0%

分布式系统中常见一致性算法

分布式系统中常见一致性算法

分布式一致性理论

1.为什么要使用分布式

​ 随着大型网站的各种高并发访问、海量数据处理等场景越来越多,如何实现网站的高可用、易伸缩、可扩展、安全等目标就显得越来越重要。为了解决这样一系列问题,大型网站的架构也在不断发展。单体架构或者说集中式系统在处理大量并发或数据的业务场景中越来越显得捉襟见肘,此时分布式架构应运而生。

​ 谈到分布式系统,就不得不提到另一种集中式系统,集中式系统有一个大型的中央处理系统,中央处理系统时一台高性能、可扩充的计算机,所有的数据、运算、处理任务全部在中央计算机系统上完成。中央计算机连接多个终端,终端用来输入和输出,不具有数据处理能力。远程终端通过网络连接到中央计算机,它们得到的信息是一致的。我们日常生活中常用的ATM机等都是用的集中式系统

​ 这种系统架构的优点主要是1.数据容易备份,只需要把中央计算机上的数据进行备份即可 2.在开发业务相对简单的系统时,总费用较低,只需要中央计算机的功能强大,终端只需要简单便宜的设备。缺点也很明显,在面对体量较大的业务时,性能扩展的成本太高,因为系统的总算力往往只取决于中央计算机的性能,另外集中式系统采用的单体架构,各业务模块之间的耦合度较高,导致架构的可拓展性较低

​ 而在一个分布式系统中,一组独立的计算机展现给用户的是一个统一的整体,就好像是一个系统似的。系统拥有多种通用的物理和逻辑资源,可以动态的分配任务,分散的物理和逻辑资源通过计算机网络实现信息交换。系统中存在一个以全局的方式管理计算机资源的分布式操作系统。通常,对用户来说,分布式系统只有一个模型或范型。在操作系统之上有一层软件中间件(middleware)负责实现这个模型 (百度百科“分布式系统”)

​ 简言之,分布式系统中的数据存储、任务的处理分布在网络中的不同机器上,每台主机都是一个独立的系统,联网的目的是为了获取更多的资源、丰富的服务。

分布式系统具有如下的特点:

  • 高度的可靠性

数据分散存储在网络中的不同主机上,系统中存在数据冗余,当一台机器发生故障时,可以使用另一台主机的备份。

  • 均衡负载

每台主机可以缓存本地最常用的数据,不需要频繁地访问服务器,减轻了服务器的负担,减少了网络的流量。

服务器也可以对任务进行分配和优化,克服几种系统中央计算机资源紧张的瓶颈。

  • 满足不同的需要

用户可以根据自己的需要在自己的主机上安装不同的操作系统、应用软件,使用不同的服务,

不再像集中式计算机系统那样受限于中央计算机的功能。

但是,分布式系统因为网络的不确定性,节点故障等情况,会带来各种复杂的问题

2.分布式系统的问题

1.通信异常:从集中式向分布式演变过程中,必然会引入网络因素,而由于网络本身的不可靠性,因此也引入了额外的问题。分布式系统需要在各个节点之间进行网络通信,因此当网络通信设备故障就会导致无法顺利完成一次网络通信,就算各节点的网络通信正常,但是消息丢失和消息延时也是非常普遍的事情。

2.网络分区(脑裂):网络发生异常情况导致分布式系统中部分节点之间的网络延时不断增大,最终导致组成分布式系统的所有节点,只有部分节点能够正常通行,而另一些节点则不能。我们称这种情况叫做网络分区(脑裂),当网络分区出现时,分布式系统会出现多个局部小集群(多个小集群可能又会产生多个master节点),所以分布式系统要求这些小集群要能独立完成原本需要整个分布式系统才能完成的功能,这就对分布式一致性提出了非常大的挑战。

3.节点故障:节点宕机是分布式环境中的常态,每个节点都有可能会出现宕机或僵死的情况,并且每天都在发生。

4.三态:由于网络不可靠的原因,因此分布式系统的每一次请求,都存在特有的“三态”概念,即:成功,失败与超时。在集中式单机部署中,由于没有网络因素,所以程序的每一次调用都能得到“成功”或者“失败”的响应,但是在分布式系统中,网络不可靠,可能就会出现超时的情况。可能在消息发送时丢失或者在响应过程中丢失,当出现超时情况时,网络通信的发起方是无法确定当前请求是否被成功处理的,所以这也是分布式事务的难点。

