代码编织梦想

作者: LingJin 原文来源: https://tidb.net/blog/b93c16d8

内容概要

本文是 TiCDC 源码解读的第四篇,主要内容是讲述 TiCDC 中 Scheduler 模块的工作原理。主要内容如下:

  1. Scheduler 模块的工作机制
  2. 两阶段调度原理

Scheduler 模块介绍

Scheduler 是 Changefeed 内的一个重要模块,它主要负责两件事情:

  1. 将一个 Changefeed 所有需要被同步的表,分发到不同的 TiCDC 节点上进行同步工作,以达到负载均衡的目的。
  2. 维护每张表的同步进度,同时推进 Changefeed 的全局同步进度。

本次介绍的 Scheduler 相关代码都在 tiflow/cdc/scheduler/internal/v3 目录下,包含多个文件夹,具体如下:

  • Coordinator 运行在 Changefeed,是 Scheduler 的全局调度中心,负责发送表调度任务,维护全部同步状态。
  • Agent 运行在 Processor,它接收表调度任务,汇报当前节点上的表同步状态给 Coordinator。
  • Transport 是对底层 peer-2-peer 机制的封装,主要负责在 Coordinator 和 Agent 之间传递网络消息。
  • Member 主要是对集群中 Captures 状态的管理和维护。
  • Replication 负责管理每张表的同步状态。 ReplicationSet 记录了每张表的同步信息, ReplicationManager 负责管理所有的 ReplicationSet
  • Scheduler 实现了多种不同的调度规则,可以由 OpenAPI 触发。

下面我们详细介绍 Scheduler 模块的工作过程。

表 & 表调度任务 & 表同步单元

TiCDC 的任务是以表为单位,将数据同步到下游目标节点。所以对于一张表,可以通过如下形式来表示,该数据结构即刻画了一张表当前的同步进度。

type Table struct {
    TableID model.TableID
    Checkpoint uint64
    ResolvedTs uint64
}

Scheduler 主要是通过 Add Table / Remove Table / Move Table 三类表调度任务来平衡每个 TiCDC 节点上的正在同步的表数量。对于这三类任务,可以被简单地刻画为:

  • Add Table:「TableID, Checkpoint, CaptureID」,即在 CaptureID 所指代的 Capture 上从 Checkpoint 开始加载并且同步 TableID 所指代的表同步单元。
  • Remove Table:「TableID, CaptureID」,即从 CaptureID 所指代的 Capture 上移除 TableID 所指代的表同步单元。
  • Move Table:「TableID, Source CaptureID, Target CaptureID」,即将 TableID 所指代的表同步单元从 Source CaptureID 指代的 Capture 上挪动到 Target CaptureID 指代的 Capture 之上。

表同步单元主要负责对一张表进行数据同步工作,在 TiCDC 内这由 Table Pipeline 实现。它的基本结构如下所示:

每个 Processor 开始同步一张表,即会为这张表创建一个 Table Pipeline,该过程可以分成两个部分:

  • 加载表:创建 Table Pipeline,分配相关的系统资源。KV-Client 从上游 TiKV 拉取数据,经由 Puller 写入到 Sorter 中,但是此时不向下游目标数据系统写入数据。
  • 复制表:在加载表的前提下,启动 Mounter 和 Sink 开始工作,从 Sorter 中读取数据,并且写入到下游目标数据系统。

Processor 实现了 TableExecutor 接口,如下所示:

type TableExecutor interface {
        // AddTable add a new table with `startTs`
        // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol.
        // if `isPrepare` is false, the 2nd phase.
        AddTable(
                ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,
        ) (done bool, err error)

        // IsAddTableFinished make sure the requested table is in the proper status
        IsAddTableFinished(tableID model.TableID, isPrepare bool) (done bool)

        // RemoveTable remove the table, return true if the table is already removed
        RemoveTable(tableID model.TableID) (done bool)
        // IsRemoveTableFinished convince the table is fully stopped.
        // return false if table is not stopped
        // return true and corresponding checkpoint otherwise.
        IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool)

        // GetAllCurrentTables should return all tables that are being run,
        // being added and being removed.
        //
        // NOTE: two subsequent calls to the method should return the same
        // result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished
        // or IsRemoveTableFinished in between two calls to this method.
        GetAllCurrentTables() []model.TableID

        // GetCheckpoint returns the local checkpoint-ts and resolved-ts of
        // the processor. Its calculation should take into consideration all
        // tables that would have been returned if GetAllCurrentTables had been
        // called immediately before.
        GetCheckpoint() (checkpointTs, resolvedTs model.Ts)

        // GetTableStatus return the checkpoint and resolved ts for the given table
        GetTableStatus(tableID model.TableID) tablepb.TableStatus
}

