代码编织梦想

在前面几篇中,我们介绍了etcd存储相关的内容,包括预写日志、mvcc、事务等,可以认为对etcd单节点的存储有了相对全面的认识。但是etcd是一个基于raft协议实现的cp模型的分布式存储,只了解其状态机的工作原理是不够的。本文我们就来介绍etcd中的raft模块的具体实现。

关于raft协议本身,这里不做介绍,建议直接阅读原论文,这里给出中文翻译版

最小实现原则

在介绍具体实现之前,我们先介绍一些软件设计上的内容。
etcd raft模块是基于开源的golang raft sdk实现。

该raft sdk基于最小实现原则,只实现了基本的功能,包括leader选举、日志处理、状态变更等逻辑,而raft运行所需要的存储层和传输层则依赖使用方自行实现。

其中存储层定义了storage接口用来管理raft log,同时提供了基本的实现raft.MemoryStorage,该实现是基于内存数组实现的非持久化的存储,在etcd系列的第一篇中提到过。用户也可以自行实现该接口,并作为参数传入。

raft节点间通信则完全依赖使用方实现,raft sdk没有做任何约束。该raft sdk仅通过channel对外输出要通信的消息,并对外提供方法来处理收到的消息。

该实现方式非常对我的胃口。我在工作中提供一些sdk给别的服务使用时,通常都会遵循最小实现原则。sdk中只实现基本的功能逻辑,sdk依赖的其他能力定义好接口,通过参数或者其他的方式进行注入。业务方在使用时,如果某项能力其本身已经具备,则只需要简单适配接口即可;如果不具备,则可以选择我提供相应实现。

相比于大而全的sdk实现方式,遵循最小实现原则的sdk实现方式可能会增加一些理解成本,但是不会引入冗余的依赖。同时,通过不同sdk的组合也可以更加灵活地对外提供丰富能力。

当然凡事不可一概而论,到底哪种方式更好还要看具体的场景。

说完设计原则,接下来会介绍具体的实现。raft sdk中按照分层的方式进行了实现,从底层到高层分别为raft -> rawNode -> node,我们会从底层开始依次介绍。

raft

raft对象是raft sdk的核心实现。其维护了raft节点的所有状态及参数,包括term、index、raft log、vote、peers state(leader对其他节点状态的追踪)、heartbeat、election timeout等raft必要的状态以及其他具体实现中的性能优化相关参数。同时,raft对象也实现了包括状态转换、日志追加、消息处理及发送等所有的raft节点所需要的方法。

raft的属性如下,我们挑选其中几个进行说明。

type raft struct {
   id uint64

   Term uint64
   Vote uint64

   readStates []ReadState

   // the log
   raftLog *raftLog

   maxMsgSize         uint64
   maxUncommittedSize uint64
   // TODO(tbg): rename to trk.
   prs tracker.ProgressTracker

   state StateType

   // isLearner is true if the local raft node is a learner.
   isLearner bool

   msgs []pb.Message

   // the leader id
   lead uint64
   // leadTransferee is id of the leader transfer target when its value is not zero.
   // Follow the procedure defined in raft thesis 3.10.
   leadTransferee uint64
   // Only one conf change may be pending (in the log, but not yet
   // applied) at a time. This is enforced via pendingConfIndex, which
   // is set to a value >= the log index of the latest pending
   // configuration change (if any). Config changes are only allowed to
   // be proposed if the leader's applied index is greater than this
   // value.
   pendingConfIndex uint64
   // an estimate of the size of the uncommitted tail of the Raft log. Used to
   // prevent unbounded log growth. Only maintained by the leader. Reset on
   // term changes.
   uncommittedSize uint64

   readOnly *readOnly

   // number of ticks since it reached last electionTimeout when it is leader
   // or candidate.
   // number of ticks since it reached last electionTimeout or received a
   // valid message from current leader when it is a follower.
   electionElapsed int

   // number of ticks since it reached last heartbeatTimeout.
   // only leader keeps heartbeatElapsed.
   heartbeatElapsed int

   checkQuorum bool
   preVote     bool

   heartbeatTimeout int
   electionTimeout  int
   // randomizedElectionTimeout is a random number between
   // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
   // when raft changes its state to follower or candidate.
   randomizedElectionTimeout int
   disableProposalForwarding bool

   tick func()
   step stepFunc

   logger Logger

   // pendingReadIndexMessages is used to store messages of type MsgReadIndex
   // that can't be answered as new leader didn't committed any log in
   // current term. Those will be handled as fast as first log is committed in
   // current term.
   pendingReadIndexMessages []pb.Message
}
  • raftlog
    raftlog是用来存储日志的部分,其构造如下。日志被追加到raft模块中时首先被会被添加的unstable中,当etcd将unstable中的日志追加至wal中以后,raft会将对应的日志追加到storage中,并从unstable中清除。storage就是第一小节中介绍的raft sdk定义的存储层接口,这里采用了raft.MemoryStorage的实现。