上面介绍的这几种情况,都有可能导致分布式系统产生数据一致性的问题。我们在分布式系统中,经常或存在数据复制的需求,主要有以下两种原因

​ 高可用:将数据复制到分布式部署的多台机器中,可以消除单点故障,防止系统由于某台(些)机器宕机导致的不可用。

​ 性能:通过负载均衡技术,能够让分布在不同地方的数据副本全都对外提供服务。有效提高系统性能。

在分布式系统引入复制机制后,不同的数据节点之间由于网络延时等原因很容易产生数据不一致的情况。复制机制的目的是为了保证数据的一致性。但是数据复制面临的主要难题也是如何保证多个副本之间的数据一致性。

对分布式数据一致性简单的解释就是:当对集群中一个副本数据进行更新的同时,必须确保能够同步更新到其他副本,否则不同副本之间的数据将不再一致。举个例子来说就是:当客户端C1将系统中的一个值K由V1更新为V2,但是客户端C2读的是另一个还没有同步更新的副本,K的值依然是V1,这就导致了数据的不一致性。其中,常见的就是主从数据库之间的复制延时问题。 #### 3.CAP理论

​ 对于本地事务处理或者集中式的事务处理系统,我们可以采用ACID模型来保证数据的严格一致性(事务概念上的)。在分布式系统中,当我们要求分布式系统具有严格一致性时,很可能就需要牺牲掉系统的可用性。如何构建一个兼顾可用性和一致性的分布式系统成为无数工程师探讨的问题

CAP是Consistency、Availablity和Partition-tolerance的缩写。分别是指:

1.一致性(Consistency):每次读操作都能保证返回的是最新数据,在分布式系统中,如果能针对一个数据项的更新执行成功后,所有的用户都可以读到其最新的值,这样的系统就被认为具有严格的一致性。

2.可用性(Availablity):任何一个没有发生故障的节点,会在合理的时间内返回一个正常的结果,也就是对于用户的每一个请求总是能够在有限的时间内返回结果;

3.分区容忍性(Partition-torlerance):当节点间出现网络分区(不同节点处于不同的子网络,子网络之间是联通的,但是子网络之间是无法联通的,也就是被切分成了孤立的集群网络),照样可以提供满足一致性和可用性的服务,除非整个网络环境都发生了故障。

CAP理论指出:CAP三者只能取其二,不可兼得。

我们可以分析一下为什么会这样:

​ 首先,如果我们要使网络分区不存在,就必须将系统部署在单个节点上,因为网络总是会出现故障,分区总是存在的,所以当部署在单节点上,可以同时保证CP,但是这时候,就没什么意义了,这都不是分布式了,同时单点故障可能会发生,就不会保证A可用性。

​ 所以我们必须明确一点:对于分布式系统而言,分区容错性是必须要满足的,因为分区的出现时必然,也是必须要解决的问题。所以,P必须要保证,那么我们就要在C和A之间做权衡。

​ 有两个或以上节点时,当网络分区发生时,集群中两个节点不能相互通信(也就是说不能保证可用性A)。此时如果保证数据的一致性C,那么必然会有一个节点被标记为不可用的状态,违反了可用性A的要求,只能保证CP。

​ 反正,如果保证可用性A,即两个节点可以继续各自处理请求,那么由于网络不通不能同步数据,必然又会导致数据的不一致,只能保证AP。

可以不用想到底放弃P,保证CA的时候是怎么弄的,因为P总是存在的,放弃不了。另外,可用性、一致性也是我们一般系统必须要满足的,如何在可用性和一致性进行权衡,所以就出现了各种一致性的理论与算法。

分布式一致性协议

2PC (两阶段提交协议)

在分布式系统中,会有多个机器节点,因此需要一个 “协调者” ,而各个节点就是 “参与者”,协调者统一调度所有分布式节点的执行逻辑,这些被调度的分布式节点就是 “参与者”。

1
协调者拥有超时机制,即如果在一定时间内没有收到 cohort 的消息则默认失败
协调流程

阶段一

阶段一主要是询问参与者是否可以进行提交。

