【翻译】我用 PostgreSQL 替换了 Redis(而且更快)

文章目录

引言

本文很好的介绍了使用pg替换redis,虽然单个操作变慢了,但是结合常用业务的整体操作却变快了,是个路子。

原文

我用 PostgreSQL 替换了 Redis(而且更快)

我有一个典型的 Web 应用技术栈:

  • PostgreSQL 用于持久化数据
  • Redis 用于缓存、发布/订阅和后台任务

两个数据库。两套东西要管理。两个故障点。

然后我意识到:PostgreSQL 可以做 Redis 做的所有事情。

我完全移除了 Redis。以下是发生的事情。


设置:我之前用 Redis 做什么

在改变之前,Redis 处理三件事:

1. 缓存(70% 的使用量)

# 缓存 API 响应
redis-cli SET "user:${id}" '{"id":123,"name":"John"}' EX 3600

2. 发布/订阅(20% 的使用量)

# 实时通知
redis-cli PUBLISH notifications '{"userId":123,"message":"Hello"}'

3. 后台任务队列(10% 的使用量)

# 使用 Bull/BullMQ(这里展示 Redis CLI 的基础操作)
redis-cli LPUSH queue:send-email '{"to":"user@example.com","subject":"Hi"}'

痛点:

  • 两个数据库要备份
  • Redis 使用内存(大规模时很昂贵)
  • Redis 持久化…很复杂
  • Postgres 和 Redis 之间的网络跳转

为什么我考虑替换 Redis

原因 #1:成本

我的 Redis 设置:

  • AWS ElastiCache:$45/月(2GB)
  • 增长到 5GB 将花费 $110/月

PostgreSQL:

  • 已经为 RDS 付费:$50/月(20GB 存储)
  • 增加 5GB 数据:$0.50/月

潜在节省: 约 $100/月

原因 #2:运维复杂性

使用 Redis:

Postgres 备份 ✅
Redis 备份 ❓(RDB?AOF?两者都要?)
Postgres 监控 ✅
Redis 监控 ❓
Postgres 故障转移 ✅
Redis Sentinel/Cluster ❓

不使用 Redis:

Postgres 备份 ✅
Postgres 监控 ✅
Postgres 故障转移 ✅

少一个移动部件。

原因 #3:数据一致性

经典问题:

# 更新数据库
psql -c "UPDATE users SET name = 'John' WHERE id = 123;"

# 使缓存失效
redis-cli DEL "user:123"

# ⚠️ 如果 Redis 宕机了怎么办?
# ⚠️ 如果这个操作失败了怎么办?
# 现在缓存和数据库不同步了

使用 Postgres 处理一切:事务解决了这个问题。


PostgreSQL 功能 #1:使用 UNLOGGED 表进行缓存

Redis:

redis-cli SET "session:abc123" '{"userId":123,"role":"admin"}' EX 3600

PostgreSQL:

