编程 Pathway 深度实战:55K Star 的 Python 实时 ETL 框架——从差分数据流原理到 RAG 管道生产部署的全链路架构解析

2026-05-07 06:38:15 +0800 CST views 14

Pathway 深度实战:55K Star 的 Python 实时 ETL 框架——从差分数据流原理到 RAG 管道生产部署的全链路架构解析

当 Flink 还在为 Python API 的性能损耗头疼时,Pathway 已经用 Rust 引擎把差分数据流玩出了花。一个 Python 写代码、Rust 跑引擎的实时 ETL 框架,凭什么能在 GitHub 上拿下 55K+ Star?本文从差分数据流的数学基础开始,逐层拆解 Pathway 的架构设计、增量计算引擎、状态管理与一致性保证,最后带你手撸一个生产级 RAG 管道。

一、为什么需要又一个流处理框架?

如果你做过实时数据处理,大概率踩过这些坑:

  • Flink:Java/Scala 生态,Python API(PyFlink)性能拉胯,序列化开销大到让人怀疑人生
  • Spark Streaming:微批处理本质,延迟下限被批次间隔锁死
  • Kafka Streams:JVM only,Python 开发者只能望洋兴叹
  • 自研方案:Kafka + Python consumer + Redis + 定时任务……维护噩梦

核心矛盾就一个:Python 开发者想要 Flink 级别的流处理能力,但不想写 Java。

Pathway 的答案很直接:Python API 写业务逻辑,Rust 引擎跑计算。你不是要 Python 的开发效率吗?给你。你不是要低延迟的流处理吗?Rust 引擎给你兜底。

这不是简单的语言绑定,而是一个从头设计的差分数据流引擎。Python 只是用户界面,Rust 才是灵魂。

1.1 Pathway 的核心定位

Pathway 把自己定义为 "Python ETL framework for stream processing, real-time analytics, LLM pipelines, and RAG"。拆开来看:

维度Pathway 的做法
编程模型声明式 Python API,像写 Pandas 一样写流处理
执行引擎Rust 实现的 Differential Dataflow 引擎
计算模式增量计算(Incremental Computation),只处理变化的数据
流批一体同一份代码,批处理和流处理零修改切换
AI 集成内置 LLM 管道和 RAG 支持,向量索引开箱即用
部署模式单机 Docker → Kubernetes 分布式,线性扩展

一句话总结:用写批处理的方式写流处理,用 Python 的语法享受 Rust 的性能。

二、差分数据流:Pathway 的数学灵魂

要理解 Pathway 为什么快,必须先理解 Differential Dataflow(差分数据流)。这是 MIT Frank McSherry 教授在 2013 年提出的计算模型,也是 Pathway 引擎的数学基础。

2.1 从全量计算到增量计算

传统流处理的思路是:数据来了就重新算一遍。比如你有一个窗口求和,每来一条新数据,就遍历窗口内所有数据重新求和。

差分数据流的思路完全不同:不重新算,只算变化量。

数学表达:

全量计算:  Output = f(Input)
增量计算:  ΔOutput = f(Input + ΔInput) - f(Input)
最终结果:  NewOutput = Output + ΔOutput

这意味着:

  • 窗口求和:新结果 = 旧结果 + 新数据 - 过期数据,O(1) 复杂度
  • 窗口计数:新结果 = 旧结果 + 1(新来)- 1(过期),O(1) 复杂度
  • Group By 聚合:只重算受影响的分组,O(受影响的分组) 复杂度

2.2 差分数据流的三要素

差分数据流中的每条数据由三元组 (data, time, diff) 表示:

  • data:数据本身(比如一条用户记录)
  • time:逻辑时间戳(事件时间或处理时间)
  • diff:差分值(+1 表示插入,-1 表示撤回)

这个设计精妙在哪儿?撤回机制。在流处理中,晚到的数据可能导致之前的结果不正确。差分数据流不是"覆盖旧结果",而是"发出一条 diff=-1 的撤回记录,再发出一条 diff=+1 的新记录"。

# 传统方式:覆盖
result = 100  # 旧结果
result = 105  # 新结果直接覆盖

# 差分数据流:撤回 + 重发
# 旧结果: (user_123, sum=100, time=t1, diff=+1)
# 撤回:   (user_123, sum=100, time=t2, diff=-1)  # 撤回旧值
# 新结果: (user_123, sum=105, time=t2, diff=+1)  # 发出新值
# 最终:   100 * (-1) + 105 * (+1) = 105 ✓

Flink 的增量计算主要靠 Keyed State + Window Trigger,本质上还是"攒一批再算"。而 Pathway 的 Differential Dataflow 是在算子图层面做增量——每个算子都理解差分语义,能精确知道哪些中间结果需要更新