type raftLog struct {
   // storage contains all stable entries since the last snapshot.
   storage Storage

   // unstable contains all unstable entries and snapshot.
   // they will be saved into storage.
   unstable unstable

   // committed is the highest log position that is known to be in
   // stable storage on a quorum of nodes.
   committed uint64
   // applied is the highest log position that the application has
   // been instructed to apply to its state machine.
   // Invariant: applied <= committed
   applied uint64

   logger Logger

   // maxNextCommittedEntsSize is the maximum number aggregate byte size of the
   // messages returned from calls to nextCommittedEnts.
   maxNextCommittedEntsSize uint64
}

type unstable struct {
   // the incoming unstable snapshot, if any.
   snapshot *pb.Snapshot
   // all entries that have not yet been written to storage.
   entries []pb.Entry
   offset  uint64

   logger Logger
}
  • maxMsgSize
    批量处理是非常常见的优化手段。raft在日志同步时就采用了批量处理的方式,一条消息携带多条日志。同时为了防止消息过大,设置了maxMsgSize参数。
  • prs
    raft中使用tracker.ProgressTracker来记录follower的状态,包括next index、commited index、active等。单独拆了一个小模块出来。
  • msgs
    raft需要发送的消息,会追加至msgs保存,算是某种程度的异步发送。当上层调用模块空闲时,会主动获取msgs然后进行发送。前面讲了,日志的同步是批量处理。这里msg的处理是异步批量处理。异步批量处理是常见的很有效的性能优化的手段。
  • prevote
    prevote也是实现中的一个优化。
    在raft算法中,follower在变为candidate时,会立刻将自身的term加一并发起选举。如果选举失败则进入election timeout然后重复该过程。正常情况下可以保证一轮选举一定会选出leader。但是在异常情况会存在问题。比如网络分区的情况下,某些节点的term会一直增长。当网络通信恢复时,其term会比leader大,这会导致leadership转移。
    针对上述问题,raft提供了prevote参数。当prevote为true时,选举时并不会直接将term加一,而是先发起prevote。当能拿到大多数选票时再将term加一并发起真正选举。
  • tick和step
    raft节点状态的驱动主要有两个地方,或者说两个方法。这里状态要和raft算法中的状态机区分,是指raft节点本身的状态,包括日志、任期、索引、节点的交互等。刚说了,raft节点的状态驱动有两个地方:一个是本身的计时,是节点自身的状态驱动,随着计时节点会根据角色不同有发送心跳、角色变更、开启选举等不同的行为;另一个是对外暴露的接口,以响应使用方的请求,同样的,针对同一种请求,不同角色的raft节点行为并不相同。抽象出来就是tick和step方法。
    针对类似上面描述的不同角色下行为不同的情况,通常会将接口抽象出来,针对不同的角色或者状态分别实现,在角色变更时设置相应的行为。这也是常见的设计思路。