1
2
3
4
5
6
事务询问
协调者向所有的参与者询问,是否准备好了执行事务,并开始等待各参与者的响应。
执行事务
各参与者节点执行事务操作,并将 Undo 和 Redo 信息记入事务日志中
各参与者向协调者反馈事务询问的响应
如果参与者成功执行了事务操作,那么就反馈给协调者 Yes 响应,表示事务可以执行;如果参与者没有成功执行事务,就返回 No 给协调者,表示事务不可以执行。

阶段二

阶段二会根据阶段一的投票结果执行两种操作:执行事务提交回滚事务

1
2
如果所有的节点在阶段一都成功返回了 yes,则执行事务提交
如果接收到了一个 no,则执行事务回滚

其中分为两种情况:

1.执行事务提交步骤如下:

1
2
3
4
发送提交请求:协调者向所有参与者发出 commit 请求。
事务提交:参与者收到 commit 请求后,会正式执行事务提交操作,并在完成提交之后释放整个事务执行期间占用的事务资源。
反馈事务提交结果:参与者在完成事务提交之后,向协调者发送 Ack 信息。
协调者接收到所有参与者反馈的 Ack 信息后,完成事务。

2.中断事务步骤如下:

1
2
3
4
发送回滚请求:协调者向所有参与者发出 Rollback 请求。
事务回滚:参与者接收到 Rollback 请求后,会利用其在阶段一种记录的 Undo 信息来执行事务回滚操作,并在完成回滚之后释放在整个事务执行期间占用的资源。
反馈事务回滚结果:参与者在完成事务回滚之后,想协调者发送 Ack 信息。
中断事务:协调者接收到所有参与者反馈的 Ack 信息后,完成事务中断。
优缺点

优点

原理简单,实现方便

缺点

同步阻塞:

  • 在二阶段提交的过程中,所有的节点都在等待其他节点的响应,无法进行其他操作。这种同步阻塞极大的限制了分布式系统的性能。

单点问题:

  • 协调者在整个二阶段提交过程中很重要,如果协调者在提交阶段出现问题,那么整个流程将无法运转,更重要的是:其他参与者将会处于一直锁定事务资源的状态中,而无法继续完成事务操作

数据不一致

  • 假设当协调者向所有的参与者发送 commti 请求之后,发生了局部网络异常或者是协调者在尚未发送完所有 commit 请求之前自身发生了崩溃,导致最终只有部分参与者收到了 commit 请求。这将导致严重的数据不一致问题。

过于保守:

  • 如果在二阶段提交的提交询问阶段中,参与者出现故障而导致协调者始终无法获取到所有参与者的响应信息的话,这时协调者只能依靠其自身的超时机制来判断是否需要中断事务,显然,这种策略过于保守。换句话说,二阶段提交协议没有设计较为完善的容错机制,任意一个节点是失败都会导致整个事务的失败

3PC(三阶段提交协议)

​ 三阶段提交协议在协调者和参与者中都引入超时机制,并且把两阶段提交协议的第一个阶段拆分成了两步:询问,然后再锁资源,最后真正提交。

协调流程:

阶段一: CanCommit

  • 事务询问:
    • 协调者向所有的参与者发送一个包含事务内容的 canCommit 请求,询问是否可以执行事务提交操作,并开始等待各参与者的响应。
  • 各参与者向协调者反馈事务询问的响应:
    • 参与者接收来自协调者的 canCommit 请求,如果参与者认为自己可以顺利执行事务,就返回 Yes,否则反馈 No 响应。

阶段二:preCommit

协调者在得到所有参与者的响应之后,会根据结果执行2种操作:执行事务预提交,或者中断事务

执行事务预提交分为 3 个步骤:

  1. 发送预提交请求:协调者向所有参与者节点发出 preCommit 的请求,并进入 prepared 状态。
  2. 事务预提交:参与者受到 preCommit 请求后,会执行事务操作,对应 2PC 中的 “执行事务”,也会 Undo 和 Redo 信息记录到事务日志中。
  3. 各参与者向协调者反馈事务执行的结果:如果参与者成功执行了事务,就反馈 Ack 响应,同时等待指令:提交(commit)终止(abor)

中断事务也分为2个步骤:

  1. 发送中断请求:协调者向所有参与者节点发出 abort 请求 。
  2. 中断事务:参与者如果收到 abort 请求或者超时了,都会中断事务。

阶段三:doCommit

该阶段做真正的提交,同样也会出现两种情况:

执行提交

  1. 发送提交请求:进入这一阶段,如果协调者正常工作,并且接收到了所有协调者的 Ack 响应,那么协调者将从 “预提交” 状态变为 “提交” 状态,并向所有的参与者发送 doCommit 请求 。
  2. 事务提交:参与者收到 doCommit 请求后,会正式执行事务提交操作,并在完成之后释放在整个事务执行期间占用的事务资源。
  3. 反馈事务提交结果:参与者完成事务提交后,向协调者发送 Ack 消息。
  4. 完成事务:协调者接收到所有参与者反馈的 Ack 消息后,完成事务。

中断事务

假设有任何参与者反馈了 no 响应,或者超时了,就中断事务。

  1. 发送中断请求:协调者向所有的参与者节点发送 abort 请求。
  2. 事务回滚:参与者接收到 abort 请求后,会利用其在二阶段记录的 undo 信息来执行事务回滚操作,并在完成回滚之后释放整个事务执行期间占用的资源。
  3. 反馈事务回滚结果:参与者在完成事务回滚之后,想协调者发送 Ack 消息。
  4. 中断事务:协调者接收到所有的 Ack 消息后,中断事务。

tips:一旦进入阶段三,可能会出现 2 种故障:

  • 协调者出现问题
  • 协调者和参与者之间的网络故障

一段出现了任一一种情况,最终都会导致参与者无法收到 doCommit 请求或者 abort 请求,针对这种情况,参与者都会在等待超时之后,继续进行事务提交

优点

  • 减少了参与者的阻塞范围(第一个阶段是不阻塞的)
  • 能够在单点故障后继续达成一致(2PC 在提交阶段会出现此问题,而 3PC 会根据协调者的状态进行回滚或者提交)

缺点

如果参与者收到了 preCommit 消息后,出现了网络分区,那么参与者等待超时后,都会进行事务的提交,这必然会出现事务不一致的问题

分布式一致性算法

paxos 一致性算法

paxos 算法保证了一致性

在一个分布式系统中,有一组的 process,每个 process 都可以提出一个 value,consensus 算法就是用来从这些 values 里选定一个最终 value。如果没有 value 被提出来,那么就没有 value 被选中;如果有1个 value 被选中,那么所有的 process 都应该被通知到。

在 2PC 或者 3PC 中,如果协调者宕机了,整个系统就宕机了,这个时候就需要引用多个协调者,paxos 就是用来协调协调者的协议。

算法的提出与证明

首先将议员的角色分为 proposersacceptors,和 learners(允许身兼数职)。proposers 提出提案,提案信息包括提案编号和提议的 valueacceptor 收到提案后可以接受(accept)提案,若提案获得多数派(majority)的 acceptors 的接受,则称该提案被批准(chosen);learners 只能“学习”被批准的提案。划分角色后,就可以更精确的定义问题:

  1. 决议(value)只有在被 proposers 提出后才能被批准(未经批准的决议称为“提案(proposal)”);
  2. 在一次 Paxos 算法的执行实例中,只批准(chosen)一个 value
  3. learners 只能获得被批准(chosen)的 value

通过不断加强上述3个约束(主要是第二个)获得了 Paxos 算法。

批准 value 的过程中,首先 proposersvalue 发送给 acceptors,之后 acceptorsvalue 进行接受(accept)。为了满足只批准一个 value 的约束,要求经“多数派(majority)”接受的 value 成为正式的决议(称为“批准”决议)。这是因为无论是按照人数还是按照权重划分,两组“多数派”至少有一个公共的 acceptor,如果每个 acceptor 只能接受一个 value,约束 2 就能保证。

于是产生了一个显而易见的新约束:

P1:一个 acceptor 必须接受(accept)第一次收到的提案。

注意 P1 是不完备的。如果恰好一半 acceptor 接受的提案具有 value A,另一半接受的提案具有 value B,那么就无法形成多数派,无法批准任何一个 value

约束 2 并不要求只批准一个提案,暗示可能存在多个提案。只要提案的 value 是一样的,批准多个提案不违背约束 2。于是可以产生约束 P2:

P2:一旦一个具有 value v 的提案被批准(chosen),那么之后批准(chosen)的提案必须具有 value v

note

  • 通过某种方法可以为每个提案分配一个编号,在提案之间建立一个全序关系,所谓“之后”都是指所有编号更大的提案

如果 P1 和 P2 都能够保证,那么约束 2 就能够保证。

批准一个 value 意味着多个 acceptor 接受(accept)了该 value。因此,可以对 P2 进行加强:

P2a:一旦一个具有 value v 的提案被批准(chosen),那么之后任何 acceptor 再次接受(accept)的提案必须具有 value v。(只是接受,还没有经过 proposer 批准)

由于通信是异步的,P2a 和 P1 会发生冲突。如果一个 value 被批准后,一个 proposer 和一个 acceptor 从休眠中苏醒,前者提出一个具有新的 value 的提案。根据 P1,后者应当接受,根据 P2a,则不应当接受,这种场景下 P2a 和 P1 有矛盾。于是需要换个思路,转而对 proposer 的行为进行约束:

P2b:一旦一个具有 value v 的提案被批准(chosen),那么以后任何 proposer 提出的提案必须具有 value v

由于 acceptor 能接受的提案都必须由 proposer 提出,所以 P2b 蕴涵了 P2a,是一个更强的约束。

但是根据 P2b 难以提出实现手段。因此需要进一步加强 P2b。

假设一个编号为 mvalue v 已经获得批准(chosen),来看看在什么情况下对任何编号为 nn > m)的提案都含有 value v。因为 m 已经获得批准(chosen),显然存在一个 acceptors 的多数派 C,他们都接受(accept)了 v。考虑到任何多数派都和 C 具有至少一个公共成员,可以找到一个蕴涵 P2b 的约束 P2c:

P2c:如果一个编号为 n 的提案具有 value v,那么存在一个多数派,要么他们中所有人都没有接受(accept)编号小于 n 的任何提案,要么他们已经接受(accept)的所有编号小于 n 的提案中编号最大的那个提案具有 value v

如果一个没有 chose 过任何 proposer 提案的 acceptorprepare 过程中接受了一个 proposer 针对提案 n 的问题,但是在开始对 n 进行投票前,又接受(accept)了编号小于n的另一个提案(例如 n-1),如果 n-1n 具有不同的 value,这个投票就会违背 P2c。因此在 prepare过程中,acceptor 进行的回答同时也应包含承诺:不会再接受(accept)编号小于 n 的提案。这是对 P1 的加强:

P1a:当且仅当 acceptor 没有回应过编号大于 nprepare 请求时,acceptor 接受(accept)编号为n的提案。

算法流程

通过一个决议分为两个阶段:

prepare 阶段

proposer 选择一个提案编号 n 并将 prepare 请求发送给acceptors中的一个多数派;

acceptor 收到 prepare 消息后,如果提案的编号大于它已经回复的所有 prepare 消息(回复消息表示接受 accept),则 acceptor 将自己上次接受的提案回复给 proposer,并承诺不再回复小于 n 的提案;

批准阶段