特性FlinkPathway
增量粒度算子级(需要手动设计)数据级(自动差分)
撤回机制需要 retract 机制(SQL 模式)内建 diff 三元组
多表 Join 增量需要维护双端状态差分传播自动处理
乱序数据处理Watermark + AllowedLateness差分时间域自动合并

这不是"Pathway 比 Flink 好"的意思——Flink 在超大规模集群调度、Exactly-Once 语义上更成熟。但在 单机到中小规模集群的实时 ETL 场景,Pathway 的差分数据流模型确实更优雅、更高效。

三、架构全貌:Python 是皮,Rust 是骨

3.1 分层架构

Pathway 的架构可以清晰地分为四层:

┌─────────────────────────────────────────┐
│           Python API Layer              │  ← 用户写代码的地方
│  pw.Table / pw.Schema / pw.io / pw.ml   │
├─────────────────────────────────────────┤
│         Computation Graph               │  ← 声明式的 DAG
│  节点 = Transformation / 边 = Channel    │
├─────────────────────────────────────────┤
│       Rust Execution Engine             │  ← 真正干活的引擎
│  Differential Dataflow / State Mgmt     │
├─────────────────────────────────────────┤
│         Connector Layer                 │  ← 数据进出
│  Kafka / PostgreSQL / GDrive / REST     │
└─────────────────────────────────────────┘

关键设计决策:Python 和 Rust 之间的通信不是 RPC,而是共享内存 + FFI。

Python 层只做两件事:

  1. 构建计算图(DAG)
  2. 发送初始快照给 Rust 引擎

Rust 引擎接管后,所有数据流转、状态管理、增量计算都在 Rust 侧完成,不需要跨语言调用。这就是为什么 Pathway 的性能可以接近纯 Rust 实现。

3.2 Python API 层:像 Pandas 一样写流处理

Pathway 的 Python API 设计哲学是:让你觉得在写批处理,实际跑的是流处理。

import pathway as pw

# 定义数据模式 —— 和 Pydantic 一样直觉
class TransactionSchema(pw.Schema):
    tx_id: str = pw.column_definition(primary_key=True)
    user_id: int
    amount: float
    currency: str
    timestamp: pw.DateTimeNaive

# 读取 Kafka 数据 —— 声明式,不需要管 offset
transactions = pw.io.kafka.read(
    bootstrap_servers="localhost:9092",
    topic="transactions",
    schema=TransactionSchema,
    autocommit_duration_ms=1000,
)

# 过滤 —— 和 Pandas 一模一样的语法
usd_transactions = transactions.filter(
    transactions.currency == "USD"
)

# 窗口聚合 —— 声明式窗口定义
daily_stats = usd_transactions.groupby(
    usd_transactions.user_id
).reduce(
    user_id=pw.this.user_id,
    total_amount=pw.reducers.sum(usd_transactions.amount),
    tx_count=pw.reducers.count(),
    avg_amount=pw.reducers.avg(usd_transactions.amount),
)

# 输出到 PostgreSQL —— 自动创建表、自动 schema 推断
pw.io.postgres.write(
    daily_stats,
    connection_string="postgresql://user:pass@localhost/db",
    table_name="daily_transaction_stats",
)

# 启动引擎 —— 这一行把 DAG 交给 Rust 引擎执行
pw.run()

注意 pw.run() 这一行。在它之前,所有操作都只是在构建一个 声明式的计算图,没有数据流动。pw.run() 之后,Rust 引擎接管,开始消费 Kafka 数据、执行增量计算、写入 PostgreSQL。

同一个脚本,你改一下输入源,就能从流处理切换到批处理:

# 流处理:从 Kafka 实时读
transactions = pw.io.kafka.read(...)

# 批处理:从 CSV 文件读(只需改这一行)
transactions = pw.io.csv.read("./data/transactions.csv")

# 后续的过滤、聚合、输出代码完全不变

这就是 流批一体 的威力:开发用批数据调试,生产用流数据运行,零代码修改。

3.3 Rust 引擎:差分数据流的工程实现

Pathway 的 Rust 引擎在 src/ 目录下,核心模块包括:

模块路径功能
差分数据流核心src/engine/Differential Dataflow 的 Rust 实现
状态管理src/engine/state/持久化状态、快照、恢复
调度器src/engine/scheduler/算子调度、优先级管理
通道src/engine/channel/算子间数据传输
连接器src/connector/Kafka、PostgreSQL 等连接器实现
Python 绑定src/api/FFI 接口,Python↔Rust 桥接

引擎的核心数据结构是 安排追踪排列(Arrangement)——一种支持高效差分查找的索引结构。你可以把它理解为一个多级 HashMap,其中键是 (key, time),值是 (value, diff) 的集合。

// 简化的 Arrangement 结构
pub struct Arrangement<K, V> {
    // 外层: key -> 内层排列
    data: HashMap<K, OrdOffset<V>>,
}

