2025-11-06 12:27:21

百亿级关系链系统架构设计:从单表到分布式的演进之路

百亿级关系链系统架构设计:从单表到分布式的演进之路

引言:一个被误解的问题

当我们谈论"百亿级关系链系统"时,很多人会认为核心挑战是"查询慢"。但真相是:查询慢不是问题,单机容量不够才是问题

想象一下微博的场景:5亿用户,每人平均200个关注关系,总计1000亿条数据。这些数据如果放在一张MySQL表里,会发生什么?

单表存储1000亿条记录:
- 数据文件:~50TB
- 索引文件:~20TB
- 总容量:~70TB

现实:单机MySQL最佳实践 < 1TB
差距:70倍

这篇文章将带你理解:从单表到分布式系统,架构演进的每一步都是被逼出来的,而数据冗余是分库分表带来的必然代价


第一部分:关系链的业务模型

1.1 两种关系类型

弱关系(单向关注)

  • 典型场景:微博、抖音、B站
  • 特点:A关注B,B不需要同意
  • 数据特征:非对称关系
用户A关注用户B:
┌─────┐  关注  ┌─────┐
│  A  │ ─────→ │  B  │
└─────┘        └─────┘

产生一条关系记录:
{follower: A, followee: B}

强关系(双向好友)

  • 典型场景:微信、QQ、LinkedIn
  • 特点:需要双方确认
  • 数据特征:对称关系
A和B互为好友:
┌─────┐  好友  ┌─────┐
│  A  │ ←────→ │  B  │
└─────┘        └─────┘

本质上是两条单向关系:
{follower: A, followee: B}
{follower: B, followee: A}

结论:从数据存储角度看,强关系 = 2条弱关系。因此我们以弱关系为基础来设计架构。

1.2 核心查询场景

场景1:正向查询(查关注列表)

-- 张三关注了哪些人? SELECT followee_id FROM relations WHERE follower_id = 123456; 业务场景: - 用户查看"我的关注"列表 - 推送关注对象的动态 - 推荐算法分析用户兴趣

场景2:反向查询(查粉丝列表)

-- 哪些人关注了李四? SELECT follower_id FROM relations WHERE followee_id = 789012; 业务场景: - 用户查看"我的粉丝"列表 - 内容分发(给所有粉丝推送) - 统计粉丝数、计算影响力

关键观察:这两个查询同等重要,都是高频操作。


第二部分:单表阶段 - 索引就够了

2.1 简单有效的单表设计

表结构

CREATE TABLE relations ( id BIGINT PRIMARY KEY AUTO_INCREMENT, follower_id BIGINT NOT NULL, -- 关注者(粉丝) followee_id BIGINT NOT NULL, -- 被关注者 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_relation (follower_id, followee_id), -- 防重复关注 KEY idx_follower (follower_id), -- 正向查询索引 KEY idx_followee (followee_id) -- 反向查询索引 ) ENGINE=InnoDB;

2.2 查询性能分析

B+树索引原理回顾

索引 idx_follower (follower_id):

           [根节点]
          /    |    \
    [中间层] [中间层] [中间层]
    /  |  \  /  |  \  /  |  \
  [叶节点数据块×N]

查询路径:根→中间层→叶节点
层级计算:log₁₀₀₀(记录数)

不同数据规模的性能

数据量 索引层级 单次查询耗时 说明
100万 3层 1-2ms 内存命中率高
1000万 4层 3-5ms 部分磁盘IO
5000万 4层 5-10ms 仍可接受
1亿 5层 20-50ms 开始变慢
10亿 5-6层 100-500ms 不可接受

实际测试数据(基于16GB内存MySQL服务器):

测试1:1000万条记录
SELECT followee_id FROM relations WHERE follower_id = ?;
平均耗时:3.2ms
QPS:~3000

测试2:1亿条记录
SELECT followee_id FROM relations WHERE follower_id = ?;
平均耗时:25ms
QPS:~400

测试3:10亿条记录
SELECT followee_id FROM relations WHERE follower_id = ?;
平均耗时:200ms(冷数据可达500ms)
QPS:~50

2.3 单表的物理瓶颈

问题1:存储容量

10亿条记录的实际占用(每行约100字节):
- 数据文件:100GB
- 主键索引:20GB
- follower_id索引:20GB
- followee_id索引:20GB
总计:~160GB

单机MySQL推荐:< 500GB(包含所有库表)

问题2:内存命中率

InnoDB Buffer Pool(假设16GB):
- 1000万记录:索引全部缓存,命中率>95%
- 1亿记录:索引部分缓存,命中率~70%
- 10亿记录:索引频繁换页,命中率<30%

命中率下降 → 磁盘IO增加 → 查询变慢

问题3:写入竞争

高并发写入时的锁竞争:
- 主键自增锁
- 辅助索引页锁
- 行锁

实测:单表超过5000万后,写入TPS从1万降到3000

问题4:运维困难

备份时间:
- 1000万记录:5分钟
- 1亿记录:1小时
- 10亿记录:10小时

DDL操作(加字段):
- 1000万记录:几秒
- 1亿记录:几分钟
- 10亿记录:几小时(需要停服)

2.4 单表的承载极限

经验值

保守估计:5000万条记录
- 数据+索引:~20GB
- 查询耗时:<10ms
- 写入TPS:>5000
- 运维可控:备份<30分钟

激进估计:1亿条记录
- 需要更好的硬件(SSD、32GB内存)
- 查询耗时:10-30ms
- 写入TPS:>3000
- 运维有挑战

危险区:>2亿条记录
- 查询明显变慢
- 写入性能下降
- 运维困难

结论:当数据量突破亿级时,必须考虑分库分表。


第三部分:分库分表的核心矛盾

3.1 为什么要分库分表?

核心目标

  1. 突破单机存储容量限制
  2. 通过水平扩展提升整体性能
  3. 分散读写压力到多台机器

分库分表原理

原来:1个库,1张表,100亿数据
      ↓
现在:1024个库,每库1张表,每表1000万数据

单表数据量:100亿 / 1024 ≈ 1000万
恢复到单表的黄金区间!

3.2 分库的关键决策:选择分库键

什么是分库键?

分库键(Sharding Key)决定了一条数据存储在哪个库:

def get_database(sharding_key): return sharding_key % DATABASE_COUNT # 示例 user_123456的数据 → 123456 % 1024 = 库#808 user_789012的数据 → 789012 % 1024 = 库#780

关键问题:我们有两个候选分库键

选项A:按 follower_id 分库(关注者ID)
选项B:按 followee_id 分库(被关注者ID)

只能选一个!这就是矛盾的根源。

3.3 方案A:按 follower_id 分库

分库规则

库编号 = follower_id % 1024

数据分布示例

用户A(123456) 关注 用户B(789012):
→ 存储在库#808(因为 123456 % 1024 = 808)

用户C(234567) 关注 用户B(789012):
→ 存储在库#807(因为 234567 % 1024 = 807)

用户A(123456) 关注 用户D(890123):
→ 存储在库#808(因为 123456 % 1024 = 808)

规律:同一个用户的所有"关注关系"都在同一个库。

查询效果分析

-- 查询1:用户A(123456)关注了谁? 步骤1: 计算库编号 = 123456 % 1024 = 808 步骤2: 连接库#808 步骤3: SELECT followee_id FROM relations WHERE follower_id = 123456 结果: ✓ 单库查询,5ms 图示: 应用 → 计算hash(123456) → 库#808 → 返回结果 (单次数据库调用)
-- 查询2:谁关注了用户B(789012)? 问题: 无法通过789012定位到具体的库(因为分库键是follower_id) 必须: 查询所有1024个库 步骤1: 向所有库发起查询 SELECT follower_id FROM relations WHERE followee_id = 789012 步骤2: 等待所有库返回结果 步骤3: 应用层合并结果 图示: 应用 ─┬→ 库#0 ─→ 可能有结果 ├→ 库#1 ─→ 可能有结果 ├→ 库#2 ─→ 可能有结果 ├→ ... └→ 库#1023 ─→ 可能有结果 (1024次数据库调用!) 结果: ✗ 扇出查询,1024次网络往返,5000ms

性能对比

查询类型 数据库调用次数 网络延迟 总耗时 可用性
正向查询(关注列表) 1次 5ms 5ms
反向查询(粉丝列表) 1024次 5ms×1024 5000ms+

3.4 方案B:按 followee_id 分库

结果完全相反

按 followee_id 分库:
- 反向查询(粉丝列表):单库查询 ✓ 5ms
- 正向查询(关注列表):扇出查询 ✗ 5000ms

3.5 核心矛盾的本质

数学表达

定义:
- 分库函数:f(key) → database_id
- 查询维度集合:Q = {正向查询, 反向查询}

约束:
f() 只能基于一个字段

结论:
|Q| = 2(两个查询维度)
能优化的维度 = 1
无法优化的维度 = 1(必须扇出查询)

图示理解

数据记录:{follower: A, followee: B}

如果按A分库:
- 查A的数据 → 定位到A所在的库 ✓
- 查B的数据 → B的数据分散在所有库 ✗

如果按B分库:
- 查B的数据 → 定位到B所在的库 ✓
- 查A的数据 → A的数据分散在所有库 ✗