当一个 proposer 收到了多数 acceptorsprepare 的回复后,就进入批准阶段。它要向回复 prepare 请求的 acceptors 发送 accept 请求,包括编号 n 和根据 P2c 决定的 value (如果根据 P2c 没有已经接受的 value,那么它可以自由决定 value。 在不违背自己向其他 proposer 的承诺的前提下, acceptor 收到 accept 请求后即批准这个请求。

这个过程在任何时候中断都可以保证正确性。例如如果一个 proposer 发现已经有其他 proposers 提出了编号更高的提案,则有必要中断这个过程。因此为了优化,在上述 prepare 过程中,如果一个 acceptor 发现存在一个更高编号的提案,则需要通知 proposer,提醒其中断这次提案。

在这之后,提议者还需要做一件事,就是告知D,E,被决定的决议已经是什么了。即可。

这个过程叫 LearnDE 被称为 Learner.

paxos 总结
  • 在一个决议提议的过程中,其他决议会被否决。如上 E 的提议被否决,不会被应用,意味着更多的网络io,意味着更多的冲突。
  • 每一个服务器都可以作为提议者或者接收者
  • 不保证决议的顺序性

zookeeper(zab)

对于 paxos 来说,每一个议案都要经过不同节点的提出,并且讨论,在提出一个议案的阶段,另外的提议会被否决,导致了性能的低下。

ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持 崩溃恢复原子广播 协议。

基于该协议,Zookeeper 实现了一种 主备模式 的系统架构来保持集群中各个副本之间数据一致性。具体如下图所示:

即只有一个 proposal 可以提出提议,其他的进程都只能复制决议。

所有客户端写入数据都是写入到 主进程(称为 Leader)中,然后,由 Leader 复制到备份进程(称为 Follower)中。从而保证数据一致性。

note

  • 通过 TCP 协议的 FIFO 特性进行网络通信,能够能容易地把证消息广播过程种消息接收与发送地顺序性。(同时,使用 TCP 可以感受到 commit 是否被对方机器接收)
消息广播

ZAB 协议的消息广播过程使用的是一个原子广播协议,类似一个 二阶段提交过程。但是只需要 Follower 有一半以上返回 Ack 信息就可以执行提交,大大减小了同步阻塞。也提高了可用性。

对于客户端发送的写请求,全部由 Leader 接收,Leader 将请求封装成一个事务 Proposal,将其发送给所有 Follwer ,然后,根据所有 Follwer 的反馈,如果超过半数成功响应,则执行 commit 操作(先提交自己,再发送 commit 给所有 Follwer)。

流程如下:

  • Leader 接收到消息请求后,将消息赋予一个全局唯一的 64 位自增 id,叫做:zxid,通过 zxid 的大小比较即可实现因果有序这一特性。
  • Leader 通过先进先出队列(会给每个 follower 都创建一个队列,保证发送的顺序性)(通过 TCP 协议来实现,以此实现了全局有序这一特性)将带有 zxid 的消息作为一个提案(proposal)分发给所有 follower
  • follower 接收到 proposal,先将 proposal 写到本地事务日志,写事务成功后再向 leader 回一个 ACK
  • leader 接收到过半ACKs 后,leader 就向所有 follower 发送 COMMIT 命令,同意会在本地执行该消息。
  • follower 收到消息的 COMMIT 命令时,就会执行该消息
崩溃恢复

Leader 挂了之后,ZAB 协议就自动进入崩溃恢复模式,选举出新的 Leader,并完成数据同步,然后退出崩溃恢复模式进入消息广播模式。

可能 Leader 遇到如下异常情况:

  1. 假设一个事务在 Leader 上提交了,并且过半 Follow 都响应 ACK 了,但是 Leadercommit 消息发出后就挂了
  2. 假设一个事务在 Leader 提交了之后,Leader 就挂掉了。 要保证如果发生上述 2 种情况,数据还能保持一致性,那么 ZAB 协议选举算法必须确保已经提交的 proposal发送过 commit 消息),在 Follow 上也必须完成提交;并且丢弃已经被跳过的事务 proposal

第一种情况 主要是当 leader 收到合法数量 followerACKs 后,就向各个 follower 广播 COMMIT 命令,同时也会在本地执行 COMMIT 并向连接的客户端返回「成功」。但是如果在各个 follower 在收到 COMMIT 命令前 leader 就挂了,导致剩下的服务器并没有执行都这条消息。

为了实现已经被处理的消息不能丢这个目的,Zab 的恢复模式使用了以下的策略:

  • 选举拥有

    1
    proposal

    最大值(即

    1
    zxid

    最大) 的节点作为新的

    1
    leader
    • 由于所有提案被 COMMIT 之前必须有合法数量的 follower ACK,即必须有合法数量的服务器的事务日志上有该提案的 proposal,因此,只要有合法数量的节点正常工作,就必然有一个节点保存了所有被 COMMIT 消息的 proposal 状态。
  • 新的 leader 将自己事务日志proposal 但未 COMMIT 的消息处理。

  • 新的 leaderfollower 建立先进先出的队列, 先将自身有而 follower 没有的 proposal 发送给 follower,再将这些 proposalCOMMIT 命令发送给 follower,以保证所有的 follower 都保存了所有的 proposal、所有的 follower 都处理了所有的消息。通过以上策略,能保证已经被处理的消息不会丢

第二种情况主要是当 leader 接收到消息请求生成 proposal 后就挂了,其他 follower 并没有收到此 proposal,因此经过恢复模式重新选了 leader 后,这条消息是被跳过的(其他机器日志中没有这一条记录,但是他的日志中有这一条记录)。 此时,之前挂了的 leader 重新启动并注册成了 follower,他保留了被跳过消息的 proposal 状态,与整个系统的状态是不一致的,需要将其删除