pub struct OrdOffset<V> {
    // 按时间排序的 (value, diff) 列表
    entries: Vec<(V, i64, Timestamp)>,  // (value, diff, time)
}

当增量数据到达时,引擎不需要扫描整个 Arrangement,而是:

  1. 通过 key 定位到对应的 OrdOffset
  2. 通过 time 找到受影响的条目
  3. 应用 diff 更新,生成新的差分输出

这种 按键分区 + 按时间排序 的双重索引,让增量查找的复杂度从 O(n) 降到 O(log n)。

3.4 连接器生态:300+ 数据源的接入能力

Pathway 的连接器分两大类:

原生连接器(Rust 实现,零拷贝高性能):

连接器输入/输出特点
Kafka输入+输出最常用,支持 offset 管理、Consumer Group
PostgreSQL输入+输出CDC 支持,逻辑解码
MongoDB输入+输出Change Stream 实时监听
CSV/JSON输入支持目录监听,自动发现新文件
REST API输入+输出轮询或 Webhook 模式
WebSocket输入+输出实时双向通信

Airbyte 连接器(通过 Airbyte 协议,300+ 数据源):

# 通过 Airbyte 连接 Salesforce
salesforce_data = pw.io.airbyte.read(
    source="salesforce",
    config={
        "client_id": "...",
        "client_secret": "...",
        "refresh_token": "...",
    },
    streams=["Contact", "Opportunity"],
)

连接器的设计统一遵循 "变更流"(Change Stream) 语义——每个连接器都输出 (insert/update/delete, data, timestamp) 三元组,天然适配差分数据流的 diff 语义。

四、核心概念深度解析

4.1 Table:流批统一的数据结构

Pathway 的 pw.Table 是最核心的数据结构。它同时表示:

  • 批处理中的一张静态表
  • 流处理中的一个持续变化的动态表

这个统一是怎么实现的?Table 内部存储的不是数据快照,而是差分日志。

import pathway as pw

class EventSchema(pw.Schema):
    event_id: str = pw.column_definition(primary_key=True)
    user_id: int
    event_type: str
    value: float

# 创建 Table —— 这不是一个静态快照,而是一个持续变化的流
events = pw.io.kafka.read(
    bootstrap_servers="localhost:9092",
    topic="events",
    schema=EventSchema,
)

# 此时 events 是一个 Table,它的内容会随着 Kafka 消息持续变化
# 但你操作它的方式和操作静态表一模一样

Table 的关键属性:

属性说明
_id每行的唯一标识(自动生成或由 primary_key 指定)
类型安全的列,支持 int、str、float、datetime、Optional 等
差分内部维护 (data, time, diff) 三元组
更新模式pw.Mode.BATCHING(批处理)或 pw.Mode.STREAMING(流处理)

4.2 Transformation:声明式的数据变换

Pathway 提供了丰富的 Transformation 操作,覆盖 ETL 的所有场景:

基础变换

# 选择列
selected = events.select(
    user_id=events.user_id,
    event_type=events.event_type,
    value=events.value,
)

# 过滤
high_value = events.filter(events.value > 100)

# 映射 —— 支持 Python 表达式和 UDF
enriched = events.select(
    **events,  # 展开所有列
    value_usd=pw.if_else(
        events.currency == "EUR",
        events.value * 1.08,
        events.value,
    ),
    is_premium=pw.if_else(events.value > 1000, True, False),
)

# 排序(注意:流处理中排序是全局操作,慎用)
sorted_events = events.sort(pw.this.timestamp)

聚合变换

# Group By + Reduce
user_stats = events.groupby(events.user_id).reduce(
    user_id=pw.this.user_id,
    total_value=pw.reducers.sum(events.value),
    event_count=pw.reducers.count(),
    max_value=pw.reducers.max(events.value),
    min_value=pw.reducers.min(events.value),
    avg_value=pw.reducers.avg(events.value),
    unique_types=pw.reducers.any(events.event_type),  # 任意值
)

# 内置聚合器
import pathway as pw
print(dir(pw.reducers))
# ['any', 'argmax', 'argmin', 'avg', 'count', 'max', 'min', 'npsum', 
#  'sorted_tuples', 'sum', 'unique', ...]

Join 变换

class UserSchema(pw.Schema):
    user_id: int = pw.column_definition(primary_key=True)
    name: str
    country: str

users = pw.io.csv.read("./users.csv", schema=UserSchema)

# Inner Join
enriched_events = events.join(
    users,
    events.user_id == users.user_id,
).select(
    event_id=events.event_id,
    user_name=users.name,
    country=users.country,
    event_type=events.event_type,
    value=events.value,
)

# Left Join
left_joined = events.join_left(
    users,
    events.user_id == users.user_id,
).select(
    **events,
    user_name=users.name,
    country=users.country,
)

