从零构建技术基础设施:build-your-own-x 49万星背后的编程教育革命
在2026年的开源社区,有一个项目以近50万Star的数量级稳居编程学习类仓库榜首。它不提供任何可直接运行的软件,却教会了无数开发者一件事:真正理解一门技术的最佳方式,是从零实现它。
引言:为什么我们要"自己造轮子"?
"不要重复造轮子"是软件工程中最常被引用的格言之一。但这句话的完整版本往往是:"在理解轮子的工作原理之前,你没资格说不需要造轮子。"
build-your-own-x 项目的核心价值,正是为开发者提供了一份系统性的"造轮子"指南——从零实现 Redis、Docker、Git、数据库、操作系统、编译器……这些构成了现代技术基础设施的关键组件。
本文将深入剖析这个项目的技术教育体系、核心实现思路,以及它如何改变了无数开发者的技术成长路径。
第一部分:项目全景——从应用开发者到系统架构师的阶梯
1.1 项目起源与社区影响力
build-your-own-x 最初由开发者 Daniel Stefanovic 创建,初衷非常简单:收集网络上那些"从零实现某技术"的优质教程。这些教程有一个共同特点——不教你如何使用工具,而是教你如何创造工具。
截至2026年5月,该项目在 GitHub 上获得了:
- Star 数:超过 490,000
- Fork 数:超过 38,000
- 贡献者:超过 200 位
- 覆盖技术领域:30+ 个核心方向
这个数据意味着什么?在 GitHub 历史上,达到这个 Star 数量级的项目屈指可数,且大多是可运行的知名开源软件(如 React、Vue、TensorFlow)。而 build-your-own-x 是一个纯教育性质的资源聚合项目。
1.2 技术领域的系统覆盖
项目将"从零实现"的教程按技术领域分类,涵盖了从底层系统到上层应用的完整技术栈:
| 技术领域 | 代表实现目标 | 核心价值 |
|---|---|---|
| 数据库 | Redis、PostgreSQL、SQLite | 理解存储引擎、索引结构、事务处理 |
| 容器技术 | Docker、Kubernetes | 理解 namespace、cgroup、镜像分层 |
| 版本控制 | Git | 理解内容寻址存储、MERGE 算法 |
| 编程语言 | Lisp、C、Python、Forth | 理解解释器、编译器、虚拟机 |
| 操作系统 | Unix Shell、内核、文件系统 | 理解系统调用、进程管理、内存管理 |
| 网络协议 | HTTP 服务器、WebSocket、BitTorrent | 理解 TCP/IP、应用层协议设计 |
| 分布式系统 | Raft 共识算法、分布式锁 | 理解一致性、容错、CAP 理论 |
| 机器学习 | 神经网络、CNN、Transformer | 理解反向传播、梯度下降、注意力机制 |
| 加密技术 | TLS、区块链、哈希函数 | 理解非对称加密、数字签名、零知识证明 |
第二部分:深度实战——以"从零实现 Redis"为例
为了让你真正理解 build-your-own-x 的价值,我们选取其中最经典的一条学习路径——从零实现 Redis——进行深度实战分析。
2.1 为什么选择 Redis 作为实现目标?
Redis 是技术面试和系统设计中高频出现的技术,但很多开发者对它的理解停留在"一个很快的 KV 存储"。实际上,Redis 的工程实现涉及:
- 高性能 I/O 模型:epoll/kqueue 事件驱动
- 内存数据结构:跳跃表、字典、压缩列表
- 持久化机制:RDB 快照与 AOF 日志
- 集群与高可用:主从复制、哨兵模式、Cluster 分片
通过从零实现一个简化版 Redis,你将理解这些设计决策背后的工程权衡。
2.2 实战:从零实现一个支持 String 和 Hash 的 Redis-like 服务器
下面我们用 Python 实现一个教学版 Redis 服务器,支持:
- RESP 协议解析
- String 和 Hash 两种数据结构
- 非阻塞 I/O(基于
select) - 简单的 AOF 持久化
2.2.1 RESP 协议解析
Redis 使用名为 RESP(REdis Serialization Protocol)的协议进行客户端-服务器通信。RESP 的设计极其简洁:
# 客户端发送命令
*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n
# 服务器响应
+OK\r\n
协议规则:
*开头:数组,后跟元素个数$开头:批量字符串,后跟字节长度+开头:简单字符串(成功响应)-开头:错误响应:开头:整数
解析器实现:
import socket
import threading
class RESPParser:
"""
RESP 协议解析器
RESP 是 Redis 的通信协议,设计极简:
- *<元素个数>\r\n — 数组
- $<字节长度>\r\n — 批量字符串
- +<字符串>\r\n — 简单字符串(成功)
- -<错误信息>\r\n — 错误
- :<整数>\r\n — 整数
"""
def __init__(self, reader: socket.socket):
self.reader = reader
self.buffer = b""
def read_line(self) -> bytes:
"""读取一行(以 \\r\\n 结尾)"""
while b"\r\n" not in self.buffer:
chunk = self.reader.recv(1024)
if not chunk:
raise ConnectionResetError("Client disconnected")
self.buffer += chunk
line, self.buffer = self.buffer.split(b"\r\n", 1)
return line
def parse(self):
"""解析 RESP 消息,返回 Python 对象"""
line = self.read_line()
if line.startswith(b"*"):
# 数组:*<count>
count = int(line[1:])
return [self.parse() for _ in range(count)]
elif line.startswith(b"$"):
# 批量字符串:$<length>\r\n<data>\r\n
length = int(line[1:])
if length == -1:
return None # NULL Bulk String
# 读取指定长度的字节
while len(self.buffer) < length + 2: # +2 for \r\n
chunk = self.reader.recv(1024)
if not chunk:
raise ConnectionResetError("Incomplete bulk string")
self.buffer += chunk
data = self.buffer[:length]
self.buffer = self.buffer[length + 2:] # 跳过 \r\n
return data.decode("utf-8")
elif line.startswith(b"+"):
# 简单字符串
return line[1:].decode("utf-8")
elif line.startswith(b"-"):
# 错误
raise Exception(line[1:].decode("utf-8"))
elif line.startswith(b":"):
# 整数
return int(line[1:])
else:
# 内联命令(兼容 telnet 等简单客户端)
return line.decode("utf-8")
设计要点分析:
为什么 RESP 使用
\\r\\n作为分隔符?
这是 HTTP 的传统,使协议对人类可读(可用 telnet 手动调试)。为什么批量字符串要显式声明长度?
这样接收方可以精确分配内存,且能区分"空字符串"和"NULL"。$和*的嵌套组合如何实现复杂数据结构?
RESP 的数组元素可以是批量字符串,也可以是另一个数组。这使得它可以表示任意嵌套的数据(如[[1, 2], [3, 4]])。
2.2.2 内存数据结构设计
Redis 的核心竞争力之一是其丰富的数据结构。我们实现两个最基础的:
String(动态字符串):
class RedisString:
"""
Redis 的 String 不是简单的 C 字符串(以 \\0 结尾),
而是一个带有长度信息的结构体,支持:
- O(1) 获取字符串长度
- 二进制安全(可存储 \\0)
- 预分配空间减少内存分配次数
"""
def __init__(self):
self.data = {}
self.expire_at = {} # key -> 过期时间戳(秒)
def get(self, key: str) -> str | None:
"""获取值,若已过期则返回 None"""
if self._is_expired(key):
self.delete(key)
return None
return self.data.get(key)
def set(self, key: str, value: str, ex: int | None = None) -> str:
"""
设置键值
ex: 过期时间(秒),如 SET key value EX 60
"""
self.data[key] = value
if ex is not None:
import time
self.expire_at[key] = time.time() + ex
elif key in self.expire_at:
del self.expire_at[key] # 清除旧过期时间
return "OK"
def append(self, key: str, value: str) -> int:
"""追加字符串,返回新长度"""
old = self.get(key) or ""
new = old + value
self.data[key] = new
return len(new)
def incr(self, key: str) -> int:
"""将字符串值增 1(要求值为整数)"""
val = self.get(key)
if val is None:
val = "0"
try:
num = int(val)
except ValueError:
raise Exception("ERR value is not an integer or out of range")
num += 1
self.data[key] = str(num)
return num
def _is_expired(self, key: str) -> bool:
"""检查 key 是否已过期"""
if key not in self.expire_at:
return False
import time
return time.time() > self.expire_at[key]
def delete(self, key: str) -> int:
"""删除 key,返回删除的个数"""
removed = 0
if key in self.data:
del self.data[key]
removed += 1
if key in self.expire_at:
del self.expire_at[key]
removed += 1
return removed
def ttl(self, key: str) -> int:
"""返回 key 的剩余生存时间(秒),-1 表示永不过期,-2 表示不存在"""
if key not in self.data:
return -2
if key not in self.expire_at:
return -1
import time
remaining = int(self.expire_at[key] - time.time())
return remaining if remaining > 0 else -2
Hash(哈希表):
class RedisHash:
"""
Redis 的 Hash 采用两种内部编码:
- ziplist(压缩列表):小哈希(元素少且值小)时使用,省内存
- hashtable(字典):大哈希时使用,O(1) 查找
我们这里简化,直接用 Python dict 实现
"""
def __init__(self):
# 外层 dict: key -> {field -> value}
self.data = {}
def hset(self, key: str, field: str, value: str) -> int:
"""
设置哈希字段
返回:1(字段是新创建的)或 0(字段已存在且被更新)
"""
if key not in self.data:
self.data[key] = {}
is_new = 0 if field in self.data[key] else 1
self.data[key][field] = value
return is_new
def hget(self, key: str, field: str) -> str | None:
"""获取哈希字段值"""
if key not in self.data:
return None
return self.data[key].get(field)
def hgetall(self, key: str) -> list:
"""
获取哈希所有字段和值
返回格式:[field1, value1, field2, value2, ...]
(这是 RESP 数组的表示方式)
"""
if key not in self.data:
return []
hash_data = self.data[key]
result = []
for field, value in hash_data.items():
result.append(field)
result.append(value)
return result
def hdel(self, key: str, *fields) -> int:
"""删除哈希中的一个或多个字段,返回成功删除的字段数"""
if key not in self.data:
return 0
removed = 0
for field in fields:
if field in self.data[key]:
del self.data[key][field]
removed += 1
# 如果哈希变空,删除整个 key
if not self.data[key]:
del self.data[key]
return removed
def hexists(self, key: str, field: str) -> bool:
"""判断字段是否存在"""
return key in self.data and field in self.data[key]
def hlen(self, key: str) -> int:
"""获取哈希中的字段数量"""
if key not in self.data:
return 0
return len(self.data[key])
数据结构设计分析:
为什么 Redis 的 String 要自己管理长度?
C 字符串以\\0结尾,获取长度需要 O(n) 遍历。Redis 的SDS(Simple Dynamic String)在结构体头部存储len,实现 O(1) 长度获取。为什么 Hash 有两种内部编码?
当哈希元素很少时(如存储用户对象的几个字段),使用压缩列表(ziplist)可以大幅减少内存开销。只有当元素增多时,才转换为哈希表。这是空间与时间的经典权衡。
2.2.3 非阻塞 I/O 服务器
Redis 是单线程的,但性能极高。核心原因是它使用了 I/O 多路复用(epoll/kqueue/select)。
import select
import socket
import sys
from typing import Dict, Callable
class RedisServer:
"""
教学版 Redis 服务器
核心架构:
1. 主线程运行事件循环(event loop)
2. 使用 select/epoll 监听多个客户端连接
3. 单线程处理所有命令(避免锁竞争)
4. 命令执行是纯内存操作,极快
注意:生产级 Redis 使用 epoll(Linux)或 kqueue(macOS),
这里用 select 做教学演示(跨平台但性能较低)
"""
def __init__(self, host: str = "127.0.0.1", port: int = 6379):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False) # 非阻塞模式
# 数据存储
self.string_db = RedisString()
self.hash_db = RedisHash()
# 客户端管理
self.clients: Dict[socket.socket, bytes] = {} # socket -> 未处理数据
# 命令路由
self.commands: Dict[str, Callable] = {
"PING": self.cmd_ping,
"SET": self.cmd_set,
"GET": self.cmd_get,
"DEL": self.cmd_del,
"EXISTS": self.cmd_exists,
"APPEND": self.cmd_append,
"INCR": self.cmd_incr,
"TTL": self.cmd_ttl,
"HSET": self.cmd_hset,
"HGET": self.cmd_hget,
"HGETALL": self.cmd_hgetall,
"HDEL": self.cmd_hdel,
"HEXISTS": self.cmd_hexists,
"HLEN": self.cmd_hlen,
}
# ==================== 命令实现 ====================
def cmd_ping(self, args: list) -> str:
"""PING -> PONG"""
return "+PONG\r\n"
def cmd_set(self, args: list) -> str:
"""
SET key value [EX seconds]
示例:SET name "Alice" EX 60
"""
if len(args) < 2:
return "-ERR wrong number of arguments for 'set' command\r\n"
key = args[0]
value = args[1]
ex = None
# 解析可选参数(EX)
if len(args) > 2 and args[2].upper() == "EX":
if len(args) < 4:
return "-ERR syntax error\r\n"
try:
ex = int(args[3])
except ValueError:
return "-ERR value is not an integer or out of range\r\n"
result = self.string_db.set(key, value, ex)
return f"+{result}\r\n"
def cmd_get(self, args: list) -> str:
"""GET key"""
if len(args) != 1:
return "-ERR wrong number of arguments for 'get' command\r\n"
value = self.string_db.get(args[0])
if value is None:
return "$-1\r\n" # NULL Bulk String
return f"${len(value)}\r\n{value}\r\n"
def cmd_del(self, args: list) -> str:
"""DEL key [key ...]"""
count = 0
for key in args:
count += self.string_db.delete(key)
# 也要从 hash 中删除
count += self.hash_db.hdel(key, *[]) # hdel 需要特殊处理
if key in self.hash_db.data:
del self.hash_db.data[key]
count += 1
return f":{count}\r\n"
def cmd_exists(self, args: list) -> str:
"""EXISTS key"""
if len(args) != 1:
return "-ERR wrong number of arguments\r\n"
exists = 1 if args[0] in self.string_db.data or args[0] in self.hash_db.data else 0
return f":{exists}\r\n"
def cmd_append(self, args: list) -> str:
"""APPEND key value"""
if len(args) != 2:
return "-ERR wrong number of arguments\r\n"
try:
new_len = self.string_db.append(args[0], args[1])
return f":{new_len}\r\n"
except Exception as e:
return f"-ERR {str(e)}\r\n"
def cmd_incr(self, args: list) -> str:
"""INCR key"""
if len(args) != 1:
return "-ERR wrong number of arguments\r\n"
try:
new_val = self.string_db.incr(args[0])
return f":{new_val}\r\n"
except Exception as e:
return f"-ERR {str(e)}\r\n"
def cmd_ttl(self, args: list) -> str:
"""TTL key"""
if len(args) != 1:
return "-ERR wrong number of arguments\r\n"
return f":{self.string_db.ttl(args[0])}\r\n"
def cmd_hset(self, args: list) -> str:
"""HSET key field value"""
if len(args) != 3:
return "-ERR wrong number of arguments for 'hset' command\r\n"
result = self.hash_db.hset(args[0], args[1], args[2])
return f":{result}\r\n"
def cmd_hget(self, args: list) -> str:
"""HGET key field"""
if len(args) != 2:
return "-ERR wrong number of arguments for 'hget' command\r\n"
value = self.hash_db.hget(args[0], args[1])
if value is None:
return "$-1\r\n"
return f"${len(value)}\r\n{value}\r\n"
def cmd_hgetall(self, args: list) -> str:
"""HGETALL key"""
if len(args) != 1:
return "-ERR wrong number of arguments\r\n"
items = self.hash_db.hgetall(args[0])
# 返回数组:*<元素个数>\r\n<每个元素>
response = f"*{len(items)}\r\n"
for item in items:
response += f"${len(item)}\r\n{item}\r\n"
return response
def cmd_hdel(self, args: list) -> str:
"""HDEL key field [field ...]"""
if len(args) < 2:
return "-ERR wrong number of arguments for 'hdel' command\r\n"
key = args[0]
fields = args[1:]
count = self.hash_db.hdel(key, *fields)
return f":{count}\r\n"
def cmd_hexists(self, args: list) -> str:
"""HEXISTS key field"""
if len(args) != 2:
return "-ERR wrong number of arguments\r\n"
exists = self.hash_db.hexists(args[0], args[1])
return f":{1 if exists else 0}\r\n"
def cmd_hlen(self, args: list) -> str:
"""HLEN key"""
if len(args) != 1:
return "-ERR wrong number of arguments\r\n"
return f":{self.hash_db.hlen(args[0])}\r\n"
# ==================== 服务器主循环 ====================
def run(self):
"""启动服务器主循环"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(128)
print(f"Redis server started on {self.host}:{self.port}")
# 需要监听的 socket 列表
read_sockets = [self.server_socket]
try:
while True:
# select 系统调用:监听哪些 socket 可读
# 第一个参数:读监听列表
# 第二个参数:写监听列表(这里用不到)
# 第三个参数:异常监听列表
# 第四个参数:超时(秒),None 表示永久阻塞
readable, _, errored = select.select(
read_sockets, [], read_sockets, 1.0
)
for sock in readable:
if sock is self.server_socket:
# 新连接到达
self._accept_client(read_sockets)
else:
# 已有连接上有数据可读
self._handle_client(sock, read_sockets)
# 处理异常连接
for sock in errored:
self._remove_client(sock, read_sockets)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
self.server_socket.close()
def _accept_client(self, read_sockets: list):
"""接受新客户端连接"""
client_socket, addr = self.server_socket.accept()
client_socket.setblocking(False)
read_sockets.append(client_socket)
self.clients[client_socket] = b""
print(f"New connection from {addr}")
def _handle_client(self, client_socket: socket.socket, read_sockets: list):
"""处理客户端请求"""
try:
# 读取客户端发送的数据
data = client_socket.recv(4096)
if not data:
# 客户端断开连接
self._remove_client(client_socket, read_sockets)
return
# 追加到缓冲区
self.clients[client_socket] += data
# 尝试解析并执行命令(可能有多条命令粘包)
parser = RESPParser(client_socket)
parser.buffer = self.clients[client_socket]
while b"\r\n" in parser.buffer:
# 解析一条命令
try:
command = parser.parse()
except Exception as e:
# 协议解析出错,返回错误并断开
error_msg = f"-ERR protocol error: {str(e)}\r\n"
client_socket.sendall(error_msg.encode("utf-8"))
break
if isinstance(command, list):
# 完整命令
response = self._execute_command(command)
client_socket.sendall(response.encode("utf-8"))
elif isinstance(command, str):
# 内联命令(如 PING)
response = self._execute_command([command.upper()])
client_socket.sendall(response.encode("utf-8"))
# 更新缓冲区(解析过的部分已被 consume)
self.clients[client_socket] = parser.buffer
except ConnectionResetError:
self._remove_client(client_socket, read_sockets)
except Exception as e:
print(f"Error handling client: {e}")
self._remove_client(client_socket, read_sockets)
def _execute_command(self, command: list) -> str:
"""执行命令并返回 RESP 响应"""
if not command:
return "-ERR empty command\r\n"
cmd_name = command[0].upper()
args = [str(arg) for arg in command[1:]]
if cmd_name in self.commands:
try:
return self.commands[cmd_name](args)
except Exception as e:
return f"-ERR {str(e)}\r\n"
else:
return f"-ERR unknown command '{cmd_name}'\r\n"
def _remove_client(self, client_socket: socket.socket, read_sockets: list):
"""移除客户端连接"""
if client_socket in read_sockets:
read_sockets.remove(client_socket)
if client_socket in self.clients:
del self.clients[client_socket]
try:
client_socket.close()
except:
pass
# ==================== 启动服务器 ====================
if __name__ == "__main__":
server = RedisServer()
server.run()
2.2.4 AOF 持久化实现
Redis 提供两种持久化方式:RDB(快照)和 AOF(Append-Only File)。我们实现 AOF:
import os
import atexit
class AOFManager:
"""
AOF(Append-Only File)持久化
原理:将每个写命令追加到文件末尾,重启时重放日志恢复数据
优点:
- 数据丢失窗口小(可配置为每秒 fsync 或每次写都 fsync)
- 文件格式可读(就是 RESP 协议)
缺点:
- 文件会越来越大(需要重写机制压缩)
- 重启恢复慢(要重放所有命令)
"""
def __init__(self, filename: str = "appendonly.aof"):
self.filename = filename
self.file = None
self.aof_buffer = [] # 缓冲区(用于批量刷盘)
# 注册退出时刷盘
atexit.register(self.flush)
def open(self):
"""打开 AOF 文件(追加模式)"""
self.file = open(self.filename, "a", buffering=1) # 行缓冲
def append(self, command: list):
"""
将命令追加到 AOF 缓冲区
command: ["SET", "name", "Alice"] ->
写入 "*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$5\r\nAlice\r\n"
"""
resp_str = self._command_to_resp(command)
self.aof_buffer.append(resp_str)
# 简单策略:缓冲区满 100 条或每次都刷盘
# 生产环境通常用 "everysec"(每秒刷盘)
if len(self.aof_buffer) >= 100:
self.flush()
def flush(self):
"""将缓冲区写入文件并 fsync"""
if not self.file or not self.aof_buffer:
return
for cmd in self.aof_buffer:
self.file.write(cmd)
self.aof_buffer.clear()
# fsync:强制操作系统将缓冲区数据写入磁盘
# 这是保证持久化的关键系统调用
os.fsync(self.file.fileno())
def _command_to_resp(self, command: list) -> str:
"""将命令转换为 RESP 格式字符串"""
parts = []
parts.append(f"*{len(command)}\r\n")
for arg in command:
arg_str = str(arg)
parts.append(f"${len(arg_str)}\r\n{arg_str}\r\n")
return "".join(parts)
def load(self, server: RedisServer):
"""
从 AOF 文件恢复数据
实现方式:逐行解析 RESP,重放命令
"""
if not os.path.exists(self.filename):
return
print(f"Loading AOF file: {self.filename}")
with open(self.filename, "r") as f:
content = f.read()
# 简单解析:按 \\r\\n 分割
# 注意:这个解析器是简化版,生产环境需要完整的 RESP 解析
lines = content.split("\r\n")
i = 0
while i < len(lines):
if not lines[i]:
i += 1
continue
if lines[i].startswith("*"):
# 数组:读取所有元素
count = int(lines[i][1:])
command = []
i += 1
for _ in range(count):
if lines[i].startswith("$"):
# 批量字符串:跳过长度行,取下一行作为值
i += 1
command.append(lines[i])
else:
command.append(lines[i])
i += 1
# 执行命令(但不写入 AOF,避免恢复时又追加)
self._execute_without_aof(server, command)
else:
i += 1
print(f"AOF loaded, {len(server.string_db.data)} keys restored")
def _execute_without_aof(self, server: RedisServer, command: list):
"""执行命令但不触发 AOF 写入(用于 AOF 恢复时)"""
# 临时保存 append 方法
original_append = server.aof.append if hasattr(server, 'aof') else None
# 替换 append 为空操作
if hasattr(server, 'aof'):
server.aof.append = lambda cmd: None
# 执行命令
server._execute_command(command)
# 恢复 append 方法
if original_append:
server.aof.append = original_append
AOF 设计分析:
为什么 AOF 比 RDB 更安全?
RDB 是定期快照,宕机时丢失上次快照后的所有数据。AOF 可以配置为每秒 fsync,最多丢失 1 秒数据。AOF 文件越来越大怎么办?
Redis 提供BGREWRITEAOF命令,fork 一个子进程,遍历内存数据生成最小命令集,生成新的 AOF 文件。fsync 的性能影响?
fsync()是阻塞系统调用,每秒一次对性能影响小;每次写都 fsync 会严重影响吞吐量。
第三部分:从 build-your-own-x 到系统思维
3.1 造轮子的四个层次
通过 build-your-own-x 的学习,开发者会经历四个认知层次:
层次一:能跑就行(Make it work)
- 目标:实现基本功能
- 关注点:语法、API 调用
- 示例:实现一个能响应
SET/GET的内存 KV 存储
层次二:理解原理(Understand it)
- 目标:理解为什么这样设计
- 关注点:数据结构、算法复杂度、系统调用
- 示例:理解为什么 Redis 用 epoll 而不是多线程
层次三:工程权衡(Engineering trade-offs)
- 目标:在 constraints 下做设计决策
- 关注点:内存 vs CPU、一致性 vs 可用性、简洁 vs 灵活
- 示例:为什么 Redis 的 Hash 有两种内部编码?
层次四:创造新东西(Create new)
- 目标:基于理解创造更好的工具
- 关注点:痛点、创新、架构演进
- 示例:设计一个比 Redis 更适合某个场景的专用 KV 存储
3.2 系统思维的培养
build-your-own-x 的核心价值不在于"你会实现一个生产级 Redis",而在于培养系统思维:
- 分层思维:理解技术栈的每一层(从硬件到应用)
- 权衡思维:理解为什么没有"最好"的设计,只有"最适合当前场景"的设计
- 演进思维:理解技术是如何在需求变化中演进的(如 Redis 从单线程到支持多线程 I/O)
- 调试思维:当你自己实现过,你对第三方工具的 Bug 会有更深的洞察
第四部分:其他经典实现路径解析
除了 Redis,build-your-own-x 还提供了许多其他领域的实现指南。
4.1 从零实现 Docker
核心概念:
- namespace:进程隔离(PID、NET、MNT、UTS、IPC、USER)
- cgroup:资源限制(CPU、内存、IO)
- UnionFS:镜像分层存储
实现一个简单的容器:
# 1. 创建文件系统(用 busybox 作为 rootfs)
mkdir -p /tmp/mycontainer/rootfs
docker export $(docker create busybox) | tar -x -C /tmp/mycontainer/rootfs
# 2. 用 unshare 创建新的 namespace
unshare --mount --pid --fork --mount-proc chroot /tmp/mycontainer/rootfs /bin/sh
# 此时你在一个"容器"中,PID 1 是 /bin/sh,看不到宿主进程
Go 实现核心代码(使用 github.com/opencontainers/runc/libcontainer):
package main
import (
"fmt"
"os"
"os/exec"
"syscall"
)
func main() {
// 1. 创建子进程(用 clone 系统调用创建新的 namespace)
cmd := exec.Command("/proc/self/exe")
cmd.SysProcAttr = &syscall.SysProcAttr{
Cloneflags: syscall.CLONE_NEWUTS | // UTS namespace(主机名隔离)
syscall.CLONE_NEWPID | // PID namespace(进程 ID 隔离)
syscall.CLONE_NEWNS | // Mount namespace(挂载点隔离)
syscall.CLONE_NEWNET, // Network namespace(网络隔离)
}
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
fmt.Printf("Error starting container: %v\n", err)
os.Exit(1)
}
// 2. 在子进程中执行 /bin/sh
// (实际实现需要先在子进程中 mount proc、设置 hostname 等)
cmd.Wait()
}
4.2 从零实现 Git
Git 的核心是一个"内容寻址文件系统":
.git/
objects/ # 存储所有数据(blob、tree、commit)
xx/xxxxx... # 前2位作为目录,后38位作为文件名
refs/ # 引用(分支、标签)
heads/ # 分支:refs/heads/main -> commit hash
tags/ # 标签
HEAD # 当前分支的引用
index # 暂存区(二进制格式)
实现一个最简单的 git add 和 git commit:
import hashlib
import zlib
import os
import time
import json
class MiniGit:
"""
迷你版 Git 实现
核心概念:
1. Blob:文件内容(去重:相同内容只存一次)
2. Tree:目录结构(指向 blob 和其他 tree)
3. Commit:快照(指向 tree + 父 commit + 作者/提交信息)
4. Ref:分支是指向 commit 的指针
"""
def __init__(self, repo_path: str = "."):
self.repo_path = repo_path
self.git_dir = os.path.join(repo_path, ".git")
def init(self):
"""初始化仓库"""
os.makedirs(os.path.join(self.git_dir, "objects"), exist_ok=True)
os.makedirs(os.path.join(self.git_dir, "refs", "heads"), exist_ok=True)
# 创建 HEAD 文件
with open(os.path.join(self.git_dir, "HEAD"), "w") as f:
f.write("ref: refs/heads/main\n")
print(f"Initialized empty MiniGit repository in {self.git_dir}")
def _hash_object(self, data: bytes, obj_type: str) -> str:
"""
计算对象的 SHA-1 哈希并存储
Git 对象格式:
header = "<type> <size>\0"
content = header + data
hash = sha1(content)
存储 = zlib.compress(content)
"""
header = f"{obj_type} {len(data)}\0".encode("utf-8")
content = header + data
obj_hash = hashlib.sha1(content).hexdigest()
# 存储到 objects 目录
obj_dir = os.path.join(self.git_dir, "objects", obj_hash[:2])
obj_path = os.path.join(obj_dir, obj_hash[2:])
if not os.path.exists(obj_path):
os.makedirs(obj_dir, exist_ok=True)
with open(obj_path, "wb") as f:
f.write(zlib.compress(content))
return obj_hash
def add(self, file_path: str):
"""
将文件添加到暂存区(简化版:直接写入 objects)
实际 Git 还会更新 index 文件(二进制格式)
"""
with open(file_path, "rb") as f:
data = f.read()
obj_hash = self._hash_object(data, "blob")
print(f"Added {file_path} as blob {obj_hash}")
# 简化:用 .git/index 存储 "文件名 -> hash" 的映射
index_path = os.path.join(self.git_dir, "index")
index = {}
if os.path.exists(index_path):
with open(index_path, "r") as f:
index = json.load(f)
index[file_path] = obj_hash
with open(index_path, "w") as f:
json.dump(index, f)
def commit(self, message: str):
"""
创建提交
简化版:只创建一个 commit 对象,不处理 tree
"""
# 读取暂存区
index_path = os.path.join(self.git_dir, "index")
if not os.path.exists(index_path):
print("Nothing to commit")
return
with open(index_path, "r") as f:
index = json.load(f)
# 创建 tree 对象(简化:只存储文件名和 blob hash)
tree_content = "\n".join([f"{file} {hash}" for file, hash in index.items()]).encode("utf-8")
tree_hash = self._hash_object(tree_content, "tree")
# 创建 commit 对象
# 读取父 commit
head_ref = self._get_head_ref()
parent_line = ""
if head_ref:
parent_line = f"parent {head_ref}\n"
commit_content = f"""tree {tree_hash}
{parent_line}author Mini Git <minigit@example.com> {int(time.time())}
committer Mini Git <minigit@example.com> {int(time.time())}
{message}
""".encode("utf-8")
commit_hash = self._hash_object(commit_content, "commit")
# 更新 HEAD
with open(os.path.join(self.git_dir, "HEAD"), "w") as f:
f.write(f"ref: refs/heads/main\n")
with open(os.path.join(self.git_dir, "refs", "heads", "main"), "w") as f:
f.write(commit_hash)
print(f"Created commit {commit_hash}")
def _get_head_ref(self) -> str | None:
"""获取当前 HEAD 指向的 commit hash"""
head_path = os.path.join(self.git_dir, "HEAD")
if not os.path.exists(head_path):
return None
with open(head_path, "r") as f:
ref = f.read().strip()
if ref.startswith("ref: "):
ref_path = ref[5:] # 去掉 "ref: " 前缀
full_path = os.path.join(self.git_dir, ref_path)
if os.path.exists(full_path):
with open(full_path, "r") as f:
return f.read().strip()
return None
# 使用示例
if __name__ == "__main__":
git = MiniGit()
# 初始化
git.init()
# 创建测试文件
with open("test.txt", "w") as f:
f.write("Hello, MiniGit!")
# 添加并提交
git.add("test.txt")
git.commit("Initial commit")
第五部分:build-your-own-x 的学习方法论
5.1 如何选择适合自己的实现项目?
build-your-own-x 包含了 30+ 个领域,如何选择和排序?
阶段一:巩固基础(适合 0-2 年经验)
- 从零实现编程语言(如 Lisp 解释器)——理解 AST、求值策略
- 从零实现数据库(如 SQLite clone)——理解 B-tree、SQL 解析
- 从零实现操作系统(如 xv6)——理解系统调用、进程调度
阶段二:深入系统(适合 2-5 年经验)
- 从零实现 Docker——理解 Linux namespace/cgroup
- 从零实现 Redis——理解事件驱动、内存数据结构
- 从零实现分布式系统(如 Raft 共识算法)——理解一致性
阶段三:创造创新(适合 5+ 年经验)
- 基于已有理解,发现现有工具的痛点
- 设计一个更好的工具(或改进方案)
- 开源并迭代
5.2 学习时的常见误区
误区一:追求功能完整
正确做法:实现一个"能用"的 MVP,理解核心流程即可。不需要支持所有命令/功能。
误区二:不看现有实现
正确做法:先自己实现,再看生产级实现的源码,对比差异并思考"为什么"。
误区三:孤立学习
正确做法:将多个项目联系起来。例如,实现了 Redis 后,思考"如何用 Docker 部署 Redis?如何用 Git 管理 Redis 配置文件?"
第六部分:从 build-your-own-x 到技术影响力
6.1 为什么这个项目能获得 49 万 Star?
- 降低学习门槛:将"从零实现 XX"的优质资源聚合,省去开发者搜索时间
- 系统知识体系:覆盖从底层到应用的完整技术栈
- 社区驱动:任何开发者都可以提交 PR,添加新资源
- 长期价值:技术会过时,但"理解原理"的能力永不过时
6.2 如何基于 build-your-own-x 建立个人影响力?
- 写博客:记录你"从零实现 XX"的过程和心得
- 做分享:在公司或技术社区做深度分享
- 贡献教程:如果你发现某个领域的优质教程未被收录,可以向
build-your-own-x提交 PR - 创造新项目:基于你的实现经验,创造一个解决实际问题的工具
总结与展望
build-your-own-x 不仅是一个 GitHub 仓库,它代表了一种深度学习理念:
在这个 AI 辅助编程的时代,会用工具的人越来越多,但理解工具的人永远是稀缺的。
当你亲手实现过一个简化版 Redis,你对"为什么 Redis 是单线程的"、"Redis 的 Hash 为什么要有两种编码"这类问题的理解,远深于只看过官方文档的人。
未来展望:
随着 AI 编程助手(如 Claude、Cursor)的普及,"写代码"的门槛会越来越低。但"设计系统"的能力——理解权衡、做出合理架构决策——会变得越来越重要。
build-your-own-x 所倡导的"造轮子"精神,在未来不仅不会过时,反而会更有价值。
参考资料
- build-your-own-x GitHub 仓库
- Redis 设计与实现(黄健宏)
- Docker 容器与容器云(浙江大学)
- Pro Git(Scott Chacon)
- 深入理解计算机系统(CSAPP)
文章字数:约 15,000 字
本文是"从零构建技术基础设施"系列的第一篇。下一篇将深入剖析"从零实现 PostgreSQL"——理解关系型数据库的核心架构。