编程 MarkItDown 深度实战:微软 13 万 Star 的「万物转 Markdown」神器——从架构原理到生产级 RAG 数据管线完全指南(2026)

2026-06-05 17:40:42 +0800 CST views 11

MarkItDown 深度实战:微软 13 万 Star 的「万物转 Markdown」神器——从架构原理到生产级 RAG 数据管线完全指南(2026)

为什么 MarkItDown 能引爆开发者圈?

如果你做过 RAG(检索增强生成)系统,你一定被文档预处理折磨过。PDF 表格错位、Word 嵌套样式丢失、PPT 图文分离、Excel 合并单元格变成乱码——这些不是小问题,而是吃掉整个项目 60% 以上精力的黑洞。

MarkItDown 是微软 AutoGen 团队开源的轻量级 Python 工具(MIT 协议),它的使命极其明确:把 PDF、Word、PPT、Excel、图片、音频、HTML、ZIP 等 20+ 种格式,一键转成结构完整的 Markdown

这不是又一个「文档转换器」。MarkItDown 的核心洞察是:大语言模型对 Markdown 的理解远超任何其他格式。Markdown 极度接近纯文本,保留关键文档结构(标题、列表、表格、链接),同时 Token 消耗极低。当你把一份 50 页的 PDF 喂给 GPT-4o 时,转换后的 Markdown 比原始 PDF 文本提取节省 40-60% 的 Token——这在生产环境中意味着真金白银的 API 成本节约。

截至 2026 年 6 月,MarkItDown 在 GitHub 上已突破 13 万 Star,PyPI 周下载量超 150 万,日增 Star 超 2000。这不是营销泡沫,而是开发者用脚投票的结果。

核心架构:模块化解析器 + 插件扩展

MarkItDown 的架构设计非常优雅,核心思想是每个文件格式对应一个独立的 Converter,通过注册机制动态加载:

MarkItDown (入口)
  ├── convert()         → 统一入口,自动路由到对应 Converter
  ├── convert_local()   → 仅本地文件
  ├── convert_stream()  → 字节流输入
  ├── convert_response()→ HTTP Response 输入
  │
  ├── ConverterRegistry → 格式 → Converter 映射
  │   ├── PdfConverter
  │   ├── DocxConverter
  │   ├── PptxConverter
  │   ├── XlsxConverter
  │   ├── HtmlConverter
  │   ├── ImageConverter      (EXIF + LLM 描述)
  │   ├── AudioConverter      (EXIF + 语音转写)
  │   ├── CsvConverter
  │   ├── JsonConverter
  │   ├── XmlConverter
  │   ├── ZipConverter        (递归遍历内容)
  │   ├── YoutubeConverter
  │   ├── EpubConverter
  │   └── ... 更多格式
  │
  └── PluginSystem      → 第三方插件扩展
      ├── markitdown-ocr       (LLM Vision OCR)
      ├── Azure DocIntel       (云布局分析)
      └── Azure ContentUnderstanding (多模态)

Converter 接口设计

每个 Converter 都遵循统一的接口协议:

from markitdown import MarkItDown, ConverterResult

# 核心接口:所有 Converter 必须实现
class BaseConverter:
    def convert(self, input_source) -> ConverterResult:
        """
        输入:文件路径 / 字节流 / HTTP Response
        输出:ConverterResult(text_content=str, title=Optional[str])
        """
        raise NotImplementedError

# ConverterResult 数据结构
@dataclass
class ConverterResult:
    text_content: str    # Markdown 文本
    title: Optional[str] # 文档标题(如果可提取)

这种设计的关键优势:

  1. 格式隔离:每个 Converter 独立维护,修改 PDF 转换逻辑不影响 Word 转换
  2. 渐进安装:不需要一次性安装所有依赖,pip install markitdown[pdf] 只装 PDF 相关
  3. 插件扩展:第三方可以通过 #markitdown-plugin 标签发布自己的 Converter

格式路由机制

convert() 方法内部的格式路由逻辑:

def convert(self, source, **kwargs):
    """统一入口,自动识别格式并路由"""
    
    # 1. 判断输入类型
    if isinstance(source, str):
        if source.startswith(('http://', 'https://')):
            if 'youtube.com' in source or 'youtu.be' in source:
                return self._converters['youtube'].convert(source)
            return self._convert_from_url(source)
        elif source.endswith('.pdf'):
            return self._converters['pdf'].convert(source)
        elif source.endswith('.docx'):
            return self._converters['docx'].convert(source)
        # ... 更多格式匹配
    
    elif isinstance(source, (bytes, BytesIO)):
        # 流式输入,尝试格式嗅探
        return self._convert_from_stream(source)
    
    elif isinstance(source, requests.Response):
        return self._convert_from_response(source)

值得注意的是,MarkItDown 提供了四个粒度不同的转换入口,这是安全设计的重要体现:

方法输入范围安全等级适用场景
convert()本地文件 + 远程 URL + 字节流⚠️ 最宽松开发调试、可信环境
convert_local()仅本地文件路径✅ 中等服务端批处理
convert_response()requests.Response✅ 较严格Web 服务中转
convert_stream()仅字节流✅ 最严格沙箱环境、不可信输入