# 增量 Join 的威力:当 users 表更新时,只重算受影响的 join 结果
# 这是差分数据流的天然优势

4.3 窗口操作:时间语义的精确控制

流处理的核心难题之一是 时间。Pathway 提供了三种窗口类型:

import pathway as pw

# 1. 滑动窗口(Sliding Window)
# 每 5 分钟统计过去 1 小时的数据
sliding_stats = events.windowby(
    events.timestamp,
    window=pw.temporal.sliding(
        duration=pw.Duration.minutes(60),  # 窗口大小
        hop=pw.Duration.minutes(5),        # 滑动步长
    ),
).reduce(
    window_start=pw.this._pw_window_start,
    window_end=pw.this._pw_window_end,
    total_value=pw.reducers.sum(events.value),
    count=pw.reducers.count(),
)

# 2. 滚动窗口(Tumbling Window)
# 每 1 小时统计一次
tumbling_stats = events.windowby(
    events.timestamp,
    window=pw.temporal.tumbling(
        duration=pw.Duration.hours(1),
    ),
).reduce(
    window_start=pw.this._pw_window_start,
    total_value=pw.reducers.sum(events.value),
)

# 3. 会话窗口(Session Window)
# 用户活跃期间的数据,30 分钟不活跃则断开
session_stats = events.windowby(
    events.timestamp,
    window=pw.temporal.session(
        gap=pw.Duration.minutes(30),
    ),
    instance=events.user_id,  # 按用户分窗口
).reduce(
    user_id=events.user_id,
    session_start=pw.this._pw_window_start,
    session_end=pw.this._pw_window_end,
    total_value=pw.reducers.sum(events.value),
    event_count=pw.reducers.count(),
)

乱序数据处理

# Pathway 通过 late 延迟参数处理乱序数据
windowed = events.windowby(
    events.timestamp,
    window=pw.temporal.tumbling(duration=pw.Duration.minutes(5)),
    behavior=pw.temporal.common_behavior(
        cutoff=pw.Duration.minutes(2),  # 允许 2 分钟的迟到数据
    ),
).reduce(
    total_value=pw.reducers.sum(events.value),
)

cutoff 的含义是:窗口关闭后,仍然保留 2 分钟的时间窗口来接收迟到数据。这比 Flink 的 Watermark + AllowedLateness 更直觉——你不需要理解 Watermark 机制,只需要告诉 Pathway "数据最多迟到多久"。

4.4 状态管理与持久化

流处理最怕什么?宕机重算。

Pathway 的状态管理基于 Rust 引擎的内存状态 + 持久化后端

import pathway as pw

# 启用持久化
pw.run(
    persistent_storage="./pathway_state",  # 本地持久化目录
    # 或者使用 S3
    # persistent_storage="s3://my-bucket/pathway-state",
)

持久化的实现原理:

  1. 检查点(Checkpoint):定期将内存状态快照到持久化后端
  2. 预写日志(WAL):每个变更在应用前先写入 WAL
  3. 恢复(Recovery):重启时从最新检查点恢复,回放 WAL 中的增量变更
# 配置持久化参数
pw.run(
    persistent_storage="./pathway_state",
    persistence_mode="speed",  # "speed" 或 "recovery"
    # "speed": 优先性能,较少检查点
    # "recovery": 优先恢复能力,更频繁检查点
)

4.5 一致性保证

Pathway 对一致性的分层设计:

版本一致性级别实现方式
开源版At-Least-OnceWAL + 检查点恢复
企业版Exactly-Once两阶段提交 + 幂等写入

对于大多数 ETL 场景,At-Least-Once + 下游去重就够了:

# 下游去重:利用 primary_key 天然去重
class DedupedSchema(pw.Schema):
    tx_id: str = pw.column_definition(primary_key=True)  # 主键去重
    amount: float
    timestamp: pw.DateTimeNaive

deduped = pw.io.kafka.read(
    bootstrap_servers="localhost:9092",
    topic="transactions",
    schema=DedupedSchema,
    # primary_key 保证同一 tx_id 只保留最新值
)

五、实战:从零构建实时交易风控管道

现在把上面学到的概念串起来,构建一个生产级的实时交易风控管道。

5.1 场景描述

需求:

  • 实时监听交易流(Kafka)
  • 按用户统计最近 1 小时的交易总额
  • 检测异常交易(单笔金额过大、短时高频、异地交易)
  • 告警输出到 Webhook
  • 正常统计数据写入 PostgreSQL

5.2 完整实现

import pathway as pw
from datetime import datetime

# ===== 数据模式定义 =====

class TransactionSchema(pw.Schema):
    tx_id: str = pw.column_definition(primary_key=True)
    user_id: int
    amount: float
    currency: str
    merchant: str
    city: str
    timestamp: pw.DateTimeNaive

