MarkItDown 深度实战:微软 13 万 Star 的「万物转 Markdown」神器——从架构原理到生产级 RAG 数据管线完全指南(2026)
为什么 MarkItDown 能引爆开发者圈?
如果你做过 RAG(检索增强生成)系统,你一定被文档预处理折磨过。PDF 表格错位、Word 嵌套样式丢失、PPT 图文分离、Excel 合并单元格变成乱码——这些不是小问题,而是吃掉整个项目 60% 以上精力的黑洞。
MarkItDown 是微软 AutoGen 团队开源的轻量级 Python 工具(MIT 协议),它的使命极其明确:把 PDF、Word、PPT、Excel、图片、音频、HTML、ZIP 等 20+ 种格式,一键转成结构完整的 Markdown。
这不是又一个「文档转换器」。MarkItDown 的核心洞察是:大语言模型对 Markdown 的理解远超任何其他格式。Markdown 极度接近纯文本,保留关键文档结构(标题、列表、表格、链接),同时 Token 消耗极低。当你把一份 50 页的 PDF 喂给 GPT-4o 时,转换后的 Markdown 比原始 PDF 文本提取节省 40-60% 的 Token——这在生产环境中意味着真金白银的 API 成本节约。
截至 2026 年 6 月,MarkItDown 在 GitHub 上已突破 13 万 Star,PyPI 周下载量超 150 万,日增 Star 超 2000。这不是营销泡沫,而是开发者用脚投票的结果。
核心架构:模块化解析器 + 插件扩展
MarkItDown 的架构设计非常优雅,核心思想是每个文件格式对应一个独立的 Converter,通过注册机制动态加载:
MarkItDown (入口)
├── convert() → 统一入口,自动路由到对应 Converter
├── convert_local() → 仅本地文件
├── convert_stream() → 字节流输入
├── convert_response()→ HTTP Response 输入
│
├── ConverterRegistry → 格式 → Converter 映射
│ ├── PdfConverter
│ ├── DocxConverter
│ ├── PptxConverter
│ ├── XlsxConverter
│ ├── HtmlConverter
│ ├── ImageConverter (EXIF + LLM 描述)
│ ├── AudioConverter (EXIF + 语音转写)
│ ├── CsvConverter
│ ├── JsonConverter
│ ├── XmlConverter
│ ├── ZipConverter (递归遍历内容)
│ ├── YoutubeConverter
│ ├── EpubConverter
│ └── ... 更多格式
│
└── PluginSystem → 第三方插件扩展
├── markitdown-ocr (LLM Vision OCR)
├── Azure DocIntel (云布局分析)
└── Azure ContentUnderstanding (多模态)
Converter 接口设计
每个 Converter 都遵循统一的接口协议:
from markitdown import MarkItDown, ConverterResult
# 核心接口:所有 Converter 必须实现
class BaseConverter:
def convert(self, input_source) -> ConverterResult:
"""
输入:文件路径 / 字节流 / HTTP Response
输出:ConverterResult(text_content=str, title=Optional[str])
"""
raise NotImplementedError
# ConverterResult 数据结构
@dataclass
class ConverterResult:
text_content: str # Markdown 文本
title: Optional[str] # 文档标题(如果可提取)
这种设计的关键优势:
- 格式隔离:每个 Converter 独立维护,修改 PDF 转换逻辑不影响 Word 转换
- 渐进安装:不需要一次性安装所有依赖,
pip install markitdown[pdf]只装 PDF 相关 - 插件扩展:第三方可以通过
#markitdown-plugin标签发布自己的 Converter
格式路由机制
convert() 方法内部的格式路由逻辑:
def convert(self, source, **kwargs):
"""统一入口,自动识别格式并路由"""
# 1. 判断输入类型
if isinstance(source, str):
if source.startswith(('http://', 'https://')):
if 'youtube.com' in source or 'youtu.be' in source:
return self._converters['youtube'].convert(source)
return self._convert_from_url(source)
elif source.endswith('.pdf'):
return self._converters['pdf'].convert(source)
elif source.endswith('.docx'):
return self._converters['docx'].convert(source)
# ... 更多格式匹配
elif isinstance(source, (bytes, BytesIO)):
# 流式输入,尝试格式嗅探
return self._convert_from_stream(source)
elif isinstance(source, requests.Response):
return self._convert_from_response(source)
值得注意的是,MarkItDown 提供了四个粒度不同的转换入口,这是安全设计的重要体现:
| 方法 | 输入范围 | 安全等级 | 适用场景 |
|---|---|---|---|
convert() | 本地文件 + 远程 URL + 字节流 | ⚠️ 最宽松 | 开发调试、可信环境 |
convert_local() | 仅本地文件路径 | ✅ 中等 | 服务端批处理 |
convert_response() | 仅 requests.Response | ✅ 较严格 | Web 服务中转 |
convert_stream() | 仅字节流 | ✅ 最严格 | 沙箱环境、不可信输入 |
安全提示:在服务端应用中,务必使用最窄的 API。不要将用户输入直接传给 convert(),因为 convert() 可能尝试读取本地文件或发起网络请求。
从零开始:安装与环境配置
基础安装
# 创建虚拟环境(Python 3.10+)
python -m venv .venv
source .venv/bin/activate # macOS/Linux
# .venv\Scripts\activate # Windows
# 安装全部依赖(推荐,省心)
pip install 'markitdown[all]'
# 或者只安装需要的格式
pip install 'markitdown[pdf,docx,pptx,xlsx]'
用 uv 安装(更快)
uv venv --python=3.12 .venv
source .venv/bin/activate
uv pip install 'markitdown[all]'
各格式依赖拆解
| 安装选项 | 支持格式 | 核心依赖 |
|---|---|---|
markitdown[pdf] | pymupdf | |
markitdown[docx] | Word (.docx) | python-docx |
markitdown[pptx] | PowerPoint | python-pptx |
markitdown[xlsx] | Excel (.xlsx) | openpyxl |
markitdown[xls] | 旧版 Excel (.xls) | xlrd |
markitdown[outlook] | Outlook 邮件 | extract-msg |
markitdown[az-doc-intel] | Azure 文档智能 | azure-ai-documentintelligence |
markitdown[az-content-understanding] | Azure 内容理解 | azure-identity |
markitdown[audio-transcription] | 音频转写 | SpeechRecognition |
markitdown[youtube-transcription] | YouTube 字幕 | youtube-transcript-api |
markitdown[all] | 以上全部 | 全部 |
可选系统工具
# Pandoc — 某些格式的底层转换引擎
# macOS
brew install pandoc
# Ubuntu/Debian
sudo apt install pandoc
# Tesseract OCR — 处理扫描版 PDF 的补充方案
# macOS
brew install tesseract
# Ubuntu/Debian
sudo apt install tesseract-ocr
命令行实战:5 分钟上手
基础用法
# 最简单的用法:转换并输出到 stdout
markitdown report.pdf
# 保存到文件
markitdown report.pdf -o report.md
# 管道输入
cat report.pdf | markitdown
# 批量转换(配合 find)
find ./docs -name "*.pdf" -exec sh -c 'markitdown "$1" -o "${1%.pdf}.md"' _ {} \;
启用插件
# 查看已安装插件
markitdown --list-plugins
# 使用插件转换
markitdown --use-plugins scanned-doc.pdf
# 使用 Azure 文档智能(云端高质量转换)
markitdown report.pdf -d -e "https://your-doc-intel.cognitiveservices.azure.com"
# 使用 Azure Content Understanding
markitdown report.pdf --use-cu --cu-endpoint "https://your-cu.cognitiveservices.azure.com"
Python API 深度实战
基础转换
from markitdown import MarkItDown
md = MarkItDown()
# 转换单个文件
result = md.convert("报告.docx")
print(result.text_content)
# 转换 PDF
result = md.convert("presentation.pdf")
print(result.text_content)
# 保存结果
with open("output.md", "w", encoding="utf-8") as f:
f.write(result.text_content)
批量转换:生产级文件处理管线
import os
import json
import time
from pathlib import Path
from markitdown import MarkItDown
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class ConversionResult:
source_file: str
output_file: str
success: bool
char_count: int
duration_ms: int
error: Optional[str] = None
class DocumentPipeline:
"""生产级文档转换管线"""
SUPPORTED_EXTENSIONS = {
'.pdf', '.docx', '.pptx', '.xlsx', '.xls',
'.html', '.htm', '.csv', '.json', '.xml',
'.jpg', '.jpeg', '.png', '.gif', '.webp',
'.mp3', '.wav', '.mp4', '.epub', '.zip'
}
def __init__(self, max_workers: int = 4, output_dir: str = "./output"):
self.md = MarkItDown()
self.max_workers = max_workers
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.results: list[ConversionResult] = []
def _convert_one(self, file_path: Path) -> ConversionResult:
"""转换单个文件"""
output_file = self.output_dir / f"{file_path.stem}.md"
start = time.monotonic()
try:
result = self.md.convert(str(file_path))
duration_ms = int((time.monotonic() - start) * 1000)
with open(output_file, "w", encoding="utf-8") as f:
f.write(result.text_content)
return ConversionResult(
source_file=str(file_path),
output_file=str(output_file),
success=True,
char_count=len(result.text_content),
duration_ms=duration_ms
)
except Exception as e:
duration_ms = int((time.monotonic() - start) * 1000)
return ConversionResult(
source_file=str(file_path),
output_file=str(output_file),
success=False,
char_count=0,
duration_ms=duration_ms,
error=str(e)
)
def run(self, input_dir: str, pattern: str = "**/*") -> list[ConversionResult]:
"""批量转换目录下所有支持的文件"""
input_path = Path(input_dir)
# 收集所有支持的文件
files = [
f for f in input_path.glob(pattern)
if f.is_file() and f.suffix.lower() in self.SUPPORTED_EXTENSIONS
]
print(f"找到 {len(files)} 个待转换文件")
# 并行转换
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._convert_one, f): f for f in files}
for future in as_completed(futures):
result = future.result()
self.results.append(result)
status = "✅" if result.success else "❌"
print(f"{status} {Path(result.source_file).name} "
f"→ {result.char_count} chars, {result.duration_ms}ms")
# 输出统计
self._print_stats()
return self.results
def _print_stats(self):
total = len(self.results)
success = sum(1 for r in self.results if r.success)
failed = total - success
total_chars = sum(r.char_count for r in self.results if r.success)
avg_time = sum(r.duration_ms for r in self.results) / max(total, 1)
print(f"\n{'='*50}")
print(f"转换完成: {success}/{total} 成功, {failed} 失败")
print(f"总字符数: {total_chars:,}")
print(f"平均耗时: {avg_time:.0f}ms")
if failed > 0:
print(f"\n失败文件:")
for r in self.results:
if not r.success:
print(f" ❌ {Path(r.source_file).name}: {r.error}")
# 使用示例
pipeline = DocumentPipeline(max_workers=4, output_dir="./markdown_output")
results = pipeline.run("./documents")
LLM 集成:图片描述与 OCR
MarkItDown 最巧妙的设计之一是直接集成 LLM Vision 能力来处理图片:
from markitdown import MarkItDown
from openai import OpenAI
# 方式一:基础 LLM 图片描述
client = OpenAI()
md = MarkItDown(
llm_client=client,
llm_model="gpt-4o",
llm_prompt="请用中文详细描述这张图片中的内容,包括文字、图表和数据"
)
result = md.convert("dashboard-screenshot.png")
print(result.text_content)
# 输出:
#
# 这是一份销售仪表板截图,展示了以下关键指标:
# - 总销售额:¥2,340,000(同比+15.3%)
# - 活跃用户:128,500
# - 转化率:3.7%
# ...
markitdown-ocr 插件:扫描文档的救星
对于包含图片的 PDF/DOCX/PPTX,markitdown-ocr 插件使用 LLM Vision 提取嵌入图片中的文字:
from markitdown import MarkItDown
from openai import OpenAI
# 安装:pip install markitdown-ocr openai
md = MarkItDown(
enable_plugins=True,
llm_client=OpenAI(),
llm_model="gpt-4o",
)
# 处理含扫描图片的 PDF
result = md.convert("scanned-invoice.pdf")
print(result.text_content)
# 扫描图片中的文字会被提取并插入到 Markdown 对应位置
# 不提供 llm_client 时,插件加载但 OCR 静默跳过
# 回退到内置转换器处理
各格式转换深度解析
PDF 转换:最复杂的战场
PDF 是文档转换中最难啃的骨头。MarkItDown 的 PDF 转换策略:
from markitdown import MarkItDown
md = MarkItDown()
# 基础 PDF 转换(使用 pymupdf)
result = md.convert("report.pdf")
# 对于复杂 PDF,推荐使用 Azure Document Intelligence
md_cloud = MarkItDown(
docintel_endpoint="https://your-resource.cognitiveservices.azure.com"
)
result = md_cloud.convert("complex-tables.pdf")
PDF 转换效果矩阵:
| PDF 类型 | 内置转换 | Azure DocIntel | Azure CU |
|---|---|---|---|
| 文本型 PDF | ✅ 优秀 | ✅ 优秀 | ✅ 优秀 |
| 简单表格 | ⚠️ 可用 | ✅ 优秀 | ✅ 优秀 |
| 复杂嵌套表格 | ❌ 错乱 | ✅ 良好 | ✅ 优秀 |
| 双栏论文 | ⚠️ 部分正确 | ✅ 优秀 | ✅ 优秀 |
| 扫描版 PDF | ❌ 无文字 | ✅ 优秀 | ✅ 优秀 |
| 图片型 PDF | ❌ 无文字 | ✅ 优秀 | ✅ 优秀 |
内置转换的局限与对策:
# 内置 PDF 转换的典型问题和解决方案
# 问题1:表格错位
# 原始 PDF 表格:
# | 姓名 | 部门 | 工号 |
# | 张三 | 技术部 | 001 |
#
# 转换后可能变成:
# 姓名 部门 工号 张三 技术部 001
#
# 解决方案:使用 Azure DocIntel 或手动后处理
# 问题2:双栏布局串行
# 左栏和右栏的文字可能被交错提取
#
# 解决方案:使用 Azure DocIntel 的布局分析
# 简单的后处理修正(处理常见表格格式问题)
import re
def fix_markdown_tables(text: str) -> str:
"""修正 Markdown 表格格式"""
lines = text.split('\n')
in_table = False
result = []
for line in lines:
stripped = line.strip()
# 检测可能的表格行(包含 | 或连续 tab 分隔)
if '|' in stripped or '\t' in stripped:
if not in_table:
in_table = True
# 统一分隔符为 |
if '\t' in stripped:
stripped = '| ' + ' | '.join(stripped.split('\t')) + ' |'
result.append(stripped)
else:
if in_table:
in_table = False
result.append(line)
return '\n'.join(result)
# 使用
result = md.convert("messy-tables.pdf")
fixed = fix_markdown_tables(result.text_content)
Word (.docx) 转换:最稳定的选择
Word 文档的结构化程度最高,转换效果通常最好:
# Word 转换保留的结构:
# - 标题层级 (H1-H6)
# - 有序/无序列表
# - 表格(含合并单元格)
# - 链接
# - 图片(转为图片引用)
# - 文本样式(粗体、斜体)
result = md.convert("proposal.docx")
print(result.text_content)
# # 项目提案
#
# ## 背景
#
# 本项目旨在...
#
# ## 技术方案
#
# | 模块 | 技术栈 | 负责人 |
# |------|--------|--------|
# | 前端 | React | 张三 |
# | 后端 | Go | 李四 |
PowerPoint (.pptx) 转换:幻灯片到结构化文档
# PPT 转换会将每页幻灯片转为标题+内容
result = md.convert("presentation.pptx")
print(result.text_content)
# # 演示文稿标题
#
# <!-- Slide 1 -->
#
# ## 封面标题
#
# 副标题内容
#
# 
#
# <!-- Slide 2 -->
#
# ## 核心数据
#
# - 用户增长 300%
# - 营收突破 1 亿
# - NPS 评分 85+
Excel (.xlsx) 转换:表格到 Markdown 表格
# 每个 Sheet 转为独立的 Markdown 表格
result = md.convert("sales-data.xlsx")
print(result.text_content)
# # Sheet1
#
# | 月份 | 销售额 | 同比增长 |
# |------|--------|---------|
# | 1月 | 1,200,000 | 15.3% |
# | 2月 | 980,000 | 12.1% |
# | 3月 | 1,500,000 | 22.7% |
#
# # Sheet2
#
# | 区域 | 负责人 | 目标 | 实际 |
# |------|--------|------|------|
# | 华东 | 王五 | 500万 | 480万 |
音频转换:语音到文字
# 安装:pip install 'markitdown[audio-transcription]'
result = md.convert("meeting-recording.mp3")
print(result.text_content)
# 输出包含:
# 1. EXIF 元数据(时长、比特率、采样率)
# 2. 语音转写文本
# 元数据示例:
# **Duration**: 00:45:30
# **Bit Rate**: 128 kbps
# **Sample Rate**: 44100 Hz
#
# **Transcription**:
# 今天我们讨论了三个议题...
ZIP 文件:递归遍历
# ZIP 文件会被自动解压并逐个转换内部文件
result = md.convert("project-archive.zip")
print(result.text_content)
# # project-archive.zip
#
# ## readme.md
#
# # 项目名称
# ...
#
# ## data/report.xlsx
#
# | 月份 | 销售额 |
# ...
Azure Content Understanding:生产级多模态转换
Azure Content Understanding(CU)是 MarkItDown 最强大的云转换后端,提供三个核心能力层级:
层级一:零配置自动路由
from markitdown import MarkItDown
# 只需指定 endpoint,自动按文件类型选择分析器
md = MarkItDown(cu_endpoint="https://your-cu.cognitiveservices.azure.com")
# 文档 → prebuilt-documentSearch 分析器
result = md.convert("report.pdf")
# 视频 → prebuilt-videoSearch 分析器(内置转换器不支持视频!)
result = md.convert("meeting.mp4")
# 音频 → prebuilt-audioSearch 分析器
result = md.convert("call.wav")
print(result.markdown)
层级二:自定义分析器 + 结构化字段提取
这是 CU 最强大的能力——提取领域特定的结构化字段:
from markitdown import MarkItDown
# 使用自定义分析器提取发票字段
md = MarkItDown(
cu_endpoint="https://your-cu.cognitiveservices.azure.com",
cu_analyzer_id="my-invoice-analyzer",
)
result = md.convert("invoice.pdf")
print(result.markdown)
# 输出包含 YAML front matter:
# ---
# contentType: document
# fields:
# VendorName: CONTOSO LTD.
# InvoiceDate: '2019-11-15'
# TotalAmount: '125,600.00'
# Currency: USD
# DueDate: '2019-12-15'
# LineItems:
# - Description: Cloud Services
# Quantity: 12
# UnitPrice: '5,000.00'
# - Description: Support Package
# Quantity: 1
# UnitPrice: '65,600.00'
# ---
#
# <!-- page 1 -->
# INVOICE #INV-2024-001
# ...
如何创建自定义分析器:
- 在 Azure Content Understanding Studio 中创建分析器
- 定义字段模式(如发票号、日期、金额、行项目)
- 上传样本文档进行训练
- 获取分析器 ID 用于 MarkItDown
层级三:精确控制路由
from markitdown.converters import ContentUnderstandingFileType
# 只对 PDF 使用 CU,其他格式用内置转换器
md = MarkItDown(
cu_endpoint="https://your-cu.cognitiveservices.azure.com",
cu_file_types=[ContentUnderstandingFileType.PDF],
)
# PDF 走 CU(付费但高质量)
result1 = md.convert("complex-report.pdf")
# DOCX 走内置转换器(免费)
result2 = md.convert("simple-doc.docx")
三种转换后端的完整对比
| 能力 | 内置转换器 | Azure DocIntel | Azure CU |
|---|---|---|---|
| 运行位置 | 本地 | 云端 | 云端 |
| 成本 | 免费 | 按 API 调用计费 | 按 API 调用计费 |
| 文档转换 | 格式特定提取 | 云布局分析 | 云多模态提取 |
| 结构化字段 | ❌ | ❌(本集成不暴露) | ✅ YAML front matter |
| 自定义分析器 | ❌ | ❌(本集成不可配) | ✅ 支持自定义 |
| 音频 | 基础转写 | ❌ | ✅ 音频分析器 |
| 视频 | ❌ | ❌ | ✅ 视频分析器 |
| 复杂表格 | ⚠️ | ✅ | ✅✅ |
| 扫描 PDF | ❌ | ✅ | ✅ |
| 离线使用 | ✅ | ❌ | ❌ |
| 部署复杂度 | pip install | Azure 资源 + 认证 | Azure 资源 + 认证 |
Docker 部署:容器化转换服务
MarkItDown 提供了 Dockerfile,可以快速构建容器化转换服务:
# 构建镜像
docker build -t markitdown:latest .
# 单文件转换
docker run --rm -i markitdown:latest < ~/report.pdf > output.md
# 批量转换
docker run --rm -v $(pwd)/docs:/input -v $(pwd)/output:/output \
markitdown:latest \
sh -c 'for f in /input/*; do markitdown "$f" > "/output/$(basename "${f%.*}").md"; done'
构建 REST API 微服务
将 MarkItDown 包装为 HTTP 服务,供 RAG 系统调用:
# server.py — MarkItDown REST API 服务
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import PlainTextResponse
from markitdown import MarkItDown
import tempfile
import os
app = FastAPI(title="MarkItDown API")
md = MarkItDown()
@app.post("/convert", response_class=PlainTextResponse)
async def convert_file(file: UploadFile = File(...)):
"""上传文件,返回 Markdown"""
# 验证文件大小(限制 50MB)
content = await file.read()
if len(content) > 50 * 1024 * 1024:
raise HTTPException(413, "文件超过 50MB 限制")
# 写入临时文件
suffix = os.path.splitext(file.filename)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
result = md.convert_local(tmp_path)
return result.text_content
except Exception as e:
raise HTTPException(500, f"转换失败: {str(e)}")
finally:
os.unlink(tmp_path)
@app.post("/convert/batch")
async def convert_batch(files: list[UploadFile] = File(...)):
"""批量转换"""
results = []
for file in files:
content = await file.read()
suffix = os.path.splitext(file.filename)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
result = md.convert_local(tmp_path)
results.append({
"filename": file.filename,
"markdown": result.text_content,
"char_count": len(result.text_content)
})
except Exception as e:
results.append({
"filename": file.filename,
"error": str(e)
})
finally:
os.unlink(tmp_path)
return {"results": results}
@app.get("/health")
async def health():
return {"status": "ok"}
# 启动:uvicorn server:app --host 0.0.0.0 --port 8000
Docker Compose 部署:
# docker-compose.yml
version: '3.8'
services:
markitdown-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./uploads:/tmp/uploads
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
生产级 RAG 数据管线实战
这是 MarkItDown 最核心的应用场景。以下是一个完整的 RAG 文档预处理管线:
"""
RAG 文档预处理管线
━━━━━━━━━━━━━━━━━
功能:将多种格式文档转换为 Markdown → 清洗 → 分块 → 生成 Embedding → 存入向量数据库
"""
import os
import re
import json
import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
from markitdown import MarkItDown
@dataclass
class Document:
"""标准化的文档对象"""
doc_id: str
source_file: str
title: str
content: str
metadata: dict = field(default_factory=dict)
chunks: list['Chunk'] = field(default_factory=list)
@dataclass
class Chunk:
"""文档分块"""
chunk_id: str
doc_id: str
content: str
index: int # 在原文中的顺序
char_count: int
metadata: dict = field(default_factory=dict)
class RAGPreprocessor:
"""RAG 文档预处理器"""
def __init__(
self,
chunk_size: int = 500,
chunk_overlap: int = 50,
min_chunk_size: int = 100,
):
self.md = MarkItDown()
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
def process_file(self, file_path: str) -> Optional[Document]:
"""处理单个文件:转换 → 清洗 → 分块"""
# Step 1: 转换为 Markdown
try:
result = self.md.convert(file_path)
raw_markdown = result.text_content
except Exception as e:
print(f"转换失败 {file_path}: {e}")
return None
# Step 2: 清洗 Markdown
cleaned = self._clean_markdown(raw_markdown)
# Step 3: 提取元数据
title = self._extract_title(cleaned, Path(file_path).stem)
doc_id = hashlib.md5(file_path.encode()).hexdigest()[:12]
# Step 4: 分块
chunks = self._split_into_chunks(cleaned, doc_id)
return Document(
doc_id=doc_id,
source_file=file_path,
title=title,
content=cleaned,
chunks=chunks,
metadata={
"source_format": Path(file_path).suffix,
"total_chars": len(cleaned),
"total_chunks": len(chunks),
}
)
def _clean_markdown(self, text: str) -> str:
"""清洗 Markdown 文本"""
# 1. 移除空行过多
text = re.sub(r'\n{4,}', '\n\n\n', text)
# 2. 修正表格格式(tab → |)
lines = text.split('\n')
result = []
for line in lines:
if '\t' in line and line.strip():
cells = line.split('\t')
result.append('| ' + ' | '.join(c.strip() for c in cells) + ' |')
else:
result.append(line)
text = '\n'.join(result)
# 3. 移除控制字符(保留换行和 tab)
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
# 4. 统一 Unicode 空白
text = re.sub(r'[\u00a0\u2000-\u200b\u202f\u205f\u3000]', ' ', text)
# 5. 移除空的 Markdown 链接
text = re.sub(r'\[([^\]]*)\]\(\s*\)', r'\1', text)
# 6. 修正标题格式(确保 # 后有空格)
text = re.sub(r'^(#{1,6})([^ #\n])', r'\1 \2', text, flags=re.MULTILINE)
return text.strip()
def _extract_title(self, text: str, fallback: str) -> str:
"""从 Markdown 中提取文档标题"""
# 尝试获取第一个 H1 标题
match = re.search(r'^#\s+(.+)$', text, re.MULTILINE)
if match:
return match.group(1).strip()
# 回退到文件名
return fallback
def _split_into_chunks(self, text: str, doc_id: str) -> list[Chunk]:
"""按语义分块(尊重标题边界)"""
chunks = []
# 按标题分割
sections = re.split(r'(?=^#{1,3}\s)', text, flags=re.MULTILINE)
current_chunk = ""
chunk_index = 0
for section in sections:
if not section.strip():
continue
# 如果当前块 + 新段落超过阈值,先保存当前块
if current_chunk and len(current_chunk) + len(section) > self.chunk_size:
if len(current_chunk) >= self.min_chunk_size:
chunks.append(self._make_chunk(
current_chunk, doc_id, chunk_index
))
chunk_index += 1
# 保留 overlap 部分
overlap_text = current_chunk[-self.chunk_overlap:] if self.chunk_overlap > 0 else ""
current_chunk = overlap_text + section
else:
current_chunk += "\n\n" + section
else:
current_chunk = current_chunk + "\n\n" + section if current_chunk else section
# 最后一块
if current_chunk and len(current_chunk) >= self.min_chunk_size:
chunks.append(self._make_chunk(current_chunk, doc_id, chunk_index))
return chunks
def _make_chunk(self, text: str, doc_id: str, index: int) -> Chunk:
chunk_id = hashlib.md5(f"{doc_id}:{index}:{text[:50]}".encode()).hexdigest()[:16]
return Chunk(
chunk_id=chunk_id,
doc_id=doc_id,
content=text.strip(),
index=index,
char_count=len(text.strip()),
)
def process_directory(self, input_dir: str) -> list[Document]:
"""批量处理目录下所有文档"""
documents = []
input_path = Path(input_dir)
for file_path in sorted(input_path.rglob("*")):
if file_path.suffix.lower() in {
'.pdf', '.docx', '.pptx', '.xlsx',
'.html', '.csv', '.json', '.xml', '.epub'
}:
doc = self.process_file(str(file_path))
if doc:
documents.append(doc)
print(f"✅ {file_path.name} → {len(doc.chunks)} chunks")
total_chunks = sum(len(d.chunks) for d in documents)
print(f"\n处理完成: {len(documents)} 个文档, {total_chunks} 个分块")
return documents
# 使用示例
preprocessor = RAGPreprocessor(
chunk_size=500,
chunk_overlap=50,
min_chunk_size=100,
)
documents = preprocessor.process_directory("./knowledge_base")
# 导出到 JSON(供下游 Embedding 生成使用)
export_data = []
for doc in documents:
for chunk in doc.chunks:
export_data.append({
"chunk_id": chunk.chunk_id,
"doc_id": chunk.doc_id,
"content": chunk.content,
"metadata": {
**doc.metadata,
"title": doc.title,
"source_file": doc.source_file,
"chunk_index": chunk.index,
}
})
with open("chunks.json", "w", encoding="utf-8") as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
自定义插件开发
MarkItDown 的插件系统允许你扩展对新格式的支持。以下是开发一个自定义插件的完整流程:
"""
markitdown-mycustom — 自定义 MarkItDown 插件示例
支持转换 .myformat 文件
"""
from markitdown import MarkItDown, ConverterProtocol, ConverterResult
class MyCustomConverter(ConverterProtocol):
"""自定义格式转换器"""
def convert(self, input_source) -> ConverterResult:
"""
转换 .myformat 文件到 Markdown
Args:
input_source: 文件路径或字节流
Returns:
ConverterResult(text_content=str, title=Optional[str])
"""
# 读取输入
if isinstance(input_source, str):
with open(input_source, 'r', encoding='utf-8') as f:
raw = f.read()
elif isinstance(input_source, (bytes, bytearray)):
raw = input_source.decode('utf-8')
else:
raise ValueError(f"不支持的输入类型: {type(input_source)}")
# 解析并转换为 Markdown
markdown = self._parse_myformat(raw)
return ConverterResult(
text_content=markdown,
title=self._extract_title(raw)
)
def _parse_myformat(self, raw: str) -> str:
"""解析 .myformat 并转为 Markdown"""
# 实现你的解析逻辑
lines = raw.split('\n')
md_lines = []
for line in lines:
# 示例:将 @TITLE 转为 H1
if line.startswith('@TITLE '):
md_lines.append(f"# {line[7:]}")
# 将 @SECTION 转为 H2
elif line.startswith('@SECTION '):
md_lines.append(f"## {line[9:]}")
# 将 @ITEM 转为列表项
elif line.startswith('@ITEM '):
md_lines.append(f"- {line[6:]}")
else:
md_lines.append(line)
return '\n'.join(md_lines)
def _extract_title(self, raw: str) -> str:
for line in raw.split('\n'):
if line.startswith('@TITLE '):
return line[7:]
return "Untitled"
# 插件注册(通过 entry_points)
# 在 pyproject.toml 中:
# [project.entry-points."markitdown.converter"]
# mycustom = "markitdown_mycustom:MyCustomConverter"
性能优化:大规模文档处理的实战经验
内存优化:流式处理大文件
import mmap
from markitdown import MarkItDown
def convert_large_pdf(file_path: str, output_path: str):
"""处理大型 PDF 文件"""
md = MarkItDown()
# 方式一:使用 convert_stream 减少内存占用
with open(file_path, 'rb') as f:
# 使用 mmap 避免将整个文件加载到内存
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
result = md.convert_stream(mm)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result.text_content)
并行处理:榨干多核性能
import multiprocessing as mp
from pathlib import Path
from markitdown import MarkItDown
def convert_file(args):
"""Worker 函数(注意:MarkItDown 实例不能跨进程共享)"""
file_path, output_dir = args
md = MarkItDown() # 每个进程创建自己的实例
try:
result = md.convert(file_path)
output_path = Path(output_dir) / f"{Path(file_path).stem}.md"
output_path.write_text(result.text_content, encoding='utf-8')
return (file_path, True, len(result.text_content))
except Exception as e:
return (file_path, False, str(e))
def batch_convert_parallel(input_dir: str, output_dir: str, workers: int = None):
"""多进程批量转换"""
if workers is None:
workers = min(mp.cpu_count(), 8)
files = [
str(f) for f in Path(input_dir).rglob("*")
if f.suffix.lower() in {'.pdf', '.docx', '.pptx', '.xlsx', '.html'}
]
args = [(f, output_dir) for f in files]
with mp.Pool(workers) as pool:
results = pool.map(convert_file, args)
success = sum(1 for _, ok, _ in results if ok)
print(f"完成: {success}/{len(results)} 成功")
return results
缓存策略:避免重复转换
import hashlib
import json
from pathlib import Path
from markitdown import MarkItDown
class CachedConverter:
"""带缓存的文档转换器"""
def __init__(self, cache_dir: str = "./cache"):
self.md = MarkItDown()
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _file_hash(self, file_path: str) -> str:
"""计算文件内容哈希"""
h = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()[:16]
def convert(self, file_path: str, force: bool = False) -> str:
"""转换文件(带缓存)"""
file_hash = self._file_hash(file_path)
cache_file = self.cache_dir / f"{file_hash}.md"
# 检查缓存
if not force and cache_file.exists():
return cache_file.read_text(encoding='utf-8')
# 执行转换
result = self.md.convert(file_path)
# 写入缓存
cache_file.write_text(result.text_content, encoding='utf-8')
return result.text_content
def clear_cache(self):
"""清除所有缓存"""
for f in self.cache_dir.glob("*.md"):
f.unlink()
# 使用
converter = CachedConverter()
markdown = converter.convert("large-report.pdf")
# 第二次调用相同文件时直接从缓存读取
markdown = converter.convert("large-report.pdf")
Token 消耗对比:为什么 Markdown 是 LLM 的最优输入
一份 50 页的技术报告,不同格式的 Token 消耗对比:
| 输入格式 | Token 数量 | 相对 Markdown | 说明 |
|---|---|---|---|
| 原始 PDF 文本提取 | ~45,000 | 2.0x | 包含大量格式噪声 |
| HTML | ~38,000 | 1.7x | 标签和样式占大量 Token |
| JSON 结构化 | ~42,000 | 1.9x | 字段名和嵌套开销 |
| Markdown | ~22,000 | 1.0x | 最紧凑的结构化格式 |
| 纯文本 | ~20,000 | 0.9x | 丢失所有结构信息 |
Markdown 是唯一在保持文档结构的同时接近纯文本 Token 效率的格式。这意味着:
- 50 页报告的 RAG 查询成本降低 40-55%
- 上下文窗口可容纳更多文档片段
- 检索精度更高(结构信息帮助 LLM 理解语义层级)
安全考量:生产环境必读
MarkItDown 官方文档明确警告了安全风险。核心要点:
1. 输入验证
# ❌ 危险:直接将用户输入传给 convert()
user_input = request.args.get('url')
result = md.convert(user_input) # 可能读取本地文件或发起 SSRF
# ✅ 安全:使用最窄 API
from markitdown import MarkItDown
from pathlib import Path
ALLOWED_EXTENSIONS = {'.pdf', '.docx', '.pptx', '.xlsx'}
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
def safe_convert(uploaded_file_path: str) -> str:
"""安全转换:验证输入后使用 convert_local"""
path = Path(uploaded_file_path)
# 验证扩展名
if path.suffix.lower() not in ALLOWED_EXTENSIONS:
raise ValueError(f"不支持的文件格式: {path.suffix}")
# 验证文件大小
if path.stat().st_size > MAX_FILE_SIZE:
raise ValueError("文件超过大小限制")
# 使用最窄 API
md = MarkItDown()
result = md.convert_local(str(path))
return result.text_content
2. SSRF 防护
# ❌ 危险:convert() 可以发起网络请求
result = md.convert("http://internal-server/admin")
# ✅ 安全:使用 convert_stream 或 convert_local
import requests
# 自己控制 HTTP 请求
ALLOWED_DOMAINS = {"example.com", "docs.example.com"}
def safe_fetch_and_convert(url: str) -> str:
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.hostname not in ALLOWED_DOMAINS:
raise ValueError(f"不允许的域名: {parsed.hostname}")
response = requests.get(url, timeout=30)
response.raise_for_status()
md = MarkItDown()
return md.convert_stream(response.content)
3. 路径遍历防护
import os
def safe_resolve_path(base_dir: str, user_path: str) -> str:
"""防止路径遍历攻击"""
base = Path(base_dir).resolve()
target = (base / user_path).resolve()
if not str(target).startswith(str(base)):
raise ValueError("路径遍历攻击检测")
return str(target)
生态与未来
插件生态
MarkItDown 的插件生态正在快速成长。在 GitHub 上搜索 #markitdown-plugin 可以找到社区插件:
- markitdown-ocr:使用 LLM Vision 提取嵌入图片中的文字
- markitdown-latex:LaTeX 数学公式转换
- markitdown-cad:CAD 图纸元数据提取
- markitdown-onenote:OneNote 笔记转换
与 AutoGen 的集成
MarkItDown 是微软 AutoGen 生态的一部分,与 AutoGen Agent 框架深度集成:
# 在 AutoGen Agent 中使用 MarkItDown
from autogen import AssistantAgent
from markitdown import MarkItDown
md = MarkItDown()
# Agent 可以直接消费转换后的 Markdown
def document_tool(file_path: str) -> str:
"""将文档转为 Markdown 供 Agent 使用"""
result = md.convert(file_path)
return result.text_content
agent = AssistantAgent(
name="doc_analyst",
tools=[document_tool],
)
发展趋势
- 视频转换:通过 Azure CU 支持,MarkItDown 已经可以处理视频文件,未来可能内置视频转 Markdown
- 更多云后端:除了 Azure,未来可能支持 AWS Textract、Google Document AI 等
- 性能优化:Rust 扩展或 WASM 加速大文件处理
- 结构化输出:JSON Schema 驱动的结构化字段提取,不仅限于 Markdown 输出
总结
MarkItDown 的价值可以用一个公式概括:
文档预处理成本 = 格式解析 + 结构提取 + 内容清洗 + Token 优化
MarkItDown 将这四步合一,把开发者从重复的文档解析劳动中解放出来。它的核心优势:
- 格式全覆盖:20+ 种文件格式,一个 API 全搞定
- LLM 原生:输出 Markdown,天然适配大语言模型
- 渐进增强:内置转换免费够用,Azure 云端按需升级
- 插件扩展:自定义格式支持,社区生态持续成长
- 安全可控:四级 API 粒度,输入验证、SSRF 防护
如果你在做 RAG、知识库、文档搜索、内容迁移中的任何一项,MarkItDown 都值得你认真研究。13 万 Star 不是终点,而是「万物转 Markdown」这个理念的起点。
项目地址:https://github.com/microsoft/markitdown
协议:MIT
Python 版本:3.10+
PyPI:https://pypi.org/project/markitdown/