在 Changefeed 的整个运行周期中,Scheduler 都处于工作状态,Agent 利用 Processor 提供的上述接口方法实现,实际地执行表调度任务,获取到表调度任务进行的程度,以及表同步单元当前的运行状态等,以供后续做出调度决策。

Coordinator & Agent

Scheduler 模块由 Coordinator 和 Agent 两部分组成。Coordinator 运行在 Changefeed 内,Agent 运行在 Processor 内,Coordinator 和 Agent 即是 Changefeed 和 Processor 之间的通信接口。二者使用 peer-2-peer 框架完成网络数据交换,该框架基于 gRPC 实现。下图展示了一个有 3 个 TiCDC 节点的集群中,一个 Changefeed 的 Scheduler 模块的通信拓扑情况。可以看到,Coordinator 和 Agent 之间会交换两类网络消息,消息格式由 Protobuf 定义,源代码位于 tiflow/cdc/scheduler/schedulepb

  • 第一类是 Heartbeat 消息,Coordinator 周期性地向 Agent 发送 HeartbeatRequest ,Agent 返回相应的 HeartbeatResponse ,该类消息主要目的是让 Coordinator 能够及时获取到所有表在不同 TiCDC 节点上的同步状态。
  • 第二类是 DispatchTable 消息,在有对表进行调度的需求的时候,Coordinator 向特定 Agent 发送 DispatchTableRequest ,后者返回 DispatchTableResponse ,用于及时同步每一张表的调度进展。

下面我们从消息传递的角度,分别看一下 Coordinator 和 Agent 的工作逻辑。

Coordinator 工作过程

Coordinator 会收到来自 Agent 的 HeartbeatReponse DispatchTableResponse 这两类消息。Coordinator 内的 CaptureM 负责维护 Capture 的状态,在每次接收到 HeartbeatResponse 之后,都会更新自身维护的 Captures 的状态,包括每个 Capture 当前的存活状态,Capture 上当前同步的所有表信息。同时也生成新的 HeartbeatRequest 消息,再次发送到所有 Agents。 ReplicationM 负责维护所有表的同步状态,它接收到 HeartbeatResponse DispatchTableResponse 之后,按照消息中记录的表信息,更新自己维护的这些表对应的同步状态。 CaptureM 提供了当前集群中存活的所有 Captures 信息, ReplicationM 则提供了所有表的同步状态信息, SchedulerM 以二者提供的信息为输入,以让每个 Capture 上的表同步单元数量尽可能均衡为目标,生成表调度任务,这些表调度任务会被 ReplicationM 进一步处理,生成 DispatchTableRequest ,然后发送到对应的 Agent。

Agent 工作过程

Agent 会从 Coordinator 收到 HeartbeatRequest DispatchTableRequest 这两类消息。对于前者,Agent 会收集当前运行在当前 TiCDC 节点上的所有表同步单元的运行状态,构造 HeartbeatRespone 。对于后者,则通过访问 Processor 来添加或者移除表同步单元,获取到表调度任务的执行进度,构造对应的 DispatchTableResponse ,最后发送到 Coordinator。

Changefeed 同步进度计算

一个 changefeed 内同步了多张表。对于每张表,有 Checkpoint ResolvedTs 来标识它的同步进度,Coordinator 通过 HeartbeatResponse 周期性地收集所有表的同步进度信息,然后就可以计算得到一个 Changefeed 的同步进度。具体计算方法如下:

// AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.
func (r *Manager) AdvanceCheckpoint(currentTables []model.TableID) (newCheckpointTs, newResolvedTs model.Ts) {
    newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
    for _, tableID := range currentTables {
        table, ok := r.tables[tableID]
        if !ok {
            // Can not advance checkpoint there is a table missing.
            return checkpointCannotProceed, checkpointCannotProceed
        }
        // Find the minimum checkpoint ts and resolved ts.
        if newCheckpointTs > table.Checkpoint.CheckpointTs {
            newCheckpointTs = table.Checkpoint.CheckpointTs
        }
        if newResolvedTs > table.Checkpoint.ResolvedTs {
            newResolvedTs = table.Checkpoint.ResolvedTs
        }
    }
    return newCheckpointTs, newResolvedTs
}

从上面的示例代码中我们可以看出,一个 Changefeed 的 Checkpoint 和 ResolvedTs,即是它同步的所有表的对应指标的最小值。Changefeed 的 Checkpoint 的意义是,它的所有表的同步进度都不小于该值,所有时间戳小于该值的数据变更事件已经被同步到了下游;ResolvedTs 指的是 TiCDC 当前已经捕获到了所有时间戳小于该值的数据变更事件。除此之外的一个重点是,只有当所有表都被分发到 Capture 上并且创建了对应的表同步单元之后,才可以推进同步进度。

以上从消息传递的角度对 Scheduler 模块基本工作原理的简单介绍。下面我们更加详细地聊一下 Scheduler 对表表度任务的处理机制。

两阶段调度原理

两阶段调度是 Scheduler 内部对表调度任务的执行原理,主要目的是降低 Move Table 操作对同步延迟的影响。

上图展示了将表 X 从 Agent-1 所在的 Capture 上挪动到 Agent-2 所在的 Capture 上的过程,具体如下:

  1. Coordinator 让 Agent-2 准备表 X 的数据。
  2. Agent-2 在准备好了数据之后,告知 Coordinator 这一消息。
  3. Coordinator 发送消息到 Agent-1,告知它移除表 X 的同步任务。
  4. Agent-1 在移除了表 X 的同步任务之后,告知 Coordinator 这一消息。
  5. Coordinator 再次发送消息到 Agent-2,开始向下游复制表 X 的数据。
  6. Agent-2 再次发送消息到 Coordinator,告知表 X 正处于复制数据到下游的状态。

上述过程的重点是在将一张表从原节点上移除之前,先在目标节点上分配相关的资源,准备需要被同步的数据。准备数据的过程,往往颇为耗时,这是引起挪动表过程耗时长的主要原因。两阶段调度机制通过提前在目标节点上准备表数据,同时保证其他节点上有该表的同步单元正在向下游复制数据,保证了该表一直处于同步状态,这样可以减少整个挪动表过程的时间开销,降低对同步延迟的影响。

Replication set 状态转换过程

在上文中讲述的两阶段调度挪动表的基本过程中,可以看到在 Agent-2 执行了前两步之后,表 X 在 Agent-1 和 Agent-2 的 Capture 之上,均存在表同步单元。不同点在于,Agent-1 此时正在复制表,Agent-2 此时只是加载表。

Coordinator 使用 ReplicationSet 来跟踪一张表在多个 Capture 上的表同步单元的状态,并以此维护了该表真实的同步状态。基本定义如下:

// ReplicationSet is a state machine that manages replication states.
type ReplicationSet struct {
    TableID    model.TableID
    State      ReplicationSetState
    Primary model.CaptureID
    Secondary model.CaptureID
    Checkpoint tablepb.Checkpoint
    ...
}

TableID 唯一地标识了一张表, State 则记录了当前该 ReplicationSet 所处的状态, Primary 记录了当前正在复制该表的 Capture 的 ID,而 Secondary 则记录了当前已经加载了该表,但是尚未同步数据的 Capture 的 ID, Checkpoint 则记录了该表当前的同步状态。