class UserProfileSchema(pw.Schema):
    user_id: int = pw.column_definition(primary_key=True)
    name: str
    home_city: str
    risk_level: str  # low / medium / high
    daily_limit: float


# ===== 数据源接入 =====

# 实时交易流
transactions = pw.io.kafka.read(
    bootstrap_servers="kafka:9092",
    topic="transactions",
    schema=TransactionSchema,
    autocommit_duration_ms=5000,
)

# 用户画像(低频更新的维度表)
users = pw.io.postgres.read(
    connection_string="postgresql://user:pass@postgres:5432/risk_db",
    table_name="user_profiles",
    schema=UserProfileSchema,
)


# ===== 数据预处理 =====

# 1. 统一货币(简化版,实际用实时汇率 API)
normalized_tx = transactions.select(
    tx_id=transactions.tx_id,
    user_id=transactions.user_id,
    amount_usd=pw.if_else(
        transactions.currency == "EUR",
        transactions.amount * 1.08,
        pw.if_else(
            transactions.currency == "GBP",
            transactions.amount * 1.27,
            transactions.amount,  # 默认 USD
        ),
    ),
    merchant=transactions.merchant,
    city=transactions.city,
    timestamp=transactions.timestamp,
)

# 2. 关联用户画像
enriched_tx = normalized_tx.join_left(
    users,
    normalized_tx.user_id == users.user_id,
).select(
    tx_id=normalized_tx.tx_id,
    user_id=normalized_tx.user_id,
    amount_usd=normalized_tx.amount_usd,
    merchant=normalized_tx.merchant,
    city=normalized_tx.city,
    timestamp=normalized_tx.timestamp,
    home_city=users.home_city,
    risk_level=users.risk_level,
    daily_limit=users.daily_limit,
)


# ===== 风控规则引擎 =====

# 规则1:单笔大额交易
large_amount_alert = enriched_tx.filter(
    enriched_tx.amount_usd > enriched_tx.daily_limit * 0.8  # 单笔超过日限额 80%
).select(
    alert_type="LARGE_AMOUNT",
    tx_id=enriched_tx.tx_id,
    user_id=enriched_tx.user_id,
    amount=enriched_tx.amount_usd,
    limit=enriched_tx.daily_limit,
    timestamp=enriched_tx.timestamp,
)

# 规则2:短时高频交易(1 小时内超过 10 笔)
tx_count_window = enriched_tx.windowby(
    enriched_tx.timestamp,
    window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
    user_id=enriched_tx.user_id,
    window_start=pw.this._pw_window_start,
    tx_count=pw.reducers.count(),
    total_amount=pw.reducers.sum(enriched_tx.amount_usd),
)

high_freq_alert = tx_count_window.filter(
    tx_count_window.tx_count > 10
).select(
    alert_type="HIGH_FREQUENCY",
    user_id=tx_count_window.user_id,
    tx_count=tx_count_window.tx_count,
    total_amount=tx_count_window.total_amount,
    window_start=tx_count_window.window_start,
)

# 规则3:异地交易(交易城市 ≠ 常驻城市)
remote_city_alert = enriched_tx.filter(
    enriched_tx.city != enriched_tx.home_city
).select(
    alert_type="REMOTE_CITY",
    tx_id=enriched_tx.tx_id,
    user_id=enriched_tx.user_id,
    tx_city=enriched_tx.city,
    home_city=enriched_tx.home_city,
    amount=enriched_tx.amount_usd,
    timestamp=enriched_tx.timestamp,
)


# ===== 输出 =====

# 告警推送到 Webhook
pw.io.jsonlines.write(
    large_amount_alert,
    "./alerts/large_amount.jsonl",
)
pw.io.jsonlines.write(
    high_freq_alert,
    "./alerts/high_freq.jsonl",
)
pw.io.jsonlines.write(
    remote_city_alert,
    "./alerts/remote_city.jsonl",
)

# 正常统计数据写入 PostgreSQL
user_hourly_stats = enriched_tx.windowby(
    enriched_tx.timestamp,
    window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
    user_id=enriched_tx.user_id,
    window_start=pw.this._pw_window_start,
    tx_count=pw.reducers.count(),
    total_amount=pw.reducers.sum(enriched_tx.amount_usd),
    avg_amount=pw.reducers.avg(enriched_tx.amount_usd),
    max_amount=pw.reducers.max(enriched_tx.amount_usd),
)

pw.io.postgres.write(
    user_hourly_stats,
    connection_string="postgresql://user:pass@postgres:5432/risk_db",
    table_name="user_hourly_stats",
)


# ===== 启动引擎 =====

pw.run(
    persistent_storage="./pathway_state",
    persistence_mode="speed",
)

5.3 架构亮点分析

这个管道的几个设计亮点:

1. 声明式规则引擎:每条风控规则都是一个独立的 Table 变换,新增规则只需加一段 filter + select,不影响其他规则。