安全提示:在服务端应用中,务必使用最窄的 API。不要将用户输入直接传给 convert(),因为 convert() 可能尝试读取本地文件或发起网络请求。

从零开始:安装与环境配置

基础安装

# 创建虚拟环境(Python 3.10+)
python -m venv .venv
source .venv/bin/activate  # macOS/Linux
# .venv\Scripts\activate   # Windows

# 安装全部依赖(推荐,省心)
pip install 'markitdown[all]'

# 或者只安装需要的格式
pip install 'markitdown[pdf,docx,pptx,xlsx]'

用 uv 安装(更快)

uv venv --python=3.12 .venv
source .venv/bin/activate
uv pip install 'markitdown[all]'

各格式依赖拆解

安装选项支持格式核心依赖
markitdown[pdf]PDFpymupdf
markitdown[docx]Word (.docx)python-docx
markitdown[pptx]PowerPointpython-pptx
markitdown[xlsx]Excel (.xlsx)openpyxl
markitdown[xls]旧版 Excel (.xls)xlrd
markitdown[outlook]Outlook 邮件extract-msg
markitdown[az-doc-intel]Azure 文档智能azure-ai-documentintelligence
markitdown[az-content-understanding]Azure 内容理解azure-identity
markitdown[audio-transcription]音频转写SpeechRecognition
markitdown[youtube-transcription]YouTube 字幕youtube-transcript-api
markitdown[all]以上全部全部

可选系统工具

# Pandoc — 某些格式的底层转换引擎
# macOS
brew install pandoc
# Ubuntu/Debian
sudo apt install pandoc

# Tesseract OCR — 处理扫描版 PDF 的补充方案
# macOS
brew install tesseract
# Ubuntu/Debian
sudo apt install tesseract-ocr

命令行实战:5 分钟上手

基础用法

# 最简单的用法:转换并输出到 stdout
markitdown report.pdf

# 保存到文件
markitdown report.pdf -o report.md

# 管道输入
cat report.pdf | markitdown

# 批量转换(配合 find)
find ./docs -name "*.pdf" -exec sh -c 'markitdown "$1" -o "${1%.pdf}.md"' _ {} \;

启用插件

# 查看已安装插件
markitdown --list-plugins

# 使用插件转换
markitdown --use-plugins scanned-doc.pdf

# 使用 Azure 文档智能(云端高质量转换)
markitdown report.pdf -d -e "https://your-doc-intel.cognitiveservices.azure.com"

# 使用 Azure Content Understanding
markitdown report.pdf --use-cu --cu-endpoint "https://your-cu.cognitiveservices.azure.com"

Python API 深度实战

基础转换

from markitdown import MarkItDown

md = MarkItDown()

# 转换单个文件
result = md.convert("报告.docx")
print(result.text_content)

# 转换 PDF
result = md.convert("presentation.pdf")
print(result.text_content)

# 保存结果
with open("output.md", "w", encoding="utf-8") as f:
    f.write(result.text_content)

批量转换:生产级文件处理管线

import os
import json
import time
from pathlib import Path
from markitdown import MarkItDown
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class ConversionResult:
    source_file: str
    output_file: str
    success: bool
    char_count: int
    duration_ms: int
    error: Optional[str] = None

class DocumentPipeline:
    """生产级文档转换管线"""
    
    SUPPORTED_EXTENSIONS = {
        '.pdf', '.docx', '.pptx', '.xlsx', '.xls',
        '.html', '.htm', '.csv', '.json', '.xml',
        '.jpg', '.jpeg', '.png', '.gif', '.webp',
        '.mp3', '.wav', '.mp4', '.epub', '.zip'
    }
    
    def __init__(self, max_workers: int = 4, output_dir: str = "./output"):
        self.md = MarkItDown()
        self.max_workers = max_workers
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.results: list[ConversionResult] = []
    
    def _convert_one(self, file_path: Path) -> ConversionResult:
        """转换单个文件"""
        output_file = self.output_dir / f"{file_path.stem}.md"
        start = time.monotonic()
        
        try:
            result = self.md.convert(str(file_path))
            duration_ms = int((time.monotonic() - start) * 1000)
            
            with open(output_file, "w", encoding="utf-8") as f:
                f.write(result.text_content)
            
            return ConversionResult(
                source_file=str(file_path),
                output_file=str(output_file),
                success=True,
                char_count=len(result.text_content),
                duration_ms=duration_ms
            )
        except Exception as e:
            duration_ms = int((time.monotonic() - start) * 1000)
            return ConversionResult(
                source_file=str(file_path),
                output_file=str(output_file),
                success=False,
                char_count=0,
                duration_ms=duration_ms,
                error=str(e)
            )
    
    def run(self, input_dir: str, pattern: str = "**/*") -> list[ConversionResult]:
        """批量转换目录下所有支持的文件"""
        input_path = Path(input_dir)
        
        # 收集所有支持的文件
        files = [
            f for f in input_path.glob(pattern)
            if f.is_file() and f.suffix.lower() in self.SUPPORTED_EXTENSIONS
        ]
        
        print(f"找到 {len(files)} 个待转换文件")
        
        # 并行转换
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(self._convert_one, f): f for f in files}
            
            for future in as_completed(futures):
                result = future.result()
                self.results.append(result)
                status = "✅" if result.success else "❌"
                print(f"{status} {Path(result.source_file).name} "
                      f"→ {result.char_count} chars, {result.duration_ms}ms")
        
        # 输出统计
        self._print_stats()
        return self.results
    
    def _print_stats(self):
        total = len(self.results)
        success = sum(1 for r in self.results if r.success)
        failed = total - success
        total_chars = sum(r.char_count for r in self.results if r.success)
        avg_time = sum(r.duration_ms for r in self.results) / max(total, 1)
        
        print(f"\n{'='*50}")
        print(f"转换完成: {success}/{total} 成功, {failed} 失败")
        print(f"总字符数: {total_chars:,}")
        print(f"平均耗时: {avg_time:.0f}ms")
        
        if failed > 0:
            print(f"\n失败文件:")
            for r in self.results:
                if not r.success:
                    print(f"  ❌ {Path(r.source_file).name}: {r.error}")