这是一个二选一的困境!

3.6 扇出查询为什么不可接受?

性能问题

假设:
- 库数量:1024
- 单库查询延迟:5ms
- 网络往返延迟:2ms

扇出查询总耗时:
= max(所有库的查询时间) + 网络延迟
= 5ms + 2ms × 1024(串行)或 5ms(并行但受连接数限制)
= 实际约100-500ms(考虑连接池、网络抖动)

vs 单库查询:5ms

性能差距:20-100倍

稳定性问题

场景:某一个库出现慢查询

单库查询:
库#808响应慢 → 只影响落在该库的用户

扇出查询:
库#808响应慢 → 影响所有用户的反向查询
(因为必须等待所有库返回)

雪崩效应:1个库拖垮整个系统

资源消耗问题

QPS:10000次/秒(反向查询)
库数量:1024

总数据库连接数:10000 × 1024 = 1024万
即使用连接池,也需要维护海量连接

而单库查询只需要:10000 个连接

实际案例

微博场景:
- 用户量:5亿
- 日活:2亿
- 峰值QPS:"查看粉丝"功能 100万/秒

如果用扇出查询:
- 数据库总调用量:100万 × 1024 = 10.24亿次/秒
- 单库需承受:10.24亿 / 1024 = 100万QPS
- 单库极限:约1万QPS

结论:完全不可行!

第四部分:数据冗余 - 不得已的选择

4.1 解决方案:存储两份数据

核心思想

既然一个分库键只能优化一个查询维度,那就存储两份数据,用两个不同的分库键。

架构设计

关注表(guanzhu):
- 分库键:follower_id
- 优化查询:"X关注了谁"
- 库编号 = follower_id % 1024

粉丝表(fensi):
- 分库键:followee_id
- 优化查询:"谁关注了X"
- 库编号 = followee_id % 1024

数据示例

业务操作:用户A(123456) 关注 用户B(789012)