介绍完属性,接下来再介绍相关的方法。对于方法,同样不会进行非常细节的介绍。因为相关方法里涉及到大量raft算法的逻辑实现,建议还是去看raft算法。我们会简单介绍主要方法的功能,然后关注一些在具体实现上的优化思路。

下面是raft发送消息相关的方法,最底层是send方法。我把send方法的具体实现贴了出来。可以看到,send方法只是将消息追加到msgs列表,以此实现异步批量处理。异步批量处理是常见的优化手段,可以极大的提升系统的吞吐和性能。但是在使用异步处理时必须要有所限制,必须对等待处理的消息的批次进行限制。
在send方法基础上,封装了sendAppend方法、sendHeartbeat方法,分别对指定的节点发送日志追加消息、发送心跳,以及在sendAppend和sendHeartbeat基础上封装广播方法。

func (r *raft) send(m pb.Message) {
    // 省略了参数校验
    r.msgs = append(r.msgs, m)
}

func (r *raft) sendAppend(to uint64) {}

func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {}

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {}

func (r *raft) bcastAppend() {}

func (r *raft) bcastHeartbeat() {}

func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {}

下面是状态变化相关的方法。状态变化的方法比较简单,这里不做展开。只是在具体实现时增加了prevote的状态,这个在前面已经提到过。

func (r *raft) becomeFollower(term uint64, lead uint64) {}

func (r *raft) becomeCandidate() {}

func (r *raft) becomePreCandidate() {}

func (r *raft) becomeLeader() {}

func (r *raft) hup(t CampaignType) {}

func (r *raft) campaign(t CampaignType) {}

下面是状态驱动的方法。前面也提到,raft节点的状态分别受自身的时钟驱动以及外界请求驱动。

时钟驱动来说,leader会在时钟驱动下发送心跳以及检查qurom;follower及candidate则在时钟驱动下进行状态转换并发起选举。同样,raft也实现了不同角色响应外界请求的方法。

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {}

// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {}

func (r *raft) Step(m pb.Message) error {}

func stepLeader(r *raft, m pb.Message) erro {}

func stepCandidate(r *raft, m pb.Message) error {}

func stepFollower(r *raft, m pb.Message) error {}

RawNode

RawNode是在raft基础上的封装,其中最主要的一点我认为就是ready的封装。

// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
   raft       *raft
   prevSoftSt *SoftState
   prevHardSt pb.HardState
}

ready和advance是raft节点和状态机的交互机制。前面多次提到,raft的实现采用了异步批量处理。状态机会主动调用ready方法,获取等待处理的数据,并在处理完成后调用advance方法通知raft节点相应内容已经处理完成。
先看下ready中都包含哪些数据。ready中包含了的数据有:

  • unstable的日志条目,在etcd将其写入wal后,raft才会认为相应的日志为stable;
  • 已经commit但是尚未apply的日志,apply后raft节点会更新applied状态;
  • 待发送的msgs;
  • softstate和hardstate,分别包括raft节点的状态、以及term、index、vote;
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
   rd := Ready{
      Entries:          r.raftLog.unstableEntries(),
      CommittedEntries: r.raftLog.nextCommittedEnts(),
      Messages:         r.msgs,
   }
   if softSt := r.softState(); !softSt.equal(prevSoftSt) {
      rd.SoftState = softSt
   }
   if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
      rd.HardState = hardSt
   }
   if r.raftLog.unstable.snapshot != nil {
      rd.Snapshot = *r.raftLog.unstable.snapshot
   }
   if len(r.readStates) != 0 {
      rd.ReadStates = r.readStates
   }
   rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
   return rd
}

状态机在相应处理后会调用advance通知raft节点。