# 使用示例
pipeline = DocumentPipeline(max_workers=4, output_dir="./markdown_output")
results = pipeline.run("./documents")

LLM 集成:图片描述与 OCR

MarkItDown 最巧妙的设计之一是直接集成 LLM Vision 能力来处理图片:

from markitdown import MarkItDown
from openai import OpenAI

# 方式一:基础 LLM 图片描述
client = OpenAI()
md = MarkItDown(
    llm_client=client,
    llm_model="gpt-4o",
    llm_prompt="请用中文详细描述这张图片中的内容,包括文字、图表和数据"
)

result = md.convert("dashboard-screenshot.png")
print(result.text_content)
# 输出:![dashboard-screenshot.png](dashboard-screenshot.png)
# 
# 这是一份销售仪表板截图,展示了以下关键指标:
# - 总销售额:¥2,340,000(同比+15.3%)
# - 活跃用户:128,500
# - 转化率:3.7%
# ...

markitdown-ocr 插件:扫描文档的救星

对于包含图片的 PDF/DOCX/PPTX,markitdown-ocr 插件使用 LLM Vision 提取嵌入图片中的文字:

from markitdown import MarkItDown
from openai import OpenAI

# 安装:pip install markitdown-ocr openai
md = MarkItDown(
    enable_plugins=True,
    llm_client=OpenAI(),
    llm_model="gpt-4o",
)

# 处理含扫描图片的 PDF
result = md.convert("scanned-invoice.pdf")
print(result.text_content)
# 扫描图片中的文字会被提取并插入到 Markdown 对应位置

# 不提供 llm_client 时,插件加载但 OCR 静默跳过
# 回退到内置转换器处理

各格式转换深度解析

PDF 转换:最复杂的战场

PDF 是文档转换中最难啃的骨头。MarkItDown 的 PDF 转换策略:

from markitdown import MarkItDown

md = MarkItDown()

# 基础 PDF 转换(使用 pymupdf)
result = md.convert("report.pdf")

# 对于复杂 PDF,推荐使用 Azure Document Intelligence
md_cloud = MarkItDown(
    docintel_endpoint="https://your-resource.cognitiveservices.azure.com"
)
result = md_cloud.convert("complex-tables.pdf")

PDF 转换效果矩阵

PDF 类型内置转换Azure DocIntelAzure CU
文本型 PDF✅ 优秀✅ 优秀✅ 优秀
简单表格⚠️ 可用✅ 优秀✅ 优秀
复杂嵌套表格❌ 错乱✅ 良好✅ 优秀
双栏论文⚠️ 部分正确✅ 优秀✅ 优秀
扫描版 PDF❌ 无文字✅ 优秀✅ 优秀
图片型 PDF❌ 无文字✅ 优秀✅ 优秀

内置转换的局限与对策

# 内置 PDF 转换的典型问题和解决方案

# 问题1:表格错位
# 原始 PDF 表格:
# | 姓名 | 部门 | 工号 |
# | 张三 | 技术部 | 001 |
# 
# 转换后可能变成:
# 姓名 部门 工号 张三 技术部 001
#
# 解决方案:使用 Azure DocIntel 或手动后处理

# 问题2:双栏布局串行
# 左栏和右栏的文字可能被交错提取
#
# 解决方案:使用 Azure DocIntel 的布局分析

# 简单的后处理修正(处理常见表格格式问题)
import re

def fix_markdown_tables(text: str) -> str:
    """修正 Markdown 表格格式"""
    lines = text.split('\n')
    in_table = False
    result = []
    
    for line in lines:
        stripped = line.strip()
        # 检测可能的表格行(包含 | 或连续 tab 分隔)
        if '|' in stripped or '\t' in stripped:
            if not in_table:
                in_table = True
            # 统一分隔符为 |
            if '\t' in stripped:
                stripped = '| ' + ' | '.join(stripped.split('\t')) + ' |'
            result.append(stripped)
        else:
            if in_table:
                in_table = False
            result.append(line)
    
    return '\n'.join(result)

# 使用
result = md.convert("messy-tables.pdf")
fixed = fix_markdown_tables(result.text_content)

Word (.docx) 转换:最稳定的选择

Word 文档的结构化程度最高,转换效果通常最好:

# Word 转换保留的结构:
# - 标题层级 (H1-H6)
# - 有序/无序列表
# - 表格(含合并单元格)
# - 链接
# - 图片(转为图片引用)
# - 文本样式(粗体、斜体)

result = md.convert("proposal.docx")
print(result.text_content)
# # 项目提案
# 
# ## 背景
# 
# 本项目旨在...
# 
# ## 技术方案
# 
# | 模块 | 技术栈 | 负责人 |
# |------|--------|--------|
# | 前端 | React | 张三 |
# | 后端 | Go | 李四 |

PowerPoint (.pptx) 转换:幻灯片到结构化文档

# PPT 转换会将每页幻灯片转为标题+内容
result = md.convert("presentation.pptx")
print(result.text_content)
# # 演示文稿标题
# 
# <!-- Slide 1 -->
# 
# ## 封面标题
# 
# 副标题内容
# 
# ![image1.png](image1.png)
# 
# <!-- Slide 2 -->
# 
# ## 核心数据
# 
# - 用户增长 300%
# - 营收突破 1 亿
# - NPS 评分 85+

Excel (.xlsx) 转换:表格到 Markdown 表格

# 每个 Sheet 转为独立的 Markdown 表格
result = md.convert("sales-data.xlsx")
print(result.text_content)
# # Sheet1
# 
# | 月份 | 销售额 | 同比增长 |
# |------|--------|---------|
# | 1月 | 1,200,000 | 15.3% |
# | 2月 | 980,000 | 12.1% |
# | 3月 | 1,500,000 | 22.7% |
# 
# # Sheet2
# 
# | 区域 | 负责人 | 目标 | 实际 |
# |------|--------|------|------|
# | 华东 | 王五 | 500万 | 480万 |

音频转换:语音到文字

# 安装:pip install 'markitdown[audio-transcription]'

result = md.convert("meeting-recording.mp3")
print(result.text_content)
# 输出包含:
# 1. EXIF 元数据(时长、比特率、采样率)
# 2. 语音转写文本

# 元数据示例:
# **Duration**: 00:45:30
# **Bit Rate**: 128 kbps
# **Sample Rate**: 44100 Hz
# 
# **Transcription**:
# 今天我们讨论了三个议题...

ZIP 文件:递归遍历

# ZIP 文件会被自动解压并逐个转换内部文件
result = md.convert("project-archive.zip")
print(result.text_content)
# # project-archive.zip
# 
# ## readme.md
# 
# # 项目名称
# ...
# 
# ## data/report.xlsx
# 
# | 月份 | 销售额 |
# ...

Azure Content Understanding:生产级多模态转换

Azure Content Understanding(CU)是 MarkItDown 最强大的云转换后端,提供三个核心能力层级:

层级一:零配置自动路由

from markitdown import MarkItDown

# 只需指定 endpoint,自动按文件类型选择分析器
md = MarkItDown(cu_endpoint="https://your-cu.cognitiveservices.azure.com")

# 文档 → prebuilt-documentSearch 分析器
result = md.convert("report.pdf")

# 视频 → prebuilt-videoSearch 分析器(内置转换器不支持视频!)
result = md.convert("meeting.mp4")

# 音频 → prebuilt-audioSearch 分析器
result = md.convert("call.wav")

print(result.markdown)

层级二:自定义分析器 + 结构化字段提取

这是 CU 最强大的能力——提取领域特定的结构化字段:

from markitdown import MarkItDown

# 使用自定义分析器提取发票字段
md = MarkItDown(
    cu_endpoint="https://your-cu.cognitiveservices.azure.com",
    cu_analyzer_id="my-invoice-analyzer",
)

result = md.convert("invoice.pdf")
print(result.markdown)
# 输出包含 YAML front matter:
# ---
# contentType: document
# fields:
#   VendorName: CONTOSO LTD.
#   InvoiceDate: '2019-11-15'
#   TotalAmount: '125,600.00'
#   Currency: USD
#   DueDate: '2019-12-15'
#   LineItems:
#     - Description: Cloud Services
#       Quantity: 12
#       UnitPrice: '5,000.00'
#     - Description: Support Package
#       Quantity: 1
#       UnitPrice: '65,600.00'
# ---
# 
# <!-- page 1 -->
# INVOICE #INV-2024-001
# ...

如何创建自定义分析器

  1. 在 Azure Content Understanding Studio 中创建分析器
  2. 定义字段模式(如发票号、日期、金额、行项目)
  3. 上传样本文档进行训练
  4. 获取分析器 ID 用于 MarkItDown

层级三:精确控制路由

from markitdown.converters import ContentUnderstandingFileType

# 只对 PDF 使用 CU,其他格式用内置转换器
md = MarkItDown(
    cu_endpoint="https://your-cu.cognitiveservices.azure.com",
    cu_file_types=[ContentUnderstandingFileType.PDF],
)

# PDF 走 CU(付费但高质量)
result1 = md.convert("complex-report.pdf")

# DOCX 走内置转换器(免费)
result2 = md.convert("simple-doc.docx")

三种转换后端的完整对比

