文章目录
- 引言
- 我用 PostgreSQL 替换了 Redis(而且更快)
引言
本文很好的介绍了使用pg替换redis,虽然单个操作变慢了,但是结合常用业务的整体操作却变快了,是个路子。
我用 PostgreSQL 替换了 Redis(而且更快)
我有一个典型的 Web 应用技术栈:
- PostgreSQL 用于持久化数据
- Redis 用于缓存、发布/订阅和后台任务
两个数据库。两套东西要管理。两个故障点。
然后我意识到:PostgreSQL 可以做 Redis 做的所有事情。
我完全移除了 Redis。以下是发生的事情。
设置:我之前用 Redis 做什么
在改变之前,Redis 处理三件事:
1. 缓存(70% 的使用量)
# 缓存 API 响应
redis-cli SET "user:${id}" '{"id":123,"name":"John"}' EX 3600
2. 发布/订阅(20% 的使用量)
# 实时通知
redis-cli PUBLISH notifications '{"userId":123,"message":"Hello"}'
3. 后台任务队列(10% 的使用量)
# 使用 Bull/BullMQ(这里展示 Redis CLI 的基础操作)
redis-cli LPUSH queue:send-email '{"to":"user@example.com","subject":"Hi"}'
痛点:
- 两个数据库要备份
- Redis 使用内存(大规模时很昂贵)
- Redis 持久化…很复杂
- Postgres 和 Redis 之间的网络跳转
为什么我考虑替换 Redis
原因 #1:成本
我的 Redis 设置:
- AWS ElastiCache:$45/月(2GB)
- 增长到 5GB 将花费 $110/月
PostgreSQL:
- 已经为 RDS 付费:$50/月(20GB 存储)
- 增加 5GB 数据:$0.50/月
潜在节省: 约 $100/月
原因 #2:运维复杂性
使用 Redis:
Postgres 备份 ✅
Redis 备份 ❓(RDB?AOF?两者都要?)
Postgres 监控 ✅
Redis 监控 ❓
Postgres 故障转移 ✅
Redis Sentinel/Cluster ❓
不使用 Redis:
Postgres 备份 ✅
Postgres 监控 ✅
Postgres 故障转移 ✅
少一个移动部件。
原因 #3:数据一致性
经典问题:
# 更新数据库
psql -c "UPDATE users SET name = 'John' WHERE id = 123;"
# 使缓存失效
redis-cli DEL "user:123"
# ⚠️ 如果 Redis 宕机了怎么办?
# ⚠️ 如果这个操作失败了怎么办?
# 现在缓存和数据库不同步了
使用 Postgres 处理一切:事务解决了这个问题。
PostgreSQL 功能 #1:使用 UNLOGGED 表进行缓存
Redis:
redis-cli SET "session:abc123" '{"userId":123,"role":"admin"}' EX 3600
PostgreSQL:
CREATE UNLOGGED TABLE cache (
key TEXT PRIMARY KEY,
value JSONB NOT NULL,
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_cache_expires ON cache(expires_at);
插入:
INSERT INTO cache (key, value, expires_at)
VALUES ('user:123', '{"id":123,"name":"John"}'::jsonb, NOW() + INTERVAL '1 hour')
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value,
expires_at = EXCLUDED.expires_at;
读取:
SELECT value FROM cache
WHERE key = 'user:123' AND expires_at > NOW();
清理(定期运行):
DELETE FROM cache WHERE expires_at < NOW();
什么是 UNLOGGED?
UNLOGGED 表:
- 跳过预写日志(WAL)
- 写入速度更快
- 崩溃后不保留(非常适合缓存!)
性能:
Redis SET: 0.05ms
Postgres UNLOGGED INSERT: 0.08ms
对于缓存来说足够接近。
PostgreSQL 功能 #2:使用 LISTEN/NOTIFY 进行发布/订阅
这里变得有趣了。
PostgreSQL 有原生发布/订阅功能,大多数开发者都不知道。
Redis 发布/订阅
# 发布者
redis-cli PUBLISH notifications '{"userId":123,"msg":"Hello"}'
# 订阅者(在另一个终端)
redis-cli SUBSCRIBE notifications
PostgreSQL 发布/订阅
-- 发布者
NOTIFY notifications, '{"userId": 123, "msg": "Hello"}';
// 订阅者(Java with PostgreSQL JDBC)
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Properties;
import com.fasterxml.jackson.databind.ObjectMapper;
// 建立连接
String url = System.getenv("DATABASE_URL");
Properties props = new Properties();
Connection conn = DriverManager.getConnection(url, props);
Statement stmt = conn.createStatement();
stmt.execute("LISTEN notifications");
stmt.close();
// 获取 PGConnection 以接收通知
PGConnection pgConn = conn.unwrap(PGConnection.class);
org.postgresql.PGNotification[] notifications = pgConn.getNotifications();
// 在单独的线程中监听通知
new Thread(() -> {
while (true) {
try {
PGNotification[] notifications = pgConn.getNotifications();
if (notifications != null) {
for (PGNotification notification : notifications) {
String payload = notification.getParameter();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> data = mapper.readValue(payload, Map.class);
System.out.println(data);
}
}
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
性能比较:
Redis pub/sub 延迟:1-2ms
Postgres NOTIFY 延迟:2-5ms
稍慢一些,但是:
- 无需额外基础设施
- 可以在事务中使用
- 可以与查询结合使用
真实世界示例:实时日志流
在我的日志管理应用中,我需要实时日志流。
使用 Redis:
# 当新日志到达时
psql -c "INSERT INTO logs ..."
redis-cli PUBLISH logs:new '{"id":123,"message":"..."}'
# 前端监听
redis-cli SUBSCRIBE logs:new
问题: 两个操作。如果发布失败怎么办?
使用 PostgreSQL:
CREATE FUNCTION notify_new_log() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('logs_new', row_to_json(NEW)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER log_inserted
AFTER INSERT ON logs
FOR EACH ROW EXECUTE FUNCTION notify_new_log();
现在是原子性的。插入和通知一起发生,或者都不发生。
// 前端(通过 SSE)- Spring Boot 示例
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@GetMapping(value = "/logs/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamLogs() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
try {
Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
stmt.execute("LISTEN logs_new");
stmt.close();
PGConnection pgConn = conn.unwrap(PGConnection.class);
while (true) {
PGNotification[] notifications = pgConn.getNotifications();
if (notifications != null) {
for (PGNotification notification : notifications) {
String payload = notification.getParameter();
emitter.send(SseEmitter.event()
.data("data: " + payload + "\n\n"));
}
}
Thread.sleep(100);
}
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
结果: 零 Redis 的实时日志流。
PostgreSQL 功能 #3:使用 SKIP LOCKED 的任务队列
Redis(使用 Bull/BullMQ):
# 入队
redis-cli LPUSH queue:send-email '{"to":"user@example.com","subject":"Hi"}'
# 出队(使用阻塞操作)
redis-cli BRPOP queue:send-email 5
PostgreSQL:
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
queue TEXT NOT NULL,
payload JSONB NOT NULL,
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3,
scheduled_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_jobs_queue ON jobs(queue, scheduled_at)
WHERE attempts < max_attempts;
入队:
INSERT INTO jobs (queue, payload)
VALUES ('send-email', '{"to": "user@example.com", "subject": "Hi"}'::jsonb);
工作进程(出队):
WITH next_job AS (
SELECT id FROM jobs
WHERE queue = 'send-email'
AND attempts < max_attempts
AND scheduled_at <= NOW()
ORDER BY scheduled_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET attempts = attempts + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING *;
魔法:FOR UPDATE SKIP LOCKED
这使得 PostgreSQL 成为一个无锁队列:
- 多个工作进程可以并发拉取任务
- 没有任务被处理两次
- 如果工作进程崩溃,任务会再次变为可用
性能:
Redis BRPOP: 0.1ms
Postgres SKIP LOCKED: 0.3ms
对于大多数工作负载来说差异可忽略。
【译注】:下面的例子可能更简单:
BEGIN;
-- 取出 1 个 pending 状态的任务,加锁并跳过已锁定的
UPDATE tasks
SET status = 'processing'
WHERE id = (
SELECT id
FROM tasks
WHERE status = 'pending'
ORDER BY id
LIMIT 1 --- 可以LIMIT 10来达到批量拉取的效果
FOR UPDATE SKIP LOCKED -- 👈 核心在这里
)
RETURNING *;
COMMIT;
--- 需要考虑对僵尸任务的释放
-- 任务处理超时 5 分钟,自动释放
UPDATE tasks
SET status = 'pending'
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL '5 minutes';
PostgreSQL 功能 #4:速率限制
Redis(经典速率限制器):
# 检查并增加计数
redis-cli INCR "ratelimit:${userId}"
redis-cli EXPIRE "ratelimit:${userId}" 60
# 检查是否超过限制
redis-cli GET "ratelimit:${userId}"
PostgreSQL:
CREATE TABLE rate_limits (
user_id INT PRIMARY KEY,
request_count INT DEFAULT 0,
window_start TIMESTAMPTZ DEFAULT NOW()
);
-- 检查并增加
WITH current AS (
SELECT
request_count,
CASE
WHEN window_start < NOW() - INTERVAL '1 minute'
THEN 1 -- 重置计数器
ELSE request_count + 1
END AS new_count
FROM rate_limits
WHERE user_id = 123
FOR UPDATE
)
UPDATE rate_limits
SET
request_count = (SELECT new_count FROM current),
window_start = CASE
WHEN window_start < NOW() - INTERVAL '1 minute'
THEN NOW()
ELSE window_start
END
WHERE user_id = 123
RETURNING request_count;
或者使用窗口函数更简单:
CREATE TABLE api_requests (
user_id INT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 检查速率限制
SELECT COUNT(*) FROM api_requests
WHERE user_id = 123
AND created_at > NOW() - INTERVAL '1 minute';
-- 如果在限制内,插入
INSERT INTO api_requests (user_id) VALUES (123);
-- 定期清理旧请求
DELETE FROM api_requests WHERE created_at < NOW() - INTERVAL '5 minutes';
Postgres 更好的时候:
- 需要基于复杂逻辑进行速率限制(不仅仅是计数)
- 希望速率限制数据与业务逻辑在同一事务中
Redis 更好的时候:
- 需要亚毫秒级速率限制
- 极高的吞吐量(每秒数百万请求)
PostgreSQL 功能 #5:使用 JSONB 的会话
Redis:
redis-cli SET "session:${sessionId}" '{"userId":123,"role":"admin"}' EX 86400
PostgreSQL:
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
data JSONB NOT NULL,
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_sessions_expires ON sessions(expires_at);
-- 插入/更新
INSERT INTO sessions (id, data, expires_at)
VALUES ('abc123', '{"userId":123,"role":"admin"}'::jsonb, NOW() + INTERVAL '24 hours')
ON CONFLICT (id) DO UPDATE
SET data = EXCLUDED.data,
expires_at = EXCLUDED.expires_at;
-- 读取
SELECT data FROM sessions
WHERE id = 'abc123' AND expires_at > NOW();
奖励:JSONB 操作符
你可以查询会话内部:
-- 查找特定用户的所有会话
SELECT * FROM sessions
WHERE data->>'userId' = '123';
-- 查找具有特定角色的会话
SELECT * FROM sessions
WHERE data->'user'->>'role' = 'admin';
使用 Redis 无法做到这一点!
真实世界基准测试
我在生产数据集上运行了基准测试:
测试设置
- 硬件: AWS RDS db.t3.medium(2 vCPU,4GB RAM)
- 数据集: 100 万缓存条目,1 万会话
- 工具: pgbench(自定义脚本)
结果
| 操作 | Redis | PostgreSQL | 差异 |
|---|---|---|---|
| 缓存 SET | 0.05ms | 0.08ms | +60% 更慢 |
| 缓存 GET | 0.04ms | 0.06ms | +50% 更慢 |
| 发布/订阅 | 1.2ms | 3.1ms | +158% 更慢 |
| 队列推送 | 0.08ms | 0.15ms | +87% 更慢 |
| 队列弹出 | 0.12ms | 0.31ms | +158% 更慢 |
PostgreSQL 更慢…但是:
- 所有操作仍然在 1ms 以下
- 消除了到 Redis 的网络跳转
- 减少了基础设施复杂性
组合操作(真正的优势)
场景: 插入数据 + 使缓存失效 + 通知订阅者
使用 Redis:
psql -c "INSERT INTO posts ..." # 2ms
redis-cli DEL "posts:latest" # 1ms(网络跳转)
redis-cli PUBLISH posts:new '{"id":123}' # 1ms(网络跳转)
# 总计:~4ms
使用 PostgreSQL:
BEGIN;
INSERT INTO posts ...; -- 2ms
DELETE FROM cache WHERE key = 'posts:latest'; -- 0.1ms(同一连接)
NOTIFY posts_new, '...'; -- 0.1ms(同一连接)
COMMIT;
-- 总计:~2.2ms
当操作组合时,PostgreSQL 更快。
何时保留 Redis
如果以下情况,不要替换 Redis:
1. 你需要极致性能
Redis: 100,000+ 操作/秒(单实例)
Postgres: 10,000-50,000 操作/秒
如果你每秒进行数百万次缓存读取,保留 Redis。
2. 你使用 Redis 特定的数据结构
Redis 有:
- 有序集合(排行榜)
- HyperLogLog(唯一计数估计)
- 地理空间索引
- 流(高级发布/订阅)
Postgres 等价物存在但更笨拙:
-- Postgres 中的排行榜(更慢)
SELECT user_id, score
FROM leaderboard
ORDER BY score DESC
LIMIT 10;
-- vs Redis
redis-cli ZREVRANGE leaderboard 0 9 WITHSCORES
3. 你有独立的缓存层要求
如果你的架构要求独立的缓存层(例如微服务),保留 Redis。
迁移策略
不要一夜之间移除 Redis。 以下是我的做法:
阶段 1:并行运行(第 1 周)
// 写入两者
jedis.set(key, value);
jdbcTemplate.update("INSERT INTO cache ...", key, value);
// 从 Redis 读取(仍然是主要的)
String data = jedis.get(key);
监控: 比较命中率、延迟。
阶段 2:从 Postgres 读取(第 2 周)
// 先尝试 Postgres
String data = jdbcTemplate.queryForObject(
"SELECT value FROM cache WHERE key = ? AND expires_at > NOW()",
String.class, key);
// 回退到 Redis
if (data == null || data.isEmpty()) {
data = jedis.get(key);
}
监控: 错误率、性能。
阶段 3:只写入 Postgres(第 3 周)
// 只写入 Postgres
jdbcTemplate.update("INSERT INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
key, value, LocalDateTime.now().plusHours(1));
监控: 一切仍然正常工作?
阶段 4:移除 Redis(第 4 周)
# 关闭 Redis
# 观察错误
# 没有破坏?成功!
代码示例:完整实现
缓存模块(PostgreSQL)
// PostgresCache.java
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;
@Component
public class PostgresCache {
private final JdbcTemplate jdbcTemplate;
public PostgresCache(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public String get(String key) {
return jdbcTemplate.queryForObject(
"SELECT value FROM cache WHERE key = ? AND expires_at > NOW()",
String.class, key);
}
public void set(String key, String value, int ttlSeconds) {
jdbcTemplate.update(
"INSERT INTO cache (key, value, expires_at) " +
"VALUES (?, ?::jsonb, NOW() + INTERVAL ? || ' seconds') " +
"ON CONFLICT (key) DO UPDATE " +
"SET value = EXCLUDED.value, expires_at = EXCLUDED.expires_at",
key, value, ttlSeconds);
}
public void set(String key, String value) {
set(key, value, 3600);
}
public void delete(String key) {
jdbcTemplate.update("DELETE FROM cache WHERE key = ?", key);
}
public void cleanup() {
jdbcTemplate.update("DELETE FROM cache WHERE expires_at < NOW()");
}
}
发布/订阅模块
// PostgresPubSub.java
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Component
public class PostgresPubSub {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
private final Map<String, Connection> listeners = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
public PostgresPubSub(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = new ObjectMapper();
}
public void publish(String channel, Object message) throws Exception {
String payload = objectMapper.writeValueAsString(message);
jdbcTemplate.update("SELECT pg_notify(?, ?)", channel, payload);
}
public void subscribe(String channel, Consumer<Map<String, Object>> callback) {
executor.execute(() -> {
try {
Connection conn = jdbcTemplate.getDataSource().getConnection();
Statement stmt = conn.createStatement();
stmt.execute("LISTEN " + channel);
stmt.close();
listeners.put(channel, conn);
PGConnection pgConn = conn.unwrap(PGConnection.class);
while (listeners.containsKey(channel)) {
PGNotification[] notifications = pgConn.getNotifications();
if (notifications != null) {
for (PGNotification notification : notifications) {
if (notification.getName().equals(channel)) {
Map<String, Object> data = objectMapper.readValue(
notification.getParameter(), Map.class);
callback.accept(data);
}
}
}
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
public void unsubscribe(String channel) {
Connection conn = listeners.remove(channel);
if (conn != null) {
try {
Statement stmt = conn.createStatement();
stmt.execute("UNLISTEN " + channel);
stmt.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
任务队列模块
// PostgresQueue.java
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Map;
@Component
public class PostgresQueue {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public PostgresQueue(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = new ObjectMapper();
}
public void enqueue(String queue, Map<String, Object> payload, LocalDateTime scheduledAt) {
try {
String payloadJson = objectMapper.writeValueAsString(payload);
jdbcTemplate.update(
"INSERT INTO jobs (queue, payload, scheduled_at) VALUES (?, ?::jsonb, ?)",
queue, payloadJson, scheduledAt);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void enqueue(String queue, Map<String, Object> payload) {
enqueue(queue, payload, LocalDateTime.now());
}
public Job dequeue(String queue) {
String sql = "WITH next_job AS (" +
" SELECT id FROM jobs " +
" WHERE queue = ? " +
" AND attempts < max_attempts " +
" AND scheduled_at <= NOW() " +
" ORDER BY scheduled_at " +
" LIMIT 1 " +
" FOR UPDATE SKIP LOCKED " +
") " +
"UPDATE jobs " +
"SET attempts = attempts + 1 " +
"FROM next_job " +
"WHERE jobs.id = next_job.id " +
"RETURNING jobs.*";
return jdbcTemplate.queryForObject(sql, new JobRowMapper(), queue);
}
public void complete(Long jobId) {
jdbcTemplate.update("DELETE FROM jobs WHERE id = ?", jobId);
}
public void fail(Long jobId, Exception error) {
try {
String errorJson = objectMapper.writeValueAsString(Map.of("error", error.getMessage()));
jdbcTemplate.update(
"UPDATE jobs " +
"SET attempts = max_attempts, " +
" payload = payload || ?::jsonb " +
"WHERE id = ?",
errorJson, jobId);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static class JobRowMapper implements RowMapper<Job> {
@Override
public Job mapRow(ResultSet rs, int rowNum) throws SQLException {
Job job = new Job();
job.setId(rs.getLong("id"));
job.setQueue(rs.getString("queue"));
job.setPayload(rs.getString("payload"));
job.setAttempts(rs.getInt("attempts"));
job.setMaxAttempts(rs.getInt("max_attempts"));
job.setScheduledAt(rs.getTimestamp("scheduled_at").toLocalDateTime());
job.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
return job;
}
}
public static class Job {
private Long id;
private String queue;
private String payload;
private Integer attempts;
private Integer maxAttempts;
private LocalDateTime scheduledAt;
private LocalDateTime createdAt;
// Getters and Setters
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getQueue() { return queue; }
public void setQueue(String queue) { this.queue = queue; }
public String getPayload() { return payload; }
public void setPayload(String payload) { this.payload = payload; }
public Integer getAttempts() { return attempts; }
public void setAttempts(Integer attempts) { this.attempts = attempts; }
public Integer getMaxAttempts() { return maxAttempts; }
public void setMaxAttempts(Integer maxAttempts) { this.maxAttempts = maxAttempts; }
public LocalDateTime getScheduledAt() { return scheduledAt; }
public void setScheduledAt(LocalDateTime scheduledAt) { this.scheduledAt = scheduledAt; }
public LocalDateTime getCreatedAt() { return createdAt; }
public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}
}
性能调优技巧
1. 使用连接池
// Spring Boot 配置(application.yml)
spring:
datasource:
url: jdbc:postgresql://localhost:5432/mydb
username: user
password: password
hikari:
maximum-pool-size: 20 # 最大连接数
minimum-idle: 5 # 最小空闲连接数
connection-timeout: 2000 # 连接超时(毫秒)
idle-timeout: 30000 # 空闲超时(毫秒)
max-lifetime: 1800000 # 连接最大生命周期(毫秒)
// 或者使用 Java 配置
@Configuration
public class DataSourceConfig {
@Bean
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/mydb");
config.setUsername("user");
config.setPassword("password");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(2000);
config.setIdleTimeout(30000);
return new HikariDataSource(config);
}
}
2. 添加适当的索引
CREATE INDEX CONCURRENTLY idx_cache_key ON cache(key) WHERE expires_at > NOW();
CREATE INDEX CONCURRENTLY idx_jobs_pending ON jobs(queue, scheduled_at)
WHERE attempts < max_attempts;
3. 调整 PostgreSQL 配置
# postgresql.conf
shared_buffers = 2GB # RAM 的 25%
effective_cache_size = 6GB # RAM 的 75%
work_mem = 50MB # 用于复杂查询
maintenance_work_mem = 512MB # 用于 VACUUM
4. 定期维护
-- 每天运行
VACUUM ANALYZE cache;
VACUUM ANALYZE jobs;
-- 或启用 autovacuum(推荐)
ALTER TABLE cache SET (autovacuum_vacuum_scale_factor = 0.1);
结果:3 个月后
我节省了什么:
- ✅ $100/月(不再需要 ElastiCache)
- ✅ 备份复杂性减少 50%
- ✅ 少一个服务要监控
- ✅ 更简单的部署(少一个依赖)
我失去了什么:
- ❌ 缓存操作约 0.5ms 延迟
- ❌ Redis 的异域数据结构(我不需要它们)
我会再做一次吗? 是的,对于这个用例。
我会普遍推荐吗? 不会。
决策矩阵
用 Postgres 替换 Redis,如果:
- ✅ 你使用 Redis 进行简单缓存/会话
- ✅ 缓存命中率 < 95%(大量写入)
- ✅ 你想要事务一致性
- ✅ 你可以接受 0.1-1ms 更慢的操作
- ✅ 你是一个小团队,运维资源有限
保留 Redis,如果:
- ❌ 你需要 100k+ 操作/秒
- ❌ 你使用 Redis 数据结构(有序集合等)
- ❌ 你有专门的运维团队
- ❌ 亚毫秒延迟至关重要
- ❌ 你正在进行地理复制
资源
PostgreSQL 功能:
工具:
- pgBouncer - 连接池
- pg_stat_statements - 查询性能
替代解决方案:
- Graphile Worker - 基于 Postgres 的任务队列
- pg-boss - 另一个 Postgres 队列
TL;DR
我用 PostgreSQL 替换了 Redis,用于:
- 缓存 → UNLOGGED 表
- 发布/订阅 → LISTEN/NOTIFY
- 任务队列 → SKIP LOCKED
- 会话 → JSONB 表
结果:
- 节省 $100/月
- 减少运维复杂性
- 稍慢(0.1-1ms)但可接受
- 保证事务一致性
何时这样做:
- 小型到中型应用
- 简单的缓存需求
- 想要减少移动部件
何时不这样做:
- 高性能要求(100k+ 操作/秒)
- 使用 Redis 特定功能
- 有专门的运维团队
你用 Postgres 替换了 Redis(或反之)吗? 你的经验是什么?在评论中分享你的基准测试!👇
P.S. - 想要后续的”PostgreSQL 隐藏功能”或”何时 Redis 实际上更好”吗?告诉我!