2. 增量计算的性能优势:用户画像表(users)更新时,只重算受影响的 join 结果。传统方案需要全量重刷。

3. 流批一体的开发体验:开发时用 CSV 文件调试,生产时改一行代码切 Kafka。

4. 自动容错persistent_storage 保证宕机后从检查点恢复,不丢数据。

六、RAG 管道:Pathway 的 AI 杀手级场景

如果说实时 ETL 是 Pathway 的基本功,那 RAG(检索增强生成) 就是它的杀手级场景。Pathway 内建了从文档摄入、向量索引到 LLM 查询的完整 RAG 管道。

6.1 为什么 Pathway 适合做 RAG?

传统 RAG 系统的痛点:

  1. 数据陈旧:向量索引是静态的,文档更新后索引不会自动更新
  2. 管道复杂:文档加载 → 切分 → Embedding → 向量存储 → 检索,5 个步骤 5 个组件
  3. 运维困难:每个组件独立部署,排查问题要跨 5 个系统

Pathway 的方案:一个 Python 脚本搞定所有。

import pathway as pw
from pathway.xpacks.llm import parsers, splitters, embedders
from pathway.xpacks.llm.vector_store import VectorStoreServer

# ===== 1. 文档摄入 =====
# 支持多种数据源:Google Drive、SharePoint、本地文件、S3 等

# Google Drive 文档
gdrive_docs = pw.io.gdrive.read(
    object_id="1A2B3C4D5E6F",  # Google Drive 文件夹 ID
    service_user_credentials_file="./credentials.json",
)

# 本地文件
local_docs = pw.io.fs.read(
    "./documents/",
    format="binary",
)

# SharePoint 文档
sharepoint_docs = pw.io.sharepoint.read(
    url="https://company.sharepoint.com/sites/docs",
    client_id="...",
    client_secret="...",
    tenant_id="...",
)

# 合并所有文档源
all_docs = pw.combine(gdrive_docs, local_docs, sharepoint_docs)


# ===== 2. 文档解析与切分 =====

# 解析:PDF、Word、HTML、Markdown 自动识别
parsed = pw.apply(parsers.parse, all_docs.data)

# 切分:Token 计数切分器,确保每个块不超过 LLM 上下文
split = pw.apply(
    lambda text: splitters.TokenCountSplitter(
        max_tokens=500,
        overlap_tokens=50,  # 块之间 50 token 重叠
    )(text),
    parsed,
)


# ===== 3. 向量索引 =====

# 使用 OpenAI Embedding
embedder = embedders.OpenAIEmbedder(
    model="text-embedding-3-small",
    api_key="sk-...",
)

# 构建向量存储服务器
vector_server = VectorStoreServer(
    data=split,
    embedder=embedder,
    search_top_k=5,  # 检索 top-5 相关文档
)

# 启动 REST API 服务
vector_server.serve(
    host="0.0.0.0",
    port=8000,
    with_cache=True,  # 内置缓存层
    cache_strategy=pw.upatterns.TemporalJoin(
        cutoff=pw.Duration.minutes(5),  # 5 分钟缓存
    ),
)


# ===== 4. 自动更新 =====
# 当 Google Drive / SharePoint / 本地文件有更新时,
# Pathway 自动重新解析、切分、Embedding,并更新向量索引
# 不需要手动触发重新索引!

pw.run()

6.2 向量索引的差分更新

这是 Pathway RAG 最核心的竞争力。传统方案(如 LangChain + Pinecone)的向量索引是 追加式 的——文档更新后,你需要手动删除旧向量、插入新向量。Pathway 的差分数据流让这个过程 完全自动化

  1. 文档更新 → 连接器发出 (doc_id, time, diff=-1) 撤回旧版本
  2. 差分传播 → 向量索引自动移除旧 Embedding
  3. 重新解析 → 发出 (doc_id, time, diff=+1) 插入新版本
  4. 新 Embedding → 向量索引自动添加

整个过程无需人工干预,延迟取决于 Embedding API 的响应时间。

6.3 与 LangChain 集成

Pathway 提供了 LangChain 集成,让你可以直接在 LangChain 应用中使用 Pathway 的实时向量索引:

from langchain_community.vectorstores import PathwayVectorClient

# 连接到 Pathway 向量服务器
client = PathwayVectorClient(url="http://localhost:8000")

# 检索相关文档
results = client.similarity_search(
    "如何配置 Kafka 连接器?",
    k=5,
)

# 在 LangChain Chain 中使用
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=client.as_retriever(search_kwargs={"k": 5}),
)

# 每次查询都拿到最新的文档内容
# 因为 Pathway 的向量索引是实时更新的
answer = qa_chain.run("如何配置 Kafka 连接器?")

七、性能优化实战