能力内置转换器Azure DocIntelAzure CU
运行位置本地云端云端
成本免费按 API 调用计费按 API 调用计费
文档转换格式特定提取云布局分析云多模态提取
结构化字段❌(本集成不暴露)✅ YAML front matter
自定义分析器❌(本集成不可配)✅ 支持自定义
音频基础转写✅ 音频分析器
视频✅ 视频分析器
复杂表格⚠️✅✅
扫描 PDF
离线使用
部署复杂度pip installAzure 资源 + 认证Azure 资源 + 认证

Docker 部署:容器化转换服务

MarkItDown 提供了 Dockerfile,可以快速构建容器化转换服务:

# 构建镜像
docker build -t markitdown:latest .

# 单文件转换
docker run --rm -i markitdown:latest < ~/report.pdf > output.md

# 批量转换
docker run --rm -v $(pwd)/docs:/input -v $(pwd)/output:/output \
  markitdown:latest \
  sh -c 'for f in /input/*; do markitdown "$f" > "/output/$(basename "${f%.*}").md"; done'

构建 REST API 微服务

将 MarkItDown 包装为 HTTP 服务,供 RAG 系统调用:

# server.py — MarkItDown REST API 服务
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import PlainTextResponse
from markitdown import MarkItDown
import tempfile
import os

app = FastAPI(title="MarkItDown API")
md = MarkItDown()

@app.post("/convert", response_class=PlainTextResponse)
async def convert_file(file: UploadFile = File(...)):
    """上传文件,返回 Markdown"""
    
    # 验证文件大小(限制 50MB)
    content = await file.read()
    if len(content) > 50 * 1024 * 1024:
        raise HTTPException(413, "文件超过 50MB 限制")
    
    # 写入临时文件
    suffix = os.path.splitext(file.filename)[1]
    with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
        tmp.write(content)
        tmp_path = tmp.name
    
    try:
        result = md.convert_local(tmp_path)
        return result.text_content
    except Exception as e:
        raise HTTPException(500, f"转换失败: {str(e)}")
    finally:
        os.unlink(tmp_path)

@app.post("/convert/batch")
async def convert_batch(files: list[UploadFile] = File(...)):
    """批量转换"""
    results = []
    for file in files:
        content = await file.read()
        suffix = os.path.splitext(file.filename)[1]
        with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
            tmp.write(content)
            tmp_path = tmp.name
        
        try:
            result = md.convert_local(tmp_path)
            results.append({
                "filename": file.filename,
                "markdown": result.text_content,
                "char_count": len(result.text_content)
            })
        except Exception as e:
            results.append({
                "filename": file.filename,
                "error": str(e)
            })
        finally:
            os.unlink(tmp_path)
    
    return {"results": results}

@app.get("/health")
async def health():
    return {"status": "ok"}

# 启动:uvicorn server:app --host 0.0.0.0 --port 8000

Docker Compose 部署

# docker-compose.yml
version: '3.8'
services:
  markitdown-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./uploads:/tmp/uploads
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

生产级 RAG 数据管线实战

这是 MarkItDown 最核心的应用场景。以下是一个完整的 RAG 文档预处理管线:

"""
RAG 文档预处理管线
━━━━━━━━━━━━━━━━━
功能:将多种格式文档转换为 Markdown → 清洗 → 分块 → 生成 Embedding → 存入向量数据库
"""

import os
import re
import json
import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
from markitdown import MarkItDown

@dataclass
class Document:
    """标准化的文档对象"""
    doc_id: str
    source_file: str
    title: str
    content: str
    metadata: dict = field(default_factory=dict)
    chunks: list['Chunk'] = field(default_factory=list)

@dataclass
class Chunk:
    """文档分块"""
    chunk_id: str
    doc_id: str
    content: str
    index: int  # 在原文中的顺序
    char_count: int
    metadata: dict = field(default_factory=dict)