在对表进行调度的过程中,一个 ReplicationSet 会处于多种状态。如下图所示:

  • Absent 表示没有任何一个节点加载了该表的同步单元。
  • Prepare 可能出现在两种情况。第一种是表正处于 Absent 状态,调用 Add Table 在某一个 Capture 上开始加载该表。第二种情况是需要将正在被同步的表挪动到其他节点上,发起 Move Table 请求,在目标节点上加载表。
  • Commit 指的是在至少一个节点上,已经准备好了可以同步到下游的数据。
  • Replicating 指的是有且只有一个节点正在复制该表的数据到下游目标系统。
  • Removing 说明当前只有一个节点上加载了表的同步单元,并且当前正在停止向下游同步数据,同时释放该同步单元。一般发生在上游执行了 Drop table 的情况。在一张表被完全移除之后,即再次回到 Absent 状态。

下面假设存在一张表 table-0,它在被调度时发生的各种情况。首先考虑如何将表 X 加载到 Agent-0 所在的 Capture 之上,并且向下游复制数据。

首先 table-0 处于 Absent 状态,此时发起 Add Table 调度任务,让 Agent-0 从 checkpoint = 5 开始该表的同步工作, Agent-0 会创建相应的表同步单元,和上游 TiKV 集群中的 Regions 建立网络连接,拉取数据。当准备好了可以向下游同步的数据之后, Agent-0 告知 Coordinator 该表同步单元当前已经处于 Prepared 状态。Coordinator 会根据该消息,将该 ReplicationSet Prepare 切换到 Commit 状态,然后发起第二条消息到 Agent-0 ,让它开始从 checkpoint = 5 从下游开始同步数据。当 Agent-0 完成相关操作,返回响应到 Coordinator 之后,Coordinator 再次更新 table-0 的 ReplicationSet ,进入到 Replicating 状态。

再来看一下移除表 table-0 的过程,如上所示。最开始正处于 Replicating 状态,并且在 Capture-0 上同步。Coordinator 向 Agent-0 发送 Remove Table 请求, Agent-0 通过 Processor 来取消该表的同步单元,释放相关的资源,待所有资源释放完毕之后,返回消息到 Coordinator,告知该表当前已经没有被同步了,同时带有最后同步的 Checkpoint。在 Agent-0 正在取消表的过程中,Coordinator 和 Agent-0 之间依旧有保持通过 Heartbeat 进行状态通知,Coordinator 可以及时地知道当前表 t = 0 正处于 Removing 状态,在后续收到表已经被完全取消的消息之后,则从 Removing 切换到 Absent 状态。

最后再来看一下 Move Table ,它本质上是先在目标节点 Add Table ,然后在原节点上 Remove Table

如上图所示,首先假设 table-0 正在 capture-0 上被同步,处于 Replicating 状态,现在需要将 table-0 从 capture-0 挪动到 capture-1。首先 Coordinator 将 ReplicationSet 的状态从 Replicating 转移到 Prepare ,同时向 Agent-1 发起添加 table-0 的请求, Agent-1 加载完了该表的同步单元之后,会告诉 Coordinator 这一消息,此时 Coordinator 会再次更新 table-0 到 Commit 状态。此时可以知道表 table-0 目前正在 capture-0 上被同步,在 agent-1 上也已经有了它的同步单元和可同步数据。Coordinator 再向 Agent-0 上发送 Remove Table Agent-0 收到调度指示之后,停止并且释放表 table-0 的同步单元,再向 Coordinator 返回执行结果。Coordinator 在得知 capture-0 上已经没有该表的同步单元之后,将 Primary 从 capture-0 修改为 capture-1,告知 Agent-1 开始向下游同步表 table-0 的数据,Coordinator 在收到从 Agent-1 传来的响应之后,再次更新 table-0 的 状态为 Replicating

从上面三种调度操作中,可以看到 Coordinator 维护的 ReplicationSet 记录了整个调度过程中,一张表的同步状态,它由从 Agent 处收到的各种消息来驱动状态的改变。同时可以看到消息中还有 Checkpoint 和 Resolved Ts 在不断更新。Coordinator 在处理收到的 Checkpoint 和 ResolvedTs 时,保证二者均不会发生会退。

