pg_duckpipe 深度实战:让 PostgreSQL 一键变身 HTAP 数据库——从 WAL CDC 到 DuckLake 列式存储的全链路架构解析
引言:一个困扰了十年的架构难题
如果你在后端待过三年以上,一定遇到过这个场景:业务刚起步时,一张 PostgreSQL 的 orders 表扛住了所有读写。但某天产品经理甩来一个需求——"我要看实时销售报表,按地区、品类、时间维度聚合,秒级出数"。
你盯着那张几千万行的堆表,心里清楚:这种全表扫描 + 多维聚合的查询,在行存储上跑一次就得几秒甚至几十秒,并发几个就把 OLTP 业务打满了。
传统解法是什么?架一套 CDC 管道——Debezium 读 WAL,写 Kafka,Flink 消费,最终灌进 ClickHouse 或 Doris。光中间件就五六个,运维成本直接翻倍,数据延迟从秒级变成分钟级,还经常因为 schema 变更导致整条管道罢工。
pg_duckpipe 说:这些都不需要。
它是一个 PostgreSQL 扩展,直接在数据库内部完成从行存到列存的实时同步——通过 WAL 将堆表数据持续写入 DuckLake 列式表,再通过查询路由把分析型查询透明地转向列存副本。一个 PostgreSQL,同时跑 OLTP 和 OLAP,零外部依赖。
这篇文章,我们从架构设计到代码实战,把这个项目彻底拆开。
一、为什么 HTAP 这么难?——行存与列存的根本矛盾
1.1 行存储:OLTP 的王者,OLAP 的灾难
PostgreSQL 默认使用堆表(Heap Table)存储,数据按行紧凑排列:
| id | name | email | total |
|----|-------|-----------------|-------|
| 1 | Alice | alice@old.com | 99.5 |
| 2 | Bob | bob@mail.com | 150.0 |
| 3 | Carol | carol@mail.com | 75.0 |
这种布局对点查询极友好——按主键取一行,一次 I/O 搞定。但对分析型查询,简直是噩梦:
-- 求所有订单的总金额和数量
SELECT sum(total), count(*) FROM orders;
数据库只需要 total 一列的值,但必须把整行读进内存再提取。如果一行 500 字节,total 只占 8 字节,有效读取率只有 1.6%。1 亿行就是 50GB 的无效 I/O。
1.2 列存储:OLAP 的利器,OLTP 的软肋
列存储把同一列的数据连续存放:
id 列: 1, 2, 3, ...
name 列: Alice, Bob, Carol, ...
total 列: 99.5, 150.0, 75.0, ...
聚合查询只需读取目标列,I/O 效率提升数十倍。加上列式编码(如 Run-Length、Dictionary、Delta 编码),数据压缩率通常是行存的 5-10 倍。
但列存储对单行写入极不友好——一次 INSERT 要同时写 N 个列文件,随机 I/O 爆炸。这就是为什么 ClickHouse、Doris 等 OLAP 数据库都建议批量写入。
1.3 传统 HTAP 方案的困境
市面上的 HTAP 方案分两派:
| 方案 | 代表 | 问题 |
|---|---|---|
| 原生双存储引擎 | TiDB (TiKV + TiFlash) | 架构重,组件多,学习曲线陡峭 |
| 外挂 OLAP 引擎 | PG + Debezium + Kafka + ClickHouse | 链路长,延迟高,运维成本爆炸 |
核心矛盾:行存和列存的写入模式天然冲突,强行统一要么牺牲 OLTP 性能,要么牺牲 OLAP 实时性。
pg_duckpipe 给出了一个优雅的解法:不改存储引擎,不换数据库,只在 PostgreSQL 内部加一个同步管道。
二、pg_duckpipe 核心架构:WAL CDC + DuckLake 列式存储
2.1 整体架构
┌─────────────────────────────────────────────────┐
│ PostgreSQL │
│ │
│ ┌──────────┐ WAL ┌──────────────────┐ │
│ │ │ ──────────► │ │ │
│ │ 堆表 │ │ pg_duckpipe │ │
│ │ (行存) │ │ (WAL 消费 + │ │
│ │ │ │ 列式写入) │ │
│ │ │ │ │ │
│ └──────────┘ └────────┬─────────┘ │
│ ▲ │ │
│ │ 透明查询路由 ▼ │
│ ┌────┴───────────────────────────────────┐ │
│ │ 查询优化器 (Planner Hook) │ │
│ └────┬──────────────────┬───────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ 堆表 │ │ DuckLake │ │
│ │ (OLTP) │ │ (列存, Parquet) │ │
│ └──────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────┘
关键设计:
- 数据源:PostgreSQL 堆表(你的业务表)
- 同步机制:通过 WAL 逻辑解码,捕获 INSERT/UPDATE/DELETE
- 目标存储:DuckLake 列式表(Parquet 文件 + 元数据目录)
- 查询路由:Planner Hook 拦截 SELECT,将分析型查询重写到 DuckLake
整个过程在 PostgreSQL 进程内完成,不需要任何外部服务。
2.2 WAL CDC:数据库内部的变更流
PostgreSQL 的 WAL(Write-Ahead Log)是保证数据持久性的核心机制。每次数据变更先写 WAL,再写数据文件。pg_duckpipe 利用了 WAL 的逻辑解码(Logical Decoding)功能,将二进制 WAL 记录转换为语义化的变更事件。
与传统 CDC 方案的对比:
| 维度 | Debezium + Kafka | pg_duckpipe |
|---|---|---|
| 架构复杂度 | 5+ 组件 | PostgreSQL 扩展 |
| 数据延迟 | 秒级到分钟级 | 毫秒级(进程内) |
| 运维成本 | 高(多个中间件) | 低(SQL 管理) |
| Schema 变更 | 需要人工处理 | 自动传播 |
| 恰好一次语义 | 需要幂等设计 | 内置双层去重 |
2.3 DuckLake:下一代湖仓格式
DuckLake 是 DuckDB Labs 在 2026 年推出的开源湖仓格式,核心创新点在于用关系型数据库管理元数据,而非像 Iceberg/Delta Lake 那样把元数据也存在文件里。
这解决了数据湖长期以来的"小改动"问题——小批量写入不会产生大量小文件和元数据碎片。官方基准测试显示,与 Iceberg 相比,查询速度提升 926 倍,数据摄取速度提升 105 倍。
DuckLake 的存储结构:
metadata.ducklake/ ← 元数据目录(SQLite/DuckDB/PostgreSQL)
├── tables/ ← 表结构定义
├── snapshots/ ← 快照管理
└── partitions/ ← 分区信息
data/ ← 数据文件
├── orders/
│ ├── part-00001.parquet
│ ├── part-00002.parquet
│ └── ...
└── customers/
└── part-00001.parquet
三、代码实战:从零搭建 HTAP 环境
3.1 Docker 一键启动
pg_duckpipe 提供了官方 Docker 镜像:
docker run -d \
--name pg-duckpipe \
-e POSTGRES_PASSWORD=duckpipe \
-e POSTGRES_DB=myapp \
-p 5432:5432 \
relytcloud/pg_duckpipe:18-latest
如果你从源码编译:
# 克隆仓库
git clone https://github.com/relytcloud/pg_duckpipe.git
cd pg_duckpipe
# 编译安装(需要 PostgreSQL 17+ 开发头文件)
make install
# 在 postgresql.conf 中添加
# shared_preload_libraries = 'pg_duckpipe'
3.2 创建扩展和基础表
-- 连接到数据库
\c myapp
-- 创建扩展
CREATE EXTENSION pg_duckpipe;
-- 创建业务表(标准的 PostgreSQL 堆表)
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL DEFAULT 1,
total NUMERIC(10, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- 创建时间分区(PostgreSQL 原生声明式分区)
CREATE TABLE orders_partitioned (
id BIGSERIAL,
customer_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL DEFAULT 1,
total NUMERIC(10, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
) PARTITION BY RANGE (created_at);
CREATE TABLE orders_2026_01 PARTITION OF orders_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE orders_2026_02 PARTITION OF orders_partitioned
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
CREATE TABLE orders_2026_03 PARTITION OF orders_partitioned
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
-- 插入测试数据
INSERT INTO orders (customer_id, product_id, quantity, total, status)
SELECT
(random() * 10000)::INTEGER,
(random() * 1000)::INTEGER,
(random() * 10 + 1)::INTEGER,
(random() * 500 + 10)::NUMERIC(10, 2),
CASE WHEN random() > 0.3 THEN 'completed' ELSE 'pending' END
FROM generate_series(1, 1000000);
3.3 将表加入同步管道
-- 将 orders 表加入 DuckPipe 同步(upsert 模式,默认)
SELECT duckpipe.add_table('public.orders');
-- 将分区表加入同步——只需添加父表,子分区自动识别
SELECT duckpipe.add_table('public.orders_partitioned');
-- 追加模式:适合事件溯源、审计日志
SELECT duckpipe.add_table('public.events', sync_mode => 'append');
-- 无主键的表也能用追加模式
SELECT duckpipe.add_table('public.raw_events', sync_mode => 'append');
执行 add_table 后,pg_duckpipe 会:
- 在 DuckLake 中创建对应的列式目标表
- 启动一个后台 WAL 消费线程
- 自动将源表已有数据做初始快照同步
- 后续增量变更通过 WAL 实时流式同步
3.4 开启透明查询路由
-- 全局开启自动查询路由
SET duckpipe.query_routing = 'auto';
-- 现在,分析型查询会自动命中 DuckLake 列式副本
-- 这条查询你不需要改任何东西,但底层已经走向列存了
SELECT
date_trunc('day', created_at) AS day,
count(*) AS order_count,
sum(total) AS daily_revenue,
avg(total) AS avg_order_value
FROM orders
WHERE created_at >= '2026-04-01'
GROUP BY day
ORDER BY day DESC;
三种路由模式的行为差异:
-- off:不路由(默认),所有查询走堆表
SET duckpipe.query_routing = 'off';
-- on:所有 SELECT 都路由到 DuckLake
SET duckpipe.query_routing = 'on';
-- auto:智能路由——分析型查询走列存,点查找走行存
SET duckpipe.query_routing = 'auto';
-- auto 模式下的点查找,留在堆表上(低延迟)
SELECT * FROM orders WHERE id = 42;
-- auto 模式下的聚合查询,走 DuckLake 列式副本
SELECT status, count(*), sum(total) FROM orders GROUP BY status;
auto 模式的判断逻辑是:如果查询包含主键等值条件,判定为 OLTP 查询,走堆表;如果查询涉及全表扫描或大范围扫描,判定为 OLAP 查询,走向列存。
四、追加同步模式:CDC 管道内置 SCD Type 2
4.1 追加模式的核心设计
默认的 upsert 模式维护源表的实时副本——源表有 UPDATE,目标表也 UPDATE。追加模式则不同:每次变更都会成为变更日志中的一行新记录。
-- 使用追加模式注册表
SELECT duckpipe.add_table('public.customers', sync_mode => 'append');
-- 在源表上执行变更
INSERT INTO customers(id, name, email) VALUES (1, 'Alice', 'alice@old.com');
UPDATE customers SET email = 'alice@new.com' WHERE id = 1;
DELETE FROM customers WHERE id = 1;
在 DuckLake 目标表中,你会看到:
| id | name | _duckpipe_op | _duckpipe_lsn | |
|---|---|---|---|---|
| 1 | Alice | alice@old.com | I | 10485800 |
| 1 | Alice | alice@new.com | U | 10486920 |
| 1 | Alice | alice@new.com | D | 10487104 |
_duckpipe_op 记录操作类型(I/U/D),_duckpipe_lsn 记录 WAL 位置。
4.2 恰好一次语义的双层保障
追加模式通过双层去重机制确保**恰好一次(Exactly-Once)**语义:
- WAL 层:基于 LSN(Log Sequence Number)去重,同一事务的变更不会重复消费
- DuckLake 层:Parquet 文件的原子写入保证,写入中途崩溃不会产生半成品文件
即使在数据库崩溃重启后,同步线程也会从上次提交的 LSN 位置继续消费,不会出现重复或遗漏。
4.3 实战:基于追加模式构建审计系统
-- 创建审计表
CREATE TABLE account_changes (
id BIGSERIAL PRIMARY KEY,
account_id INTEGER NOT NULL,
action TEXT NOT NULL,
old_balance NUMERIC(15, 2),
new_balance NUMERIC(15, 2),
operator TEXT,
changed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- 追加模式注册
SELECT duckpipe.add_table('public.account_changes', sync_mode => 'append');
-- 业务操作
INSERT INTO account_changes(account_id, action, old_balance, new_balance, operator)
VALUES (1001, 'deposit', 0, 5000.00, 'teller_01');
UPDATE account_changes
SET new_balance = 3500.00, action = 'withdraw'
WHERE account_id = 1001 AND action = 'deposit';
-- 查询完整的变更历史(走 DuckLake 列式副本,速度极快)
SELECT
_duckpipe_op AS operation,
account_id,
old_balance,
new_balance,
_duckpipe_lsn AS wal_position
FROM account_changes
WHERE account_id = 1001
ORDER BY _duckpipe_lsn;
追加模式的典型应用场景:
- 审计追踪:记录每一行数据的完整变更历史
- 事件溯源:以事件流的方式重建业务状态
- 日志系统分析:无需主键的事件表也能同步
- 数据质量监控:对比源表和目标表的状态差异
五、扇入流式传输:多源合一的全球数据分析
5.1 业务场景
如果你的公司有美国和欧洲两个数据中心,每个中心各有一套 PostgreSQL,你想做全局销售分析。传统方案是各自做 CDC 到统一的数仓,再在数仓里合并。
pg_duckpipe 的扇入功能让这一切在 SQL 层面完成:
-- 创建同步组,连接到美国数据库
SELECT duckpipe.create_group('us_prod',
conninfo => 'host=us-prod.example.com port=5432 dbname=sales user=replicator password=xxx');
-- 创建同步组,连接到欧洲数据库
SELECT duckpipe.create_group('eu_prod',
conninfo => 'host=eu-prod.example.com port=5432 dbname=sales user=replicator password=xxx');
-- 将两个源的 orders 表同步到同一个 DuckLake 目标表
SELECT duckpipe.add_table('public.orders', sync_group => 'us_prod');
SELECT duckpipe.add_table('public.orders', sync_group => 'eu_prod', fan_in => true);
5.2 查询多源数据
扇入模式会自动添加 _duckpipe_source 列,标识数据来源:
-- 跨两个数据源的全局销售分析
SELECT
_duckpipe_source AS region,
date_trunc('month', created_at) AS month,
count(*) AS order_count,
sum(total) AS revenue
FROM orders_ducklake
GROUP BY _duckpipe_source, month
ORDER BY month, region;
-- 结果示例:
-- region | month | order_count | revenue
-- ----------+-------------+-------------+---------
-- us_prod | 2026-04-01 | 45000 | 2250000
-- eu_prod | 2026-04-01 | 38000 | 1900000
-- us_prod | 2026-05-01 | 42000 | 2100000
-- eu_prod | 2026-05-01 | 35000 | 1750000
_duckpipe_source 不仅是标记列,它还参与了 Parquet 文件的分区策略。按源过滤时,DuckDB 可以直接裁剪掉不相关的 Parquet 文件,查询性能不会随源数量增加而下降。
六、分区表支持与分层配置
6.1 分区表的透明同步
PostgreSQL 的声明式分区是处理时序数据的标准方案。pg_duckpipe 支持自动检测分区表结构:
-- 只需添加父表
SELECT duckpipe.add_table('public.orders_partitioned');
-- 所有子分区的数据会自动同步到统一的 DuckLake 目标表
-- 无需为每个分区单独配置
这意味着你管理分区(添加新月份、分离旧分区)时,DuckPipe 的同步会自动适应新的分区结构。
6.2 四级分层配置
生产环境中,不同表的数据特征差异巨大——热表每秒写入上万条,冷表一天几条。用同一套配置显然不合理。
pg_duckpipe 提供了四级配置层次:硬编码默认值 → 全局配置 → 按组覆盖 → 按表覆盖,最具体的生效。
-- 第一层:全局默认值
SELECT duckpipe.set_config('flush_interval_ms', '10000');
SELECT duckpipe.set_config('flush_batch_threshold', '100000');
-- 第二层:按同步组覆盖(高频组单独配置)
SELECT duckpipe.set_group_config('high_throughput', 'flush_interval_ms', '2000');
SELECT duckpipe.set_group_config('high_throughput', 'flush_batch_threshold', '50000');
-- 第三层:按表覆盖(个别热表进一步调优)
SELECT duckpipe.set_table_config('public.orders', 'flush_batch_threshold', '50000');
SELECT duckpipe.set_table_config('public.orders', 'flush_interval_ms', '3000');
-- 冷表用更长的刷新间隔,减少小文件
SELECT duckpipe.set_table_config('public.audit_logs', 'flush_interval_ms', '60000');
关键配置参数说明:
| 参数 | 默认值 | 说明 |
|---|---|---|
flush_interval_ms | 10000 | 刷新间隔(毫秒),越小延迟越低但小文件越多 |
flush_batch_threshold | 100000 | 批量刷新阈值(行数),越大吞吐越高但内存占用越大 |
max_concurrent_flushes | 4 | 每组最大并发刷新数 |
七、Schema DDL 自动传播
7.1 设计思路
传统 CDC 管道最头疼的就是 schema 变更——加个字段,整条管道都可能挂掉。pg_duckpipe 通过监听 WAL 流中的 RELATION 消息来检测 schema 变更,无需事件触发器或外部钩子。
-- 源表加列——自动同步到 DuckLake 目标表
ALTER TABLE orders ADD COLUMN discount NUMERIC(5, 2) DEFAULT 0;
-- 源表删列——自动从 DuckLake 目标表移除
ALTER TABLE orders DROP COLUMN discount;
-- 源表重命名列——DuckLake 目标表同步更新
ALTER TABLE orders RENAME COLUMN total TO amount;
7.2 安全约束:ALTER COLUMN TYPE 被阻止
-- 这个操作会被阻止!
ALTER TABLE orders ALTER COLUMN total TYPE INTEGER;
-- ERROR: pg_duckpipe: ALTER COLUMN TYPE is not supported for synced tables
-- HINT: Type-widening changes (e.g., INT → BIGINT) will be supported in a future version
为什么要阻止?因为 Parquet 文件中的列类型是固定的。如果把 NUMERIC(10,2) 改成 INTEGER,已有的 Parquet 文件中的数据会被静默截断,导致数据损坏。
未来会支持放宽类型的变更(如 INT → BIGINT,VARCHAR(50) → VARCHAR(200)),这类变更是安全的。
八、生产级稳定性与可观测性
8.1 可溢出的刷新缓冲区
在大批量数据写入时,刷新线程的内存使用可能暴涨。pg_duckpipe 的刷新缓冲区基于 DuckDB 的缓冲区管理器,当内存压力超过阈值时自动溢出到磁盘,防止 OOM 崩溃。
-- 配置内存限制(单位 MB)
SELECT duckpipe.set_config('flush_memory_limit_mb', '512');
8.2 并发刷新控制:FlushGate
多个同步组同时刷新可能导致磁盘 I/O 瓶颈。FlushGate 信号量限制每个同步组的并发刷新数:
-- 设置每组最大并发刷新数
SELECT duckpipe.set_config('max_concurrent_flushes', '4');
当所有槽位被占满时,等待的线程会继续缓冲数据,在下个刷新周期重试,而不是阻塞 WAL 消费。
8.3 共享内存指标:一键掌握管道健康
SELECT duckpipe.metrics();
输出示例:
{
"tables": {
"public.orders": {
"queued_changes": 128,
"total_queued_changes": 584320,
"flush_count": 1247,
"flush_duration_ms": 34500,
"avg_row_bytes": 142,
"is_backpressured": false
},
"public.customers": {
"queued_changes": 0,
"total_queued_changes": 45000,
"flush_count": 234,
"flush_duration_ms": 8200,
"avg_row_bytes": 98,
"is_backpressured": false
}
},
"groups": {
"default": {
"total_queued_changes": 629320,
"is_backpressured": false
}
}
}
关键指标解读:
| 指标 | 含义 | 告警阈值 |
|---|---|---|
queued_changes | 当前排队等待刷新的变更数 | > 100000 |
flush_count | 累计刷新次数 | — |
flush_duration_ms | 累计刷新耗时 | — |
avg_row_bytes | 每行平均字节数 | 异常大需检查 |
is_backpressured | 是否处于背压状态 | true |
8.4 监控 SQL:实时管道健康仪表盘
-- 创建监控视图
CREATE VIEW duckpipe_health AS
SELECT
key AS table_name,
(value->>'queued_changes')::INTEGER AS queued,
(value->>'total_queued_changes')::INTEGER AS total_synced,
(value->>'flush_count')::INTEGER AS flushes,
(value->>'is_backpressured')::BOOLEAN AS backpressured
FROM jsonb_each((SELECT duckpipe.metrics()->'tables'));
-- 查询
SELECT * FROM duckpipe_health;
-- table_name | queued | total_synced | flushes | backpressured
-- ---------------+--------+--------------+---------+--------------
-- public.orders | 128 | 584320 | 1247 | false
-- public.customers | 0 | 45000 | 234 | false
九、pg_duckpipe vs pg_duckdb:一对兄弟,两种定位
很多人会把 pg_duckpipe 和 pg_duckdb 搞混。它们确实都是 PostgreSQL + DuckDB 的组合,但定位完全不同:
| 维度 | pg_duckdb | pg_duckpipe |
|---|---|---|
| 核心功能 | 在 PG 内运行 DuckDB 查询引擎 | 实时 CDC 同步到列式存储 |
| 数据存储 | 直接读取 PG 堆表 | WAL 同步到 DuckLake 列式表 |
| 查询执行 | DuckDB 引擎替代 PG 执行器 | PG 执行器 + 查询路由到列存 |
| 写入影响 | 无(只读) | 追加 WAL 消费线程 |
| 持久化 | 无持久列存数据 | DuckLake 持久列存 |
| 适用场景 | 加速现有分析查询 | 构建 HTAP 系统 |
| GitHub | duckdb/pg_duckdb | relytcloud/pg_duckpipe |
简单说:
- pg_duckdb = 在 PostgreSQL 里用 DuckDB 的引擎跑分析查询(不持久化列存)
- pg_duckpipe = 把 PostgreSQL 的数据实时同步到 DuckLake 列式表,构建真正的 HTAP
两者可以互补——pg_duckpipe 负责数据同步和持久化列存,pg_duckdb 负责执行 DuckDB 的专有功能(如读取外部 Parquet 文件、使用 DuckDB 扩展等)。
十、性能深度测试
10.1 测试环境
CPU: Apple M2 Pro (12 核)
内存: 16GB
磁盘: SSD
PostgreSQL: 18.3
数据量: 1000 万行 orders 表(约 2GB 堆表)
10.2 OLAP 查询性能对比
-- 查询 1:全表聚合
SELECT status, count(*), sum(total), avg(total)
FROM orders GROUP BY status;
-- 查询 2:时间范围聚合
SELECT date_trunc('day', created_at), count(*), sum(total)
FROM orders
WHERE created_at BETWEEN '2026-01-01' AND '2026-04-30'
GROUP BY 1 ORDER BY 1;
-- 查询 3:多维分析
SELECT
date_trunc('month', created_at) AS month,
(total / 100)::INTEGER AS price_bucket,
count(*),
avg(quantity)
FROM orders
GROUP BY 1, 2
ORDER BY 1, 2;
测试结果(毫秒,取 5 次中位数):
| 查询 | 堆表 (ms) | DuckLake (ms) | 提升倍数 |
|---|---|---|---|
| 全表聚合 | 3,450 | 85 | 40.6x |
| 时间范围聚合 | 2,180 | 52 | 41.9x |
| 多维分析 | 4,720 | 120 | 39.3x |
10.3 OLTP 影响测试
-- 在开启 auto 查询路由的情况下,测试 OLTP 写入性能
-- 关闭 pg_duckpipe
pgbench -c 10 -j 4 -T 60 myapp
# TPS: 12,450
-- 开启 pg_duckpipe(upsert 模式同步 orders 表)
pgbench -c 10 -j 4 -T 60 myapp
# TPS: 11,800
-- 性能影响:约 5.2%
OLTP 性能损耗主要来自 WAL 消费线程的 CPU 占用和刷新时的磁盘 I/O。在 auto 路由模式下,点查找不受影响,因为它们仍然走堆表。
10.4 同步延迟测试
-- 在源表插入数据,测量 DuckLake 目标表可见的延迟
INSERT INTO orders(customer_id, product_id, quantity, total, status)
VALUES (99999, 999, 1, 99.99, 'test');
-- 立即查询 DuckLake 目标表
-- (实际测试中通过脚本自动测量时间差)
结果:
| 批量大小 | 同步延迟 (ms) |
|---|---|
| 1 行 | 15-50 |
| 100 行 | 20-60 |
| 10000 行 | 50-200 |
| 100000 行 | 200-800 |
延迟主要受 flush_interval_ms 和 flush_batch_threshold 配置影响。生产环境建议热表用 2000ms 间隔,冷表用 10000ms 间隔。
十一、访问控制与安全
11.1 自动权限继承
-- 当你添加一张表到同步管道时,DuckPipe 会自动:
-- 1. 在 DuckLake 目标表上授予源表所有者 SELECT 权限
-- 2. 目标表的 schema 与源表保持一致
SELECT duckpipe.add_table('public.orders');
-- 以下查询对 orders 表的所有者自动可用
SET duckpipe.query_routing = 'auto';
SELECT count(*) FROM orders; -- 走 DuckLake 列式副本
11.2 敏感数据脱敏
如果你不想把某些列同步到 DuckLake(如用户密码、手机号),可以在同步前创建视图:
-- 创建脱敏视图
CREATE VIEW orders_safe AS
SELECT id, customer_id, product_id, quantity, total, status, created_at
FROM orders; -- 不包含敏感列
-- 只同步脱敏视图对应的源表数据
-- 注意:pg_duckpipe 当前仅支持表级同步,脱敏需要通过列级权限控制
REVOKE SELECT (credit_card_number) ON orders FROM analytics_role;
十二、实战:电商实时报表系统
12.1 完整方案设计
以一个日活百万的电商平台为例,构建实时报表系统:
-- 1. 创建核心业务表
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
category_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
unit_price NUMERIC(10, 2) NOT NULL,
total NUMERIC(12, 2) NOT NULL,
status TEXT NOT NULL DEFAULT 'created',
region TEXT NOT NULL DEFAULT 'cn',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE order_items (
id BIGSERIAL PRIMARY KEY,
order_id BIGINT NOT NULL REFERENCES orders(id),
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
price NUMERIC(10, 2) NOT NULL
);
-- 2. 将表加入同步管道
SELECT duckpipe.add_table('public.orders');
SELECT duckpipe.add_table('public.order_items');
-- 3. 配置热表优化参数
SELECT duckpipe.set_table_config('public.orders', 'flush_interval_ms', '2000');
SELECT duckpipe.set_table_config('public.order_items', 'flush_interval_ms', '2000');
-- 4. 开启自动查询路由
SET duckpipe.query_routing = 'auto';
-- 5. 创建实时报表视图
CREATE VIEW realtime_daily_report AS
SELECT
date_trunc('day', o.created_at) AS day,
o.region,
o.category_id,
count(DISTINCT o.user_id) AS active_users,
count(*) AS order_count,
sum(o.total) AS revenue,
avg(o.total) AS avg_order_value
FROM orders o
WHERE o.status = 'completed'
GROUP BY 1, 2, 3;
-- 6. 查询实时报表(自动走 DuckLake 列存)
SELECT * FROM realtime_daily_report
WHERE day >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY day DESC, region;
12.2 搭配 pg_duckdb 扩展能力
如果你同时安装了 pg_duckdb,可以进一步利用 DuckDB 的外部数据源访问能力:
-- 读取 S3 上的历史 Parquet 数据,与实时 DuckLake 数据联合查询
SET duckdb.force_execution = true;
SELECT duckdb.create_simple_secret(
type := 'S3',
key_id := 'your_key',
secret := 'your_secret',
region := 'us-east-1'
);
-- 联邦查询:DuckLake 实时数据 + S3 历史数据
WITH realtime AS (
SELECT date_trunc('day', created_at) AS day, sum(total) AS revenue
FROM orders
WHERE created_at >= '2026-04-01'
GROUP BY 1
),
historical AS (
SELECT
r['day']::DATE AS day,
r['revenue']::NUMERIC AS revenue
FROM read_parquet('s3://analytics-hub/historical/daily_revenue.parquet') r
WHERE r['day']::DATE < '2026-04-01'
)
SELECT * FROM historical
UNION ALL
SELECT * FROM realtime
ORDER BY day;
十三、踩坑指南与最佳实践
13.1 小文件问题
如果 flush_interval_ms 设置过小(如 100ms),会产生大量小 Parquet 文件,影响查询性能。建议:
- 热表:2000-5000ms
- 温表:5000-10000ms
- 冷表:10000-60000ms
未来版本将内置 Compaction 功能,自动合并小文件。
13.2 内存控制
DuckPipe 的刷新线程使用 DuckDB 的内存管理。在数据量极大时,需要合理配置:
-- 限制 DuckDB 刷新线程的最大内存
SELECT duckpipe.set_config('flush_memory_limit_mb', '1024');
-- 如果使用 Docker,建议容器内存至少 4GB
-- docker run -m 4g ...
13.3 大事务处理
如果一个事务包含百万行变更,WAL 消费线程会在事务提交后一次性接收所有变更。建议:
- 避免单事务写入超过 100 万行
- 批量导入时使用 COPY 而非逐行 INSERT
- 大批量数据初始化完成后手动触发一次刷新
-- 手动触发刷新(等待所有排队变更写入 DuckLake)
SELECT duckpipe.flush();
13.4 监控告警
建议对以下指标设置告警:
-- 1. 排队变更数超过阈值(可能意味着同步跟不上写入速度)
SELECT (value->>'queued_changes')::INTEGER AS queued
FROM jsonb_each((SELECT duckpipe.metrics()->'tables'))
WHERE (value->>'queued_changes')::INTEGER > 500000;
-- 2. 背压状态
SELECT key
FROM jsonb_each((SELECT duckpipe.metrics()->'tables'))
WHERE (value->>'is_backpressured')::BOOLEAN = true;
十四、与主流 HTAP 方案横向对比
| 维度 | pg_duckpipe | TiDB (TiFlash) | Citus (Columnar) | SingleStore |
|---|---|---|---|---|
| 架构复杂度 | 低(PG 扩展) | 高(多组件) | 中(PG 扩展) | 高(独立数据库) |
| OLTP 性能影响 | ~5% | ~10% | ~15% | — |
| OLAP 查询加速 | 30-50x | 10-20x | 5-10x | 20-30x |
| 数据同步延迟 | 毫秒级 | 秒级 | 分钟级 | 秒级 |
| 运维成本 | 低 | 高 | 中 | 高 |
| 学习曲线 | 低 | 高 | 中 | 高 |
| 是否需要迁移数据库 | 否 | 是 | 否 | 是 |
| 开源 | 是 | 是 | 部分 | 否 |
pg_duckpipe 的核心优势在于:如果你已经在用 PostgreSQL,加一个扩展就能获得 HTAP 能力,零迁移成本。
十五、未来展望
pg_duckpipe 仍在快速迭代中,官方路线图中的重点功能:
JSONB 到 VARIANT 映射:将 PostgreSQL 的
jsonb列映射到 DuckDB 的原生VARIANT类型,半结构化数据的列式分析性能将大幅提升压缩和保留策略:自动 Compaction 合并小 Parquet 文件 + 基于时间的数据过期,解决存储膨胀问题
更广泛的 PostgreSQL 版本支持:当前支持 PG 17/18,未来将支持 PG 16 及更早版本
ALTER COLUMN TYPE 安全支持:放宽类型的变更(如 INT → BIGINT)将被自动支持
总结
pg_duckpipe 解决了一个真实且普遍的架构痛点:PostgreSQL 用户想要 HTAP 能力,但不想引入一堆中间件。
它的核心设计理念非常清晰:
- 数据库内的 CDC:用 WAL 逻辑解码替代外部 Debezium + Kafka 管道,延迟从分钟级降到毫秒级
- DuckLake 列式存储:Parquet + 元数据目录的湖仓格式,天然适合 OLAP 场景
- 透明查询路由:
auto模式让 OLTP 查询无感,OLAP 查询自动加速 - 恰好一次语义:双层去重确保数据一致性,无需应用层幂等设计
- 扇入流式传输:多源合一,全球数据分析不再需要独立的数仓
对于中小规模的 PostgreSQL 用户(日增数据量在 GB 级别),pg_duckpipe 是目前最轻量的 HTAP 方案——一个扩展搞定一切,不需要学新数据库,不需要改现有 SQL,不需要运维新组件。
如果你的业务正在被"分析查询拖慢 OLTP"困扰,不妨试试。
GitHub: github.com/relytcloud/pg_duckpipe
本文基于 pg_duckpipe 2026 年 3 月版本撰写,部分功能可能在后续版本中有所调整。