func (r *raft) advance(rd Ready) {
   r.reduceUncommittedSize(rd.CommittedEntries)
   if newApplied := rd.appliedCursor(); newApplied > 0 {
      r.raftLog.appliedTo(newApplied)

      if r.prs.Config.AutoLeave && newApplied >= r.pendingConfIndex && r.state == StateLeader {
         m, err := confChangeToMsg(nil)
         if err != nil {
            panic(err)
         }
         if err := r.Step(m); err != nil {
            r.logger.Debugf("not initiating automatic transition out of joint configuration %s: %v", r.prs.Config, err)
         } else {
            r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
         }
      }
   }

   if len(rd.Entries) > 0 {
      e := rd.Entries[len(rd.Entries)-1]
      if r.id == r.lead {
         _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index})
      }
      r.raftLog.stableTo(e.Index, e.Term)
   }
   if !IsEmptySnap(rd.Snapshot) {
      r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
   }
}

Node

node仅是在rawNode上封装了一些chan用来做交互,不做介绍。

// node is the canonical implementation of the Node interface
type node struct {
   propc      chan msgWithResult
   recvc      chan pb.Message
   confc      chan pb.ConfChangeV2
   confstatec chan pb.ConfState
   readyc     chan Ready
   advancec   chan struct{}
   tickc      chan struct{}
   done       chan struct{}
   stop       chan struct{}
   status     chan chan Status

   rn *RawNode
}

以上即是对raft部分的介绍,主要侧重在raft sdk的代码设计以及性能优化方面。一些技术细节以及连接层等没有提及,后面会再开一篇补充说明。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/shanxiaoshuai/article/details/129629700

etcd源码分析-raft实现_xxb249的博客-爱代码爱编程

        从本篇开始介绍Raft,集群中最核心内容是保证数据一致性,那么如何保证数据一致性?在业界有很多算法、协议,例如:Paxos,Raft。         Raft协议相比之前协议Paxos等,算是年轻协议,而且Raft协议比较简单,容易实现。   一、Raft基础 1.1 状态机 Raft简单就在于它的状态机。由上图可知,状态机状

etcd-raft 2.3.7 raft peer间的交互通信流程_fas_2019的博客-爱代码爱编程

在写etcd-raft的leader选举的那篇文章时,每次牵扯到消息接收和发送都用”接收到消息”、“把消息发送”出去这样的字眼给代替了,感觉有那么一点的别扭,这节主要描述下etcd-raft的消息发送和接收。 先附上一

etcd系列-----raft协议:理论基础篇(三)-------快照-爱代码爱编程

通过前面的描述可知, 随着客户端与集群不断地交互,每个节点上的日志记录会不断增加,但是服务器的空间都是有限的,日志量不能无限制地增长。另外,在节点重启时会重放日志记录,如果日志记录过多,则需要花费较长的时间完成重放操作。这就需要压缩和清除机制来减少日志量,从而避免上述情况。定期生成快照是最常见也是最简单的压缩方法。在创建快照文件时,会将整个节点的状态进行序

etcd系列-----raft协议:重要数据结构介绍(Entry、Message、storage、unstable)-爱代码爱编程

一些基础结构体介绍: Entry Entry记录:在前面介绍Raft协议时提到,节点之间传递的是消息(Message), 每条消息中可以携带多条Entry记录,每条Entry记录对应一个独立的操作。 在Entry中其中封装了如下信息:     :Term( uint64类型): 该Entry所在的任期号。     : Index ( uint64类型)

etcd系列-----raft协议:重要数据结构介绍(raftlog、raft)-爱代码爱编程

raftlog结构体 Raft 协议中日志复制部分的核心就是在集群中各个节点之间完成日志的复制,因此在raft模块的实现中使用raftLog结构来管理节点上的日志, 它依赖于前面介绍的Storage接口和unstable结构体。 raftLog中主要字段的含义与功能 storage ( Storage类型):实际上就是前面介绍的MemoryStorag

etcd系列-----raft协议:Node结构-爱代码爱编程