Zab 通过巧妙的设计 zxid 来实现这一目的。一个 zxid 是 64 位,高 32 是纪元(epoch)编号,每经过一次 leader 选举产生一个新的 leader,新 leader 会将 epoch + 1。低 32 位是消息计数器,每接到一个消息,则 $lo^{32} + 1$,新 leader 选举后这个值重置为 0。这样设计的好处是旧的 leader 挂了后重启,它不会被选举为 leader,因为此时它的 zxid 肯定小于当前的新 leader。当旧的 leader 作为 follower 接入新的 leader 后,新的 leader 会让它将所有的拥有旧的 epoch 未被 COMMITproposal 清除。

note

  • 脑裂:1 个集群如果发生了网络故障,很可能出现 1 个集群分成了两部分,而这两个部分都不知道对方是否存活,不知道到底是网络问题还是直接机器 down 了,所以这两部分都要选举 1 个 Leader ,而一旦两部分都选出了 Leader, 并且网络又恢复了,那么就会出现两个 Brain 的情况,整个集群的行为不一致了。
  • Zab 使用 Majority Quorums 的方式解决脑裂,一定要超过半数才能选举出来 leader
zab 总结
  • Zk 集群中的 client 和任意一个节点建立 TCP 长连接,完成所有的交互动作(虽然所有的交互都是 节点转发到 leader
  • Zk 中的读请求,直接由连接的节点处理,不需要经过 leader这种模式可能导致读取到的数据是过时的,但是可以保证一定是半数节点之前确认过的数据
  • Zk有 sync() 方法,可以保证读取到最新的数据。

raft 协议

Raft是用于管理复制日志的一致性算法,raft 协议也是一个主备模型,有一个唯一的 leader 控制任务的提交。

如下是一个 raft 协议中每一个节点可能存在的状态,主要分为领袖群众候选人

raft 最关键的一个概念是任期,每一个 leader 都有自己的任期,必须在任期内发送心跳信息给 follower 来延长自己的任期。

Leader 选举过程
  • 所有节点初始状态都是 Follower 角色
  • 超时时间(每一个节点会自定义一个超时时间)内没有收到 Leader 的请求则转换为 Candidate 进行选举
  • 每一个 Candidate 首先会投给自己,如果没有一个 Candidate 得到了过半的票数,则每一个 Candidate 随机休息一段时间(Election timeout
  • 如果某一 Candidate 休息结束,则发送信息,如果其他 Candidate 仍然在休息,就只能投给当前 Candidate
  • Candidate 收到大多数节点的选票则转换为 Leader;发现 Leader 或者收到更高任期的请求则转换为 Follower
  • Leader 在收到更高任期的请求后转换为 Follower
数据同步过程

Raft 协议强依赖 Leader 节点的可用性来确保集群数据的一致性数据的流向只能从 Leader 节点向 Follower 节点转移。当 Client 向集群 Leader 节点提交数据后,Leader 节点接收到的数据处于未提交状态(Uncommitted,接着 Leader 节点会并发向所有 Follower 节点复制数据并等待接收响应,确保至少集群中超过半数节点已接收到数据后再向 Client 确认数据已接收。一旦向 Client 发出数据接收 Ack 响应后,表明此时数据状态进入已提交(Committed),Leader 节点再向 Follower 节点发通知告知该数据状态已提交。

在数据同步阶段,可能出现七种情况:

  • 数据到达

    1
    Leader

    节点前,

    1
    Leader

    挂掉。

    • 这个阶段 Leader 挂掉不影响一致性。
  • 数据到达

    1
    Leader

    节点,但未复制到

    1
    Follower

    节点,

    1
    Leader

    挂掉。

    • 这个阶段 Leader 挂掉,数据属于未提交状态,Client 不会收到 Ack 会认为超时失败可安全发起重试Follower 节点上没有该数据,重新选主后 Client 重试重新提交可成功。原来的 Leader 节点恢复后作为 Follower 加入集群重新从当前任期的新 Leader 处同步数据,强制保持和 Leader 数据一致。
  • 数据到达

    1
    Leader

    节点,成功复制到

    1
    Follower

    所有节点,但还未向

    1
    Leader

    响应接收,

    1
    Leader

    挂掉。

    • 这个阶段 Leader 挂掉,虽然数据在 Follower 节点处于未提交状态(Uncommitted)但保持一致,重新选出 Leader 后可完成数据提交,此时 Client 由于不知到底提交成功没有,可重试提交。针对这种情况 Raft 要求 RPC 请求实现幂等性,也就是要实现内部去重机制。(类似于 zab)
  • 数据到达

    1
    Leader

    节点,成功复制到

    1
    Follower

    部分节点,但还未向

    1
    Leader

    响应接收,

    1
    Leader

    挂掉。

    • 这个阶段 Leader 挂掉,数据在 Follower 节点处于未提交状态(Uncommitted)且不一致,Raft 协议要求投票只能投给拥有最新数据的节点。所以拥有最新数据的节点会被选为 Leader 再强制同步数据到 Follower,数据不会丢失并最终一致。
  • 数据到达

    1
    Leader

    节点,成功复制到

    1
    Follower

    所有或多数节点,数据在

    1
    Leader

    处于已提交状态,但在

    1
    Follower

    处于未提交状态,,

    1
    Leader

    挂掉。

    • 这个阶段 Leader 挂掉,重新选出新 Leader 后的处理流程和阶段 3 一样。
  • 网络分区导致的脑裂情况,出现双

    1
    Leader
    • 原先的 Leader 独自在一个区,向它提交数据不可能复制到多数节点所以永远提交不成功。向新的 Leader 提交数据可以提交成功,网络恢复后旧的 Leader 发现集群中有更新任期(Term)的新 Leader 则自动降级为 Follower 并从新 Leader 处同步数据达成集群数据一致。

(另)一致性哈希算法java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import java.util.*;

public class ConsistentHashing {
// 物理节点
private Set<String> physicalNodes = new TreeSet<String>() {
{
add("192.168.1.101");
add("192.168.1.102");
add("192.168.1.103");
add("192.168.1.104");
}
};

//虚拟节点
private final int VIRTUAL_COPIES = 1000000; // 物理节点至虚拟节点的复制倍数
private TreeMap<Long, String> virtualNodes = new TreeMap<>(); // 哈希值 => 物理节点

// 32位的 Fowler-Noll-Vo 哈希算法
// https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function
private static Long FNVHash(String key) {
final int p = 16777619;
Long hash = 2166136261L;
for (int idx = 0, num = key.length(); idx < num; ++idx) {
hash = (hash ^ key.charAt(idx)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;

if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}

// 根据物理节点,构建虚拟节点映射表
public ConsistentHashing() {
for (String nodeIp : physicalNodes) {
addPhysicalNode(nodeIp);
}
}

// 添加物理节点
public void addPhysicalNode(String nodeIp) {
for (int idx = 0; idx < VIRTUAL_COPIES; ++idx) {
long hash = FNVHash(nodeIp + "#" + idx);
virtualNodes.put(hash, nodeIp);
}
}

// 删除物理节点
public void removePhysicalNode(String nodeIp) {
for (int idx = 0; idx < VIRTUAL_COPIES; ++idx) {
long hash = FNVHash(nodeIp + "#" + idx);
virtualNodes.remove(hash);
}
}

// 查找对象映射的节点
public String getObjectNode(String object) {
long hash = FNVHash(object);
SortedMap<Long, String> tailMap = virtualNodes.tailMap(hash); // 所有大于 hash 的节点
Long key = tailMap.isEmpty() ? virtualNodes.firstKey() : tailMap.firstKey();
return virtualNodes.get(key);
}

// 统计对象与节点的映射关系
public void dumpObjectNodeMap(String label, int objectMin, int objectMax) {
// 统计
Map<String, Integer> objectNodeMap = new TreeMap<>(); // IP => COUNT
for (int object = objectMin; object <= objectMax; ++object) {
String nodeIp = getObjectNode(Integer.toString(object));
Integer count = objectNodeMap.get(nodeIp);
objectNodeMap.put(nodeIp, (count == null ? 0 : count + 1));
}

// 打印
double totalCount = objectMax - objectMin + 1;
System.out.println("======== " + label + " ========");
for (Map.Entry<String, Integer> entry : objectNodeMap.entrySet()) {
long percent = (int) (100 * entry.getValue() / totalCount);
System.out.println("IP=" + entry.getKey() + ": RATE=" + percent + "%");
}
}

public static void main(String[] args) {
ConsistentHashing ch = new ConsistentHashing();

// 初始情况
ch.dumpObjectNodeMap("初始情况", 0, 65536);

// 删除物理节点
ch.removePhysicalNode("192.168.1.103");
ch.dumpObjectNodeMap("删除物理节点", 0, 65536);

// 添加物理节点
ch.addPhysicalNode("192.168.1.108");
ch.dumpObjectNodeMap("添加物理节点", 0, 65536);
}
}