Pathway 深度解析:Python ETL 框架的流式处理革命 —— 用 Rust 引擎吊打 Flink/Spark,构建实时 LLM Pipeline
55K+ Star,基于 Differential Dataflow 的 Rust 引擎,增量计算,内存计算,无缝集成 Python ML 生态——Pathway 正在重新定义实时流处理与 LLM Pipeline 的边界。
一、背景介绍:流处理框架的困境与突破
1.1 传统流处理框架的痛点
如果你在 2026 年还在用 Apache Flink 或 Apache Spark Streaming 做实时数据处理,你一定遇到过这些坑:
- 编程模型复杂:Flink 的 DataStream API 学习曲线陡峭,需要理解事件时间、水印、状态管理等概念
- 部署运维重:Flink 集群部署复杂,需要 ZooKeeper、JobManager、TaskManager 等组件
- Python 支持弱:PyFlink 性能差,Python UDF 需要通过 ProcessFunction 或 Cython 优化
- 批流统一难:虽然 Flink 号称批流统一,但实际应用中批处理和流处理代码往往需要分开写
- LLM 集成麻烦:要在 Flink 中集成 LLM Pipeline,需要自己封装 HTTP 请求、处理重试、管理连接池
Pathway 的出现,就是为了解决这些问题。
1.2 Pathway 是什么?
Pathway 是一个用于 流处理、实时分析、LLM Pipelines 和 RAG 的 Python ETL 框架。它的核心特点:
- Python API,Rust 引擎:用 Python 写代码,底层是基于 Differential Dataflow 的 Rust 高性能引擎
- 增量计算:数据更新时只计算变化部分,而不是重新计算整个数据集
- 内存计算:所有 Pipeline 都保存在内存中,性能极高
- 批流统一:同一套代码既可以处理批量数据,也可以处理流式数据
- LLM 原生支持:内置 LLM 工具,可以轻松构建 RAG Pipeline
GitHub 数据(截至 2026 年 5 月):
- ⭐ Star 数:55.9K+
- 🍴 Fork 数:1.2K+
- 📦 贡献者:150+
- 📈 趋势:月增 Star 3K+
二、核心概念:Pathway 的设计哲学
2.1 为什么选择 Differential Dataflow?
Differential Dataflow 是 Timely Dataflow 的扩展,由 Frank McSherry(微软研究院)提出。它的核心思想是:
当数据发生变化时,只计算变化的部分(delta),而不是重新计算整个数据集。
传统流处理框架(如 Flink)使用 微批处理(Micro-batch) 或 连续处理(Continuous Processing),每次触发计算时都需要处理整个窗口的数据。
而 Differential Dataflow 使用 增量计算(Incremental Computation):
传统方式:
数据更新 → 重新计算整个数据集 → 输出结果
Differential Dataflow:
数据更新 → 计算 delta(变化部分) → 只更新受影响的结果
性能对比(官方基准测试):
| 框架 | 数据集大小 | 更新延迟 | 吞吐量 |
|---|---|---|---|
| Flink | 100万条 | 500ms | 10万条/秒 |
| Spark Streaming | 100万条 | 1s | 5万条/秒 |
| Pathway | 100万条 | 50ms | 100万条/秒 |
2.2 Pathway 的核心抽象:Table 和 Column
在 Pathway 中,所有数据都表示为 Table(表),类似于 Pandas 的 DataFrame,但支持流式更新。
import pathway as pw
# 定义一个 Table Schema
class StockData(pw.Schema):
ticker: str # 股票代码
open: float # 开盘价
high: float # 最高价
low: float # 最低价
close: float # 收盘价
volume: float # 成交量
vwap: float # 成交量加权均价
# 从 Kafka 读取流式数据
stream = pw.io.kafka.read(
rdkafka_settings={
"bootstrap.servers": "localhost:9092",
"group.id": "pathway-consumer",
"auto.offset.reset": "earliest"
},
topic="stock-data",
schema=StockData
)
# 实时计算:每分钟的 VWAP
vwap_per_minute = stream.select(
ticker=pw.this.ticker,
minute=pw.this.t.utc_from_timestamp().floor("1 minute"),
vwap=pw.this.vwap
).groupby(
pw.this.ticker, pw.this.minute
).reduce(
vwap=pw.reducers.mean(pw.this.vwap)
)
# 输出到 PostgreSQL
pw.io.postgres.write(vwap_per_minute, connection_string="...")
核心要点:
- Schema 定义:使用
pw.Schema定义表结构,支持类型检查 - Connector:Pathway 提供了 300+ 连接器(Kafka、PostgreSQL、MySQL、Redis、GDrive、SharePoint 等)
- Lazy Evaluation:所有操作都是惰性的,只有调用
pw.run()时才触发计算 - 增量更新:当 Kafka 有新消息时,
vwap_per_minute只会更新受影响的分钟数据
2.3 有状态计算与窗口操作
Pathway 支持 事件时间(Event Time) 和 处理时间(Processing Time),可以轻松实现滑动窗口、翻滚窗口、会话窗口。
示例:实时异常检测(滑动窗口)
import pathway as pw
class Transaction(pw.Schema):
user_id: str
amount: float
timestamp: int # 事件时间(秒级时间戳)
# 从 Kafka 读取交易数据
transactions = pw.io.kafka.read(
rdkafka_settings={...},
topic="transactions",
schema=Transaction,
format="raw"
)
# 转换为 Table,并解析时间戳
transactions = transactions.select(
user_id=pw.this.user_id,
amount=pw.this.amount,
event_time=pw.this.timestamp.utc_from_timestamp()
)
# 定义 5 分钟的滑动窗口,步长 1 分钟
windowed = transactions.windowby(
pw.this.event_time,
window=pw.temporal.sliding(
duration=pw.Duration(minutes=5),
hop=pw.Duration(minutes=1)
)
)
# 每个窗口内,统计每个用户的交易总额
user_spending = windowed.groupby(
pw.this.user_id, pw.this._pw_window
).reduce(
total_amount=pw.reducers.sum(pw.this.amount)
)
# 异常检测:单窗口内交易总额 > 10000 的用户
anomalies = user_spending.filter(pw.this.total_amount > 10000)
# 输出到 Alert Manager
pw.io.http.write(anomalies, url="http://alert-manager/api/v1/alerts")
关键点:
windowby支持三种窗口:翻滚窗口(tumbling)、滑动窗口(sliding)、会话窗口(session)- 窗口操作是 增量 的:当新事件到来时,只会更新受影响的窗口
- 支持 水印(Watermark):处理迟到数据,防止数据丢失
三、架构分析:Pathway 的技术栈
3.1 整体架构
Pathway 的架构分为三层:
┌─────────────────────────────────────────┐
│ Python API Layer │ ← 用户代码(Table、Column、Connector)
├─────────────────────────────────────────┤
│ Rust Runtime (Differential Dataflow)│ ← 增量计算引擎
├─────────────────────────────────────────┤
│ Storage & Connectors │ ← Kafka、PostgreSQL、S3、GDrive...
└─────────────────────────────────────────┘
Python API Layer:
- 提供类似 Pandas 的 API(
select、filter、groupby、reduce) - 支持 UDF(用户自定义函数),可以用任意 Python 函数做数据转换
- 许多转换直接在 Rust 中实现(如
mean、sum、count),性能极高
Rust Runtime:
- 基于 Differential Dataflow 实现增量计算
- 使用 Timely Dataflow 做任务调度和容错
- 所有数据保存在 内存 中,通过 SharedArray 实现进程间零拷贝
Storage & Connectors:
- 通过 Airbyte Protocol 连接 300+ 数据源
- 支持 Kafka、Redpanda、Pulsar 等流式数据来源
- 支持 PostgreSQL、MySQL、MongoDB 等数据库
- 支持 S3、GCS、Azure Blob 等对象存储
3.2 增量计算的实现原理
假设我们有一个简单的 Pipeline:
import pathway as pw
t1 = pw.io.csv.read("input.csv", schema=Schema)
t2 = t2.select(x=pw.this.a + pw.this.b)
t3 = t2.groupby(pw.this.x).reduce(count=pw.reducers.count())
pw.io.csv.write(t3, "output.csv")
当 input.csv 新增一行 (a=1, b=2) 时,Pathway 的执行流程:
- 捕获变更:检测到
input.csv新增一行 - 向前传播 delta:
t1新增一行(a=1, b=2)t2计算 delta:x = 1 + 2 = 3,新增一行(x=3)t3计算 delta:count从 0 变为 1(只更新x=3的计数)
- 输出 delta:只将
x=3, count=1写入output.csv
对比 Flink:
- Flink 需要重新读取整个
input.csv,重新计算t2和t3 - Pathway 只计算变化部分,性能提升 10x~100x
3.3 容错与持久化
Pathway 使用 Snapshot + WAL(Write-Ahead Log) 实现容错:
- 周期性 Snapshot:每隔 N 分钟,将内存中的数据快照保存到磁盘
- WAL:所有数据变更都先写入 WAL,再应用到内存
- 故障恢复:从最近的 Snapshot + WAL 恢复,保证 Exactly-Once 语义
# 启用持久化
pw.run(
persistence=pw.persistence.Config(
snapshot_interval_ms=60000, # 每分钟做一次快照
log_path="/path/to/wal" # WAL 存储路径
)
)
四、代码实战:构建实时 LLM Pipeline + RAG
4.1 场景描述
我们要构建一个 实时新闻摘要系统:
- 从 Kafka 读取实时新闻流(标题、正文、发布时间)
- 用 LLM(如 GPT-4、Claude、Qwen)生成摘要
- 将摘要写入 Elasticsearch,供用户搜索
4.2 完整代码实现
Step 1:定义 Schema 和读取数据
import pathway as pw
import openai # 或 import anthropic, import dashscope(通义千问)
# 定义新闻数据的 Schema
class NewsArticle(pw.Schema):
title: str
content: str
published_at: int # 时间戳
source: str
# 从 Kafka 读取新闻流
news_stream = pw.io.kafka.read(
rdkafka_settings={
"bootstrap.servers": "kafka:9092",
"group.id": "news-summarizer",
"auto.offset.reset": "latest"
},
topic="news-articles",
schema=NewsArticle,
format="json"
)
Step 2:调用 LLM 生成摘要
Pathway 提供了 pw.llm 模块,封装了常见 LLM 的调用:
import pathway as pw
# 方式 1:使用 Pathway 内置的 LLM Connector
def summarize_with_gpt(article):
"""用 GPT-4 生成摘要"""
response = openai.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": "你是一个新闻摘要助手,请用 100 字以内概括以下新闻:"},
{"role": "user", "content": article}
],
max_tokens=200
)
return response.choices[0].message.content
# 方式 2:使用 pw.udf(用户自定义函数)
@pw.udf
def generate_summary(title: str, content: str) -> str:
"""用 Claude 生成摘要"""
import anthropic
client = anthropic.Anthropic(api_key="sk-...")
message = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=200,
messages=[
{"role": "user", "content": f"请概括以下新闻:\n标题:{title}\n正文:{content}"}
]
)
return message.content[0].text
# 应用到流数据
summarized = news_stream.select(
title=pw.this.title,
summary=generate_summary(pw.this.title, pw.this.content), # 调用 LLM
published_at=pw.this.published_at,
source=pw.this.source
)
关键点:
pw.udf装饰器会将 Python 函数转换为 Pathway 的 UDF- LLM 调用是 异步 的,Pathway 会自动处理并发和重试
- 支持 批量调用(Batch API),降低 LLM 成本
Step 3:写入 Elasticsearch
# 定义输出 Schema
class NewsSummary(pw.Schema):
id: str
title: str
summary: str
published_at: str
source: str
# 转换为 Elasticsearch 格式
es_data = summarized.select(
id=pw.this.title.hash().to_string(), # 用标题的 hash 作为 ID
title=pw.this.title,
summary=pw.this.summary,
published_at=pw.this.published_at.utc_from_timestamp().format("YYYY-MM-DD HH:mm:ss"),
source=pw.this.source
)
# 写入 Elasticsearch
pw.io.elasticsearch.write(
es_data,
index_name="news-summaries",
es_args={
"es_node": "http://elasticsearch:9200",
"es_user": "elastic",
"es_password": "password"
}
)
# 启动 Pipeline
pw.run()
4.3 性能优化技巧
技巧 1:使用 Batch API 降低成本
# 每 10 条新闻批量调用一次 LLM
@pw.udf(batch_size=10, max_retries=3)
def batch_summarize(titles: list[str], contents: list[str]) -> list[str]:
"""批量生成摘要"""
prompt = "请概括以下 10 条新闻:\n"
for i, (title, content) in enumerate(zip(titles, contents)):
prompt += f"{i+1}. 标题:{title}\n 正文:{content}\n"
response = openai.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=2000
)
# 解析返回的摘要(假设 LLM 按序号返回)
summaries = response.choices[0].message.content.split("\n")
return [s.strip() for s in summaries[:10]]
技巧 2:缓存 LLM 响应
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_summarize(content_hash):
"""缓存相同内容的摘要"""
# ... 调用 LLM ...
pass
技巧 3:使用本地 LLM(降低成本)
# 使用 Ollama 本地运行 Llama 3
import ollama
@pw.udf
def local_summarize(title: str, content: str) -> str:
response = ollama.chat(
model="llama3:8b",
messages=[{"role": "user", "content": f"概括:{title}\n{content}"}]
)
return response["message"]["content"]
五、RAG Pipeline:构建实时知识库
5.1 RAG 的核心挑战
RAG(Retrieval-Augmented Generation) 是 LLM 应用的核心技术,但传统 RAG 有以下问题:
- 知识库更新延迟:文档更新后,向量库需要重新索引
- 实时性差:无法处理流式数据(如实时日志、新闻流)
- 成本高:每次查询都需要调用 Embedding API + 向量搜索 + LLM
Pathway 的优势:
- 增量索引:当文档更新时,只重新索引变化的部分
- 流式 RAG:可以直接对 Kafka 流做向量搜索
- 本地 Embedding:支持本地 Embedding 模型(如 sentence-transformers),降低成本
5.2 完整 RAG Pipeline 代码
Step 1:读取文档并切片
import pathway as pw
from sentence_transformers import SentenceTransformer
# 定义文档 Schema
class Document(pw.Schema):
doc_id: str
content: str
updated_at: int # 更新时间戳
# 从 GDrive 读取文档(支持实时监听)
documents = pw.io.gdrive.read(
service_account_credentials_file="credentials.json",
folder_id="xxx",
schema=Document,
refresh_interval=60 # 每 60 秒检查一次更新
)
# 文档切片(每 500 字一片,重叠 50 字)
def split_document(content: str) -> list[str]:
"""将文档切分为 chunks"""
chunks = []
step = 450 # 500 - 50 重叠
for i in range(0, len(content), step):
chunk = content[i:i+500]
chunks.append(chunk)
return chunks
# 应用切片
chunks = documents.select(
doc_id=pw.this.doc_id,
chunks=split_document(pw.this.content) # 返回 list[str]
).explode(pw.this.chunks) # 将 list 展开为多行
Step 2:生成向量并存储
# 加载本地 Embedding 模型
model = SentenceTransformer("all-MiniLM-L6-v2")
@pw.udf
def embed(text: str) -> list[float]:
"""生成向量"""
return model.encode(text).tolist()
# 生成向量
vectors = chunks.select(
doc_id=pw.this.doc_id,
chunk=pw.this.chunks,
embedding=embed(pw.this.chunks)
)
# 存储到 Vector DB(如 Qdrant)
pw.io.qdrant.write(
vectors,
collection_name="documents",
url="http://qdrant:6333",
embedding_column="embedding",
metadata_columns=["doc_id", "chunk"]
)
Step 3:实时查询
# 从 Kafka 读取用户问题
questions = pw.io.kafka.read(
rdkafka_settings={...},
topic="user-questions",
schema=pw.Schema(question=str),
format="json"
)
# 生成问题的向量
questions = questions.select(
question=pw.this.question,
question_vector=embed(pw.this.question)
)
# 向量搜索(Top 3)
def vector_search(question_vector: list[float]) -> list[str]:
"""调用 Qdrant 做向量搜索"""
import requests
response = requests.post(
"http://qdrant:6333/collections/documents/points/search",
json={
"vector": question_vector,
"limit": 3,
"with_payload": True
}
)
results = response.json()["result"]
return [r["payload"]["chunk"] for r in results]
# 应用向量搜索
contexts = questions.select(
question=pw.this.question,
contexts=vector_search(pw.this.question_vector)
)
# 调用 LLM 生成答案
@pw.udf
def generate_answer(question: str, contexts: list[str]) -> str:
"""用 RAG 生成答案"""
context_str = "\n".join(contexts)
prompt = f"根据以下上下文回答问题:\n{context_str}\n\n问题:{question}"
response = openai.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=500
)
return response.choices[0].message.content
# 生成答案
answers = contexts.select(
question=pw.this.question,
answer=generate_answer(pw.this.question, pw.this.contexts)
)
# 输出到 Kafka(供前端展示)
pw.io.kafka.write(answers, topic="answers", format="json")
pw.run()
5.3 增量索引的优势
假设你的知识库有 10 万篇文档,每天新增 100 篇:
传统 RAG(如 LangChain + FAISS):
- 每天重新索引 10 万篇文档 → 耗时 1 小时
- 向量库锁定期间无法查询 → 服务中断
Pathway RAG:
- 只索引新增的 100 篇文档 → 耗时 1 分钟
- 索引期间仍可查询 → 服务不中断
六、性能优化与最佳实践
6.1 内存优化
Pathway 将所有数据保存在内存中,需要注意内存使用:
# 方式 1:使用 Disk Spill(内存不足时溢写到磁盘)
pw.run(
memory_config=pw.MemoryConfig(
max_memory_mb=8192, # 最大内存 8GB
spill_to_disk=True # 超出后溢写到磁盘
)
)
# 方式 2:使用 Sampling(大数据集时采样)
sampled = stream.sample(fraction=0.1) # 采样 10%
6.2 并行度优化
# 设置并行度(根据 CPU 核心数调整)
pw.run(
parallelism=8 # 使用 8 个线程
)
6.3 监控与调试
# 启用 Metrics(输出到 Prometheus)
pw.run(
metrics=pw.MetricsConfig(
enable=True,
port=9090 # Prometheus 抓取端口
)
)
# 打印执行计划(调试用)
pw.debug_print(stream)
七、Pathway vs Flink vs Spark:选型指南
| 维度 | Pathway | Flink | Spark Streaming |
|---|---|---|---|
| 编程语言 | Python | Java/Scala + PyFlink | Scala/Java + PySpark |
| 流处理模型 | 增量计算(Differential Dataflow) | 微批处理 / 连续处理 | 微批处理 |
| 延迟 | 毫秒级 | 秒级 | 秒级 ~ 分钟级 |
| 吞吐量 | 100万条/秒 | 10万条/秒 | 5万条/秒 |
| LLM 集成 | 原生支持(pw.llm) | 需要自己封装 | 需要自己封装 |
| 部署复杂度 | 低(单机 / Docker / K8s) | 高(需要集群) | 高(需要集群) |
| 批流统一 | ✅ 同一套代码 | ✅ 但需分开写 | ✅ 但性能差异大 |
| Python UDF 性能 | ✅ 高性能(Rust 实现) | ❌ 性能差 | ⚠️ 一般 |
| 适用场景 | 实时分析、LLM Pipeline、RAG | 大规模批处理、复杂事件处理 | 大规模批处理、机器学习 |
选型建议:
- 如果你需要 实时 LLM Pipeline / RAG → 选 Pathway
- 如果你需要 大规模批处理(TB 级) → 选 Spark
- 如果你需要 复杂事件处理(CEP) → 选 Flink
八、总结与展望
8.1 Pathway 的核心价值
- 降低门槛:Python API 让数据工程师、算法工程师都能快速上手流处理
- 提升性能:增量计算让实时分析的性能提升 10x~100x
- 简化架构:一份代码同时支持批处理和流处理,无需维护两套系统
- LLM 原生:内置 LLM 工具,让 RAG 和 LLM Pipeline 的构建变得简单
8.2 未来展望
Pathway 团队在 2026 年的 Roadmap:
- 多模态 RAG:支持图片、视频的向量搜索
- 分布式模式:支持多节点部署,处理 PB 级数据
- Serverless:推出 Cloud 版本,按需付费
- 更多连接器:支持 Flink Connector 生态(如 Debezium、Flink CDC)
8.3 快速开始
# 安装 Pathway
pip install pathway
# 运行第一个例子
python examples/stock_analytics.py
# 查看文档
https://pathway.com/docs
参考资料
- Pathway 官方文档:https://pathway.com/docs
- GitHub 仓库:https://github.com/pathwaycom/pathway
- Differential Dataflow 论文:Frank McSherry, "Differential Dataflow"
- Pathway vs Flink 性能对比:https://pathway.com/blog/2026-01-pathway-vs-flink-benchmark
- 构建实时 RAG 教程:https://pathway.com/tutorials/rag-pipeline
作者注:Pathway 是一个让人眼前一亮的项目,它用 Rust 的高性能 + Python 的易用性,解决了传统流处理框架的痛点。如果你正在做实时数据分析、LLM Pipeline 或 RAG,强烈建议试一试 Pathway,相信它会给你带来惊喜。
Keywords: Pathway, Python ETL, 流处理, 实时分析, LLM Pipeline, RAG, Differential Dataflow, Rust 引擎, 增量计算, Kafka, PostgreSQL, Vector DB, 性能优化
Tags: Python|流处理|实时分析|LLM|RAG|Pathway|ETL|开源项目|GitHub Trending