class RAGPreprocessor:
    """RAG 文档预处理器"""
    
    def __init__(
        self,
        chunk_size: int = 500,
        chunk_overlap: int = 50,
        min_chunk_size: int = 100,
    ):
        self.md = MarkItDown()
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size
    
    def process_file(self, file_path: str) -> Optional[Document]:
        """处理单个文件:转换 → 清洗 → 分块"""
        
        # Step 1: 转换为 Markdown
        try:
            result = self.md.convert(file_path)
            raw_markdown = result.text_content
        except Exception as e:
            print(f"转换失败 {file_path}: {e}")
            return None
        
        # Step 2: 清洗 Markdown
        cleaned = self._clean_markdown(raw_markdown)
        
        # Step 3: 提取元数据
        title = self._extract_title(cleaned, Path(file_path).stem)
        doc_id = hashlib.md5(file_path.encode()).hexdigest()[:12]
        
        # Step 4: 分块
        chunks = self._split_into_chunks(cleaned, doc_id)
        
        return Document(
            doc_id=doc_id,
            source_file=file_path,
            title=title,
            content=cleaned,
            chunks=chunks,
            metadata={
                "source_format": Path(file_path).suffix,
                "total_chars": len(cleaned),
                "total_chunks": len(chunks),
            }
        )
    
    def _clean_markdown(self, text: str) -> str:
        """清洗 Markdown 文本"""
        
        # 1. 移除空行过多
        text = re.sub(r'\n{4,}', '\n\n\n', text)
        
        # 2. 修正表格格式(tab → |)
        lines = text.split('\n')
        result = []
        for line in lines:
            if '\t' in line and line.strip():
                cells = line.split('\t')
                result.append('| ' + ' | '.join(c.strip() for c in cells) + ' |')
            else:
                result.append(line)
        text = '\n'.join(result)
        
        # 3. 移除控制字符(保留换行和 tab)
        text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
        
        # 4. 统一 Unicode 空白
        text = re.sub(r'[\u00a0\u2000-\u200b\u202f\u205f\u3000]', ' ', text)
        
        # 5. 移除空的 Markdown 链接
        text = re.sub(r'\[([^\]]*)\]\(\s*\)', r'\1', text)
        
        # 6. 修正标题格式(确保 # 后有空格)
        text = re.sub(r'^(#{1,6})([^ #\n])', r'\1 \2', text, flags=re.MULTILINE)
        
        return text.strip()
    
    def _extract_title(self, text: str, fallback: str) -> str:
        """从 Markdown 中提取文档标题"""
        # 尝试获取第一个 H1 标题
        match = re.search(r'^#\s+(.+)$', text, re.MULTILINE)
        if match:
            return match.group(1).strip()
        # 回退到文件名
        return fallback
    
    def _split_into_chunks(self, text: str, doc_id: str) -> list[Chunk]:
        """按语义分块(尊重标题边界)"""
        
        chunks = []
        
        # 按标题分割
        sections = re.split(r'(?=^#{1,3}\s)', text, flags=re.MULTILINE)
        
        current_chunk = ""
        chunk_index = 0
        
        for section in sections:
            if not section.strip():
                continue
            
            # 如果当前块 + 新段落超过阈值,先保存当前块
            if current_chunk and len(current_chunk) + len(section) > self.chunk_size:
                if len(current_chunk) >= self.min_chunk_size:
                    chunks.append(self._make_chunk(
                        current_chunk, doc_id, chunk_index
                    ))
                    chunk_index += 1
                    
                    # 保留 overlap 部分
                    overlap_text = current_chunk[-self.chunk_overlap:] if self.chunk_overlap > 0 else ""
                    current_chunk = overlap_text + section
                else:
                    current_chunk += "\n\n" + section
            else:
                current_chunk = current_chunk + "\n\n" + section if current_chunk else section
        
        # 最后一块
        if current_chunk and len(current_chunk) >= self.min_chunk_size:
            chunks.append(self._make_chunk(current_chunk, doc_id, chunk_index))
        
        return chunks
    
    def _make_chunk(self, text: str, doc_id: str, index: int) -> Chunk:
        chunk_id = hashlib.md5(f"{doc_id}:{index}:{text[:50]}".encode()).hexdigest()[:16]
        return Chunk(
            chunk_id=chunk_id,
            doc_id=doc_id,
            content=text.strip(),
            index=index,
            char_count=len(text.strip()),
        )
    
    def process_directory(self, input_dir: str) -> list[Document]:
        """批量处理目录下所有文档"""
        documents = []
        input_path = Path(input_dir)
        
        for file_path in sorted(input_path.rglob("*")):
            if file_path.suffix.lower() in {
                '.pdf', '.docx', '.pptx', '.xlsx',
                '.html', '.csv', '.json', '.xml', '.epub'
            }:
                doc = self.process_file(str(file_path))
                if doc:
                    documents.append(doc)
                    print(f"✅ {file_path.name} → {len(doc.chunks)} chunks")
        
        total_chunks = sum(len(d.chunks) for d in documents)
        print(f"\n处理完成: {len(documents)} 个文档, {total_chunks} 个分块")
        return documents


# 使用示例
preprocessor = RAGPreprocessor(
    chunk_size=500,
    chunk_overlap=50,
    min_chunk_size=100,
)

documents = preprocessor.process_directory("./knowledge_base")

# 导出到 JSON(供下游 Embedding 生成使用)
export_data = []
for doc in documents:
    for chunk in doc.chunks:
        export_data.append({
            "chunk_id": chunk.chunk_id,
            "doc_id": chunk.doc_id,
            "content": chunk.content,
            "metadata": {
                **doc.metadata,
                "title": doc.title,
                "source_file": doc.source_file,
                "chunk_index": chunk.index,
            }
        })

with open("chunks.json", "w", encoding="utf-8") as f:
    json.dump(export_data, f, ensure_ascii=False, indent=2)

自定义插件开发

MarkItDown 的插件系统允许你扩展对新格式的支持。以下是开发一个自定义插件的完整流程:

"""
markitdown-mycustom — 自定义 MarkItDown 插件示例
支持转换 .myformat 文件
"""

from markitdown import MarkItDown, ConverterProtocol, ConverterResult