7.1 内存优化

Pathway 默认把所有状态保存在内存中。对于大规模数据流,内存管理至关重要。

# 1. 控制 Table 的保留时间
# 对于时间序列数据,可以设置 TTL 自动清理过期数据
events_with_ttl = events.filter(
    pw.this.timestamp > pw.now() - pw.Duration.hours(24)
)

# 2. 减少不必要的数据携带
# 只 select 需要的列,减少内存占用
lean_events = events.select(
    user_id=events.user_id,
    amount=events.amount,
    # 不带 event_type、merchant 等不需要的列
)

# 3. 合理使用 groupby + reduce
# 避免大粒度的 groupby,尽量缩小分组键的范围

7.2 并行度调优

# Pathway 支持多线程和多进程执行
pw.run(
    # 多线程:适用于 IO 密集型任务
    # 引擎自动利用所有 CPU 核心
    
    # 持久化配置
    persistent_storage="./pathway_state",
    
    # 检查点间隔(秒)
    # 默认 60s,低延迟场景可以减小
    checkpoint_interval_ms=30000,  # 30 秒
)

对于分布式部署,Pathway 支持 Kubernetes 部署:

# pathway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pathway-etl
spec:
  replicas: 3
  selector:
    matchLabels:
      app: pathway-etl
  template:
    metadata:
      labels:
        app: pathway-etl
    spec:
      containers:
      - name: pathway
        image: pathwaycom/pathway:latest
        command: ["python", "/app/pipeline.py"]
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"
        volumeMounts:
        - name: state
          mountPath: /pathway_state
        - name: config
          mountPath: /app
      volumes:
      - name: state
        persistentVolumeClaim:
          claimName: pathway-state-pvc
      - name: config
        configMap:
          name: pathway-config

一个简单的 Word Count 基准测试(1000 万条数据,1KB/条):

指标PyFlinkPathway
吞吐量~50K events/s~200K events/s
端到端延迟(p99)~500ms~150ms
内存占用4GB2GB
代码行数~80 行~20 行

注意:这个基准测试是 单机场景 下的结果。在多节点集群场景下,Flink 的扩展性更好。选择哪个框架取决于你的场景:

  • 单机 / 中小规模集群 → Pathway(开发效率高,性能足够)
  • 超大规模集群 / 复杂 Exactly-Once 语义 → Flink(生态成熟,稳定性验证充分)

7.4 UDF 性能优化

Pathway 的 UDF(用户自定义函数)有两种执行模式:

# 1. Python UDF —— 逐行调用,性能较低
@pw.udf
def slow_embed(text: str) -> list:
    import openai
    response = openai.Embedding.create(
        input=text,
        model="text-embedding-3-small",
    )
    return response.data[0].embedding

# 使用
embedded = documents.select(
    text=documents.text,
    embedding=slow_embed(documents.text),
)

# 2. 批量 UDF —— 批量调用,性能更高
@pw.udf(batch_size=32)  # 每批 32 条
def fast_embed(texts: list[str]) -> list[list]:
    import openai
    response = openai.Embedding.create(
        input=texts,
        model="text-embedding-3-small",
    )
    return [item.embedding for item in response.data]

# 使用
embedded = documents.select(
    text=documents.text,
    embedding=fast_embed(documents.text),
)

批量 UDF 的性能提升主要来自:

  1. 减少函数调用开销(32 次合 1 次)
  2. API 调用的批量优化(OpenAI Embedding API 支持批量输入)
  3. Rust 引擎的批量调度优化

八、生产部署最佳实践

8.1 Docker 部署

FROM pathwaycom/pathway:latest

WORKDIR /app
COPY pipeline.py .

# 持久化状态目录
VOLUME /pathway_state

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s \
    CMD curl -f http://localhost:8000/health || exit 1

CMD ["python", "pipeline.py"]
# 构建并运行
docker build -t pathway-etl .
docker run -d \
    --name pathway-etl \
    -p 8000:8000 \
    -v pathway_state:/pathway_state \
    -e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
    -e POSTGRES_CONNECTION_STRING=postgresql://user:pass@postgres:5432/db \
    pathway-etl

8.2 监控

Pathway 内置了监控面板,跟踪每个连接器的吞吐量和延迟:

# 启用监控面板
pw.run(
    monitoring_level=pw.MonitoringLevel.ALL,
    # 可选: NONE, STATS, ALL
)

同时支持 Prometheus 指标输出:

# Prometheus 集成
pw.io.prometheus.export(
    port=9090,
    metrics_prefix="pathway_",
)

8.3 常见陷阱

陷阱1:Table 的不可变性

Pathway 的 Table 是不可变的——你不能修改已经创建的 Table,只能创建新的:

# ❌ 错误:Table 是不可变的
events.add_column(new_col=events.amount * 2)  # 不支持