关注表存储(库#808 = 123456 % 1024):
┌──────────┬────────────┬─────────────┐
│ id       │ follower   │ followee    │
├──────────┼────────────┼─────────────┤
│ 10086    │ 123456     │ 789012      │
└──────────┴────────────┴─────────────┘

粉丝表存储(库#780 = 789012 % 1024):
┌──────────┬────────────┬─────────────┐
│ id       │ user_id    │ fan_id      │
├──────────┼────────────┼─────────────┤
│ 20188    │ 789012     │ 123456      │
└──────────┴────────────┴─────────────┘

本质:同一份业务数据,用不同的维度组织了两次

4.2 查询路由

正向查询(查关注列表):

def get_following_list(user_id): # 计算库编号 db_id = user_id % 1024 # 连接关注表 db = get_guanzhu_database(db_id) # 查询 result = db.query( "SELECT followee FROM guanzhu WHERE follower = ?", user_id ) return result 执行流程: 用户123456查关注列表 → 计算 123456 % 1024 = 808 → 连接 guanzhu库#808 → 查询 WHERE follower=1234565ms返回

反向查询(查粉丝列表):

def get_fans_list(user_id): # 计算库编号 db_id = user_id % 1024 # 连接粉丝表 db = get_fensi_database(db_id) # 查询 result = db.query( "SELECT fan_id FROM fensi WHERE user_id = ?", user_id ) return result 执行流程: 用户789012查粉丝列表 → 计算 789012 % 1024 = 780 → 连接 fensi库#780 → 查询 WHERE user_id=7890125ms返回

完美解决

两个查询都变成单库查询:
- 正向查询:guanzhu表,5ms
- 反向查询:fensi表,5ms

性能一致,没有扇出查询!

4.3 代价分析

空间代价

原方案:
- 单份数据:100亿条记录
- 存储空间:10TB

冗余方案:
- 两份数据:200亿条记录(100亿×2)
- 存储空间:20TB

空间增加:2倍

维护代价

原方案:
- 写入一次
- 保证一份数据的一致性

冗余方案:
- 写入两次
- 保证两份数据的一致性(这是核心挑战!)

收益

性能提升:
- 查询延迟:从5000ms降到5ms(1000倍提升)
- 系统吞吐:从1000 QPS提升到100万 QPS(1000倍提升)
- 稳定性:从依赖所有库到只依赖单库(质的飞跃)

投入产出比:
- 投入:2倍存储 + 一致性复杂度
- 产出:1000倍性能提升

显然值得!

4.4 底层规律总结

规律1:分库分表的本质约束

分布式系统的基本定律:
数据分片 ⇒ 查询局部性

一条数据只能存在一个位置
一个查询要定位数据,必须知道数据在哪

当查询条件不包含分库键 ⇒ 无法定位 ⇒ 必须扫描所有分片

规律2:数据冗余的本质

数据冗余 = 用空间换时间 + 用存储换查询灵活性

本质:同一份业务数据,按不同维度组织多次存储
目的:让每个查询维度都能快速定位数据

公式:
冗余份数 = 高频查询维度数量

规律3:不可能三角

           查询维度多样性
                 /\
                /  \
               /    \
              /      \
             /________\
      单库查询         存储不冗余

在分布式系统中,三者不可兼得:
- 要查询多样性 + 单库查询 → 必须冗余
- 要查询多样性 + 不冗余 → 必须扇出查询
- 要单库查询 + 不冗余 → 查询维度受限

第五部分:数据冗余的三种实现方案

现在我们知道了"为什么要冗余",接下来解决"怎么冗余"。

5.1 核心挑战:一致性问题

问题场景

用户A关注用户B,需要写入两个表:
1. guanzhu表(库#808)
2. fensi表(库#780)

这是两次独立的数据库操作,不在一个事务中!

可能的问题:
- 写入guanzhu成功,写入fensi失败 → 数据不一致
- 两次写入之间有延迟 → 短时间不一致
- 网络分区导致部分写入 → 数据丢失

一致性要求

理想:强一致性(两个表实时同步)
现实:最终一致性(短时间不一致可接受,但最终要一致)

业务影响分析:
- 不一致窗口:1秒以内 → 用户基本无感知 ✓
- 不一致窗口:1分钟 → 用户可能投诉 ✗
- 永久不一致 → 数据错误,严重问题 ✗✗✗

目标:追求最终一致性,尽量缩短不一致窗口

5.2 方案一:服务同步冗余

架构图

┌─────────────┐
│  应用服务   │
└──────┬──────┘
       │
       ├──→ guanzhu库#808 (写入1)
       │         ↓ 成功
       │
       └──→ fensi库#780   (写入2)
                 ↓ 成功

       ← 返回成功给用户

代码实现

def follow(follower_id, followee_id): # 计算两个库的编号 guanzhu_db_id = follower_id % 1024 fensi_db_id = followee_id % 1024 # 连接两个库 guanzhu_db = get_guanzhu_database(guanzhu_db_id) fensi_db = get_fensi_database(fensi_db_id) try: # 写入关注表 guanzhu_db.execute( "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)", follower_id, followee_id ) # 写入粉丝表 fensi_db.execute( "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)", followee_id, follower_id ) return {"success": True} except Exception as e: # 如果任何一步失败,需要回滚已写入的数据 # 但这里没有分布式事务,回滚很困难 return {"success": False, "error": str(e)}

执行时序

T0:   应用收到请求
T5:   写入guanzhu表完成(5ms)
T10:  写入fensi表完成(5ms)
T10:  返回成功给用户

用户感知延迟:10ms

优点

1. 实现简单
   - 代码逻辑直观
   - 不需要额外组件

2. 一致性较强
   - 两次写入都成功才返回
   - 用户感知的是一致的结果

3. 问题可及时发现
   - 写入失败立即返回错误
   - 便于排查问题

严重缺陷

缺陷1:延迟叠加

正常情况:
写入1:5ms
写入2:5ms
总延迟:10ms(可接受)

异常情况:
写入1:5ms
写入2:数据库慢查询,3000ms
总延迟:3005ms(用户超时)

而且:两次写入是串行的,总延迟 = sum(所有写入)

缺陷2:可用性绑定

guanzhu库#808 正常,fensi库#780 故障

结果:
- 所有涉及这两个库的写入全部失败
- 可用性 = min(所有涉及库的可用性)
- 单库可用性99.9%,涉及2个库 → 可用性约99.8%

随着库数量增加,可用性指数级下降:
- 2个库:99.8%
- 4个库:99.6%
- 10个库:99%

缺陷3:回滚困难

场景:
T0: 写入guanzhu成功
T1: 写入fensi失败(网络超时)

问题:
1. 第一次写入已经提交,无法回滚
2. 如果补偿性删除guanzhu的记录,可能删除失败
3. 最终导致数据不一致

这就是分布式事务的经典问题

缺陷4:雪崩风险

某个库出现抖动(慢查询、磁盘IO高)
→ 涉及该库的所有请求变慢
→ 应用服务线程池耗尽
→ 其他正常请求也无法处理
→ 整个系统雪崩

实际案例

某社交平台早期架构:
- 用同步冗余
- 峰值QPS:5万/秒

某天:
- fensi库#123磁盘故障,响应变慢
- 涉及该库的请求从5ms变成3秒
- 应用服务器线程池(500线程)快速耗尽
- 整体QPS从5万降到1000
- 用户大规模投诉

后果:紧急切换到异步方案

适用场景

仅适合:
- 数据量小(< 1000万)
- 并发低(QPS < 1000)
- 对一致性要求极高
- 团队技术实力有限

示例:企业内部系统、小型社交应用早期

5.3 方案二:服务异步冗余

核心思想

关注表是核心数据,必须同步写入。粉丝表是冗余数据,可以异步写入。

架构图

┌─────────────┐
│  应用服务   │
└──┬────┬─────┘
   │    │
   │    └──→ 消息队列(Kafka/RocketMQ)
   │              │
   │              ↓
   │         ┌────────────┐
   │         │ 冗余服务   │
   │         └─────┬──────┘
   │               │
   ↓               ↓
guanzhu库      fensi库
(同步写入)     (异步写入)

详细流程

步骤1:应用服务处理请求
┌──────────┐
│ 用户请求 │
└────┬─────┘
     ↓
┌────────────────┐
│ 1. 写guanzhu表 │ ← 同步,5ms
└────┬───────────┘
     ↓
┌────────────────┐
│ 2. 发MQ消息    │ ← 同步,1ms
└────┬───────────┘
     ↓
┌────────────────┐
│ 3. 返回成功    │ ← 总延迟:6ms
└────────────────┘

步骤2:冗余服务异步处理
┌────────────────┐
│ MQ消费者监听   │
└────┬───────────┘
     ↓
┌────────────────┐
│ 4. 消费消息    │ ← 异步,延迟100ms
└────┬───────────┘
     ↓
┌────────────────┐
│ 5. 写fensi表   │ ← 异步,5ms
└────────────────┘

代码实现

# 应用服务代码 def follow(follower_id, followee_id): guanzhu_db_id = follower_id % 1024 guanzhu_db = get_guanzhu_database(guanzhu_db_id) try: # 1. 写入主表(关注表) guanzhu_db.execute( "INSERT INTO guanzhu (follower, followee, created_at) VALUES (?, ?, NOW())", follower_id, followee_id ) # 2. 发送消息到队列 message = { "action": "follow", "follower_id": follower_id, "followee_id": followee_id, "timestamp": int(time.time()) } kafka.send("relation_sync_topic", message) # 3. 立即返回成功 return {"success": True} except Exception as e: # 只要主表写入成功,就认为操作成功 # 消息队列的失败可以通过重试机制保证 return {"success": False, "error": str(e)} # 冗余服务代码 def consume_message(): for message in kafka.consume("relation_sync_topic"): action = message["action"] follower_id = message["follower_id"] followee_id = message["followee_id"] if action == "follow": # 计算fensi表的库编号 fensi_db_id = followee_id % 1024 fensi_db = get_fensi_database(fensi_db_id) # 写入粉丝表(带重试机制) retry_count = 0 while retry_count < 3: try: fensi_db.execute( "INSERT INTO fensi (user_id, fan_id, created_at) VALUES (?, ?, NOW())", followee_id, follower_id ) break # 成功则跳出重试 except Exception as e: retry_count += 1 if retry_count >= 3: # 写入失败队列,人工处理 dead_letter_queue.send(message) time.sleep(1) # 重试前等待

时序图(详细版):

用户    应用    MQ      消费者   guanzhu  fensi
 │       │      │        │        │       │
 ├──────→│      │        │        │       │  T0: 发起关注
 │       ├─────→│        │        │       │  T1: 写guanzhu
 │       │←─────┤        │        │       │  T6: 写入成功
 │       ├─────────────→ │        │       │  T7: 发MQ消息
 │←──────┤      │        │        │       │  T8: 返回成功(用户感知延迟8ms)
 │       │      │        │        │       │
 │       │      ├───────→│        │       │  T108: 消费消息(延迟100ms)
 │       │      │        ├───────────────→│  T109: 写fensi
 │       │      │        │←───────────────┤  T114: 写入成功
 │       │      │        │        │       │
 │       │      │        │        │       │  不一致窗口:T6-T114 = 108ms

核心特点

特点1:用户延迟降低

同步方案:10ms(两次数据库写入)
异步方案:6ms(一次数据库 + 一次MQ)

延迟降低:40%

特点2:可用性解耦

guanzhu库故障 → 用户无法关注 ✗(核心功能受影响)
fensi库故障 → 用户可以关注 ✓(冗余功能异步补偿)

可用性 = guanzhu库可用性(99.9%)
不再受fensi库影响

特点3:最终一致性

T0:    用户A关注B,guanzhu表已写入
T0-T1: 查"A关注了谁"→ 能看到B ✓
T0-T1: 查"谁关注了B"→ 看不到A ✗(不一致窗口)
T1:    fensi表写入完成
T1后:  两个查询结果一致 ✓

不一致窗口:通常100-500ms

特点4:天然削峰

峰值场景:
用户请求:10万QPS
MQ缓冲:积压10万消息
消费者:按自己的节奏处理,如5000 TPS

数据库压力:
- 同步方案:guanzhu库和fensi库都承受10万QPS
- 异步方案:guanzhu库承受10万QPS,fensi库只承受5000 TPS

fensi库压力降低:20倍

优点总结

1. 性能提升
   - 用户延迟降低40%
   - 系统吞吐量提升(解除fensi库瓶颈)

2. 可用性提升
   - 核心功能不受冗余库影响
   - 单库故障影响面缩小

3. 削峰填谷
   - MQ缓冲流量
   - 保护数据库

4. 易于扩展
   - 消费者可以水平扩展
   - 独立调整冗余速度

缺点总结

1. 最终一致性
   - 存在不一致窗口(通常<1秒)
   - 需要容忍短时间的数据不一致

2. 系统复杂度增加
   - 引入消息队列
   - 需要管理消费者服务
   - 需要监控消息积压

3. 消息可能丢失
   - MQ本身可能丢消息(虽然概率很小)
   - 需要补偿机制

4. 调试困难
   - 异步链路长,问题定位复杂
   - 需要完善的日志和监控

适用场景

✓ 数据量:> 1亿
✓ 并发:QPS > 1万
✓ 一致性要求:可接受秒级延迟
✓ 团队能力:能驾驭消息队列

这是互联网大厂的标准方案

5.4 方案三:线下异步冗余

核心思想

应用服务完全不关心冗余逻辑,通过捕获数据库的binlog来触发冗余写入。

什么是binlog?

MySQL的binlog(Binary Log):
- 记录了所有数据变更操作(INSERT、UPDATE、DELETE)
- 主要用途:主从复制、数据恢复

示例binlog记录:
{
  "database": "guanzhu_db_808",
  "table": "guanzhu",
  "type": "INSERT",
  "data": {
    "follower": 123456,
    "followee": 789012,
    "created_at": "2025-11-06 10:30:00"
  },
  "timestamp": 1699260600
}

架构图

┌─────────────┐
│  应用服务   │ ← 完全不感知冗余逻辑
└──────┬──────┘
       │
       ↓
  guanzhu库#808
       │
       ├→ binlog输出
       ↓
┌──────────────┐
│ Canal/Maxwell│ ← binlog解析工具
└──────┬───────┘
       │
       ↓
  消息队列(Kafka)
       │
       ↓
┌──────────────┐
│  冗余服务    │ ← 独立部署的服务
└──────┬───────┘
       │
       ↓
  fensi库#780

详细流程

步骤1:用户操作
用户A关注B
  ↓
应用服务
  ↓
写入guanzhu表(库#808)
  ↓
MySQL记录binlog

步骤2:binlog捕获(实时)
Canal监听guanzhu库#808的binlog
  ↓
解析binlog:
  {
    "操作": "INSERT",
    "表": "guanzhu",
    "数据": {"follower": 123456, "followee": 789012}
  }
  ↓
转换为业务消息:
  {
    "action": "follow",
    "follower_id": 123456,
    "followee_id": 789012
  }
  ↓
发送到Kafka

步骤3:冗余写入(异步)
冗余服务消费Kafka消息
  ↓
写入fensi表(库#780)

代码实现

# 应用服务代码(完全无感知) def follow(follower_id, followee_id): guanzhu_db_id = follower_id % 1024 guanzhu_db = get_guanzhu_database(guanzhu_db_id) # 只写主表,不管冗余 guanzhu_db.execute( "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)", follower_id, followee_id ) return {"success": True} # Canal配置(伪代码) canal_config = { "源数据库": "guanzhu_db_*", # 监听所有guanzhu库 "binlog位置": "自动记录", "过滤规则": "只监听guanzhu表的INSERT和DELETE", "输出": "Kafka topic: guanzhu_binlog" } # 冗余服务代码 def consume_binlog_message(): for message in kafka.consume("guanzhu_binlog"): event_type = message["type"] follower = message["data"]["follower"] followee = message["data"]["followee"] fensi_db_id = followee % 1024 fensi_db = get_fensi_database(fensi_db_id) if event_type == "INSERT": # 关注 → 写入粉丝表 fensi_db.execute( "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)", followee, follower ) elif event_type == "DELETE": # 取消关注 → 删除粉丝表 fensi_db.execute( "DELETE FROM fensi WHERE user_id=? AND fan_id=?", followee, follower )

时序图

用户   应用   guanzhu  Canal   Kafka  冗余服务  fensi
 │      │       │       │       │       │       │
 ├─────→│       │       │       │       │       │  T0: 关注请求
 │      ├──────→│       │       │       │       │  T1: 写guanzhu
 │      │       ├──→binlog      │       │       │  T6: 记录binlog
 │←─────┤       │       │       │       │       │  T6: 返回成功
 │      │       │       │       │       │       │
 │      │       │       ├──────→│       │       │  T56: Canal读取binlog(延迟50ms)
 │      │       │       │       ├──────→│       │  T106: 消息到达(延迟50ms)
 │      │       │       │       │       ├──────→│  T111: 写fensi(延迟5ms)
 │      │       │       │       │       │       │
 不一致窗口:T6 - T111 = 105ms(通常100-500ms)

核心优势

优势1:业务完全解耦

应用代码:
- 只关心核心业务逻辑
- 不需要管冗余、不需要管MQ
- 代码简洁、维护容易

冗余逻辑:
- 独立服务,独立迭代
- 可以随时增加新的冗余表
- 不影响主流程

优势2:自动捕获所有变更

同步/异步方案的风险:
- 代码漏写MQ消息 → 冗余数据丢失
- 异常分支没有发消息 → 数据不一致

binlog方案:
- 所有数据库变更都被binlog记录
- 自动保证不会遗漏
- 即使代码有bug,冗余也会执行

优势3:天然支持多种冗余

场景:需要将关系链数据同步到多个目标
- fensi表(粉丝维度)
- ES(搜索引擎,支持模糊查询)
- Redis(热点数据缓存)
- 数据仓库(离线分析)

实现:
同一份binlog → 多个消费者并行处理

  binlog
    │
    ├→ 冗余服务1 → fensi表
    ├→ 冗余服务2 → Elasticsearch
    ├→ 冗余服务3 → Redis
    └→ 冗余服务4 → 数据仓库

无需修改应用代码!

优势4:历史数据自动修复

场景:发现fensi表有历史脏数据

传统方案:
- 写脚本扫描guanzhu表
- 对比fensi表
- 手动修复

binlog方案:
- 重置Canal的binlog位置到历史时间点
- Canal自动重放历史binlog
- 冗余服务自动重建fensi表

严重缺点

缺点1:一致性延迟最大

延迟来源:
1. binlog写入延迟:0-10ms
2. Canal读取延迟:10-50ms(轮询间隔)
3. Kafka传输延迟:10-50ms
4. 消费者处理延迟:10-100ms

总延迟:100-500ms(典型值)

vs 服务异步方案:50-200ms

差距:2-3倍

缺点2:系统复杂度最高

新增组件:
- Canal/Maxwell(binlog解析)
- Kafka(消息队列)
- 冗余服务集群
- 监控告警系统

运维成本:
- 需要监控binlog延迟
- 需要监控Canal服务状态
- 需要处理binlog位置丢失
- 需要处理数据库主从切换

缺点3:binlog依赖风险

问题场景:
1. 数据库binlog过期被删除
   → Canal丢失部分数据
   → 冗余数据缺失

2. 主从切换导致binlog位置变化
   → Canal可能重复消费或漏消费
   → 数据重复或丢失

3. binlog格式变化(MySQL升级)
   → Canal解析失败
   → 冗余中断

缺点4:调试和追踪困难

问题定位链路:
用户反馈 → 检查应用日志 → 检查数据库 → 检查binlog
→ 检查Canal → 检查Kafka → 检查冗余服务

链路长度:6个环节
每个环节都可能出问题

vs 同步方案:2个环节
vs 异步方案:3个环节

适用场景

✓ 数据量:> 10亿
✓ 对实时性要求不高(秒级延迟可接受)
✓ 需要多种冗余目标(如ES、Redis、数仓)
✓ 团队技术实力强(能驾驭复杂系统)
✓ 业务逻辑复杂,不希望应用代码承担冗余责任

典型案例:
- 阿里巴巴:使用Canal同步数据到搜索引擎
- 美团:使用Maxwell同步数据到数据仓库
- 字节跳动:使用Flink CDC同步多种目标

5.5 三种方案对比总结

维度 服务同步冗余 服务异步冗余 线下异步冗余
用户延迟 10ms 6ms 6ms
一致性窗口 0(强一致) 100-200ms 100-500ms
可用性 差(绑定所有库) 好(核心库决定) 好(核心库决定)
实现复杂度
运维复杂度
新增冗余成本 高(改代码) 中(改代码) 低(加消费者)
数据完整性 可能丢失 可能丢失 自动保证
适用数据量 < 1000万 1000万-100亿 > 10亿
典型QPS < 1000 1000-100万 任意

选型决策树

开始
 │
 ├─ 数据量 < 1000万?
 │   └─ 是 → 服务同步冗余(简单够用)
 │
 ├─ 需要多种冗余目标?(ES、Redis、数仓等)
 │   └─ 是 → 线下异步冗余(一次投入,持续受益)
 │
 ├─ 对实时性要求极高?(< 100ms)
 │   └─ 是 → 服务异步冗余(可控延迟)
 │
 └─ 其他情况 → 服务异步冗余(性价比最高)

第六部分:最终一致性保障机制

无论选择哪种冗余方案,都无法保证100%的实时一致性。因此需要建立最终一致性保障机制

6.1 为什么会出现不一致?

异步冗余方案的失败场景

场景1:消息丢失
T0: 应用发送MQ消息
T1: MQ服务器重启,消息丢失
结果:guanzhu表有数据,fensi表没有

场景2:消费失败
T0: 消费者读取消息
T1: 写入fensi表时,数据库连接超时
T2: 消费者重试3次全部失败
T3: 消息被丢弃(进入死信队列)
结果:数据不一致

场景3:网络分区
T0: 消费者写入fensi表
T1: 网络分区,写入超时
T2: 消费者认为失败,重试
T3: 实际上第一次写入成功了
结果:fensi表有重复数据

场景4:程序Bug
T0: 代码逻辑错误,某种边界情况下不发MQ消息
结果:数据永久不一致

binlog方案的失败场景

场景1:binlog过期
T0: Canal服务停止7天
T7: 数据库binlog保留期只有3天,部分binlog被删除
结果:丢失部分变更

场景2:主从切换
T0: 主库故障,切换到从库
T1: Canal的binlog位置在新主库上不连续
结果:可能重复消费或漏消费

场景3:解析失败
T0: MySQL升级,binlog格式变化
T1: Canal无法解析新格式
结果:冗余中断

6.2 影响分析

数据不一致的业务影响

案例1:粉丝数不匹配
- guanzhu表:用户A有1000个关注
- fensi表统计:用户们的粉丝总数 = 950个A
- 差异:50条数据不一致
- 影响:粉丝数显示错误、推荐算法偏差

案例2:内容分发遗漏
- 用户B发布内容
- 系统查fensi表,找到999个粉丝
- 实际guanzhu表有1000人关注B
- 结果:1个用户看不到B的内容

案例3:数据分析错误
- 运营分析关注关系图
- 基于不一致的数据得出错误结论
- 影响业务决策

容忍度分析

用户视角:
- 不一致窗口 < 1秒:完全无感知 ✓
- 不一致窗口 1-10秒:偶尔发现,可接受 ✓
- 不一致窗口 > 1分钟:明显感知,影响体验 ✗
- 永久不一致:数据错误,严重问题 ✗✗✗

业务视角:
- 比例 < 0.01%:可接受(百万分之一)
- 比例 0.01%-0.1%:需要监控
- 比例 > 0.1%:需要优化
- 比例 > 1%:严重问题

6.3 方案一:全量扫描修复

核心思想:定期扫描所有数据,对比两个表,发现并修复不一致。

实现逻辑

def full_scan_repair(): """ 全量扫描修复 执行频率:每天凌晨2点 """ print("开始全量扫描...") # 遍历所有guanzhu库 for db_id in range(1024): guanzhu_db = get_guanzhu_database(db_id) # 查询该库的所有关系 relations = guanzhu_db.query( "SELECT follower, followee FROM guanzhu" ) # 对每条关系,检查fensi表 for relation in relations: follower = relation['follower'] followee = relation['followee'] # 计算fensi表的库编号 fensi_db_id = followee % 1024 fensi_db = get_fensi_database(fensi_db_id) # 检查fensi表是否存在对应记录 exists = fensi_db.query( "SELECT 1 FROM fensi WHERE user_id=? AND fan_id=?", followee, follower ) if not exists: # 不存在,补写数据 fensi_db.execute( "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)", followee, follower ) log_repair(f"修复: {follower}{followee}") print("全量扫描完成")

执行流程图

┌──────────────────────────────────────┐
│ 定时任务:每天凌晨2点执行            │
└─────────────┬────────────────────────┘
              │
    ┌─────────▼─────────┐
    │ 扫描guanzhu库#0   │
    │ 读取100万条记录   │
    └─────────┬─────────┘
              │
         ┌────▼────┐
         │ 对每条:  │
         │ 检查fensi│
         │ 不存在?  │
         │ → 补写   │
         └────┬────┘
              │
    ┌─────────▼─────────┐
    │ 扫描guanzhu库#1   │
    └─────────┬─────────┘
              │
             ...
              │
    ┌─────────▼─────────┐
    │ 扫描guanzhu库#1023│
    └───────────────────┘
              │
    ┌─────────▼─────────┐
    │ 扫描完成,生成报告│
    │ - 扫描记录数      │
    │ - 修复记录数      │
    │ - 耗时            │
    └───────────────────┘

性能估算

假设:
- 总数据量:100亿条
- 单库数据量:100亿 / 1024 ≈ 1000万条
- 每条记录检查:0.5ms(查guanzhu + 查fensi)

单线程耗时:
100亿 × 0.5ms = 5,000,000秒 ≈ 58天

优化方案1:并发扫描
- 1024个库并发扫描
- 耗时:58天 / 1024 ≈ 1.4小时

优化方案2:批量查询
- 每次查1000条,减少网络往返
- 每批耗时:从500ms降到50ms
- 总耗时:1.4小时 / 10 ≈ 8分钟

实际实现:
使用64个工作线程,每个线程负责16个库
预计耗时:1-2小时

代码优化版本

from concurrent.futures import ThreadPoolExecutor import time def scan_and_repair_batch(db_id): """ 扫描单个数据库并修复 使用批量查询优化 """ guanzhu_db = get_guanzhu_database(db_id) repair_count = 0 scan_count = 0 # 批量读取guanzhu表 offset = 0 batch_size = 1000 while True: relations = guanzhu_db.query( f"SELECT follower, followee FROM guanzhu LIMIT {batch_size} OFFSET {offset}" ) if not relations: break scan_count += len(relations) # 按fensi库分组 fensi_groups = {} for rel in relations: fensi_db_id = rel['followee'] % 1024 if fensi_db_id not in fensi_groups: fensi_groups[fensi_db_id] = [] fensi_groups[fensi_db_id].append(rel) # 批量检查每个fensi库 for fensi_db_id, group in fensi_groups.items(): fensi_db = get_fensi_database(fensi_db_id) # 构建批量查询 conditions = " OR ".join([ f"(user_id={rel['followee']} AND fan_id={rel['follower']})" for rel in group ]) existing = fensi_db.query( f"SELECT user_id, fan_id FROM fensi WHERE {conditions}" ) existing_set = {(r['user_id'], r['fan_id']) for r in existing} # 找出缺失的记录 missing = [ rel for rel in group if (rel['followee'], rel['follower']) not in existing_set ] # 批量插入 if missing: values = ",".join([ f"({rel['followee']}, {rel['follower']})" for rel in missing ]) fensi_db.execute( f"INSERT INTO fensi (user_id, fan_id) VALUES {values}" ) repair_count += len(missing) offset += batch_size return {"db_id": db_id, "scanned": scan_count, "repaired": repair_count} def full_scan_with_concurrency(): """ 并发全量扫描 """ start_time = time.time() # 使用64个线程并发扫描 with ThreadPoolExecutor(max_workers=64) as executor: futures = [executor.submit(scan_and_repair_batch, db_id) for db_id in range(1024)] results = [f.result() for f in futures] # 统计结果 total_scanned = sum(r['scanned'] for r in results) total_repaired = sum(r['repaired'] for r in results) elapsed = time.time() - start_time print(f"扫描完成:") print(f" 总记录数: {total_scanned:,}") print(f" 修复记录数: {total_repaired:,}") print(f" 不一致比例: {total_repaired/total_scanned*100:.4f}%") print(f" 耗时: {elapsed/60:.1f}分钟")

优点

1. 简单可靠
   - 逻辑清晰,易于实现
   - 不依赖复杂组件

2. 全面覆盖
   - 扫描所有数据,不会遗漏
   - 能发现任何原因导致的不一致

3. 自动修复
   - 发现问题立即修复
   - 无需人工介入

严重缺陷

1. 不一致窗口长
   - 每天执行一次 → 最长24小时的不一致窗口
   - 用户可能一整天看到错误数据

2. 资源消耗大
   - 需要扫描所有数据
   - 占用大量数据库IO和CPU
   - 可能影响业务高峰期(即使在凌晨)

3. 扩展性差
   - 数据量翻倍 → 扫描时间翻倍
   - 当数据量达到千亿级,即使优化也需要数小时

4. 无法定位根因
   - 只知道数据不一致,不知道为什么
   - 无法修复导致不一致的代码bug

适用场景

✓ 数据量:< 10亿
✓ 对一致性要求不高(天级延迟可接受)
✓ 作为兜底方案(配合其他方案使用)

示例:
- 中小型社交应用
- 企业内部系统
- 数据分析系统(非实时)

6.4 方案二:增量扫描修复

核心思想:只扫描最近变化的数据,而不是全量扫描。

实现原理

关键:在guanzhu表增加字段
ALTER TABLE guanzhu ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

每小时扫描:
SELECT * FROM guanzhu WHERE updated_at > NOW() - INTERVAL 1 HOUR;

执行流程

定时任务:每小时执行
  │
  ├→ 找出过去1小时修改的记录
  │   SELECT * FROM guanzhu
  │   WHERE updated_at > '2025-11-06 09:00:00'
  │
  ├→ 对每条记录,检查fensi表
  │   - 存在且一致 → 跳过
  │   - 不存在 → 补写
  │   - 存在但不一致 → 修复
  │
  └→ 生成修复报告

代码实现

def incremental_scan_repair(): """ 增量扫描修复 执行频率:每小时 """ # 计算扫描时间范围 end_time = datetime.now() start_time = end_time - timedelta(hours=1) total_scanned = 0 total_repaired = 0 # 遍历所有guanzhu库 for db_id in range(1024): guanzhu_db = get_guanzhu_database(db_id) # 只查询最近1小时的变更 relations = guanzhu_db.query( """ SELECT follower, followee, updated_at FROM guanzhu WHERE updated_at BETWEEN ? AND ? """, start_time, end_time ) total_scanned += len(relations) # 检查并修复 for rel in relations: fensi_db_id = rel['followee'] % 1024 fensi_db = get_fensi_database(fensi_db_id) exists = fensi_db.query( "SELECT 1 FROM fensi WHERE user_id=? AND fan_id=?", rel['followee'], rel['follower'] ) if not exists: fensi_db.execute( "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)", rel['followee'], rel['follower'] ) total_repaired += 1 # 记录指标 metrics.record("incremental_scan", { "scanned": total_scanned, "repaired": total_repaired, "ratio": total_repaired / total_scanned if total_scanned > 0 else 0 })

性能对比

假设:
- 总数据量:100亿
- 每小时新增/修改:100万条(峰值)
- 平均每小时:50万条

全量扫描:
- 扫描量:100亿
- 耗时:1-2小时(并发优化后)

增量扫描:
- 扫描量:100万(峰值)
- 比例:100万 / 100亿 = 0.01%
- 耗时:1-2小时 × 0.01% = 3-7秒

性能提升:1000倍

不一致窗口缩短

全量扫描:
- 执行频率:每天1次
- 最长窗口:24小时

增量扫描:
- 执行频率:每小时1次
- 最长窗口:1小时

窗口缩短:24倍

优点

1. 高效
   - 只扫描变更数据,效率提升1000倍
   - 资源消耗低

2. 频繁执行
   - 可以每小时甚至每10分钟执行一次
   - 不一致窗口从天级降到小时级

3. 资源消耗可控
   - 即使在业务高峰期执行也不会影响性能

缺陷

1. 依赖updated_at字段
   - 需要保证该字段准确更新
   - 如果字段更新失败,会漏扫描

2. 无法发现历史遗留问题
   - 只检查最近变更
   - 历史不一致数据无法发现

解决方案:
- 增量扫描(每小时)+ 全量扫描(每周)
- 双重保障

完整方案

def comprehensive_scan_strategy(): """ 综合扫描策略 """ # 增量扫描:每小时 schedule.every(1).hour.do(incremental_scan_repair) # 全量扫描:每周日凌晨 schedule.every().sunday.at("02:00").do(full_scan_repair) # 实时监控:持续运行 schedule.every(10).seconds.do(monitor_consistency_metrics)

监控指标

关键指标:
1. 扫描记录数
   - 每小时扫描多少条
   - 趋势分析(判断是否异常)

2. 修复记录数
   - 每小时修复多少条
   - 计算不一致比例

3. 修复比例
   - 修复数 / 扫描数
   - 正常:< 0.1%
   - 警告:0.1% - 1%
   - 告警:> 1%

4. 扫描耗时
   - 监控性能变化
   - 及时发现数据库性能问题

6.5 方案三:实时核对(最佳方案)

核心思想

每次写入时,同时发送一条"核对消息",延迟几秒后检查两个表是否一致。

架构图

用户操作
  │
  ↓
┌─────────────┐
│  应用服务   │
└──┬────┬─────┘
   │    │
   │    ├→ MQ消息1:冗余指令(立即处理)
   │    │   topic: relation_sync
   │    │
   │    └→ MQ消息2:核对任务(延迟5秒处理)
   │        topic: consistency_check
   │        delay: 5 seconds
   │
   ↓
guanzhu表

并行处理:
消费者1:消费 relation_sync → 写fensi表
消费者2:消费 consistency_check → 核对一致性

详细流程

T0: 用户A关注B
    ├→ 写入guanzhu表
    ├→ 发MQ消息1: {"action": "sync", "follower": A, "followee": B}
    └→ 发MQ消息2: {"action": "check", "follower": A, "followee": B, "delay": 5s}

T0-T5: 冗余服务消费消息1,写入fensi表(正常情况下完成)

T5: 核对服务消费消息2
    ├→ 查询guanzhu表: A→B 存在? 是
    ├→ 查询fensi表: B←A 存在? 是
    └→ 结论:一致 ✓

T5: 如果fensi表不存在
    ├→ 告警:发现不一致
    ├→ 自动修复:写入fensi表
    └→ 记录日志:便于排查根因

代码实现

# 应用服务代码 def follow(follower_id, followee_id): guanzhu_db_id = follower_id % 1024 guanzhu_db = get_guanzhu_database(guanzhu_db_id) # 1. 写入主表 guanzhu_db.execute( "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)", follower_id, followee_id ) # 2. 发送冗余消息(立即处理) kafka.send("relation_sync", { "action": "sync", "follower": follower_id, "followee": followee_id, "timestamp": time.time() }) # 3. 发送核对消息(延迟5秒) kafka.send("consistency_check", { "action": "check", "follower": follower_id, "followee": followee_id, "timestamp": time.time() }, delay=5) # 延迟5秒投递 return {"success": True} # 冗余服务代码 def sync_consumer(): for message in kafka.consume("relation_sync"): follower = message["follower"] followee = message["followee"] fensi_db_id = followee % 1024 fensi_db = get_fensi_database(fensi_db_id) try: fensi_db.execute( "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)", followee, follower ) except Exception as e: # 写入失败,记录日志 logger.error(f"Sync failed: {follower}{followee}, error: {e}") # 不重试,等待核对服务发现并修复 # 核对服务代码 def check_consumer(): for message in kafka.consume("consistency_check"): follower = message["follower"] followee = message["followee"] # 查询guanzhu表 guanzhu_db_id = follower % 1024 guanzhu_db = get_guanzhu_database(guanzhu_db_id) guanzhu_exists = guanzhu_db.query( "SELECT 1 FROM guanzhu WHERE follower=? AND followee=?", follower, followee ) # 查询fensi表 fensi_db_id = followee % 1024 fensi_db = get_fensi_database(fensi_db_id) fensi_exists = fensi_db.query( "SELECT 1 FROM fensi WHERE user_id=? AND fan_id=?", followee, follower ) # 核对结果 if guanzhu_exists and not fensi_exists: # 不一致:guanzhu有,fensi没有 logger.warning(f"Inconsistency detected: {follower}{followee}") # 自动修复 fensi_db.execute( "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)", followee, follower ) # 发送告警 alert("数据不一致", { "follower": follower, "followee": followee, "missing_in": "fensi" }) # 记录指标 metrics.increment("consistency_repair") elif not guanzhu_exists and fensi_exists: # 不一致:fensi有,guanzhu没有(异常情况) logger.error(f"Orphan data in fensi: {followee}{follower}") # 这种情况很少见,可能是误删除,需要人工介入 alert("数据异常", { "type": "orphan_in_fensi", "user_id": followee, "fan_id": follower }) elif guanzhu_exists and fensi_exists: # 一致,无需处理 metrics.increment("consistency_check_pass") else: # 都不存在(可能是已经取消关注) pass

延迟时间选择

核对延迟的选择逻辑:

过短(如1秒):
- 冗余服务可能还没处理完
- 误报率高

过长(如60秒):
- 不一致窗口长
- 用户可能已经感知

最佳实践(5秒):
- 统计数据:99%的冗余操作在3秒内完成
- 5秒延迟能覆盖99.9%的情况
- 用户基本无感知

动态调整:
- 监控P99延迟
- 如果P99超过5秒 → 调整为P99 + 2秒

核对覆盖率

问题:是否每个操作都要核对?

方案1:100%覆盖
- 优点:发现所有不一致
- 缺点:核对消息量 = 业务消息量,成本高

方案2:采样核对(如10%)
- 优点:成本降低90%
- 缺点:只能发现10%的问题

推荐方案:分级策略
- 核心用户(大V):100%核对
- 普通用户:10%采样核对
- 配合增量扫描:发现剩余90%的问题

代码实现:
if is_vip_user(follower) or is_vip_user(followee):
    # VIP用户,100%核对
    kafka.send("consistency_check", message, delay=5)
elif random.random() < 0.1:
    # 普通用户,10%核对
    kafka.send("consistency_check", message, delay=5)

优势总结

1. 不一致窗口最短
   - 实时发现:5-10秒
   - vs 增量扫描:1小时
   - vs 全量扫描:1天

2. 自动修复
   - 发现即修复,无需人工介入
   - 修复延迟:秒级

3. 根因分析
   - 可以统计哪些原因导致不一致
   - 指导代码优化

4. 精准告警
   - 实时告警,不会漏报
   - 可以定位到具体的用户和操作

5. 灵活控制
   - 可以调整核对延迟
   - 可以调整核对比例
   - 可以针对不同用户采用不同策略

成本分析

假设:
- 业务QPS:10万/秒
- 核对比例:20%(VIP 100% + 普通用户 10%采样)

核对服务负载:
- 核对QPS:10万 × 20% = 2万/秒
- 每次核对:2次数据库查询(guanzhu + fensi)
- 数据库查询:4万/秒

对比:
- 全量扫描:每天1次,1-2小时,高峰期影响
- 增量扫描:每小时1次,几秒,影响小
- 实时核对:持续运行,负载平滑

结论:实时核对的资源消耗可控,且负载平滑

完整架构

┌─────────────────────────────────────┐
│         三层一致性保障体系           │
└─────────────────────────────────────┘

第一层:实时核对(秒级发现和修复)
  ├─ 覆盖率:VIP 100% + 普通用户 10%
  ├─ 延迟:5-10秒
  └─ 目标:发现和修复90%的问题

第二层:增量扫描(小时级兜底)
  ├─ 频率:每小时
  ├─ 覆盖率:100%(最近1小时变更)
  └─ 目标:发现实时核对遗漏的10%问题

第三层:全量扫描(周级终极保障)
  ├─ 频率:每周
  ├─ 覆盖率:100%(全部数据)
  └─ 目标:发现历史遗留问题

监控体系:
  ├─ 实时监控:不一致率、修复率
  ├─ 告警:不一致率超过阈值
  └─ 报表:每日/每周一致性报告

第七部分:实际案例与演进路线

7.1 初创期(用户 < 100万)

业务特点

- 用户量:10万
- 日活:5万
- 关系总数:500万
- QPS:峰值1000

架构选择

数据存储:
┌──────────────────┐
│  MySQL单库       │
│  ├─ relations表  │ ← 单表足够
│  ├─ idx_follower │ ← 索引优化查询
│  └─ idx_followee │
└──────────────────┘

查询性能:
- 正向查询:3-5ms
- 反向查询:3-5ms

完全不需要:
✗ 分库分表
✗ 数据冗余
✗ 消息队列
✗ 一致性检测

只需要:
✓ 单表 + 索引
✓ 主从复制(高可用)
✓ 定期备份

代码示例

# 简单的单表实现 class RelationService: def follow(self, follower_id, followee_id): db.execute( "INSERT INTO relations (follower, followee) VALUES (?, ?)", follower_id, followee_id ) def get_following(self, user_id): return db.query( "SELECT followee FROM relations WHERE follower = ?", user_id ) def get_fans(self, user_id): return db.query( "SELECT follower FROM relations WHERE followee = ?", user_id )

7.2 成长期(用户 100万-5000万)

业务特点

- 用户量:1000万
- 日活:500万
- 关系总数:50亿
- QPS:峰值5万

遇到的问题

问题1:单表容量告急
- relations表:50亿条记录
- 数据+索引:~500GB
- 查询变慢:50-100ms
- 写入TPS下降:从1万降到3000

问题2:备份困难
- 备份时间:8小时
- 影响业务

问题3:DDL操作困难
- 加字段需要锁表数小时

架构升级

升级1:引入分库分表
┌─────────┐
│ 应用    │
└────┬────┘
     │
┌────▼────────────────────┐
│  分库中间件(ShardingJDBC)│
└────┬────────────────────┘
     │
     ├→ guanzhu_db_0  (按follower_id分)
     ├→ guanzhu_db_1
     ├→ ...
     └→ guanzhu_db_63  (64个库)

每库存储:50亿 / 64 ≈ 8000万条
查询性能恢复:5-10ms

升级2:引入数据冗余
guanzhu表:按follower_id分库(64库)
fensi表:按followee_id分库(64库)

升级3:异步冗余
应用 → guanzhu表 + MQ → 冗余服务 → fensi表

代码演进

# 分库分表后的代码 class RelationService: def __init__(self): self.sharding = ShardingClient(db_count=64) def follow(self, follower_id, followee_id): # 计算guanzhu库 guanzhu_db = self.sharding.get_db("guanzhu", follower_id) # 写入主表 guanzhu_db.execute( "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)", follower_id, followee_id ) # 发送冗余消息 kafka.send("relation_sync", { "action": "follow", "follower": follower_id, "followee": followee_id }) def get_following(self, user_id): guanzhu_db = self.sharding.get_db("guanzhu", user_id) return guanzhu_db.query( "SELECT followee FROM guanzhu WHERE follower = ?", user_id ) def get_fans(self, user_id): fensi_db = self.sharding.get_db("fensi", user_id) return fensi_db.query( "SELECT fan_id FROM fensi WHERE user_id = ?", user_id )

一致性保障

方案:增量扫描
- 频率:每小时
- 覆盖:最近1小时变更
- 修复:自动补写

监控:
- 不一致率:< 0.01%
- 告警阈值:> 0.1%

7.3 成熟期(用户 > 1亿)

业务特点

- 用户量:5亿
- 日活:2亿
- 关系总数:1000亿
- QPS:峰值100万

架构全貌

┌───────────────────────────────────────────┐
│              用户请求                      │
└───────────────┬───────────────────────────┘
                │
┌───────────────▼───────────────────────────┐
│          网关层(Nginx)                   │
│  - 限流                                   │
│  - 熔断                                   │
│  - 路由                                   │
└───────────────┬───────────────────────────┘
                │
┌───────────────▼───────────────────────────┐
│       应用服务集群(1000+实例)            │
└─┬──────────┬──────────┬──────────────────┘
  │          │          │
  │          │          └→ MQ (Kafka集群)
  │          │                │
  │          │                ├→ 冗余服务集群
  │          │                ├→ 核对服务集群
  │          │                └→ 分析服务
  │          │
  │          └→ Redis集群(热点数据缓存)
  │              - 大V用户的粉丝列表
  │              - 最近关注关系
  │
  └→ 数据库集群
     ├→ guanzhu_db (1024个库)
     └→ fensi_db (1024个库)

分库策略优化

初期:64个库,每库15亿条
问题:仍然太大

优化:1024个库,每库1亿条
- 查询性能:稳定在5-10ms
- 扩容灵活:可以继续拆分到2048库

缓存策略

问题:大V用户的粉丝查询频繁
- 周杰伦:1亿粉丝
- 每秒被查询:10万次
- 数据库压力巨大

解决:多级缓存
L1: 应用本地缓存(Caffeine)
  - 容量:10万条
  - 命中率:50%
  - 延迟:< 1ms

L2: Redis集群
  - 容量:热点用户的全量粉丝
  - 命中率:40%
  - 延迟:1-3ms

L3: 数据库
  - 兜底查询
  - 命中率:10%
  - 延迟:5-10ms

总体效果:
- 90%请求被缓存承接
- 数据库压力降低:10倍

一致性保障完整方案

三层保障:

L1: 实时核对(秒级)
  - VIP用户:100%覆盖
  - 普通用户:20%采样
  - 发现率:95%
  - 修复延迟:5-10秒

L2: 增量扫描(小时级)
  - 频率:每30分钟
  - 覆盖:100%最近变更
  - 发现率:4.9%(剩余的5%)
  - 修复延迟:30分钟

L3: 全量扫描(周级)
  - 频率:每周日凌晨
  - 覆盖:100%全量数据
  - 发现率:0.1%(历史遗留)
  - 修复延迟:7天

综合效果:
- 不一致率:< 0.001%(百万分之一)
- 平均修复延迟:< 1分钟

监控体系

关键指标:
1. 业务指标
   - 关注成功率:> 99.99%
   - 查询成功率:> 99.99%
   - P99延迟:< 20ms

2. 一致性指标
   - 实时核对:不一致率、修复率
   - 增量扫描:扫描量、修复量
   - 全量扫描:总不一致数

3. 系统指标
   - 数据库QPS、慢查询
   - MQ积压量
   - 缓存命中率

告警规则:
- 不一致率 > 0.1%:P0告警,立即处理
- 查询延迟P99 > 50ms:P1告警,30分钟内处理
- MQ积压 > 100万:P2告警,1小时内处理

第八部分:底层规律与设计哲学

8.1 分布式系统的根本约束

CAP定理在关系链系统中的体现

CAP定理:
- C(Consistency):一致性
- A(Availability):可用性
- P(Partition Tolerance):分区容错性

在分布式环境下,三者只能选其二

关系链系统的选择:
      一致性(C)
        /  \
       /    \
      /  选择\
     /   AP   \
    /__________\
 可用性(A)  分区容错(P)
      ↑        ↑
      必选     必选

解释:
- P(分区容错):必选
  → 分库分表必然导致网络分区
  → 无法避免

- A(可用性):必选
  → 社交产品核心竞争力
  → 用户无法容忍服务不可用

- C(一致性):妥协
  → 降级为最终一致性
  → 通过补偿机制保证

数据冗余的数学本质

问题模型:
- 数据集 D = {d1, d2, ..., dn}
- 查询维度 Q = {q1, q2, ..., qm}
- 分片函数 f: D → {s1, s2, ..., sk} (k个分片)

约束:
一个分片函数只能基于一个字段
即:f 只能优化一个查询维度

定理:
当 |Q| > 1 时,为了让所有查询都是单分片查询,
需要的数据副本数 = |Q|

证明:
反证法:假设只需 r < |Q| 个副本
则至少存在一个查询维度 q_i 没有对应的副本
该维度的查询必须扫描所有分片(扇出查询)
与"所有查询都是单分片查询"矛盾

结论:
冗余份数 = 查询维度数量
(这是分布式系统的数学必然)

8.2 一致性的本质

强一致性 vs 最终一致性

强一致性:
- 定义:任何时刻读取的数据都是最新的
- 实现:分布式事务(2PC、3PC、Paxos)
- 代价:性能下降、可用性降低

最终一致性:
- 定义:经过一段时间后,数据会达到一致状态
- 实现:异步复制 + 补偿机制
- 收益:性能高、可用性高

为什么选择最终一致性?

成本分析:
强一致性:
- 每次写入需要2PC,延迟 > 100ms
- 任何一个参与方故障,整体不可用
- QPS:< 1万

最终一致性:
- 写入延迟:5-10ms
- 单点故障不影响整体
- QPS:> 100万

性价比:
最终一致性的性能是强一致性的 100倍
而不一致窗口只有几秒,用户基本无感知

选择:显然是最终一致性

不一致的容忍度

人类感知时间:
- < 100ms:无感知(实时)
- 100ms - 1s:轻微感知(可接受)
- 1s - 10s:明显感知(有点慢)
- > 10s:不可接受(太慢了)

社交场景的容忍度:
关注操作:
- 我关注了某人 → 立即能看到 ✓
- 对方的粉丝列表 → 1秒后更新 ✓(对方不会立即查看)
- 粉丝数统计 → 10秒后更新 ✓(统计数据允许延迟)

结论:
只要核心操作(自己的视角)是实时的
冗余数据(他人的视角)几秒延迟完全可接受

8.3 系统演进的规律

系统复杂度 = f(数据规模, 查询需求)

阶段1:单表(数据量 < 5000万)
复杂度:★☆☆☆☆
核心:SQL + 索引

阶段2:分库分表(数据量 5000万-10亿)
复杂度:★★★☆☆
核心:分片路由

阶段3:数据冗余(数据量 > 10亿,查询维度 > 1)
复杂度:★★★★☆
核心:异步复制 + 一致性保障

阶段4:多级缓存 + 实时计算(数据量 > 100亿,QPS > 100万)
复杂度:★★★★★
核心:缓存、消息队列、流式计算

规律:
复杂度不可避免地增加,但每一步都是被数据规模逼出来的
不要过度设计,够用就好

奥卡姆剃刀原则

"如无必要,勿增实体"

反例:
用户量只有10万,就引入:
- 1024个分库
- Kafka集群
- 实时核对系统
- 多级缓存

结果:
- 系统复杂,维护困难
- 成本高昂(服务器、人力)
- 收益极小(性能过剩)

正确做法:
1. 评估当前规模和未来1年增长
2. 选择满足需求的最简单方案
3. 预留扩展空间(如分库从64开始,而不是4)
4. 等问题出现再优化

8.4 设计决策框架

决策矩阵

数据量 查询维度 一致性要求 推荐方案
< 1000万 任意 任意 单表 + 索引
1000万-1亿 1个 任意 分库单表
1000万-1亿 多个 强一致 分库 + 同步冗余
1000万-1亿 多个 最终一致 分库 + 异步冗余
> 1亿 多个 最终一致 分库 + 异步冗余 + 实时核对
> 10亿 多个 最终一致 分库 + 异步冗余 + binlog同步

成本效益分析公式

总成本 = 开发成本 + 运维成本 + 资源成本

开发成本:
- 简单方案(单表):1人周
- 中等方案(分库+异步):2人月
- 复杂方案(全套):6人月

运维成本:
- 简单方案:1人维护
- 中等方案:2-3人维护
- 复杂方案:5+人团队

资源成本:
- 简单方案:1台数据库
- 中等方案:100台数据库 + MQ
- 复杂方案:1000台数据库 + MQ + 缓存 + ...

收益:
- 性能提升倍数
- 可支撑的用户规模

决策:
ROI = 收益 / 总成本
选择ROI最高的方案

总结:关键洞察

核心认知

  1. 数据冗余不是为了解决"查询慢"

    • 单表 + 索引就能解决查询性能问题
    • 真正的问题是:单表容量瓶颈 → 必须分库 → 产生查询定位问题
    • 数据冗余是分库后的必然选择
  2. 分库的本质矛盾

    • 一个分库键只能优化一个查询维度
    • 多查询维度 → 必须多份冗余或扇出查询
    • 扇出查询不可接受 → 只能选择冗余
  3. 最终一致性是分布式系统的必然选择

    • 强一致性代价太大(性能、可用性)
    • 最终一致性通过补偿机制保证可靠性
    • 核心是缩短不一致窗口
  4. 系统设计要随业务演进

    • 不要过度设计
    • 够用就好,预留扩展空间
    • 等问题出现再优化

设计模式

数据冗余三段论:
1. 为什么冗余?→ 分库后的查询定位问题
2. 怎么冗余?→ 根据一致性要求选择同步/异步
3. 如何保证一致?→ 三层保障机制(实时+增量+全量)

一致性保障三层防护:
L1: 实时核对(秒级)→ 覆盖95%问题
L2: 增量扫描(小时级)→ 覆盖4.9%问题
L3: 全量扫描(周级)→ 覆盖0.1%问题
综合:99.999%可靠性

实践建议

初创团队:
- 单表够用,不要盲目分库
- 关注业务增长,而不是技术炫技

成长团队:
- 提前规划分库(按2年增长预估)
- 选择异步冗余(性价比最高)
- 建立监控和告警

成熟团队:
- 完善一致性保障机制
- 多级缓存优化性能
- 持续优化架构

这篇文章的核心价值在于:揭示了百亿级关系链系统背后的底层规律,而不是简单地罗列技术方案。希望能帮助你真正理解分布式系统的设计本质。


第九部分:工程化补充与修订(实践更稳的落地指南)

本节对上文方案作工程化增强与勘误,便于直接落地到大规模生产环境。

9.1 分片与扩容

  • 虚拟分片(virtual shards)与一致性哈希:避免 user_id % N 带来的全量重分布,支持平滑扩容与弹性缩容。
  • 热点拆分:对“大V/热点用户”采用 per-user buckets(如 user_id + bucket_id)横向切分,缓解单分片热点写入/读取放大。
  • 分片元数据中心:路由表动态下发(灰度/回滚/限流),支持按用户、按桶精细迁移。

9.2 冗余写入的可靠性(强烈建议 Outbox/CDC)

  • 服务异步冗余的改进:使用 Outbox(事务外发表)+ 消费端幂等(去重键)替代“写主表+发MQ”的两步式,确保事件不丢失且可重放。
  • CDC 方案说明:
    • MySQL 可用 Canal/Maxwell/Debezium;
    • PostgreSQL 采用逻辑复制/Decoding(wal2json、pgoutput、Debezium PG Connector)。
  • 去重与幂等:事件携带全局去重键(如 follower_id:followee_id:op:ts),消费端 UPSERT/ON CONFLICT DO NOTHING,保障多次投递不产生日志性脏写。

9.3 计数一致性

  • 粉丝数/关注数建议单独计数表(或列)维护:写路径做幂等增减(基于事件去重),并提供定期校准任务(对照关系表重算,差异阈值内自动修正)。

9.4 查询与分页

  • 使用基于游标的 seek-based pagination(如按 created_at,id 复合索引)替代 deep offset,提高大页性能与稳定性。
  • 明确排序语义(按关注时间/最近互动等),相应建立复合索引,避免回表与排序放大。

9.5 缓存与热点保护

  • 多级缓存(进程内 + Redis)+ 订阅失效;对热点 Key 采用局部互斥/请求合并,使用负缓存降低穿透。
  • 对粉丝全量列表的缓存应分片或分段缓存,配合版本号/范围刷新,避免一次性大 Key 失效的抖动。

9.6 主键与索引

  • 主键使用紧凑且有序的 ID(雪花/Time-UUID/ULID)以减少索引分裂;
  • 谨慎新增二级索引,关注索引体积与写放大;按查询模式设计复合索引(如 (follower_id, created_at) / (user_id, created_at))。

9.7 一致性检测与修复链路

  • 实时核对建议基于“单源事件流”延迟校验,避免“双消息(指令 + 核对)”乱序;
  • 增量校验优先基于 CDC 流位置/事件时间窗口,而非仅依赖 last_modified_time 字段;
  • 全量校验的性能估算应以“行/秒吞吐 + I/O 带宽 + 并发度”做容量规划,避免过于理想的 CPU 级估算。

9.8 可靠性与 SLO(勘误)

  • 上文关于“整体可靠性叠加趋近 100%”的表述过于理想化,实际链路存在相关性与共同失效域;
  • 建议以端到端 SLO 管理:
    • 写入成功率(关注/取关)≥ 99.99%
    • 冗余完成延迟 P99 ≤ 若干秒
    • 不一致率 ≤ 0.01% 并在 T≤1h 内校准至 ≤ 0.001%
    • 修复任务的滞后/积压有硬阈值与自动降级(背压、限流、只保核心写)。

9.9 安全合规与治理

  • 拉黑/隐私:写路径与读路径均应过滤封禁关系;
  • 账户删除/合规清理(GDPR 等):提供级联清除与异步彻底擦除;
  • 写入限流与风控:针对异常批量关注行为做速率限制与熔断;
  • 审计与可观测:事件全链路追踪(trace_id)、积压/失败可见、可重放工具链。

9.10 演进与回放能力

  • 以可回放事件日志作为“事实来源”,任何冗余表(粉丝维度、索引表、缓存)都能从事件流快速重建;
  • 灰度迁移:虚拟分片与 per-user bucket 级别的细粒度迁移/回滚,配合双写、对账与切换窗口监控。

以上补充能在不改变核心思路的前提下,显著提升可用性、可扩展性与可运维性,使方案更贴近实际大规模生产环境。


第十部分:本地验证与实验指南(含实测结果)

本项目提供可在本机快速验证“异步冗余与双表冗余收益”的实验工具,支持调节规模与并发,并输出 p50/p95/p99 与复制落地延迟、积压峰值等指标。

10.1 端到端基准(异步 vs 同步双写 + 查询)

启动数据库:

docker compose up -d postgres

运行(可调规模与并发):

# N: 关注操作次数;CONC: 并发;PAGE: 查询页大小
N=20000 CONC=8 PAGE=50 go run cmd/relbench/main.go

输出示例字段说明:

  • Async follow latency: 主路径(关注主表写 + 入队)延迟的总时长/均值及 p50/p95/p99
  • Sync (2 writes): 同步双写总时长/均值
  • Query fans/following: 单次分页查询延迟
  • Replication landing: 复制落地耗时分位、队列峰值(maxQueue)、排空时长(drain)

本地一次实测结果(N=20000,CONC=1,PAGE=50,Apple M3 Max + Docker Postgres):

Async follow latency total: 3.891583667s, per op: 194.579µs
Sync (2 writes) total: 6.060609417s, per op: 303.03µs
Query fans(50) latency: 614µs
Query following(50) latency: 835.542µs

解读:

  • 异步冗余将关键路径写延迟较同步双写下降约 35%(本地环境下),趋势符合“写一次 + 入队”优于“双写”的预期;
  • 双表冗余使粉丝/关注两类查询均为单库命中,延迟处于子毫秒级(本地);
  • 注意该结果为本地小规模样例,用于验证相对收益与方向性,非容量上限评估。

10.2 扇出查询对比(单库单查 vs 64 分片扇出)

运行:

# SHARDS: 扇出分片数;REPEAT: 重复次数
SHARDS=64 REPEAT=100 go run cmd/fanoutbench/main.go

说明:

  • 单次查询:针对单分片(单库)的一次命中;
  • 扇出查询:并发请求 64 个分片(模拟 64 库),测量整体完成时间;
  • 该实验旨在直观展示“多分片扇出”的额外开销,支持快速本地演示扇出的不可接受性。

一次本地实测结果(SHARDS=64,REPEAT=100):

Single-shard query: avg=91.655µs p95=141.375µs p99=592.791µs
Fan-out 64-shard queries: avg=45.164845ms p95=64.492708ms p99=103.007667ms

解读:单分片查询处于百微秒量级;64 分片扇出后,平均延迟上升到数十毫秒,p99 超 100ms,直观体现了扇出带来的网络与并发开销,验证“双表冗余+单分片命中”的必要性。

10.4 环境说明(PostgreSQL)

  • 本项目默认数据库为 PostgreSQL(docker-compose 使用 postgres:15-alpine)。
  • 文中关于 MySQL binlog/Canal/Maxwell 的描述用于说明思路;在 PostgreSQL 上应使用逻辑复制/Decoding:如 wal2json、pgoutput 或 Debezium PostgreSQL Connector(详见第 9.2 节“CDC 方案说明”)。
  • 配置在 config/config.yaml,端到端与基准工具均以 Postgres 为默认目标环境。

10.3 代码仓库

项目地址:

https://github.com/d60-Lab/RelationGraph

有兴趣的同学可按 10.1/10.2 的命令自行复现,并调整 N/CONC/SHARDS 等参数进行扩展测试。

本文链接:http://blog.go2live.cn/post/relation-design.html

-- EOF --