CREATE UNLOGGED TABLE cache (
  key TEXT PRIMARY KEY,
  value JSONB NOT NULL,
  expires_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_cache_expires ON cache(expires_at);

插入:

INSERT INTO cache (key, value, expires_at)
VALUES ('user:123', '{"id":123,"name":"John"}'::jsonb, NOW() + INTERVAL '1 hour')
ON CONFLICT (key) DO UPDATE
  SET value = EXCLUDED.value,
      expires_at = EXCLUDED.expires_at;

读取:

SELECT value FROM cache
WHERE key = 'user:123' AND expires_at > NOW();

清理(定期运行):

DELETE FROM cache WHERE expires_at < NOW();

什么是 UNLOGGED?

UNLOGGED 表:

  • 跳过预写日志(WAL)
  • 写入速度更快
  • 崩溃后不保留(非常适合缓存!)

性能:

Redis SET: 0.05ms
Postgres UNLOGGED INSERT: 0.08ms

对于缓存来说足够接近。


PostgreSQL 功能 #2:使用 LISTEN/NOTIFY 进行发布/订阅

这里变得有趣了。

PostgreSQL 有原生发布/订阅功能,大多数开发者都不知道。

Redis 发布/订阅

# 发布者
redis-cli PUBLISH notifications '{"userId":123,"msg":"Hello"}'

# 订阅者(在另一个终端)
redis-cli SUBSCRIBE notifications

PostgreSQL 发布/订阅

-- 发布者
NOTIFY notifications, '{"userId": 123, "msg": "Hello"}';
// 订阅者(Java with PostgreSQL JDBC)
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Properties;
import com.fasterxml.jackson.databind.ObjectMapper;

// 建立连接
String url = System.getenv("DATABASE_URL");
Properties props = new Properties();
Connection conn = DriverManager.getConnection(url, props);
Statement stmt = conn.createStatement();
stmt.execute("LISTEN notifications");
stmt.close();

// 获取 PGConnection 以接收通知
PGConnection pgConn = conn.unwrap(PGConnection.class);
org.postgresql.PGNotification[] notifications = pgConn.getNotifications();

// 在单独的线程中监听通知
new Thread(() -> {
    while (true) {
        try {
            PGNotification[] notifications = pgConn.getNotifications();
            if (notifications != null) {
                for (PGNotification notification : notifications) {
                    String payload = notification.getParameter();
                    ObjectMapper mapper = new ObjectMapper();
                    Map<String, Object> data = mapper.readValue(payload, Map.class);
                    System.out.println(data);
                }
            }
            Thread.sleep(500);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}).start();

性能比较:

Redis pub/sub 延迟:1-2ms
Postgres NOTIFY 延迟:2-5ms

稍慢一些,但是:

  • 无需额外基础设施
  • 可以在事务中使用
  • 可以与查询结合使用

真实世界示例:实时日志流

在我的日志管理应用中,我需要实时日志流

使用 Redis:

# 当新日志到达时
psql -c "INSERT INTO logs ..."
redis-cli PUBLISH logs:new '{"id":123,"message":"..."}'

# 前端监听
redis-cli SUBSCRIBE logs:new

问题: 两个操作。如果发布失败怎么办?

使用 PostgreSQL:

CREATE FUNCTION notify_new_log() RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('logs_new', row_to_json(NEW)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER log_inserted
AFTER INSERT ON logs
FOR EACH ROW EXECUTE FUNCTION notify_new_log();

现在是原子性的。插入和通知一起发生,或者都不发生。

// 前端(通过 SSE)- Spring Boot 示例
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@GetMapping(value = "/logs/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamLogs() {
    SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
    ExecutorService executor = Executors.newSingleThreadExecutor();
    
    executor.execute(() -> {
        try {
            Connection conn = dataSource.getConnection();
            Statement stmt = conn.createStatement();
            stmt.execute("LISTEN logs_new");
            stmt.close();
            
            PGConnection pgConn = conn.unwrap(PGConnection.class);
            
            while (true) {
                PGNotification[] notifications = pgConn.getNotifications();
                if (notifications != null) {
                    for (PGNotification notification : notifications) {
                        String payload = notification.getParameter();
                        emitter.send(SseEmitter.event()
                            .data("data: " + payload + "\n\n"));
                    }
                }
                Thread.sleep(100);
            }
        } catch (Exception e) {
            emitter.completeWithError(e);
        }
    });
    
    return emitter;
}

结果: 零 Redis 的实时日志流。


PostgreSQL 功能 #3:使用 SKIP LOCKED 的任务队列

Redis(使用 Bull/BullMQ):

# 入队
redis-cli LPUSH queue:send-email '{"to":"user@example.com","subject":"Hi"}'

# 出队(使用阻塞操作)
redis-cli BRPOP queue:send-email 5

PostgreSQL:

CREATE TABLE jobs (
  id BIGSERIAL PRIMARY KEY,
  queue TEXT NOT NULL,
  payload JSONB NOT NULL,
  attempts INT DEFAULT 0,
  max_attempts INT DEFAULT 3,
  scheduled_at TIMESTAMPTZ DEFAULT NOW(),
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_jobs_queue ON jobs(queue, scheduled_at) 
WHERE attempts < max_attempts;

入队:

INSERT INTO jobs (queue, payload)
VALUES ('send-email', '{"to": "user@example.com", "subject": "Hi"}'::jsonb);

工作进程(出队):

WITH next_job AS (
  SELECT id FROM jobs
  WHERE queue = 'send-email'
    AND attempts < max_attempts
    AND scheduled_at <= NOW()
  ORDER BY scheduled_at
  LIMIT 1
  FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET attempts = attempts + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING *;

魔法:FOR UPDATE SKIP LOCKED

这使得 PostgreSQL 成为一个无锁队列

  • 多个工作进程可以并发拉取任务
  • 没有任务被处理两次
  • 如果工作进程崩溃,任务会再次变为可用

性能:

Redis BRPOP: 0.1ms
Postgres SKIP LOCKED: 0.3ms

对于大多数工作负载来说差异可忽略。
【译注】:下面的例子可能更简单:

BEGIN;

-- 取出 1 个 pending 状态的任务,加锁并跳过已锁定的
UPDATE tasks 
SET status = 'processing'
WHERE id = (
    SELECT id 
    FROM tasks 
    WHERE status = 'pending'
    ORDER BY id
    LIMIT 1  --- 可以LIMIT 10来达到批量拉取的效果
    FOR UPDATE SKIP LOCKED  -- 👈 核心在这里
)
RETURNING *;

COMMIT;

--- 需要考虑对僵尸任务的释放
-- 任务处理超时 5 分钟,自动释放
UPDATE tasks 
SET status = 'pending'
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL '5 minutes';

PostgreSQL 功能 #4:速率限制

Redis(经典速率限制器):

# 检查并增加计数
redis-cli INCR "ratelimit:${userId}"
redis-cli EXPIRE "ratelimit:${userId}" 60

# 检查是否超过限制
redis-cli GET "ratelimit:${userId}"

PostgreSQL:

CREATE TABLE rate_limits (
  user_id INT PRIMARY KEY,
  request_count INT DEFAULT 0,
  window_start TIMESTAMPTZ DEFAULT NOW()
);

-- 检查并增加
WITH current AS (
  SELECT 
    request_count,
    CASE 
      WHEN window_start < NOW() - INTERVAL '1 minute'
      THEN 1  -- 重置计数器
      ELSE request_count + 1
    END AS new_count
  FROM rate_limits
  WHERE user_id = 123
  FOR UPDATE
)
UPDATE rate_limits
SET 
  request_count = (SELECT new_count FROM current),
  window_start = CASE
    WHEN window_start < NOW() - INTERVAL '1 minute'
    THEN NOW()
    ELSE window_start
  END
WHERE user_id = 123
RETURNING request_count;

或者使用窗口函数更简单:

CREATE TABLE api_requests (
  user_id INT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 检查速率限制
SELECT COUNT(*) FROM api_requests
WHERE user_id = 123
  AND created_at > NOW() - INTERVAL '1 minute';

-- 如果在限制内,插入
INSERT INTO api_requests (user_id) VALUES (123);

-- 定期清理旧请求
DELETE FROM api_requests WHERE created_at < NOW() - INTERVAL '5 minutes';

Postgres 更好的时候:

  • 需要基于复杂逻辑进行速率限制(不仅仅是计数)
  • 希望速率限制数据与业务逻辑在同一事务中

Redis 更好的时候:

  • 需要亚毫秒级速率限制
  • 极高的吞吐量(每秒数百万请求)

PostgreSQL 功能 #5:使用 JSONB 的会话

Redis:

redis-cli SET "session:${sessionId}" '{"userId":123,"role":"admin"}' EX 86400

PostgreSQL:

CREATE TABLE sessions (
  id TEXT PRIMARY KEY,
  data JSONB NOT NULL,
  expires_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_sessions_expires ON sessions(expires_at);

-- 插入/更新
INSERT INTO sessions (id, data, expires_at)
VALUES ('abc123', '{"userId":123,"role":"admin"}'::jsonb, NOW() + INTERVAL '24 hours')
ON CONFLICT (id) DO UPDATE
  SET data = EXCLUDED.data,
      expires_at = EXCLUDED.expires_at;

-- 读取
SELECT data FROM sessions
WHERE id = 'abc123' AND expires_at > NOW();

奖励:JSONB 操作符

你可以查询会话内部:

-- 查找特定用户的所有会话
SELECT * FROM sessions
WHERE data->>'userId' = '123';

-- 查找具有特定角色的会话
SELECT * FROM sessions
WHERE data->'user'->>'role' = 'admin';

使用 Redis 无法做到这一点!


真实世界基准测试

我在生产数据集上运行了基准测试:

测试设置

  • 硬件: AWS RDS db.t3.medium(2 vCPU,4GB RAM)
  • 数据集: 100 万缓存条目,1 万会话
  • 工具: pgbench(自定义脚本)

结果

操作 Redis PostgreSQL 差异
缓存 SET 0.05ms 0.08ms +60% 更慢
缓存 GET 0.04ms 0.06ms +50% 更慢
发布/订阅 1.2ms 3.1ms +158% 更慢
队列推送 0.08ms 0.15ms +87% 更慢
队列弹出 0.12ms 0.31ms +158% 更慢

PostgreSQL 更慢…但是:

  • 所有操作仍然在 1ms 以下
  • 消除了到 Redis 的网络跳转
  • 减少了基础设施复杂性

组合操作(真正的优势)

场景: 插入数据 + 使缓存失效 + 通知订阅者

使用 Redis:

psql -c "INSERT INTO posts ..."                    # 2ms
redis-cli DEL "posts:latest"                        # 1ms(网络跳转)
redis-cli PUBLISH posts:new '{"id":123}'            # 1ms(网络跳转)
# 总计:~4ms

使用 PostgreSQL:

BEGIN;
INSERT INTO posts ...;                              -- 2ms
DELETE FROM cache WHERE key = 'posts:latest';      -- 0.1ms(同一连接)
NOTIFY posts_new, '...';                            -- 0.1ms(同一连接)
COMMIT;
-- 总计:~2.2ms

当操作组合时,PostgreSQL 更快。


何时保留 Redis

如果以下情况,不要替换 Redis:

1. 你需要极致性能

Redis: 100,000+ 操作/秒(单实例)
Postgres: 10,000-50,000 操作/秒

如果你每秒进行数百万次缓存读取,保留 Redis。

2. 你使用 Redis 特定的数据结构

Redis 有:

  • 有序集合(排行榜)
  • HyperLogLog(唯一计数估计)
  • 地理空间索引
  • 流(高级发布/订阅)

Postgres 等价物存在但更笨拙:

-- Postgres 中的排行榜(更慢)
SELECT user_id, score
FROM leaderboard
ORDER BY score DESC
LIMIT 10;

-- vs Redis
redis-cli ZREVRANGE leaderboard 0 9 WITHSCORES

3. 你有独立的缓存层要求

如果你的架构要求独立的缓存层(例如微服务),保留 Redis。


迁移策略

不要一夜之间移除 Redis。 以下是我的做法:

阶段 1:并行运行(第 1 周)

// 写入两者
jedis.set(key, value);
jdbcTemplate.update("INSERT INTO cache ...", key, value);

// 从 Redis 读取(仍然是主要的)
String data = jedis.get(key);

监控: 比较命中率、延迟。

阶段 2:从 Postgres 读取(第 2 周)

// 先尝试 Postgres
String data = jdbcTemplate.queryForObject(
    "SELECT value FROM cache WHERE key = ? AND expires_at > NOW()",
    String.class, key);

// 回退到 Redis
if (data == null || data.isEmpty()) {
    data = jedis.get(key);
}

监控: 错误率、性能。

阶段 3:只写入 Postgres(第 3 周)

// 只写入 Postgres
jdbcTemplate.update("INSERT INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
    key, value, LocalDateTime.now().plusHours(1));

监控: 一切仍然正常工作?

阶段 4:移除 Redis(第 4 周)

# 关闭 Redis
# 观察错误
# 没有破坏?成功!

代码示例:完整实现

缓存模块(PostgreSQL)

// PostgresCache.java
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;

@Component
public class PostgresCache {
    private final JdbcTemplate jdbcTemplate;
    
    public PostgresCache(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
    
    public String get(String key) {
        return jdbcTemplate.queryForObject(
            "SELECT value FROM cache WHERE key = ? AND expires_at > NOW()",
            String.class, key);
    }
    
    public void set(String key, String value, int ttlSeconds) {
        jdbcTemplate.update(
            "INSERT INTO cache (key, value, expires_at) " +
            "VALUES (?, ?::jsonb, NOW() + INTERVAL ? || ' seconds') " +
            "ON CONFLICT (key) DO UPDATE " +
            "SET value = EXCLUDED.value, expires_at = EXCLUDED.expires_at",
            key, value, ttlSeconds);
    }
    
    public void set(String key, String value) {
        set(key, value, 3600);
    }
    
    public void delete(String key) {
        jdbcTemplate.update("DELETE FROM cache WHERE key = ?", key);
    }
    
    public void cleanup() {
        jdbcTemplate.update("DELETE FROM cache WHERE expires_at < NOW()");
    }
}

发布/订阅模块

// PostgresPubSub.java
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@Component
public class PostgresPubSub {
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    private final Map<String, Connection> listeners = new ConcurrentHashMap<>();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    
    public PostgresPubSub(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
        this.objectMapper = new ObjectMapper();
    }
    
    public void publish(String channel, Object message) throws Exception {
        String payload = objectMapper.writeValueAsString(message);
        jdbcTemplate.update("SELECT pg_notify(?, ?)", channel, payload);
    }
    
    public void subscribe(String channel, Consumer<Map<String, Object>> callback) {
        executor.execute(() -> {
            try {
                Connection conn = jdbcTemplate.getDataSource().getConnection();
                Statement stmt = conn.createStatement();
                stmt.execute("LISTEN " + channel);
                stmt.close();
                
                listeners.put(channel, conn);
                
                PGConnection pgConn = conn.unwrap(PGConnection.class);
                while (listeners.containsKey(channel)) {
                    PGNotification[] notifications = pgConn.getNotifications();
                    if (notifications != null) {
                        for (PGNotification notification : notifications) {
                            if (notification.getName().equals(channel)) {
                                Map<String, Object> data = objectMapper.readValue(
                                    notification.getParameter(), Map.class);
                                callback.accept(data);
                            }
                        }
                    }
                    Thread.sleep(100);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
    
    public void unsubscribe(String channel) {
        Connection conn = listeners.remove(channel);
        if (conn != null) {
            try {
                Statement stmt = conn.createStatement();
                stmt.execute("UNLISTEN " + channel);
                stmt.close();
                conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

任务队列模块

// PostgresQueue.java
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Map;

@Component
public class PostgresQueue {
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    
    public PostgresQueue(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
        this.objectMapper = new ObjectMapper();
    }
    
    public void enqueue(String queue, Map<String, Object> payload, LocalDateTime scheduledAt) {
        try {
            String payloadJson = objectMapper.writeValueAsString(payload);
            jdbcTemplate.update(
                "INSERT INTO jobs (queue, payload, scheduled_at) VALUES (?, ?::jsonb, ?)",
                queue, payloadJson, scheduledAt);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    public void enqueue(String queue, Map<String, Object> payload) {
        enqueue(queue, payload, LocalDateTime.now());
    }
    
    public Job dequeue(String queue) {
        String sql = "WITH next_job AS (" +
            "  SELECT id FROM jobs " +
            "  WHERE queue = ? " +
            "    AND attempts < max_attempts " +
            "    AND scheduled_at <= NOW() " +
            "  ORDER BY scheduled_at " +
            "  LIMIT 1 " +
            "  FOR UPDATE SKIP LOCKED " +
            ") " +
            "UPDATE jobs " +
            "SET attempts = attempts + 1 " +
            "FROM next_job " +
            "WHERE jobs.id = next_job.id " +
            "RETURNING jobs.*";
        
        return jdbcTemplate.queryForObject(sql, new JobRowMapper(), queue);
    }
    
    public void complete(Long jobId) {
        jdbcTemplate.update("DELETE FROM jobs WHERE id = ?", jobId);
    }
    
    public void fail(Long jobId, Exception error) {
        try {
            String errorJson = objectMapper.writeValueAsString(Map.of("error", error.getMessage()));
            jdbcTemplate.update(
                "UPDATE jobs " +
                "SET attempts = max_attempts, " +
                "    payload = payload || ?::jsonb " +
                "WHERE id = ?",
                errorJson, jobId);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    private static class JobRowMapper implements RowMapper<Job> {
        @Override
        public Job mapRow(ResultSet rs, int rowNum) throws SQLException {
            Job job = new Job();
            job.setId(rs.getLong("id"));
            job.setQueue(rs.getString("queue"));
            job.setPayload(rs.getString("payload"));
            job.setAttempts(rs.getInt("attempts"));
            job.setMaxAttempts(rs.getInt("max_attempts"));
            job.setScheduledAt(rs.getTimestamp("scheduled_at").toLocalDateTime());
            job.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
            return job;
        }
    }
    
    public static class Job {
        private Long id;
        private String queue;
        private String payload;
        private Integer attempts;
        private Integer maxAttempts;
        private LocalDateTime scheduledAt;
        private LocalDateTime createdAt;
        
        // Getters and Setters
        public Long getId() { return id; }
        public void setId(Long id) { this.id = id; }
        public String getQueue() { return queue; }
        public void setQueue(String queue) { this.queue = queue; }
        public String getPayload() { return payload; }
        public void setPayload(String payload) { this.payload = payload; }
        public Integer getAttempts() { return attempts; }
        public void setAttempts(Integer attempts) { this.attempts = attempts; }
        public Integer getMaxAttempts() { return maxAttempts; }
        public void setMaxAttempts(Integer maxAttempts) { this.maxAttempts = maxAttempts; }
        public LocalDateTime getScheduledAt() { return scheduledAt; }
        public void setScheduledAt(LocalDateTime scheduledAt) { this.scheduledAt = scheduledAt; }
        public LocalDateTime getCreatedAt() { return createdAt; }
        public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
    }
}

性能调优技巧

1. 使用连接池

// Spring Boot 配置(application.yml)
spring:
  datasource:
    url: jdbc:postgresql://localhost:5432/mydb
    username: user
    password: password
    hikari:
      maximum-pool-size: 20  # 最大连接数
      minimum-idle: 5        # 最小空闲连接数
      connection-timeout: 2000  # 连接超时(毫秒)
      idle-timeout: 30000    # 空闲超时(毫秒)
      max-lifetime: 1800000  # 连接最大生命周期(毫秒)

// 或者使用 Java 配置
@Configuration
public class DataSourceConfig {
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/mydb");
        config.setUsername("user");
        config.setPassword("password");
        config.setMaximumPoolSize(20);
        config.setMinimumIdle(5);
        config.setConnectionTimeout(2000);
        config.setIdleTimeout(30000);
        return new HikariDataSource(config);
    }
}

2. 添加适当的索引

CREATE INDEX CONCURRENTLY idx_cache_key ON cache(key) WHERE expires_at > NOW();
CREATE INDEX CONCURRENTLY idx_jobs_pending ON jobs(queue, scheduled_at) 
  WHERE attempts < max_attempts;

3. 调整 PostgreSQL 配置

# postgresql.conf
shared_buffers = 2GB           # RAM 的 25%
effective_cache_size = 6GB     # RAM 的 75%
work_mem = 50MB                # 用于复杂查询
maintenance_work_mem = 512MB   # 用于 VACUUM

4. 定期维护

-- 每天运行
VACUUM ANALYZE cache;
VACUUM ANALYZE jobs;

-- 或启用 autovacuum(推荐)
ALTER TABLE cache SET (autovacuum_vacuum_scale_factor = 0.1);

结果:3 个月后

我节省了什么:

  • ✅ $100/月(不再需要 ElastiCache)
  • ✅ 备份复杂性减少 50%
  • ✅ 少一个服务要监控
  • ✅ 更简单的部署(少一个依赖)

我失去了什么:

  • ❌ 缓存操作约 0.5ms 延迟
  • ❌ Redis 的异域数据结构(我不需要它们)

我会再做一次吗? 是的,对于这个用例。

我会普遍推荐吗? 不会。


决策矩阵

用 Postgres 替换 Redis,如果:

  • ✅ 你使用 Redis 进行简单缓存/会话
  • ✅ 缓存命中率 < 95%(大量写入)
  • ✅ 你想要事务一致性
  • ✅ 你可以接受 0.1-1ms 更慢的操作
  • ✅ 你是一个小团队,运维资源有限

保留 Redis,如果:

  • ❌ 你需要 100k+ 操作/秒
  • ❌ 你使用 Redis 数据结构(有序集合等)
  • ❌ 你有专门的运维团队
  • ❌ 亚毫秒延迟至关重要
  • ❌ 你正在进行地理复制

资源

PostgreSQL 功能:

工具:

替代解决方案:


TL;DR

我用 PostgreSQL 替换了 Redis,用于:

  1. 缓存 → UNLOGGED 表
  2. 发布/订阅 → LISTEN/NOTIFY
  3. 任务队列 → SKIP LOCKED
  4. 会话 → JSONB 表

结果:

  • 节省 $100/月
  • 减少运维复杂性
  • 稍慢(0.1-1ms)但可接受
  • 保证事务一致性

何时这样做:

  • 小型到中型应用
  • 简单的缓存需求
  • 想要减少移动部件

何时不这样做:

  • 高性能要求(100k+ 操作/秒)
  • 使用 Redis 特定功能
  • 有专门的运维团队

你用 Postgres 替换了 Redis(或反之)吗? 你的经验是什么?在评论中分享你的基准测试!👇

P.S. - 想要后续的”PostgreSQL 隐藏功能”或”何时 Redis 实际上更好”吗?告诉我!