结尾

以上就是本文的全部内容。希望在阅读上面的内容之后,读者能够对 TiCDC 的 Scheduler 模块的工作原理有一个基本的了解。

TiCDC 源码解读系列文章阅读:

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/TiDBer/article/details/129729430

【征文大赛】tidb 社区第二届征文大赛,一次性带走社区全部新周边,还有bose 降噪耳机、倍轻松按摩仪等你拿!-爱代码爱编程

回顾过去 去年我们在 2022年3月14日-2022年5月31日进行了【征文大赛】TiDB 社区专栏第一届征文大赛,快来一次性集齐所有周边吧! 在活动过程中, 征文大赛共有 45 位小伙伴参与,有103篇优质文章输出 @代晓磊_Mars 荣获一等奖! @Gin @Jellybean @数据小黑 荣获

搭建阿里云 tidb 的灾备,让我安欣睡个好觉-爱代码爱编程

作者: anxin 原文来源: https://tidb.net/blog/07ee8049 云原生数据库TiDB 上阿里云了,依托的平台是阿里云计算巢,阿里云计算巢是一个服务管理平台,一方面方便第三方开发商交付服务,另一方面可以更充分的保障用户的信息安全和使用体验,两周前我申请到了试用,对于这次试用活动,PingCAP的

物理机安装 tikv 时 raid 卡在线配置方式-爱代码爱编程

作者: pepezzzz 原文来源: https://tidb.net/blog/308842f7 Raid 配置的规划 安装 TiDB 集群的物理机配置如下: 组件 配置描述 CPU 2 * Intel Xeon Gold 5218R(2.1GHz, 20Core) 内存 384GB

region is unavailable的排查总结-爱代码爱编程

作者: h5n1 原文来源: https://tidb.net/blog/07c99ed0 1 region访问基本流程 tidb在访问key数据时需要获取key所在region的分布信息,在tidb 侧有一个region cache存储region信息,包含region key范围、leade

对tidb监控方式的一点点研究-爱代码爱编程

作者: buddyyuan 原文来源: https://tidb.net/blog/c591a993 TiDB 的告警极其复杂,在 tidb 软件的代码中集成了 prometheus 类库。这样当 tidb 运行的时候,就可以使用 10080 状态端口,把一些 tidb server 运行的 mertis 暴露出来。此时

手把手教你改 sysbench 代码-爱代码爱编程

作者: GangShen 原文来源: https://tidb.net/blog/a8c2981d sysbench 原始代码介绍 安装完 sysbench 之后,可以在 /usr/share/sysbench 目录下看到一些 .lua 的脚本,这些脚本就是 sysbench 程序在进行 oltp 压测

dr-autosync tidb 集群的计划内和计划外切换验证步骤-爱代码爱编程

作者: pepezzzz 原文来源: https://tidb.net/blog/0cdd8bd0 环境准备 集群名称和版本 tidb 集群: tidb-h 版本:v6.1.0 集群拓扑:两中心部署 Dr-Autosync 集群 数据副本:五副本 + 一 Learne

云数据库tidb免费试用初体验-爱代码爱编程

作者: qabel12 原文来源: https://tidb.net/blog/d8d33e4d 日常登录墨天轮无意中发现有云数据库TiDB免费体验活动,正好趁这个机会动手实践体验一下。具体如下: 墨天轮邀您免费体验云数据库 TiDB ,还有 Switch、AirPods 等你拿! - 墨天轮 (modb.pro

将tidb各服务组件混布到物理机集群和k8s环境-爱代码爱编程

作者: lqbyz 原文来源: https://tidb.net/blog/95f35f0c 前提条件 K8S集群外的服务器节点和K8S集群内的Pod网络必须保持互通(本文采用将物理机节点加入K8S集群然后打污点并驱逐该服务器里边的pod的方式来实现) K8S机器外的服务器节点必须可以通过添加解析的方式来

ticdc 源码解读(6)- ticdc puller 模块介绍-爱代码爱编程