class MyCustomConverter(ConverterProtocol):
    """自定义格式转换器"""
    
    def convert(self, input_source) -> ConverterResult:
        """
        转换 .myformat 文件到 Markdown
        
        Args:
            input_source: 文件路径或字节流
        
        Returns:
            ConverterResult(text_content=str, title=Optional[str])
        """
        # 读取输入
        if isinstance(input_source, str):
            with open(input_source, 'r', encoding='utf-8') as f:
                raw = f.read()
        elif isinstance(input_source, (bytes, bytearray)):
            raw = input_source.decode('utf-8')
        else:
            raise ValueError(f"不支持的输入类型: {type(input_source)}")
        
        # 解析并转换为 Markdown
        markdown = self._parse_myformat(raw)
        
        return ConverterResult(
            text_content=markdown,
            title=self._extract_title(raw)
        )
    
    def _parse_myformat(self, raw: str) -> str:
        """解析 .myformat 并转为 Markdown"""
        # 实现你的解析逻辑
        lines = raw.split('\n')
        md_lines = []
        
        for line in lines:
            # 示例:将 @TITLE 转为 H1
            if line.startswith('@TITLE '):
                md_lines.append(f"# {line[7:]}")
            # 将 @SECTION 转为 H2
            elif line.startswith('@SECTION '):
                md_lines.append(f"## {line[9:]}")
            # 将 @ITEM 转为列表项
            elif line.startswith('@ITEM '):
                md_lines.append(f"- {line[6:]}")
            else:
                md_lines.append(line)
        
        return '\n'.join(md_lines)
    
    def _extract_title(self, raw: str) -> str:
        for line in raw.split('\n'):
            if line.startswith('@TITLE '):
                return line[7:]
        return "Untitled"


# 插件注册(通过 entry_points)
# 在 pyproject.toml 中:
# [project.entry-points."markitdown.converter"]
# mycustom = "markitdown_mycustom:MyCustomConverter"

性能优化:大规模文档处理的实战经验

内存优化:流式处理大文件

import mmap
from markitdown import MarkItDown

def convert_large_pdf(file_path: str, output_path: str):
    """处理大型 PDF 文件"""
    md = MarkItDown()
    
    # 方式一:使用 convert_stream 减少内存占用
    with open(file_path, 'rb') as f:
        # 使用 mmap 避免将整个文件加载到内存
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            result = md.convert_stream(mm)
    
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(result.text_content)

并行处理:榨干多核性能

import multiprocessing as mp
from pathlib import Path
from markitdown import MarkItDown

def convert_file(args):
    """Worker 函数(注意:MarkItDown 实例不能跨进程共享)"""
    file_path, output_dir = args
    md = MarkItDown()  # 每个进程创建自己的实例
    
    try:
        result = md.convert(file_path)
        output_path = Path(output_dir) / f"{Path(file_path).stem}.md"
        output_path.write_text(result.text_content, encoding='utf-8')
        return (file_path, True, len(result.text_content))
    except Exception as e:
        return (file_path, False, str(e))

def batch_convert_parallel(input_dir: str, output_dir: str, workers: int = None):
    """多进程批量转换"""
    if workers is None:
        workers = min(mp.cpu_count(), 8)
    
    files = [
        str(f) for f in Path(input_dir).rglob("*")
        if f.suffix.lower() in {'.pdf', '.docx', '.pptx', '.xlsx', '.html'}
    ]
    
    args = [(f, output_dir) for f in files]
    
    with mp.Pool(workers) as pool:
        results = pool.map(convert_file, args)
    
    success = sum(1 for _, ok, _ in results if ok)
    print(f"完成: {success}/{len(results)} 成功")
    return results

缓存策略:避免重复转换

import hashlib
import json
from pathlib import Path
from markitdown import MarkItDown

class CachedConverter:
    """带缓存的文档转换器"""
    
    def __init__(self, cache_dir: str = "./cache"):
        self.md = MarkItDown()
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
    
    def _file_hash(self, file_path: str) -> str:
        """计算文件内容哈希"""
        h = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                h.update(chunk)
        return h.hexdigest()[:16]
    
    def convert(self, file_path: str, force: bool = False) -> str:
        """转换文件(带缓存)"""
        file_hash = self._file_hash(file_path)
        cache_file = self.cache_dir / f"{file_hash}.md"
        
        # 检查缓存
        if not force and cache_file.exists():
            return cache_file.read_text(encoding='utf-8')
        
        # 执行转换
        result = self.md.convert(file_path)
        
        # 写入缓存
        cache_file.write_text(result.text_content, encoding='utf-8')
        
        return result.text_content
    
    def clear_cache(self):
        """清除所有缓存"""
        for f in self.cache_dir.glob("*.md"):
            f.unlink()

# 使用
converter = CachedConverter()
markdown = converter.convert("large-report.pdf")
# 第二次调用相同文件时直接从缓存读取
markdown = converter.convert("large-report.pdf")

Token 消耗对比:为什么 Markdown 是 LLM 的最优输入

一份 50 页的技术报告,不同格式的 Token 消耗对比:

输入格式Token 数量相对 Markdown说明
原始 PDF 文本提取~45,0002.0x包含大量格式噪声
HTML~38,0001.7x标签和样式占大量 Token
JSON 结构化~42,0001.9x字段名和嵌套开销
Markdown~22,0001.0x最紧凑的结构化格式
纯文本~20,0000.9x丢失所有结构信息

Markdown 是唯一在保持文档结构的同时接近纯文本 Token 效率的格式。这意味着:

  • 50 页报告的 RAG 查询成本降低 40-55%
  • 上下文窗口可容纳更多文档片段
  • 检索精度更高(结构信息帮助 LLM 理解语义层级)