# ✅ 正确:创建新的 Table
enriched = events.select(
    **events,
    new_col=events.amount * 2,
)

陷阱2:全局排序的性能问题

# ❌ 危险:全局排序需要维护完整状态
sorted_all = events.sort(events.timestamp)

# ✅ 正确:窗口内排序
windowed_sorted = events.windowby(
    events.timestamp,
    window=pw.temporal.tumbling(duration=pw.Duration.minutes(5)),
).reduce(
    sorted_data=pw.reducers.sorted_tuples(events.amount),
)

陷阱3:Join 的状态膨胀

# ❌ 危险:无时间约束的 Join 会维护两端完整状态
result = large_table.join(another_large_table, ...)

# ✅ 正确:先窗口聚合再 Join
hourly_left = large_table.windowby(
    large_table.timestamp,
    window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
    key=pw.this.key,
    value=pw.reducers.sum(large_table.value),
)

hourly_right = another_large_table.windowby(
    another_large_table.timestamp,
    window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
    key=pw.this.key,
    value=pw.reducers.sum(another_large_table.value),
)

result = hourly_left.join(hourly_right, hourly_left.key == hourly_right.key)

陷阱4:UDF 的副作用

# ❌ 危险:UDF 中不要有副作用
@pw.udf
def bad_udf(value: float) -> float:
    requests.post("http://api/notify", json={"value": value})  # 副作用!
    return value * 2

# Pathway 可能因为增量计算而多次调用 UDF(撤回+重发),
# 导致副作用被执行多次

# ✅ 正确:将副作用输出到 Connector
result = events.select(processed=events.value * 2)
pw.io.jsonlines.write(result, "./output.jsonl")

九、Pathway 的局限与适用场景

客观说,Pathway 不是银弹。以下是它的局限:

局限说明替代方案
超大规模集群单集群建议 < 10 节点,不如 Flink 成熟Flink / Spark
Exactly-Once 语义开源版只有 At-Least-OnceFlink / Kafka Streams
SQL 支持不支持 SQL,只能用 Python APIFlink SQL / ksQLDB
生态成熟度2023 年才 1.0,社区资源较少Flink / Spark
调试工具缺少像 Flink Web UI 那样的成熟监控自建 Grafana 面板

Pathway 的最佳适用场景:

  1. 中小规模实时 ETL:QPS < 100K,节点 < 10,延迟要求秒级
  2. RAG 管道:需要实时更新向量索引的知识库系统
  3. 实时监控告警:业务指标实时聚合 + 阈值告警
  4. 数据同步管道:多数据源 → 多目标的实时同步
  5. AI Feature Store:实时特征计算和在线推理

不适合的场景:

  1. 每日数据量 PB 级别的超大规模数据处理
  2. 需要严格 Exactly-Once 语义的金融交易系统
  3. 团队只有 Java/Scala 工程师,没有 Python 经验

十、总结与展望

Pathway 的核心价值主张很清晰:让 Python 开发者用写批处理的体验,获得流处理的实时能力。

它的技术密码是 差分数据流——这个从学术界走出来的计算模型,经过 Rust 引擎的工程化实现,变成了一个对 Python 开发者友好的流处理框架。

从 ETL 到 RAG,从实时监控到特征工程,Pathway 的适用范围正在快速扩展。2026 年它已经获得了 55K+ Star,社区活跃度持续上升。

但也要清醒地认识到:在大规模生产环境中,Flink 和 Spark 仍然是更稳妥的选择。Pathway 的定位是 "Python 开发者的第一个流处理框架"——它不是要取代 Flink,而是让更多 Python 开发者能够低门槛地进入流处理的世界。

我的判断:在 AI 时代,数据管道的实时性不再是"nice to have",而是"must have"。Pathway 以 Python 优先 + Rust 性能 + 内建 AI 能力的组合,有潜力成为 AI 时代的数据管道基础设施。


本文涉及的完整代码示例可在 GitHub 仓库 pathwaycom/pathway 中找到。Pathway 文档:https://pathway.com/developers/

推荐文章

使用Vue 3和Axios进行API数据交互
2024-11-18 22:31:21 +0800 CST
Go语言SQL操作实战
2024-11-18 19:30:51 +0800 CST
Vue3中如何使用计算属性?
2024-11-18 10:18:12 +0800 CST
SQL常用优化的技巧
2024-11-18 15:56:06 +0800 CST
黑客帝国代码雨效果
2024-11-19 01:49:31 +0800 CST
Vue3结合Driver.js实现新手指引功能
2024-11-19 08:46:50 +0800 CST
前端代码规范 - Commit 提交规范
2024-11-18 10:18:08 +0800 CST
mysql 计算附近的人
2024-11-18 13:51:11 +0800 CST
服务器购买推荐
2024-11-18 23:48:02 +0800 CST
程序员茄子在线接单