作者: sdojjy 原文来源: https://tidb.net/blog/0df5ff04 分享概要 本文是 TiCDC 源码解读的第六篇, 主要是 TiCDC 中的 Puller 模块介绍,TiCDC 中的 Puller 通过创建 KV-Client 向 TiKV 发送 ChangeDataReq

tidb sql调优案例之避免tiflash帮倒忙-爱代码爱编程

原文来源: https://tidb.net/blog/cf272d4c 背景 早上收到某系统的告警tidb节点挂掉无法访问,情况十万火急。登录中控机查了一下display信息,4个TiDB、Prometheus、Grafana全挂了,某台机器hang死无法连接,经过快速重启后集群恢复,经排查后是昨天上线的

ticdc 源码解读(5)-- ticdc ddl 事件处理逻辑 与 filter 实现介绍-爱代码爱编程

作者: asddongmen 原文来源: https://tidb.net/blog/fd6142e9 内容概要 本文是 TiCDC 源码解读的第五篇,本文将会介绍 TiCDC 对 DDL 的处理方式和 Filter 功能的实现(基于 TiCDC v6.5.0 版本 代码) ,文章将会围绕以下 4 个问

携程 x tidb丨应对全球业务海量数据增长,一栈式 htap 实现架构革新-爱代码爱编程

作者: TiDB社区小助手 原文来源: https://tidb.net/blog/2ad1f3c9 导读 携程作为全球领先的一站式旅行平台,旗下拥有携程旅行网、去哪儿网、Skyscanner 等品牌。携程旅行网向超过 9000 万会员提供酒店预订、酒店点评及特价酒店查询、机票预订、飞机票查询、时刻表、

基于阿里云数据库tidb的性能压测初体验-爱代码爱编程

作者: arron 原文来源: https://tidb.net/blog/3be2792c 基于阿里云数据库TiDB的性能压测初体验 申请阿里云TiDB地址: https://market.aliyun.com/isv-pingcap 的过程,申请和部署过程非常简单直观,按提示一步步来即可,这里就忽略

基于 ticdc 的 tidb 复制集群的计划内和计划外切换验证步骤-爱代码爱编程

作者: pepezzzz 原文来源: https://tidb.net/blog/5244f816 环境准备 集群名称和版本 上游 tidb 集群: tidb-h 下游 tidb 集群: tidb-cdc 版本:v6.5.0 CDC 专用用户:cdcuser

tidb容器化的管理利器--tidb operator-爱代码爱编程

作者: lqbyz 原文来源: https://tidb.net/blog/c4a9caaf 简介 TiDB Operator是 Kubernetes 上的 TiDB 集群自动运维系统,提供包括部署、升级、扩缩容、备份恢复、配置变更的 TiDB 全生命周期管理。借助 TiDB Operator,TiDB

tidb 的事务和一致性校验工具 bank-爱代码爱编程

作者: pepezzzz 原文来源: https://tidb.net/blog/f80362b4 背景 在分布式数据库的选型和测试过程中,通常需要关注分布式事务在高可用场景下的一致性和 RPO=0 的容灾技术实现。分布式事务需要能影响多张表的多条记录,实现多表事务和跨节点高可用的验证。 B

监控告警处理之tidb-爱代码爱编程

作者: Soysauce520 原文来源: https://tidb.net/blog/7b595188 背景: **监控面板中tidb\_server\_critical\_error\_total不为0,存在其它数值,表明binlog写入失败,会影响下游系统同步,需要修正。**

文盘rust -- 安全连接 tidb/mysql-爱代码爱编程

作者: jiashiwen 原文来源: https://tidb.net/blog/fb18088e notice"Rust is a trademark of the Mozilla Foundation in the US and other countries." 最近在折腾rust与数据库集成,为了

通过tidb operator为已有tidb集群部署异构集群-爱代码爱编程

作者: lqbyz 原文来源: https://tidb.net/blog/21dee5a3 本文档介绍如何为已有的的tidb集群再部署一个不同服务组件构建的集群。 异构集群是与已有 TiDB 集群不同配置的节点构成的集群。 适用场景 适用于基于已有的 TiDB 集群需要创建一个差异化配置的实例