安全考量:生产环境必读

MarkItDown 官方文档明确警告了安全风险。核心要点:

1. 输入验证

# ❌ 危险:直接将用户输入传给 convert()
user_input = request.args.get('url')
result = md.convert(user_input)  # 可能读取本地文件或发起 SSRF

# ✅ 安全:使用最窄 API
from markitdown import MarkItDown
from pathlib import Path

ALLOWED_EXTENSIONS = {'.pdf', '.docx', '.pptx', '.xlsx'}
MAX_FILE_SIZE = 50 * 1024 * 1024  # 50MB

def safe_convert(uploaded_file_path: str) -> str:
    """安全转换:验证输入后使用 convert_local"""
    path = Path(uploaded_file_path)
    
    # 验证扩展名
    if path.suffix.lower() not in ALLOWED_EXTENSIONS:
        raise ValueError(f"不支持的文件格式: {path.suffix}")
    
    # 验证文件大小
    if path.stat().st_size > MAX_FILE_SIZE:
        raise ValueError("文件超过大小限制")
    
    # 使用最窄 API
    md = MarkItDown()
    result = md.convert_local(str(path))
    return result.text_content

2. SSRF 防护

# ❌ 危险:convert() 可以发起网络请求
result = md.convert("http://internal-server/admin")

# ✅ 安全:使用 convert_stream 或 convert_local
import requests

# 自己控制 HTTP 请求
ALLOWED_DOMAINS = {"example.com", "docs.example.com"}

def safe_fetch_and_convert(url: str) -> str:
    from urllib.parse import urlparse
    parsed = urlparse(url)
    
    if parsed.hostname not in ALLOWED_DOMAINS:
        raise ValueError(f"不允许的域名: {parsed.hostname}")
    
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    
    md = MarkItDown()
    return md.convert_stream(response.content)

3. 路径遍历防护

import os

def safe_resolve_path(base_dir: str, user_path: str) -> str:
    """防止路径遍历攻击"""
    base = Path(base_dir).resolve()
    target = (base / user_path).resolve()
    
    if not str(target).startswith(str(base)):
        raise ValueError("路径遍历攻击检测")
    
    return str(target)

生态与未来

插件生态

MarkItDown 的插件生态正在快速成长。在 GitHub 上搜索 #markitdown-plugin 可以找到社区插件:

  • markitdown-ocr:使用 LLM Vision 提取嵌入图片中的文字
  • markitdown-latex:LaTeX 数学公式转换
  • markitdown-cad:CAD 图纸元数据提取
  • markitdown-onenote:OneNote 笔记转换

与 AutoGen 的集成

MarkItDown 是微软 AutoGen 生态的一部分,与 AutoGen Agent 框架深度集成:

# 在 AutoGen Agent 中使用 MarkItDown
from autogen import AssistantAgent
from markitdown import MarkItDown

md = MarkItDown()

# Agent 可以直接消费转换后的 Markdown
def document_tool(file_path: str) -> str:
    """将文档转为 Markdown 供 Agent 使用"""
    result = md.convert(file_path)
    return result.text_content

agent = AssistantAgent(
    name="doc_analyst",
    tools=[document_tool],
)

发展趋势

  1. 视频转换:通过 Azure CU 支持,MarkItDown 已经可以处理视频文件,未来可能内置视频转 Markdown
  2. 更多云后端:除了 Azure,未来可能支持 AWS Textract、Google Document AI 等
  3. 性能优化:Rust 扩展或 WASM 加速大文件处理
  4. 结构化输出:JSON Schema 驱动的结构化字段提取,不仅限于 Markdown 输出

总结

MarkItDown 的价值可以用一个公式概括:

文档预处理成本 = 格式解析 + 结构提取 + 内容清洗 + Token 优化

MarkItDown 将这四步合一,把开发者从重复的文档解析劳动中解放出来。它的核心优势:

  1. 格式全覆盖:20+ 种文件格式,一个 API 全搞定
  2. LLM 原生:输出 Markdown,天然适配大语言模型
  3. 渐进增强:内置转换免费够用,Azure 云端按需升级
  4. 插件扩展:自定义格式支持,社区生态持续成长
  5. 安全可控:四级 API 粒度,输入验证、SSRF 防护

如果你在做 RAG、知识库、文档搜索、内容迁移中的任何一项,MarkItDown 都值得你认真研究。13 万 Star 不是终点,而是「万物转 Markdown」这个理念的起点。


项目地址:https://github.com/microsoft/markitdown
协议:MIT
Python 版本:3.10+
PyPI:https://pypi.org/project/markitdown/

推荐文章

Vue中的表单处理有哪几种方式?
2024-11-18 01:32:42 +0800 CST
联系我们
2024-11-19 02:17:12 +0800 CST
Golang 中你应该知道的 noCopy 策略
2024-11-19 05:40:53 +0800 CST
一文详解回调地狱
2024-11-19 05:05:31 +0800 CST
CSS 中的 `scrollbar-width` 属性
2024-11-19 01:32:55 +0800 CST
Vue3中的v-model指令有什么变化?
2024-11-18 20:00:17 +0800 CST
程序员茄子在线接单