Node接口 raft结构体及各种消息处理流程的分析可以看出,结构体raft实现了Raft协议中最核心的内容, 它也是整个rft模块的核心, 但是它并没有实现网络传输、 持久化存储(注意与存储Entry记录的raftLog进行区分〉等功能,也没有对外提供简单易用的APi.在raft模块中,结构体Node表示集群中的一个节点,它是在结构体raft之上的一层

etcd系列-----raft保数据一致性手段之WAL日志-爱代码爱编程

WAL (Write-ahead logging)是etcd实现一致性的重要手段之一。一条Entry记录的大致流程:     (1)当客户端向etcd 集群发送了一次请求之后,请求中的封装Entry记录会先被交给raft模块进行处理,raft模块会先将Entry记录保存到raftLog.unstable中。     (2)raft模块将该Entry记录封装

etcd系列-----集大成者etcdserver 模块-爱代码爱编程

1、raftNode 相关 raftNode是充当raft模块与上层模块之间交互的桥梁     term ( uint64类型): 当前节点己应用Entry记录的最大任期号,term、 index和lead三个宇段的读写都是原子操作。     index ( uint64类型): 当前节点中己应用Entry记录的最大索引值。     lead ( uin

ETCD 源码学习--Raft 中 progress 的 inFlight 实现(九)-爱代码爱编程

首先需要搞清什么是 inFlight,inFlight 在 Raft 中存储的是已发送给 Follower 的 MsgApp 消息,但没有收到 MsgAppResp 的消息 Index  值。简单的说就是 Leader 发送一个消息给 Follower,Leader 在对应的 Follower 状态维护结构(progress)中,将这个消息的 ID 记录在

ETCD 源码学习--Raft log 的实现(十)-爱代码爱编程

在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。 Raft log 主要有两部分组成,一是已提交但未被上层模块处理的 unstable 消息,另一部分是已被上层处理的 stable 消息。   主要文件 /raft/log.go  raft log 对

etcd-raft-存储分析-爱代码爱编程

etcd raft介绍 etcd raft是目前使用最广泛的raft库,如果想深入了解raft请直接阅读论文 “In Search of an Understandable Consensus Algorithm”(https://raft.github.io/raft.pdf), etcd raft在etcd, Kubernetes, Docker

ETCD 源码学习--Raft 提案消息的旅程(十二)-爱代码爱编程

在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。 消息的来源主要有两种,一个是终端的API请求,另个一是节点内部自动发起的消息,比如心跳。而不同消息类型的处理已经在前面的文章中说明,这里不再累赘,本文只介绍一个提案消息发送到 Leader 节点直到被节点应用的

etcd中Raft协议-爱代码爱编程

文章目录 前言Raft协议Leader选举日志复制网络分区的场景日志压缩与快照其他技术点linearizable语义只读请求PreVote状态Leader节点转移 前言 文章内容来自:《etcd技术内幕》 — 百里燊,感兴趣的读者可以去读一下。Paxos 算法和 Raft 算法: Paxos 算法诞生于 1990 年,这是一种解决分布式系统一

etcd基本概念学习-爱代码爱编程

目录 1、简介2、常见功能3、应用场景3.1 配置中心3.2 分布式锁3.3 leader选举组件3.4 服务注册与服务发现3.5 消息订阅和发布3.6 负载均衡4、和同类产品的对比4.1 etcd vs redis4.2 etcd vs consul4.3 etcd vs zookeeper5、架构6、数据存储6.1 预写式日志(WAL)7、节点

etcd-爱代码爱编程

本文基于etcd 3.1.10版本所写 etcd-raft的主要模块 etcd-raft 采用极简的设计理念,仅实现核心 raft 算法;将网络通信与持久化操作交给应用层来实现,即应用层负责实现 raft 节点

etcd-爱代码爱编程

背景 Raft 算法在斯坦福 Diego Ongaro 和 John Ousterhout 于 2013 年发表的《In Search of an Understandable Consensus Algorithm》中提

分布式etcd、raft、opentracing、jaeger_jaegertracing和etcd的关系-爱代码爱编程

ETCD   ETCD是一种开源分布式键值存储主要用在服务发现上,服务发现 (Service Discovery) 要解决的是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务如何才能找到对方并建立连接。,从