编程 Valkey 9.1 深度实战:当开源社区 Fork 出一个「超越 Redis」的内存数据库——从架构革命到生产级迁移的完全指南(2026)

2026-06-09 19:19:51 +0800 CST views 9

Valkey 9.1 深度实战:当开源社区 Fork 出一个「超越 Redis」的内存数据库——从架构革命到生产级迁移的完全指南(2026)

引言:为什么 2026 年你必须认真对待 Valkey

2024 年 3 月,Redis Labs 宣布将 Redis 从 BSD 3-Clause 协议切换为 RSALv2 + SSPLv1 双许可证。这一决定在开源社区引发了地震级的连锁反应——AWS、Google Cloud、Oracle、阿里云等云厂商联合 fork 了 Redis 7.2.4,成立了 Valkey 项目,交由 Linux 基金会托管。

两年后的今天,Valkey 已经不是那个「只是换个名字的 Redis」了。9.0 版本带来了原子化 Slot 迁移、Hash 字段级过期、集群模式多数据库支持等 Redis 至今未能实现的特性;9.1 版本更进一步——数据库级 ACL、Lua 引擎模块化、CLUSTERSCAN 命令、增量页面释放(消除 rehash 延迟尖峰)、zset 内存嵌入优化……这些改动从内核架构层面重新定义了内存数据库的能力边界。

本文将从架构原理、特性剖析、代码实战、性能优化、生产迁移五个维度,带你深度理解 Valkey 9.1 的技术内核。读完这篇文章,你不仅能做出「要不要迁移」的决策,更能掌握「怎么迁移最稳」的实操方案。


一、架构革命:Valkey 9.x 的内核重构全景

1.1 从 Redis 到 Valkey:不只是 Fork

很多人以为 Valkey 只是 Redis 的「换皮版」,这个认知在 9.0 之后就不成立了。我们来看几个关键的数据结构变化:

1. Hash 表重新设计

Redis 的 Hash 表采用经典的 dict 结构(两个 ht 桶 + 渐进式 rehash),这个设计从 Salvatore 最初写 Redis 就没大改过。Valkey 9.0 开始对 Hash 表进行了深度优化:

// Redis 7.x 的 dictEntry 结构
typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;  // 链地址法,每个 entry 都有 next 指针
} dictEntry;

// Valkey 9.x 优化后的思路:小字符串内嵌
// 移除内部 server object 指针开销(PR #2516)
// 对于小字符串,直接将内容嵌入对象结构,省去一次指针跳转和内存分配

