Google LangExtract 深度实战:从非结构化文本到结构化知识的工程化完全指南(2026)
关键词:LangExtract、LLM 结构化提取、Schema 驱动、Source Grounding、RAG、Google、Python
目录
- 背景与痛点:非结构化文本的「提取困境」
- LangExtract 是什么?核心设计哲学
- 核心概念深度解析
- 3.1 Schema 驱动抽取引擎
- 3.2 Source Grounding(精确溯源)
- 3.3 交互式可视化
- 3.4 模型路由与成本优化
- 架构分析:LangExtract 内部工作原理
- 4.1 整体架构图
- 4.2 Prompt 工程层
- 4.3 模型调用层
- 4.4 输出解析与验证层
- 4.5 可视化渲染层
- 安装与配置完全指南
- 代码实战:从入门到生产级应用
- 6.1 快速入门:从客服对话中提取投诉原因
- 6.2 医疗场景:从电子病历中提取关键实体
- 6.3 法律场景:合同关键条款提取
- 6.4 批量处理与异步调用
- 6.5 与 Milvus 集成:构建语义搜索系统
- 高级技巧与性能优化
- 7.1 模型选择策略(Gemini Flash vs Pro)
- 7.2 Prompt 优化技巧
- 7.3 批量处理与速率限制
- 7.4 成本优化实战
- 7.5 错误处理与重试机制
- 生产级部署方案
- 8.1 容器化部署
- 8.2 与现有系统集成
- 8.3 监控与可观测性
- LangExtract vs 其他方案对比
- 总结与展望
- 参考资料
1. 背景与痛点:非结构化文本的「提取困境」
在当今的数字化时代,全球每天产生的文本数据是天文数字——邮件、客服对话、医疗记录、法律合同、社交媒体内容……这些数据中蕴含着巨大的价值,但有一个共同特点:它们是非结构化的。
1.1 传统方法的天花板
传统的信息提取方法主要依赖:
基于规则的方法(正则表达式、关键词匹配):
# 传统方法:从客服对话中提取投诉原因
import re
def extract_complaint(text):
if "不满意" in text or "投诉" in text:
return "客户投诉"
return "未知"
# 问题:无法处理语义变体
# "你们这服务真的垃圾" → 无法识别
# "非常失望,下次不来了" → 无法识别
基于机器学习的方法(NER、文本分类):
- 需要大量标注数据
- 泛化能力差,换领域就要重新训练
- 无法处理复杂语义关系
基于 LLM 的直接调用(Naive Approach):
# 直接调用 LLM 提取信息
import google.generativeai as genai
response = genai.generate_text(
prompt=f"从以下文本中提取产品缺陷:{text}",
model="gemini-pro"
)
# 问题:输出不稳定、无法溯源、难以验证
1.2 核心痛点
| 痛点 | 描述 | 影响 |
|---|---|---|
| 输出不稳定 | 同样输入,多次调用输出可能不同 | 生产环境不可靠 |
| 无法溯源 | 不知道提取结果来自原文哪一段 | 无法验证、无法审计 |
| 格式不统一 | 输出格式不一致,难以程序化处理 | 需要大量后处理 |
| 成本不可控 | 每次都调用大模型,成本高昂 | 大规模应用受限 |
| 缺乏可视化 | 提取结果难以直观展示和验证 | 调试困难 |
Google LangExtract 正是为了解决这些痛点而生。
2. LangExtract 是什么?核心设计哲学
LangExtract 是 Google 开源的一个 Python 库,利用大语言模型(LLM)从非结构化文本中精确提取结构化信息,并附带精确的源文本溯源(Source Grounding)和交互式可视化。
2.1 核心特性
✅ Schema 驱动 —— 用 Pydantic Model 定义提取结构
✅ 精确溯源 —— 每个提取结果都标注原文出处(字符级定位)
✅ 交互式可视化 —— 生成 HTML 报告,点击结果高亮原文
✅ 模型无关 —— 支持 Gemini 系列,可扩展其他模型
✅ 批量处理 —— 支持大规模文档处理
✅ 成本优化 —— 智能模型路由,平衡质量与成本
2.2 设计哲学
LangExtract 的设计哲学可以归纳为三点:
1. 声明式提取(Declarative Extraction)
传统方法需要写大量的提取逻辑,而 LangExtract 采用声明式范式——你只需要定义你想提取什么(Schema),而不需要关心怎么提取。
from pydantic import BaseModel
class ComplaintInfo(BaseModel):
reason: str # 投诉原因
product: str | None # 涉及产品
sentiment: str # 情感倾向:positive/negative/neutral
severity: int # 严重程度:1-5
# 只需要定义 Schema,LangExtract 自动处理提取逻辑
2. 可验证性优先(Verifiability First)
LangExtract 的每个输出都附带精确的原文引用——你可以直接点击查看模型是基于哪一段文本得出的结论。这极大地提升了系统的可审计性和可信度。
3. 工程化思维(Engineering-Grade Design)
LangExtract 不是玩具项目,而是按照生产级标准设计的:
- 完整的错误处理
- 速率限制和重试机制
- 详细的日志和调试信息
- 支持异步批量处理
3. 核心概念深度解析
3.1 Schema 驱动抽取引擎
Schema 是 LangExtract 的核心——它定义了你想从文本中提取什么信息。
3.1.1 基础 Schema 定义
from pydantic import BaseModel, Field
from typing import Optional, List
class ProductReview(BaseModel):
"""从产品评论中提取结构化信息"""
product_name: str = Field(
description="产品名称,如 'iPhone 15 Pro'"
)
rating: Optional[int] = Field(
default=None,
description="用户评分,1-5 之间的整数"
)
pros: List[str] = Field(
default_factory=list,
description="用户提到的优点列表"
)
cons: List[str] = Field(
default_factory=list,
description="用户提到的缺点列表"
)
sentiment: str = Field(
description="总体情感:positive / negative / neutral"
)
would_recommend: bool = Field(
description="是否愿意推荐给朋友"
)
3.1.2 嵌套 Schema(复杂结构提取)
class Feature(BaseModel):
name: str = Field(description="功能名称")
rating: int = Field(description="该功能评分 1-5")
comment: Optional[str] = Field(default=None)
class DetailedReview(BaseModel):
product: str
overall_rating: int
features: List[Feature] # 嵌套结构
purchase_date: Optional[str] = Field(
default=None,
description="购买日期,格式 YYYY-MM-DD"
)
verified_purchase: bool
3.1.3 Schema 设计最佳实践
原则一:字段描述要精确
# ❌ 不好的描述
class BadSchema(BaseModel):
info: str # 太模糊
# ✅ 好的描述
class GoodSchema(BaseModel):
payment_method: str = Field(
description="""
支付方式,必须是以下之一:
- 'credit_card'(信用卡)
- 'paypal'(PayPal)
- 'bank_transfer'(银行转账)
- 'cash'(现金)
如果无法确定,返回 'unknown'
"""
)
原则二:使用 Optional 处理缺失信息
# 真实文本中信息往往不完整
class RealWorldSchema(BaseModel):
name: str # 必填
email: Optional[str] = None # 选填,可能不存在
phone: Optional[str] = None # 选填
原则三:用 description 引导模型
模型完全依赖 description 来理解每个字段的含义——描述越精确,提取质量越高。
3.2 Source Grounding(精确溯源)
Source Grounding 是 LangExtract 的杀手级特性——它不仅能提取信息,还能告诉你这个信息来自原文的哪个位置。
3.2.1 Grounding 的工作原理
import langextract as lx
result = lx.extract(
text_or_documents="用户表示对产品非常不满意,说是买了两周就坏了。",
prompt_description="提取客户反馈信息",
examples=[...],
model_id="gemini-2.5-flash",
grounding=True # 启用溯源
)
# 输出结构
{
"sentiment": "negative",
"_grounding": {
"sentiment": {
"text": "非常不满意",
"start_char": 6,
"end_char": 11,
"score": 0.98
}
}
}
3.2.2 Grounding 的生产价值
| 场景 | 价值 |
|---|---|
| 审计合规 | 每个提取结果都可追溯到原文,满足金融/医疗行业的合规要求 |
| 错误调试 | 提取错误时,可以直接看到模型基于哪段文本得出结论 |
| 人机协作 | 人工审核时,可以直接看到原文依据,快速判断正确性 |
| 模型评估 | 可以量化分析模型在哪些类型的文本上表现好/差 |
3.3 交互式可视化
LangExtract 内置了强大的可视化功能,可以生成交互式 HTML 报告。
import langextract as lx
# 提取并生成可视化报告
result = lx.extract(
text_or_documents=long_text,
prompt_description="提取所有关键信息",
examples=examples,
model_id="gemini-2.5-pro",
visualization=True # 启用可视化
)
# 生成 HTML 报告
lx.visualize(
result=result,
output_path="report.html",
highlight_color="#fff3cd" # 自定义高亮颜色
)
可视化报告的功能:
- 原文与提取结果并排显示
- 点击提取结果,自动高亮原文对应位置
- 支持按实体类型筛选显示
- 可导出为独立 HTML 文件
3.4 模型路由与成本优化
LangExtract 支持多种 Gemini 模型,并提供了智能路由策略:
| 模型 | 速度 | 质量 | 成本 | 适用场景 |
|---|---|---|---|---|
gemini-2.5-flash | ⚡⚡⚡ | ★★★ | $ | 大规模批量处理、简单提取 |
gemini-2.5-pro | ⚡⚡ | ★★★★★ | $$$ | 复杂语义、高精度要求 |
gemini-1.5-flash | ⚡⚡⚡ | ★★ | $ | 超大规模、低精度要求 |
# 智能模型选择策略
def select_model(text_length, complexity):
if text_length < 500 and complexity == "low":
return "gemini-2.5-flash"
elif text_length > 10000 or complexity == "high":
return "gemini-2.5-pro"
else:
return "gemini-2.5-flash" # 默认选择
4. 架构分析:LangExtract 内部工作原理
4.1 整体架构图
输入文本
│
▼
┌─────────────────────────────────────┐
│ Schema 验证层 │
│ (Pydantic Model 解析与验证) │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Prompt 工程层 │
│ • Schema → JSON Schema 转换 │
│ • Few-shot 示例注入 │
│ • Grounding 指令注入 │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 模型调用层 │
│ • 速率限制 │
│ • 重试机制 │
│ • 成本追踪 │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 输出解析与验证层 │
│ • JSON 解析 │
│ • Schema 验证 │
│ • Grounding 解析 │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 可视化渲染层 │
│ • HTML 生成 │
│ • 交互式组件 │
└─────────────────────────────────────┘
│
▼
输出结果 + 可视化报告
4.2 Prompt 工程层深度分析
LangExtract 的 Prompt 工程是其核心竞争优势之一。它自动将 Schema 转换为精心设计的 Prompt。
4.2.1 自动生成的 Prompt 结构
# LangExtract 内部生成的 Prompt(简化版)
prompt = f"""
你是一个专业的信息提取助手。
## 任务描述
{prompt_description}
## 提取 Schema
{json_schema} # 从 Pydantic Model 自动生成
## 示例(Few-shot)
{examples} # 用户提供的示例
## 输出要求
1. 严格按 JSON 格式输出,符合上述 Schema
2. 每个字段的提取结果必须附带 grounding 信息:
{{
"field_name": "提取结果",
"_grounding": {{
"field_name": {{
"text": "原文片段",
"start_char": 0,
"end_char": 10,
"confidence": 0.95
}}
}}
}}
3. 如果某个字段在文本中不存在,返回 null
## 待提取文本
{text}
"""
4.2.2 Few-shot 示例的作用
examples = [
{
"text": "我买了个iPhone,用了一周就卡得不行,非常失望。",
"output": {
"product": "iPhone",
"sentiment": "negative",
"issue": "卡顿",
"_grounding": {...}
}
},
{
"text": "MacBook Pro 太好用了,性能强悍,就是有点重。",
"output": {
"product": "MacBook Pro",
"sentiment": "positive",
"pros": ["性能强悍"],
"cons": ["有点重"],
"_grounding": {...}
}
}
]
# LangExtract 会将示例注入到 Prompt 中
# 这显著提升了提取质量和一致性
4.3 模型调用层
# 模型调用层的核心逻辑(概念性代码)
class ModelCaller:
def __init__(self, model_id, rate_limiter, cost_tracker):
self.model_id = model_id
self.rate_limiter = rate_limiter
self.cost_tracker = cost_tracker
def call(self, prompt, max_retries=3):
for attempt in range(max_retries):
try:
# 速率限制
self.rate_limiter.acquire()
# 调用模型
response = genai.generate_text(
prompt=prompt,
model=self.model_id,
temperature=0.1 # 低温度确保一致性
)
# 成本追踪
self.cost_tracker.track(
model=self.model_id,
input_tokens=count_tokens(prompt),
output_tokens=count_tokens(response)
)
return response
except RateLimitError:
wait_time = 2 ** attempt # 指数退避
time.sleep(wait_time)
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(1)
4.4 输出解析与验证层
# 输出解析的核心挑战:LLM 输出可能不严格符合 JSON 格式
import json
import re
def parse_llm_output(raw_output, schema):
# Step 1: 尝试直接解析 JSON
try:
parsed = json.loads(raw_output)
return validate_against_schema(parsed, schema)
except json.JSONDecodeError:
pass
# Step 2: 使用正则提取 JSON 块
json_match = re.search(r'```json\n(.*?)\n```', raw_output, re.DOTALL)
if json_match:
try:
parsed = json.loads(json_match.group(1))
return validate_against_schema(parsed, schema)
except:
pass
# Step 3: 容错解析(跳过无法解析的字段)
return fallback_parse(raw_output, schema)
4.5 可视化渲染层
可视化渲染层使用 Jinja2 模板生成交互式 HTML:
# 可视化渲染的核心逻辑
def render_visualization(result, output_path):
# 1. 将提取结果转换为高亮区间
highlights = []
for field_name, grounding in result.grounding.items():
highlights.append({
"start": grounding.start_char,
"end": grounding.end_char,
"field": field_name,
"color": get_color_for_field(field_name)
})
# 2. 渲染 HTML 模板
html = template.render(
original_text=result.original_text,
extracted_data=result.data,
highlights=highlights,
grounding=result.grounding
)
# 3. 写入文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html)
5. 安装与配置完全指南
5.1 环境要求
Python >= 3.10
Google Generative AI SDK
Pydantic >= 2.0
5.2 安装步骤
# 从 PyPI 安装(如果已发布)
pip install langextract
# 从源码安装(当前推荐方式)
git clone https://github.com/google/langextract.git
cd langextract
pip install -e .
5.3 配置 Google API Key
import google.generativeai as genai
# 方法一:环境变量
export GOOGLE_API_KEY="your-api-key-here"
# 方法二:代码中配置
genai.configure(api_key="your-api-key-here")
5.4 验证安装
import langextract as lx
# 简单测试
result = lx.extract(
text_or_documents="这是一个测试。",
prompt_description="测试提取",
examples=[],
model_id="gemini-2.5-flash"
)
print(result)
6. 代码实战:从入门到生产级应用
6.1 快速入门:从客服对话中提取投诉原因
场景描述
某电商平台的客服系统每天处理数万条对话,需要自动提取:
- 投诉原因
- 涉及产品
- 情感倾向
- 处理优先级
完整代码实现
import langextract as lx
from pydantic import BaseModel, Field
from typing import Optional, List
# ========== Step 1: 定义 Schema ==========
class CustomerComplaint(BaseModel):
"""客服对话投诉信息提取 Schema"""
complaint_reason: str = Field(
description="""
客户投诉的具体原因,用一句话概括。
例如:"手机电池续航时间短"、"物流配送延迟"
"""
)
product: Optional[str] = Field(
default=None,
description="涉及的产品名称,如果未提及则为 null"
)
sentiment: str = Field(
description="""
客户情感倾向,必须是以下之一:
- 'angry'(愤怒)
- 'dissatisfied'(不满意)
- 'neutral'(中性)
- 'satisfied'(满意)
"""
)
priority: int = Field(
description="""
处理优先级,根据投诉严重程度判断:
- 1 = 低优先级(一般咨询)
- 2 = 中优先级(普通投诉)
- 3 = 高优先级(严重问题)
- 4 = 紧急(需要立即处理)
"""
)
requested_solution: Optional[str] = Field(
default=None,
description="客户要求的解决方案,如'退款'、'换货'等"
)
# ========== Step 2: 准备 Few-shot 示例 ==========
examples = [
{
"text": "你们这个破手机才买了三天就黑屏了,我要退款!",
"output": {
"complaint_reason": "手机黑屏故障",
"product": "手机",
"sentiment": "angry",
"priority": 4,
"requested_solution": "退款"
}
},
{
"text": "物流太慢了,说我上周就能到,现在还没发货,不高兴",
"output": {
"complaint_reason": "物流配送延迟",
"product": None,
"sentiment": "dissatisfied",
"priority": 2,
"requested_solution": None
}
},
{
"text": "商品还不错,就是包装有点简陋,整体满意",
"output": {
"complaint_reason": "包装简陋",
"product": None,
"sentiment": "satisfied",
"priority": 1,
"requested_solution": None
}
}
]
# ========== Step 3: 执行提取 ==========
customer_text = """
客服你好,我上个月在你们这里买的 AirPods Pro 2,
left 耳机昨天突然没声音了,重启了好几次都没用。
我之前用的索尼耳机都没这问题,有点失望。
能不能帮我换个新的?
"""
result = lx.extract(
text_or_documents=customer_text,
prompt_description="""
从客服对话中提取投诉信息。
注意:
1. complaint_reason 要具体,不要只写'产品质量问题'
2. priority 要根据问题的严重程度合理判断
3. sentiment 要准确反映客户的情绪状态
""",
examples=examples,
model_id="gemini-2.5-flash", # 性价比高的选择
grounding=True # 启用溯源
)
# ========== Step 4: 解析结果 ==========
print("=== 提取结果 ===")
print(f"投诉原因: {result.data.complaint_reason}")
print(f"涉及产品: {result.data.product}")
print(f"情感倾向: {result.data.sentiment}")
print(f"优先级: {result.data.priority}")
print(f"要求方案: {result.data.requested_solution}")
print("\n=== Grounding 溯源信息 ===")
for field_name, grounding in result.grounding.items():
print(f"[{field_name}]")
print(f" 原文: {grounding.text}")
print(f" 位置: {grounding.start_char}-{grounding.end_char}")
print(f" 置信度: {grounding.confidence:.2f}")
# ========== Step 5: 生成可视化报告 ==========
lx.visualize(
result=result,
output_path="complaint_analysis.html",
title="客服投诉信息提取报告"
)
print("\n可视化报告已生成: complaint_analysis.html")
输出示例
=== 提取结果 ===
投诉原因: AirPods Pro 2 左耳机突然无声音
涉及产品: AirPods Pro 2
情感倾向: dissatisfied
优先级: 3
要求方案: 换货
=== Grounding 溯源信息 ===
[complaint_reason]
原文: left 耳机昨天突然没声音了
位置: 45-60
置信度: 0.97
[product]
原文: AirPods Pro 2
位置: 12-28
置信度: 1.00
...
6.2 医疗场景:从电子病历中提取关键实体
场景描述
医疗机构需要从大量非结构化电子病历(EMR)中提取结构化信息,用于:
- 疾病编码(ICD-10)
- 药物不良反应监测
- 临床决策支持
完整实现
from pydantic import BaseModel, Field
from typing import Optional, List
import langextract as lx
from datetime import date
class Medication(BaseModel):
"""药物信息"""
name: str = Field(description="药物名称")
dosage: str = Field(description="剂量,如 '500mg'、'10ml'")
frequency: str = Field(description="用药频率,如 '每日两次'")
duration: Optional[str] = Field(default=None, description="用药时长")
class LabTest(BaseModel):
"""实验室检查"""
test_name: str = Field(description="检查项目名称")
result: str = Field(description="检查结果/数值")
unit: Optional[str] = Field(default=None, description="单位")
reference_range: Optional[str] = Field(default=None, description="参考范围")
class Diagnosis(BaseModel):
"""诊断信息"""
disease_name: str = Field(description="疾病名称")
icd10_code: Optional[str] = Field(default=None, description="ICD-10 编码")
severity: Optional[str] = Field(default=None, description="严重程度")
class EMRRecord(BaseModel):
"""电子病历结构化 Schema"""
patient_id: Optional[str] = Field(default=None, description="患者ID(如果提及)")
chief_complaint: str = Field(
description="主诉:患者最主要的症状和持续时间"
)
diagnoses: List[Diagnosis] = Field(
default_factory=list,
description="诊断列表"
)
medications: List[Medication] = Field(
default_factory=list,
description="当前用药列表"
)
lab_tests: List[LabTest] = Field(
default_factory=list,
description="实验室检查结果"
)
allergies: List[str] = Field(
default_factory=list,
description="已知药物过敏史"
)
vital_signs: Optional[str] = Field(
default=None,
description="生命体征摘要(血压、心率、体温等)"
)
# 示例电子病历文本
emr_text = """
患者男性,45岁,因"反复上腹痛3个月,加重1周"入院。
【主诉】反复上腹痛3个月,餐后加重,伴有反酸、嗳气。
【现病史】患者3个月前无明显诱因出现上腹痛,多为餐后1-2小时出现,
疼痛为烧灼样,伴反酸、嗳气,无放射痛。1周前症状加重,
夜间也有疼痛,影响睡眠。
【既往史】高血压5年,目前在用"苯磺酸氨氯地平 5mg qd"。
否认糖尿病、冠心病史。青霉素过敏。
【体格检查】BP 145/95 mmHg,HR 82 bpm,T 36.5°C。
上腹部有压痛,无反跳痛。
【实验室检查】
- 血常规:WBC 6.5×10^9/L,Hb 142 g/L
- 胃镜:胃窦黏膜充血、水肿,可见糜烂,快速尿素酶试验阳性
【诊断】
1. 慢性胃炎(ICD-10: K29.5)
2. 幽门螺杆菌感染(ICD-10: A49.86)
3. 高血压1级(ICD-10: I10)
【处理】
1. 标准四联根除治疗:
- 艾司奥美拉唑 20mg bid
- 阿莫西林 1000mg bid
- 克拉霉素 500mg bid
- 枸橼酸铋钾 220mg bid
疗程14天
2. 生活方式干预:戒烟酒,规律饮食
3. 4周后复查胃镜
【随访】建议4周后门诊随访,评估症状改善情况。
"""
# 执行提取
result = lx.extract(
text_or_documents=emr_text,
prompt_description="""
从电子病历文本中提取结构化医疗信息。
注意事项:
1. diagnoses 中的 icd10_code 如果文本中没有明确给出,可以推断但需标注为不确定
2. medications 要完整提取剂量和用药频次
3. lab_tests 的 reference_range 如果文本中有"正常"字样,可以写"正常范围"
""",
examples=[], # 医疗场景建议提供示例
model_id="gemini-2.5-pro", # 使用高精度模型
grounding=True
)
# 输出结构化结果
import json
print(json.dumps(result.data.model_dump(), ensure_ascii=False, indent=2))
6.3 法律场景:合同关键条款提取
class ContractParty(BaseModel):
name: str = Field(description="当事方名称")
role: str = Field(description="角色:甲方/乙方/卖方/买方等")
address: Optional[str] = Field(default=None)
class PaymentTerms(BaseModel):
amount: str = Field(description="金额(含货币单位)")
payment_method: Optional[str] = Field(default=None, description="支付方式")
payment_deadline: Optional[str] = Field(default=None, description="付款期限")
class KeyClause(BaseModel):
clause_type: str = Field(
description="""
条款类型,如:
- 'payment'(付款条款)
- 'delivery'(交付条款)
- 'confidentiality'(保密条款)
- 'termination'(终止条款)
- 'liability'(责任限制条款)
"""
)
content: str = Field(description="条款内容摘要")
risk_level: Optional[str] = Field(
default=None,
description="风险等级:low / medium / high"
)
class ContractInfo(BaseModel):
"""合同关键信息提取 Schema"""
contract_title: str = Field(description="合同标题/名称")
parties: List[ContractParty] = Field(
description="合同当事方列表"
)
effective_date: Optional[str] = Field(
default=None,
description="合同生效日期 (YYYY-MM-DD)"
)
expiration_date: Optional[str] = Field(
default=None,
description="合同到期日期 (YYYY-MM-DD)"
)
payment_terms: Optional[PaymentTerms] = Field(default=None)
key_clauses: List[KeyClause] = Field(
default_factory=list,
description="关键条款列表(识别对甲方/乙方有重大影响的条款)"
)
governing_law: Optional[str] = Field(
default=None,
description="管辖法律(如'中华人民共和国法律')"
)
dispute_resolution: Optional[str] = Field(
default=None,
description="争议解决方式(如'仲裁'、'诉讼')"
)
6.4 批量处理与异步调用
生产环境中往往需要批量处理大量文档,LangExtract 支持异步批量处理:
import asyncio
from typing import List
import langextract as lx
async def batch_extract(
documents: List[str],
schema,
prompt_description: str,
model_id: str = "gemini-2.5-flash",
max_concurrent: int = 5
) -> List[dict]:
"""
批量异步提取
Args:
documents: 文档列表
schema: Pydantic Schema 类
prompt_description: 提取任务描述
model_id: 模型 ID
max_concurrent: 最大并发数
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def extract_one(text: str, idx: int) -> dict:
async with semaphore:
try:
result = await lx.aextract( # 异步版本
text_or_documents=text,
prompt_description=prompt_description,
examples=[],
model_id=model_id,
grounding=True
)
return {
"index": idx,
"success": True,
"data": result.data.model_dump(),
"grounding": result.grounding
}
except Exception as e:
return {
"index": idx,
"success": False,
"error": str(e)
}
tasks = [extract_one(doc, i) for i, doc in enumerate(documents)]
results = await asyncio.gather(*tasks)
return results
# 使用示例
async def main():
# 假设有 100 份客服对话需要批量处理
with open("customer_conversations.txt", "r") as f:
documents = [line.strip() for line in f if line.strip()]
results = await batch_extract(
documents=documents,
schema=CustomerComplaint,
prompt_description="提取客服对话中的投诉信息",
model_id="gemini-2.5-flash",
max_concurrent=10 # 控制并发,避免速率限制
)
# 统计结果
success_count = sum(1 for r in results if r["success"])
print(f"成功处理: {success_count}/{len(documents)}")
# 保存结果
with open("extraction_results.json", "w") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
if __name__ == "__main__":
asyncio.run(main())
6.5 与 Milvus 集成:构建语义搜索系统
LangExtract + Milvus 的组合可以实现强大的智能文档处理系统:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import langextract as lx
import numpy as np
class DocumentProcessor:
"""
将 LangExtract 提取的结构化信息存储到 Milvus,
实现语义搜索 + 精确元数据过滤
"""
def __init__(self, milvus_host="localhost", milvus_port="19530"):
# 连接 Milvus
connections.connect(host=milvus_host, port=milvus_port)
# 定义 Collection Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="content_vector", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="extracted_entities", dtype=DataType.JSON), # LangExtract 提取的实体
FieldSchema(name="source_text", dtype=DataType.VARCHAR, max_length=5000),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50),
FieldSchema(name="timestamp", dtype=DataType.INT64)
]
schema = CollectionSchema(fields, description="智能文档处理 Collection")
# 创建或加载 Collection
self.collection = Collection(name="smart_documents", schema=schema)
self.collection.create_index(
field_name="content_vector",
index_params={"metric_type": "IP", "index_type": "IVF_FLAT", "params": {"nlist": 1024}}
)
def process_and_store(self, document_id: str, text: str, category: str):
"""处理文档并存储到 Milvus"""
# Step 1: 使用 LangExtract 提取结构化信息
extraction_result = lx.extract(
text_or_documents=text,
prompt_description="提取文档中的关键实体和信息",
examples=[],
model_id="gemini-2.5-flash"
)
# Step 2: 将提取的信息转换为向量(使用 Embedding 模型)
entity_text = json.dumps(extraction_result.data.model_dump(), ensure_ascii=False)
vector = self.get_embedding(entity_text) # 需要实现 Embedding 函数
# Step 3: 插入 Milvus
self.collection.insert([
[document_id],
[vector.tolist()],
[extraction_result.data.model_dump()],
[text],
[category],
[int(time.time())]
])
print(f"文档 {document_id} 已处理并存储")
def search(self, query: str, category: Optional[str] = None, top_k: int = 5):
"""语义搜索 + 元数据过滤"""
# 将查询转换为向量
query_vector = self.get_embedding(query)
# 构建搜索参数
search_params = {"metric_type": "IP", "params": {"nprobe": 10}}
# 执行搜索
results = self.collection.search(
data=[query_vector.tolist()],
anns_field="content_vector",
param=search_params,
limit=top_k,
expr=f"category == '{category}'" if category else None # 元数据过滤
)
return results
# 使用示例
processor = DocumentProcessor()
# 处理文档
processor.process_and_store(
document_id="doc_001",
text="患者男性,45岁,因反复上腹痛入院...", # 病历文本
category="medical"
)
# 语义搜索
results = processor.search(
query="胃病患者用药记录",
category="medical",
top_k=5
)
for hit in results[0]:
print(f"文档ID: {hit.entity.get('document_id')}")
print(f"相关度: {hit.distance}")
print(f"提取的实体: {hit.entity.get('extracted_entities')}")
7. 高级技巧与性能优化
7.1 模型选择策略
from enum import Enum
class TaskComplexity(Enum):
LOW = "low" # 简单信息提取(命名实体、日期等)
MEDIUM = "medium" # 中等复杂度(关系提取、情感分析)
HIGH = "high" # 高复杂度(推理、摘要、多跳关系)
def smart_model_selection(text_length: int, complexity: TaskComplexity) -> str:
"""
智能模型选择策略
Args:
text_length: 文本长度(字符数)
complexity: 任务复杂度
"""
# 超短文本 + 低复杂度 → 最快模型
if text_length < 200 and complexity == TaskComplexity.LOW:
return "gemini-2.5-flash"
# 长文本 + 高复杂度 → 最强模型
if text_length > 10000 or complexity == TaskComplexity.HIGH:
return "gemini-2.5-pro"
# 默认:性价比最高的模型
return "gemini-2.5-flash"
# 实际使用
model_id = smart_model_selection(
text_length=len(long_medical_text),
complexity=TaskComplexity.HIGH
)
7.2 Prompt 优化技巧
技巧一:使用角色扮演(Role Playing)
# ❌ 普通 Prompt
prompt_description = "提取合同中的关键信息"
# ✅ 角色扮演 Prompt
prompt_description = """
你是一位拥有 20 年经验的企业法务顾问,擅长快速识别合同中的关键条款和潜在风险。
请从以下合同中提取关键信息,特别关注对甲方有重大影响的条款。
"""
技巧二:链式思考(Chain of Thought)
prompt_description = """
请按以下步骤分析文本:
Step 1: 通读全文,理解整体内容和上下文
Step 2: 识别所有命名实体(人名、地名、组织名、产品名)
Step 3: 识别实体之间的关系(谁做了什么)
Step 4: 根据上述分析,提取结构化信息
让我们一步步来,确保不遗漏任何关键信息。
"""
技巧三:输出格式强化
prompt_description = """
提取信息并以严格 JSON 格式输出。
重要要求:
1. 所有字符串必须用双引号(不能是单引号)
2. 不能有 trailing comma
3. null 必须小写
4. 如果某个字段无法确定,显式返回 null,不要返回空字符串
5. 输出必须是合法 JSON,可以被 json.loads() 解析
错误示例(不要这样):
{'name': 'John',} # 单引号 + trailing comma
正确示例:
{"name": "John"}
"""
7.3 批量处理与速率限制
import time
from dataclasses import dataclass
from typing import Iterable
@dataclass
class RateLimiter:
"""令牌桶速率限制器"""
max_calls: int # 时间窗口内最大调用次数
period: float = 60.0 # 时间窗口(秒)
def __post_init__(self):
self.calls: list[float] = []
def acquire(self):
"""获取一个调用令牌,如果超限则阻塞等待"""
now = time.time()
# 移除时间窗口之外的记录
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# 计算需要等待的时间
wait_time = self.period - (now - self.calls[0])
if wait_time > 0:
time.sleep(wait_time)
self.calls.append(time.time())
# 使用速率限制器
rate_limiter = RateLimiter(max_calls=60, period=60) # 60次/分钟
def extract_with_rate_limit(text: str, **kwargs):
rate_limiter.acquire() # 自动限速
return lx.extract(text_or_documents=text, **kwargs)
7.4 成本优化实战
import tiktoken
def estimate_cost(text: str, model_id: str) -> float:
"""
估算提取成本(美元)
Args:
text: 输入文本
model_id: 模型 ID
"""
# 价格表(示例,实际价格请参考 Google AI 官方文档)
pricing = {
"gemini-2.5-flash": {"input": 0.075 / 1e6, "output": 0.30 / 1e6}, # $/token
"gemini-2.5-pro": {"input": 1.25 / 1e6, "output": 5.0 / 1e6}
}
# 估算 Token 数(简化估算:1 token ≈ 4 characters)
input_tokens = len(text) / 4
# 估算输出 Token 数(根据 Schema 复杂度)
estimated_output_tokens = 500 # 可以根据 Schema 动态调整
model_pricing = pricing[model_id]
cost = (input_tokens * model_pricing["input"] +
estimated_output_tokens * model_pricing["output"])
return cost
# 成本对比
text = "这是一个很长的合同文档..." * 100 # 假设 3000 字
flash_cost = estimate_cost(text, "gemini-2.5-flash")
pro_cost = estimate_cost(text, "gemini-2.5-pro")
print(f"Gemini Flash 估算成本: ${flash_cost:.4f}")
print(f"Gemini Pro 估算成本: ${pro_cost:.4f}")
print(f"成本差异: {pro_cost / flash_cost:.1f}x")
7.5 错误处理与重试机制
from functools import wraps
import random
def retry_with_exponential_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""
指数退避重试装饰器
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except RateLimitError:
if attempt == max_retries - 1:
raise
sleep_time = delay
if jitter:
sleep_time = sleep_time * (1 + random.random())
sleep_time = min(sleep_time, max_delay)
time.sleep(sleep_time)
delay *= exponential_base
except Exception as e:
# 对于非速率限制错误,直接抛出
if "rate" not in str(e).lower():
raise
if attempt == max_retries - 1:
raise
time.sleep(delay)
delay *= exponential_base
return wrapper
return decorator
# 使用装饰器
@retry_with_exponential_backoff(max_retries=5, initial_delay=2.0)
def robust_extract(text: str, **kwargs):
return lx.extract(text_or_documents=text, **kwargs)
8. 生产级部署方案
8.1 容器化部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 安装 LangExtract
COPY langextract /app/langextract
RUN pip install -e /app/langextract
# 配置环境变量
ENV GOOGLE_API_KEY=""
ENV MAX_CONCURRENT_REQUESTS="10"
ENV LOG_LEVEL="INFO"
# 复制应用代码
COPY app.py .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# app.py - FastAPI 封装
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Any, Optional
import langextract as lx
app = FastAPI(title="LangExtract API")
class ExtractRequest(BaseModel):
text: str
schema_definition: dict # JSON Schema 格式
model_id: str = "gemini-2.5-flash"
grounding: bool = True
class ExtractResponse(BaseModel):
success: bool
data: Optional[dict] = None
grounding: Optional[dict] = None
error: Optional[str] = None
@app.post("/extract", response_model=ExtractResponse)
async def extract_endpoint(request: ExtractRequest):
try:
# 将 schema_definition 转换为 Pydantic Model
# (实际实现需要动态创建 Pydantic Model)
result = lx.extract(
text_or_documents=request.text,
prompt_description="提取结构化信息",
examples=[],
model_id=request.model_id,
grounding=request.grounding
)
return ExtractResponse(
success=True,
data=result.data.model_dump(),
grounding=result.grounding
)
except Exception as e:
return ExtractResponse(
success=False,
error=str(e)
)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
8.2 与现有系统集成
与 Apache Airflow 集成
# Airflow DAG: 批量处理文档
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
import langextract as lx
from datetime import datetime
def process_document(file_path: str, **context):
"""处理单个文档的 Airflow Task"""
# 从 GCS 读取文档
from google.cloud import storage
client = storage.Client()
bucket = client.bucket("my-document-bucket")
blob = bucket.blob(file_path)
text = blob.download_as_text()
# 使用 LangExtract 提取
result = lx.extract(
text_or_documents=text,
prompt_description="提取文档关键信息",
examples=[],
model_id="gemini-2.5-flash"
)
# 将结果写回 GCS
result_blob = bucket.blob(f"extracted/{file_path}.json")
result_blob.upload_from_string(
json.dumps(result.data.model_dump(), ensure_ascii=False)
)
with DAG(
dag_id="batch_document_extraction",
start_date=datetime(2026, 1, 1),
schedule_interval="0 2 * * *", # 每天凌晨2点
catchup=False
) as dag:
# Task 1: 列出所有待处理文档
list_files = GCSListObjectsOperator(
task_id="list_files",
bucket="my-document-bucket",
prefix="raw/"
)
# Task 2: 并行处理文档
process_files = PythonOperator(
task_id="process_documents",
python_callable=process_document,
op_kwargs={"file_path": "{{ ti.xcom_pull(task_ids='list_files')[0] }}"},
execution_timeout=300 # 5分钟超时
)
list_files >> process_files
8.3 监控与可观测性
import logging
from prometheus_client import Counter, Histogram, start_http_server
import time
# Prometheus 指标
EXTRACTION_REQUESTS = Counter(
"lagextract_requests_total",
"Total extraction requests",
["model_id", "status"]
)
EXTRACTION_LATENCY = Histogram(
"lagextract_latency_seconds",
"Extraction latency in seconds",
["model_id"]
)
EXTRACTION_COST = Counter(
"lagextract_cost_total",
"Total cost in USD",
["model_id"]
)
def monitored_extract(text: str, model_id: str, **kwargs):
"""带监控的提取函数"""
start_time = time.time()
try:
result = lx.extract(
text_or_documents=text,
model_id=model_id,
**kwargs
)
# 记录成功指标
EXTRACTION_REQUESTS.labels(model_id=model_id, status="success").inc()
# 记录延迟
latency = time.time() - start_time
EXTRACTION_LATENCY.labels(model_id=model_id).observe(latency)
# 记录成本(需要实际实现成本计算)
estimated_cost = estimate_cost(text, model_id)
EXTRACTION_COST.labels(model_id=model_id).inc(estimated_cost)
return result
except Exception as e:
# 记录失败指标
EXTRACTION_REQUESTS.labels(model_id=model_id, status="error").inc()
raise
# 启动 Prometheus metrics 服务器
start_http_server(8000) # metrics 暴露在 :8000/metrics
9. LangExtract vs 其他方案对比
| 维度 | LangExtract | LangChain | LlamaIndex | 直接使用 LLM API |
|---|---|---|---|---|
| 学习曲线 | 低(声明式) | 中 | 中 | 高 |
| Source Grounding | ✅ 原生支持 | ❌ 不支持 | ❌ 不支持 | ❌ 需自己实现 |
| 可视化 | ✅ 内置 HTML 报告 | ❌ 不支持 | ❌ 不支持 | ❌ 需自己实现 |
| Schema 驱动 | ✅ Pydantic 原生 | ✅ 支持 | ✅ 支持 | ❌ 需自己实现 |
| 批量处理 | ✅ 原生异步支持 | ✅ 支持 | ✅ 支持 | ❌ 需自己实现 |
| 成本优化 | ✅ 智能模型路由 | ⚠️ 需自己实现 | ⚠️ 需自己实现 | ❌ 需自己实现 |
| 开源 | ✅ Google 官方开源 | ✅ | ✅ | N/A |
| 适合场景 | 信息提取专项 | 通用 LLM 应用 | RAG 应用 | 高度定制需求 |
10. 总结与展望
10.1 核心收获
通过本文的深度实战,我们系统地掌握了 Google LangExtract 的:
- 设计哲学:声明式提取、可验证性优先、工程化思维
- 核心概念:Schema 驱动、Source Grounding、交互式可视化、模型路由
- 架构原理:从 Prompt 工程到输出解析的完整流水线
- 实战代码:客服、医疗、法律三大场景的完整实现
- 性能优化:模型选择、Prompt 优化、批量处理、成本优化
- 生产部署:容器化、与 Airflow 集成、监控可观测性
10.2 LangExtract 的适用边界
✅ 适合的场景:
- 从大量非结构化文本中提取结构化信息
- 对提取结果的可审计性有要求(金融、医疗、法律)
- 需要人机协作审核的提取任务
- 批量文档处理
❌ 不适合的场景:
- 实时性要求极高的场景(LLM 调用有延迟)
- 成本极度敏感的大规模应用(需仔细评估)
- 需要复杂推理链的任务(考虑使用 Agent 框架)
10.3 未来展望
随着 LLM 能力的不断提升,信息提取领域正在经历一场革命:
- 多模态提取:未来的 LangExtract 可能支持从图片、视频中提取结构化信息
- 实时流式提取:支持对实时数据流(如直播字幕)的在线提取
- 领域自适应:自动根据领域调整 Prompt 和模型选择策略
- 与 Agent 框架深度融合:作为 Agent 的「感知层」,从环境中提取结构化信息
10.4 实践建议
如果你打算在生产环境中使用 LangExtract,建议按照以下步骤推进:
Phase 1: 原型验证(1-2周)
→ 选择1-2个典型场景
→ 手工准备 Few-shot 示例
→ 验证提取质量
Phase 2: 小规模试点(2-4周)
→ 处理 100-1000 份真实文档
→ 评估成本和时间
→ 优化 Prompt 和模型选择
Phase 3: 生产部署(4-8周)
→ 容器化部署
→ 接入监控和可观测性
→ 建立人工审核流程
→ 逐步扩大规模
11. 参考资料
- LangExtract GitHub: https://github.com/google/langextract
- Google Gemini API 文档: https://ai.google.dev/docs
- Pydantic 官方文档: https://docs.pydantic.dev/
- Milvus 向量数据库: https://milvus.io/
- LangExtract + Milvus 教程: https://github.com/milvus-io/bootcamp
本文完成于 2026 年 5 月,基于 LangExtract 最新版本。如有问题或建议,欢迎通过 程序员茄子 联系作者。
版权声明:本文由 程序员茄子 原创,转载请注明出处。