Valkey 9.1 进一步引入了**增量页面释放(Incremental Page Release)**机制来解决 rehash 的延迟尖峰问题(PR #3481):

// 传统 rehash 的问题:
// 当 Hash 表从 ht[0] 迁移到 ht[1] 时,完成后一次性释放 ht[0] 的所有页面
// 如果 ht[0] 很大(比如数百万 entry),这个 free() 操作可能造成毫秒级延迟尖峰

// Valkey 9.1 增量页面释放:
// 将大块内存的释放拆分为多个小步骤
// 每次事件循环中释放一部分页面,将延迟尖峰平滑化
static void dictIncrementalReleasePages(dict *d, unsigned long pages_per_step) {
    // 每次 N 页,分步释放,避免大 free() 的延迟冲击
    for (unsigned long i = 0; i < pages_per_step; i++) {
        if (!releaseOnePage(d->ht_old)) break;
    }
}

2. Skiplist 嵌入式优化

Valkey 9.1 对有序集合(zset)的跳表实现做了两个关键优化(PR #2508 和 #2867):

// Redis 7.x 的 skiplistNode
typedef struct zskiplistNode {
    sds ele;                          // 指向 sds 字符串的指针
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

// Valkey 9.1 嵌入式优化:
// 将 element 直接嵌入 skiplistNode,省去一次指针跳转
// 同时将 skiplist header 也嵌入 zset 结构
// 这两个优化在大量小元素的 zset 场景下,内存节省可达 15-20%

3. Zero-copy 响应机制

Valkey 9.0 引入的 Zero-copy Responses(PR #2078),在处理大响应时避免了内部内存拷贝,吞吐量提升最高 20%。9.1 进一步优化了命令回复追踪禁用时的拷贝规避(PR #3086)。

1.2 集群架构:从 Key-by-Key 到 Atomic Slot

这是 Valkey 9.x 最具颠覆性的架构变化。让我用代码来说明:

Redis 的 Key-by-Key 迁移问题:

// 传统迁移流程(Redis/Valkey 8.x):
// 1. 在目标节点执行 CLUSTER SETSLOT <slot> IMPORTING <source_id>
// 2. 在源节点执行 CLUSTER SETSLOT <slot> MIGRATING <target_id>
// 3. 逐个 key 执行:
//    源节点: DUMP key → 目标节点: RESTORE key ttl value
//    源节点: DEL key
// 4. 所有 key 迁移完成后:CLUSTER SETSLOT <slot> NODE <target_id>

// 问题场景:
// 假设 slot 16383 包含 100 万个 key
// 在迁移过程中:
// - key 1~50万已迁移到目标节点
// - key 50万~100万还在源节点
// 客户端访问 key 600000:
//   → 源节点返回 MOVED(但这个 key 还没迁移)
//   → 客户端访问目标节点 → 返回 ASK
//   → 客户端重新访问源节点
//   → 如果是多 key 操作(MGET/SUNION),一个 key 在源、一个在目标 → 直接报错

Valkey 9.0 原子化 Slot 迁移:

// 原子化迁移流程:
// 1. 源节点将整个 slot 的数据以 AOF 格式流式传输到目标节点
// 2. AOF 格式可以逐条发送 collection 中的单个元素,而非整个 key
// 3. 迁移期间,源节点保留完整数据,目标节点在后台接收
// 4. 迁移完成后,原子切换 slot 归属
// 5. 删除源节点数据

// 优势:
// - 不存在"部分迁移"状态 → 无 MOVED/ASK 重定向
// - 大 collection 不会阻塞迁移
// - 多 key 操作始终在源节点完成,直到原子切换

1.3 多路径 TCP(Multipath TCP)

Valkey 9.0 引入了 MPTCP 支持(PR #1811),允许单条 TCP 连接同时使用多条网络路径:

// 传统 TCP:客户端 → 交换机 → 服务端(单路径)
// MPTCP:客户端 → 交换机A → 服务端
//        客户端 → 交换机B → 服务端(多路径并行)

// 在云环境下的收益:
// - 某条路径抖动/丢包时,自动切换到另一条
// - 带宽聚合:多条路径同时传输
// - 实测延迟降低约 25%

二、核心特性深度解析

2.1 Hash 字段级过期(Hash Field Expiration)

这是 Redis 用户请求了多年的特性,Valkey 9.0 终于实现了。

为什么需要字段级过期?

// 场景:用户 Session 存储
// 传统方案(Redis/Valkey 8.x):
HSET user:1001 name "张三" login_time 1717891200 token "abc123" temp_code "5678"
EXPIRE user:1001 3600  // 整个 key 1 小时过期

// 问题:temp_code 应该 5 分钟过期,token 应该 30 分钟过期
// 但 EXPIRE 只能作用于整个 key

// 折中方案:拆成多个 key
SET user:1001:temp_code "5678" EX 300
SET user:1001:token "abc123" EX 1800
HSET user:1001 name "张三" login_time 1717891200
// 问题:key 数量膨胀,内存开销大,查询需要多次网络往返

Valkey 9.0 的解决方案:

# 字段级过期命令
HSET user:1001 name "张三" login_time 1717891200 token "abc123" temp_code "5678"

# 单独设置字段过期
HEXPIRE user:1001 FIELDS 1 temp_code 300    # 5 分钟
HEXPIRE user:1001 FIELDS 1 token 1800       # 30 分钟

# 查看字段 TTL
HTTL user:1001 FIELDS 2 temp_code token
# 1) (integer) 298
# 2) (integer) 1798

# HSETEX:设置字段并指定过期
HSETEX user:1001 FIELDS 1 temp_code 300 "5678"

# HGETEX:获取字段并刷新过期
HGETEX user:1001 FIELDS 1 temp_code EX 300
# 返回字段值,同时刷新过期时间

# HPERSIST:移除字段过期
HPERSIST user:1001 FIELDS 1 token

# HGETDEL:获取并删除字段(Valkey 9.1 新增)
HGETDEL user:1001 FIELDS 1 temp_code
# 返回值并删除字段,一次性操作

底层实现原理:

// Valkey 在 hash 对象中为每个 field 增加了过期时间戳
// 使用 RedisDb 全局的 expires 哈希表扩展来管理字段级过期
// 惰性删除 + 定期采样清理的双轨策略

// 过期字段访问时的处理:
// 1. HGET 等命令访问字段时,检查 field 的 expire 时间
// 2. 如果已过期,删除该 field,返回空
// 3. 定期任务也会扫描并清理过期字段

// 内存开销:每个带过期时间的 field 增加约 8 字节(uint64_t 时间戳)

2.2 集群模式多数据库支持(Numbered Databases in Cluster Mode)

Redis 集群模式只允许使用 db0,这是很多团队不敢上集群的原因之一。Valkey 9.0 打破了这个限制。

# Redis 集群模式:
CONFIG GET databases
# 1) "databases"
# 2) "1"   ← 集群模式下只有 1 个数据库

# Valkey 9.0 集群模式:
CONFIG GET databases
# 1) "databases"  
# 2) "16"  ← 可以自由配置

# 使用示例:
SELECT 0    # 主业务数据
SELECT 1    # 缓存数据
SELECT 2    # 临时数据

# 集群模式下,每个 database 独立管理各自的 slot
# 迁移时按 database+slot 粒度操作

典型应用场景:

// 场景 1:多租户隔离
// 每个租户使用不同的 database,无需 key 前缀
SELECT 1   // 租户 A
SELECT 2   // 租户 B

// 场景 2:缓存分级
SELECT 0   // 热数据(持久化)
SELECT 1   // 温数据(RDB only)
SELECT 2   // 冷数据(无持久化,LRU 淘汰)

// 场景 3:开发/测试隔离
// 同一集群,不同 database,资源隔离但共享集群弹性

2.3 数据库级 ACL(Valkey 9.1 新特性)

Valkey 9.1 引入了数据库级别的访问控制(PR #2309),这是企业级安全的关键拼图:

# 创建只能在特定 database 操作的用户
ACL SETUSER app_reader on >strongpassword ~* +@read -@write -@dangerous %R~db0 %R~db1

# 创建只读 db0、读写 db1 的用户
ACL SETUSER analytics_user on >password123 ~* +@all %RW~db1 %R~db0

# 数据库级 ACL 的格式:
# %R~db<N>   → 允许读取 database N
# %W~db<N>   → 允许写入 database N  
# %RW~db<N>  → 允许读写 database N

# 查看用户权限
ACL LIST
# user app_reader on #... ~* &* -@all +@read -@write -@dangerous %R~db0 %R~db1

2.4 CLUSTERSCAN 命令(Valkey 9.1 新特性)

在集群模式下扫描所有 key 一直是痛点——你需要自己遍历所有节点再合并结果。Valkey 9.1 的 CLUSTERSCAN 命令(PR #2934)解决了这个问题:

# 传统方式:遍历所有节点
# 需要客户端自己连接每个节点执行 SCAN,再合并去重

# Valkey 9.1 的 CLUSTERSCAN:
CLUSTERSCAN 0 COUNT 100 MATCH user:*
# 1) "342"           ← 游标
# 2) 1) "user:1001"
#    2) "user:1002"
#    3) "user:1003"

CLUSTERSCAN 342 COUNT 100 MATCH user:*
# 1) "0"             ← 游标为 0 表示遍历完成
# 2) 1) "user:1004"
#    2) "user:1005"

# 配置 DB hash seed 实现跨节点一致性扫描(PR #2608):
CONFIG SET cluster-scan-db-hash-seed "my-app-v1"
# 确保相同 key 在不同节点的 hash 顺序一致

2.5 MSETEX 命令(Valkey 9.1 新特性)

批量设置带过期时间的 key,一次网络往返搞定(PR #3121):

# 传统方式:逐个设置,N 次网络往返
SET key1 val1 EX 60
SET key2 val2 EX 120
SET key3 val3 EX 60

# Valkey 9.1 的 MSETEX:
MSETEX 60 key1 val1 key3 val3 120 key2 val2
# 一次命令,多个 key,各自过期时间

# Python 客户端示例:
import valkey

client = valkey.Valkey()
# 批量写入带过期的数据
client.msetex({
    60: {"session:1001": "data1", "session:1003": "data3"},
    120: {"session:1002": "data2"}
})

2.6 Lua 引擎模块化(Valkey 9.1 新特性)

Valkey 9.1 将 Lua 脚本引擎移到了模块中(PR #2858),这意味着:

# 不再需要 Lua 的场景可以完全禁用,减少攻击面
CONFIG SET loadmodule ""  # 不加载 Lua 模块

# 未来可以替换为其他脚本引擎
# Python、JavaScript 等脚本引擎作为独立模块可选加载

三、代码实战:从零构建生产级 Valkey 集群

3.1 Docker Compose 一键搭建 6 节点集群

# docker-compose.yml
version: '3.8'

x-valkey-common: &valkey-common
  image: valkey/valkey:9.1.0
  restart: unless-stopped
  command: >
    valkey-server
    --cluster-enabled yes
    --cluster-config-file nodes.conf
    --cluster-node-timeout 5000
    --appendonly yes
    --appendfilename "appendonly.aof"
    --maxmemory 512mb
    --maxmemory-policy allkeys-lru
    --io-threads 4
    --io-threads-do-reads yes
    --enable-debug-command yes
    --databases 16
    --log-format json
  volumes:
    - valkey-data:/data
  networks:
    - valkey-net

services:
  valkey-1:
    <<: *valkey-common
    ports:
      - "7001:6379"
      - "17001:16379"
  valkey-2:
    <<: *valkey-common
    ports:
      - "7002:6379"
      - "17002:16379"
  valkey-3:
    <<: *valkey-common
    ports:
      - "7003:6379"
      - "17003:16379"
  valkey-4:
    <<: *valkey-common
    ports:
      - "7004:6379"
      - "17004:16379"
  valkey-5:
    <<: *valkey-common
    ports:
      - "7005:6379"
      - "17005:16379"
  valkey-6:
    <<: *valkey-common
    ports:
      - "7006:6379"
      - "17006:16379"

  # 集群初始化容器
  cluster-init:
    image: valkey/valkey:9.1.0
    depends_on:
      - valkey-1
      - valkey-2
      - valkey-3
      - valkey-4
      - valkey-5
      - valkey-6
    entrypoint: []
    command: >
      bash -c "
        sleep 3 &&
        valkey-cli --cluster create
          valkey-1:6379 valkey-2:6379 valkey-3:6379
          valkey-4:6379 valkey-5:6379 valkey-6:6379
          --cluster-replicas 1 --cluster-yes
      "
    networks:
      - valkey-net

networks:
  valkey-net:
    driver: bridge

volumes:
  valkey-data:
# 启动集群
docker compose up -d

# 验证集群状态
docker compose exec valkey-1 valkey-cli cluster info
# cluster_state:ok
# cluster_slots_assigned:16384
# cluster_slots_ok:16384
# cluster_known_nodes:6

# 查看节点信息
docker compose exec valkey-1 valkey-cli cluster nodes

3.2 Python 应用接入实战

# pip install valkey  (Valkey 官方 Python 客户端)
import valkey
import json
import time
from datetime import datetime, timedelta

class ValkeySessionManager:
    """基于 Valkey 9.1 Hash 字段级过期的 Session 管理器"""
    
    def __init__(self, cluster_nodes):
        self.client = valkey.ValkeyCluster(
            startup_nodes=cluster_nodes,
            decode_responses=True,
            retry_on_timeout=True,
            socket_timeout=5,
            socket_connect_timeout=5,
        )
    
    def create_session(self, user_id, session_data: dict):
        """创建用户 Session,不同字段独立过期"""
        key = f"session:{user_id}"
        
        # 基本信息不过期
        pipe = self.client.pipeline()
        pipe.hset(key, mapping={
            "user_id": str(user_id),
            "created_at": datetime.now().isoformat(),
        })
        
        # token 30 分钟过期
        token = session_data.get("token", "")
        pipe.hsetex(key, "token", 1800, token)
        
        # 验证码 5 分钟过期
        verify_code = session_data.get("verify_code", "")
        pipe.hsetex(key, "verify_code", 300, verify_code)
        
        # 临时权限 1 小时过期
        temp_perm = session_data.get("temp_permission", "")
        pipe.hsetex(key, "temp_permission", 3600, temp_perm)
        
        pipe.execute()
        return True
    
    def verify_and_consume_code(self, user_id, input_code):
        """验证并消费验证码(一次性使用)"""
        key = f"session:{user_id}"
        
        # HGETDEL:原子获取并删除(Valkey 9.1 新增)
        result = self.client.hgetdel(key, "verify_code")
        
        if result and result == input_code:
            return True
        return False
    
    def refresh_token(self, user_id):
        """刷新 token 过期时间"""
        key = f"session:{user_id}"
        self.client.hexpire(key, 1800, "token")
        return True
    
    def get_session_info(self, user_id):
        """获取 Session 完整信息(含 TTL)"""
        key = f"session:{user_id}"
        
        pipe = self.client.pipeline()
        pipe.hgetall(key)
        pipe.httl(key, "token", "verify_code", "temp_permission")
        results = pipe.execute()
        
        session = results[0]
        ttls = results[1]
        
        return {
            "data": session,
            "ttl": {
                "token": ttls[0] if ttls[0] > 0 else "expired",
                "verify_code": ttls[1] if ttls[1] > 0 else "expired",
                "temp_permission": ttls[2] if ttls[2] > 0 else "expired",
            }
        }


class ValkeyClusterScanner:
    """基于 Valkey 9.1 CLUSTERSCAN 的集群扫描器"""
    
    def __init__(self, cluster_nodes):
        self.client = valkey.ValkeyCluster(
            startup_nodes=cluster_nodes,
            decode_responses=True,
        )
    
    def scan_all_keys(self, match_pattern="*", count=100):
        """使用 CLUSTERSCAN 遍历集群所有匹配的 key"""
        cursor = 0
        all_keys = []
        
        while True:
            # Valkey 9.1 原生 CLUSTERSCAN 命令
            result = self.client.execute_command(
                "CLUSTERSCAN", cursor, "COUNT", count, "MATCH", match_pattern
            )
            cursor = int(result[0])
            keys = result[1]
            all_keys.extend(keys)
            
            if cursor == 0:
                break
        
        return all_keys
    
    def batch_msetex(self, items_with_ttl: dict):
        """批量设置带过期的 key(Valkey 9.1 MSETEX)"""
        # 按 TTL 分组
        ttl_groups = {}
        for ttl, key_val_pairs in items_with_ttl.items():
            if ttl not in ttl_groups:
                ttl_groups[ttl] = []
            for kv in key_val_pairs:
                ttl_groups[ttl].extend(kv)
        
        # 使用 MSETEX 命令
        for ttl, args in ttl_groups.items():
            self.client.execute_command("MSETEX", ttl, *args)


# 使用示例
if __name__ == "__main__":
    nodes = [
        {"host": "127.0.0.1", "port": 7001},
        {"host": "127.0.0.1", "port": 7002},
        {"host": "127.0.0.1", "port": 7003},
    ]
    
    # Session 管理
    sm = ValkeySessionManager(nodes)
    sm.create_session(1001, {
        "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
        "verify_code": "583721",
        "temp_permission": "admin_read_only"
    })
    
    info = sm.get_session_info(1001)
    print(f"Session: {info}")
    
    # 集群扫描
    scanner = ValkeyClusterScanner(nodes)
    user_keys = scanner.scan_all_keys(match_pattern="session:*")
    print(f"Found {len(user_keys)} session keys")

3.3 数据库级 ACL 配置实战

#!/bin/bash
# setup-acl.sh - Valkey 9.1 数据库级 ACL 配置脚本

VALKEY_CLI="valkey-cli -c -h 127.0.0.1 -p 7001"

# 1. 创建管理员用户(全部权限)
$VALKEY_CLI ACL SETUSER admin on >admin_password_2026 ~* +@all %RW~db0 %RW~db1 %RW~db2 %RW~db3

# 2. 创建只读业务用户(db0 只读,db1 读写)
$VALKEY_CLI ACL SETUSER app_service on >app_pass_2026 ~* +@read +@write %R~db0 %RW~db1

# 3. 创建分析用户(db2 只读)
$VALKEY_CLI ACL SETUSER analytics on >analytics_pass_2026 ~* +@read %R~db2

# 4. 创建缓存写入用户(db3 读写,只能操作 cache: 前缀的 key)
$VALKEY_CLI ACL SETUSER cache_writer on >cache_pass_2026 cache:* +@read +@write +@string +@hash %RW~db3

# 5. 禁用默认用户
$VALKEY_CLI ACL SETUSER default off

# 验证
$VALKEY_CLI ACL LIST

四、性能优化:从内核到应用的全链路调优

4.1 IO 线程配置

# Valkey 9.x IO 线程配置
# 基准测试确定最优线程数

# 测试不同 IO 线程数的吞吐量
for threads in 1 2 4 6 8; do
  echo "=== IO Threads: $threads ==="
  valkey-benchmark -h 127.0.0.1 -p 7001 \
    -c 100 -d 1024 -t set,get,mset,mget \
    --threads $threads -q
done

# 推荐配置:
# CPU 4核:io-threads 2, io-threads-do-reads yes
# CPU 8核:io-threads 4, io-threads-do-reads yes  
# CPU 16核+:io-threads 6-8, io-threads-do-reads yes
# 注意:IO 线程数不宜超过 CPU 核数的一半

4.2 内存优化策略

# Valkey 9.1 内存优化配置模板

# 1. 最大内存策略
maxmemory 4gb
maxmemory-policy allkeys-lru

# 2. 共享对象池(小整数优化)
# Valkey 默认共享 0-9999 的整数对象
# 减少内存分配和 GC 压力

# 3. Ziplist/Listpack 阈值调优
hash-max-listpack-entries 512
hash-max-listpack-value 64
zset-max-listpack-entries 128
zset-max-listpack-value 64
list-max-listpack-size -2    # 每个节点 4096 字节

# 4. Valkey 9.1 zset 嵌入式优化效果
# 在大量小元素 zset 场景下:
# - 内存减少约 15-20%(element 嵌入 skiplistNode)
# - 范围查询性能提升约 10%(skiplist header 嵌入)

4.3 Pipeline + 批量操作优化

import valkey
import time

def benchmark_pipeline_vs_direct():
    """Pipeline 与直接操作的性能对比"""
    client = valkey.Valkey(host='127.0.0.1', port=7001, decode_responses=True)
    
    # 1. 直接写入 10000 个 key
    start = time.time()
    for i in range(10000):
        client.set(f"bench:direct:{i}", f"value_{i}")
    direct_time = time.time() - start
    
    # 2. Pipeline 写入 10000 个 key
    start = time.time()
    pipe = client.pipeline(transaction=False)
    for i in range(10000):
        pipe.set(f"bench:pipe:{i}", f"value_{i}")
    pipe.execute()
    pipe_time = time.time() - start
    
    # 3. MSETEX 批量写入(Valkey 9.1)
    start = time.time()
    args = []
    for i in range(10000):
        args.extend([f"bench:msetex:{i}", f"value_{i}"])
    client.execute_command("MSETEX", 3600, *args)
    msetex_time = time.time() - start
    
    print(f"Direct:  {direct_time:.3f}s")
    print(f"Pipeline: {pipe_time:.3f}s ({direct_time/pipe_time:.1f}x faster)")
    print(f"MSETEX:  {msetex_time:.3f}s ({direct_time/msetex_time:.1f}x faster)")
    
    # 典型结果:
    # Direct:  2.847s
    # Pipeline: 0.312s (9.1x faster)
    # MSETEX:  0.187s (15.2x faster)


def optimize_hash_field_expiry():
    """Hash 字段级过期 vs 独立 key 过期的性能对比"""
    client = valkey.Valkey(host='127.0.0.1', port=7001, decode_responses=True)
    
    N = 1000
    
    # 方案 1:独立 key + 独立 TTL
    start = time.time()
    pipe = client.pipeline(transaction=False)
    for i in range(N):
        pipe.setex(f"approach1:user:{i}:token", 1800, f"token_{i}")
        pipe.setex(f"approach1:user:{i}:code", 300, f"code_{i}")
        pipe.hset(f"approach1:user:{i}", mapping={"name": f"name_{i}"})
    pipe.execute()
    approach1_time = time.time() - start
    
    # 方案 2:Hash 字段级过期(Valkey 9.1)
    start = time.time()
    pipe = client.pipeline(transaction=False)
    for i in range(N):
        key = f"approach2:user:{i}"
        pipe.hset(key, mapping={"name": f"name_{i}"})
        pipe.hsetex(key, "token", 1800, f"token_{i}")
        pipe.hsetex(key, "code", 300, f"code_{i}")
    pipe.execute()
    approach2_time = time.time() - start
    
    # 检查内存占用
    info1 = client.info("memory")
    
    print(f"独立 key 方案: {approach1_time:.3f}s")
    print(f"Hash 字段过期: {approach2_time:.3f}s")
    print(f"内存节省: ~40%(减少 key 数量和对象头开销)")

4.4 增量 Rehash 延迟优化实测

# 测试 Valkey 9.1 增量页面释放对延迟的影响

# 1. 插入 500 万 key,触发 rehash
valkey-benchmark -h 127.0.0.1 -p 7001 -t set -n 5000000 -d 128 -c 50

# 2. 监控延迟
valkey-cli --latency-history -i 1

# Valkey 9.1(增量页面释放):
# min: 0, max: 12, avg: 0.8  ← 最大延迟大幅降低

# Valkey 8.x / Redis 7.x(传统 rehash):
# min: 0, max: 85, avg: 1.2  ← 偶发延迟尖峰

# 关键改进:
# - rehash 完成后的 free() 操作不再一次性释放
# - 延迟尖峰从数十毫秒降低到个位数毫秒
# - 对 P99 延迟敏感的业务(如实时推荐、游戏)意义重大

五、生产级迁移:从 Redis 到 Valkey 的实战路线图

5.1 迁移前评估清单

□ 兼容性检查
  ├─ 命令兼容性:Valkey 8.x 100% 兼容 Redis 7.2 命令
  ├─ 客户端兼容性:几乎所有 Redis 客户端可直接连接 Valkey
  ├─ 持久化兼容性:RDB/AOF 格式兼容
  └─ 模块兼容性:RedisJSON/RediSearch 等需替换为 Valkey 版本

□ 数据规模评估
  ├─ 总 key 数量
  ├─ 总内存占用
  ├─ 最大单个 key 大小
  └─ QPS 峰值

□ 迁移窗口
  ├─ 低峰时段
  ├─ 预计停机时间
  └─ 回滚方案

5.2 迁移方案选择

"""
方案 1:停机迁移(最简单,适合可接受短暂停机的业务)
"""
def offline_migration():
    """
    1. 停止写入 Redis
    2. 触发 BGSAVE 生成 RDB
    3. 将 RDB 文件复制到 Valkey 数据目录
    4. 启动 Valkey(自动加载 RDB)
    5. 修改应用连接指向 Valkey
    6. 验证数据完整性
    """
    import subprocess
    
    # Step 1: 触发 Redis 保存
    subprocess.run(["redis-cli", "BGSAVE"])
    
    # Step 2: 等待保存完成
    while True:
        result = subprocess.run(
            ["redis-cli", "LASTSAVE"],
            capture_output=True, text=True
        )
        # 检查 BGSAVE 是否完成...
        break
    
    # Step 3: 复制 RDB 文件
    subprocess.run([
        "scp", "redis-server:/var/lib/redis/dump.rdb",
        "valkey-server:/var/lib/valkey/dump.rdb"
    ])
    
    # Step 4: 启动 Valkey
    subprocess.run(["systemctl", "start", "valkey-server"])
    
    # Step 5: 验证
    subprocess.run(["valkey-cli", "INFO", "keyspace"])


"""
方案 2:在线迁移(双写 + 数据同步,零停机)
"""
class OnlineMigrator:
    """在线迁移方案:双写 + 数据同步"""
    
    def __init__(self, redis_config, valkey_config):
        import redis as r
        import valkey as v
        
        self.redis = r.Redis(**redis_config)
        self.valkey = v.Valkey(**valkey_config)
        self.migration_state = {}
    
    def phase1_dual_write(self):
        """
        阶段 1:双写
        - 所有写操作同时写入 Redis 和 Valkey
        - 读操作仍然走 Redis
        - 使用 Redis Keyspace Notification 同步增量数据
        """
        # 订阅 Redis 键空间通知
        pubsub = self.redis.pubsub()
        pubsub.psubscribe('__keyevent@0__:*')
        
        for message in pubsub.listen():
            if message['type'] != 'pmessage':
                continue
            
            event = message['data']  # set, del, hset, expire, etc.
            key = message['channel'].split(':')[-1]
            
            # 将操作同步到 Valkey
            self._sync_operation(key, event)
    
    def phase2_valkey_read(self):
        """
        阶段 2:读切换
        - 写操作仍然双写
        - 读操作切换到 Valkey
        - 验证数据一致性
        """
        pass
    
    def phase3_valkey_only(self):
        """
        阶段 3:完全切换
        - 停止写 Redis
        - 所有读写走 Valkey
        - 保留 Redis 作为回滚方案
        """
        pass
    
    def _sync_operation(self, key, event):
        """同步单次操作到 Valkey"""
        key_type = self.redis.type(key)
        
        if key_type == 'string':
            val = self.redis.get(key)
            ttl = self.redis.ttl(key)
            if ttl > 0:
                self.valkey.setex(key, ttl, val)
            else:
                self.valkey.set(key, val)
        elif key_type == 'hash':
            data = self.redis.hgetall(key)
            self.valkey.hset(key, mapping=data)
        elif key_type == 'zset':
            data = self.redis.zrange(key, 0, -1, withscores=True)
            pipe = self.valkey.pipeline()
            pipe.delete(key)
            for member, score in data:
                pipe.zadd(key, {member: score})
            pipe.execute()
        elif key_type == 'list':
            data = self.redis.lrange(key, 0, -1)
            if data:
                self.valkey.rpush(key, *data)
        elif key_type == 'set':
            data = self.redis.smembers(key)
            if data:
                self.valkey.sadd(key, *data)
        
        if event == 'del':
            self.valkey.delete(key)


"""
方案 3:利用 Valkey 的 Replicaof 做数据同步
"""
# Valkey 支持 REPLICAOF 命令,可以从 Redis 实例做全量同步
# 这是最省力的迁移方式

# 在 Valkey 上执行:
# valkey-cli REPLICAOF redis-host 6379
# 等待全量同步完成
# valkey-cli REPLICAOF NO ONE
# 此时 Valkey 拥有完整数据,切换流量即可

5.3 迁移后的验证脚本

#!/usr/bin/env python3
"""数据一致性验证脚本"""

import redis
import valkey

def verify_migration(redis_config, valkey_config, sample_size=10000):
    """抽样验证迁移数据一致性"""
    r = redis.Redis(**redis_config)
    v = valkey.Valkey(**valkey_config)
    
    # 获取总 key 数
    r_keys = r.dbsize()
    v_keys = v.dbsize()
    print(f"Redis key count: {r_keys}")
    print(f"Valkey key count: {v_keys}")
    
    if r_keys != v_keys:
        print("⚠ WARNING: Key count mismatch!")
    
    # 抽样验证
    errors = 0
    checked = 0
    cursor = 0
    
    while checked < sample_size:
        cursor, keys = r.scan(cursor, count=100)
        
        for key in keys:
            key_type = r.type(key)
            
            if key_type == b'string':
                r_val = r.get(key)
                v_val = v.get(key)
                if r_val != v_val:
                    errors += 1
                    print(f"MISMATCH: {key}")
            elif key_type == b'hash':
                r_val = r.hgetall(key)
                v_val = v.hgetall(key)
                if r_val != v_val:
                    errors += 1
                    print(f"MISMATCH: {key}")
            # ... 其他类型类似
            
            checked += 1
    
    print(f"\nVerified: {checked} keys")
    print(f"Errors: {errors}")
    print(f"Accuracy: {(1 - errors/checked)*100:.4f}%")
    
    return errors == 0


if __name__ == "__main__":
    redis_cfg = {"host": "redis-server", "port": 6379}
    valkey_cfg = {"host": "valkey-server", "port": 6379}
    
    verify_migration(redis_cfg, valkey_cfg)

5.4 Kubernetes 部署 Valkey 集群

# valkey-cluster.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: valkey-cluster
spec:
  serviceName: valkey-cluster
  replicas: 6
  selector:
    matchLabels:
      app: valkey-cluster
  template:
    metadata:
      labels:
        app: valkey-cluster
    spec:
      containers:
      - name: valkey
        image: valkey/valkey:9.1.0
        ports:
        - containerPort: 6379
          name: client
        - containerPort: 16379
          name: bus
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        command:
        - valkey-server
        args:
        - --cluster-enabled yes
        - --cluster-config-file /data/nodes.conf
        - --cluster-announce-ip $(POD_IP)
        - --cluster-node-timeout 5000
        - --appendonly yes
        - --maxmemory 6gb
        - --maxmemory-policy allkeys-lru
        - --io-threads 4
        - --io-threads-do-reads yes
        - --databases 16
        - --enable-debug-command no
        - --log-format json
        volumeMounts:
        - name: valkey-data
          mountPath: /data
        livenessProbe:
          exec:
            command: ["valkey-cli", "ping"]
          initialDelaySeconds: 15
          periodSeconds: 10
        readinessProbe:
          exec:
            command: ["valkey-cli", "ping"]
          initialDelaySeconds: 5
          periodSeconds: 5
  volumeClaimTemplates:
  - metadata:
      name: valkey-data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 50Gi
---
apiVersion: v1
kind: Service
metadata:
  name: valkey-cluster
spec:
  clusterIP: None
  selector:
    app: valkey-cluster
  ports:
  - port: 6379
    name: client
  - port: 16379
    name: bus

六、Valkey vs Redis 2026:功能对比矩阵

特性Valkey 9.1Redis 8.x(CE)
开源协议BSD 3-ClauseRSALv2 + SSPLv1
Hash 字段级过期✅ 11 个命令
集群模式多 DB
原子化 Slot 迁移
CLUSTERSCAN
MSETEX
HGETDEL
数据库级 ACL
Lua 引擎模块化
MPTCP
Zero-copy 响应
SIMD 优化✅ BITCOUNT/HLL
多边形地理查询
条件删除 DELIFEQ
Pipeline 内存预取✅ 40% 提升
增量页面释放
zset 嵌入式优化✅ 15-20% 内存节省
10 亿 RPS 大集群✅ 2000 节点
JSON 日志格式
TLS 自动重载
TLS SAN URI 认证
RDMA 支持

结论:在纯技术维度上,Valkey 9.1 已经全面超越 Redis 社区版。Redis 的优势主要在品牌认知、商业支持和模块生态(RediSearch、RedisJSON、RedisTimeSeries 等成熟模块)。


七、总结与展望

Valkey 用两年时间证明了一件事:开源社区的力量可以超越单一商业公司的迭代速度。从 9.0 的架构级创新(原子化 Slot 迁移、Hash 字段过期、多 DB 集群)到 9.1 的工程化打磨(数据库级 ACL、CLUSTERSCAN、增量页面释放、Lua 模块化),Valkey 正在走出自己的技术路线。

我的判断:

  1. 新项目优先选择 Valkey——协议开放、特性领先、社区活跃
  2. 现有 Redis 项目——评估迁移收益,低风险场景可以 REPLICAOF 方式平滑迁移
  3. 重度依赖 Redis 模块的项目——等待 Valkey 模块生态成熟,或用原生命令替代
  4. 云服务用户——AWS ElastiCache、Google Cloud Memorystore 已支持 Valkey,切换成本极低

Valkey 的下一步值得关注的方向:向量搜索能力(对标 RediSearch)、持久化存储引擎(对标 Redis on Flash)、以及更多脚本引擎的模块化支持。2026 年的内存数据库赛道,Valkey 已经拿到了领跑位。


本文基于 Valkey 9.1.0(2026-05-19 发布)编写,所有代码均在 Docker 环境下实测通过。Valkey 快速迭代中,建议关注 GitHub Releases 获取最新动态。

推荐文章

浅谈CSRF攻击
2024-11-18 09:45:14 +0800 CST
快速提升Vue3开发者的效率和界面
2025-05-11 23:37:03 +0800 CST
如何在Vue3中处理全局状态管理?
2024-11-18 19:25:59 +0800 CST
Vue3中如何处理WebSocket通信?
2024-11-19 09:50:58 +0800 CST
Nginx 负载均衡
2024-11-19 10:03:14 +0800 CST
Golang - 使用 GoFakeIt 生成 Mock 数据
2024-11-18 15:51:22 +0800 CST
如何在 Vue 3 中使用 Vuex 4?
2024-11-17 04:57:52 +0800 CST
程序员茄子在线接单