# 一、什么是 Raft算法
Raft
适用于一个管理日志一致性的协议,相比于Paxos
协议Raft
更易于理解和去实现它。为了提高理解性,Raft
将一致性算法分为了几个部分,包括领导选取leader selection
、日志复制log replication
、安全safety
,并且使用了更强的一致性来减少了必须需要考虑的状态。
Raft
算法将Server
划分为3种状态,或者也可以称作角色:
【1】Leader:负责Client
交互和log
复制,同一时刻系统中最多存在1个。
【2】Follower:被动响应请求RPC
,从不主动发起请求RPC
。
【3】Candidate:一种临时的角色,只存在于Leader
的选举阶段,某个节点想要变成Leader
,那么就发起投票请求,同时自己变成Candidate
。如果选举成功,则变为Candidate
,否则退回为Follower
状态或者说角色的流转如下:

在Raft
中,问题分解为:领导选取、日志复制、安全和成员变化。
复制状态机通过复制日志来实现
日志:每台机器保存一份日志,日志来自于客户端的请求,包含一系列的命令
状态机:状态机会按顺序执行这些命令
一致性模型:分布式环境下,保证多机的日志是一致的,这样回放到状态机中的状态是一致的
# 二、Raft算法选主流程
Raft
中有Term
的概念,Term
类比中国历史上的朝代更替,Raft
算法将时间划分成为任意不同长度的任期term
。
选举流程:
1、Follower
增加当前的term
,转变为Candidate
。
2、Candidate
投票给自己,并发送RequestVote RPC
给集群中的其他服务器。
3、收到 RequestVote
的服务器,在同一term
中只会按照先到先得投票给至多一个Candidate
。且只会投票给log
至少和自身一样新的Candidate
。

关于Raft
更详细的描述,可以查看这里,从分布式一致性到共识机制(二)Raft算法
# 三、Nacos中的 CP一致性
Spring Cloud Alibaba Nacos
在1.0.0
正式支持AP
和CP
两种一致性协议,其中CP
一致性协议实现,是基于简化的Raft
的CP
一致性。
如何实现 Raft算法: Nacos server
在启动时,会通过RunningConfig.onApplicationEvent()
方法调用RaftCore.init()
方法。
启动选举
public static void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
// 启动Notifier,轮询Datums,通知RaftListener
executor.submit(notifier);
// 获取Raft集群节点,更新到PeerSet中
peers.add(NamingProxy.getServers());
long start = System.currentTimeMillis();
// 从磁盘加载Datum和term数据进行数据恢复
RaftStore.load();
Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}",
peers.size(), datums.size(), peers.getTerm());
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
System.out.println(notifier.tasks.size());
}
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
GlobalExecutor.register(new MasterElection()); // Leader选举
GlobalExecutor.register1(new HeartBeat()); // Raft心跳
GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);
if (peers.size() > 0) {
if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {
initialized = true;
lock.unlock();
}
} else {
throw new Exception("peers is empty.");
}
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
在init
方法主要做了如下几件事:
【1】获取Raft
集群节点peers.add(NamingProxy.getServers())
;
【2】Raft
集群数据恢复RaftStore.load()
;
【3】Raft
选举GlobalExecutor.register(new MasterElection())
;
【4】Raft
心跳GlobalExecutor.register(new HeartBeat())
;
【5】Raft
发布内容
【6】Raft
保证内容一致性
选举流程: 其中,raft
集群内部节点间是通过暴露的Restful
接口,代码在RaftController
中。RaftController
控制器是Raft
集群内部节点间通信使用的,具体的信息如下
POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求
POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息
GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息
PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息
POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入
DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作
GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息
GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN}
POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作
DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作
GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息
GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者
RaftPeerSet
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
心跳机制: Raft
中使用心跳机制来触发Leader
选举。心跳定时任务是在GlobalExecutor
中,通过GlobalExecutor.register(new HeartBeat())
注册心跳定时任务,具体操作包括:
【1】重置Leader
节点的heart timeout
、election timeout
【2】sendBeat()
发送心跳包
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
简单说明了下Nacos
中的Raft
一致性实现,更详细的流程,可以下载源码,查看RaftCore
进行了解。源码可以通过以下地址检出:
git clone https://github.com/alibaba/nacos.git

# 四、Nacos AP 实现
AP协议: Distro协议。Distro
是阿里巴巴的私有协议,目前流行的Nacos
服务管理框架就采用了Distro
协议。Distro
协议被定位为临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session
会话, 该会话只要存在,数据就不会丢失 。
Distro
协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。
Distro协议具有以下特点:
【1】专门为了注册中心而创造出的协议;
【2】客户端与服务端有两个重要的交互,服务注册与心跳发送;
【3】客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;
【4】客户端请求失败则换一个节点重新发送请求;
【5】服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;
【6】每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;
【7】服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;
【8】服务端如果长时间未收到客户端心跳,则下线该服务;
【9】负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他节点;
【10】节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。
Distro
协议服务端节点发现使用寻址机制来实现服务端节点的管理。在Nacos
中,寻址模式有三种:

【1】单机模式: StandaloneMemberLookup
【2】文件模式: FileConfigMemberLookup
利用监控cluster.conf
文件的变动实现节点的管理。核心代码如下:

【3】服务器模式: AddressServerMemberLookup
使用地址服务器存储节点信息,服务端节点定时拉取信息进行管理

核心代码:

初始全量同步: Distro
协议节点启动时会从其他节点全量同步数据。在Nacos
中,整体流程如下:

【1】启动一个定时任务线程DistroLoadDataTask
加载数据,调用load()
方法加载数据
【2】调用loadAllDataSnapshotFromRemote()
方法从远程机器同步所有的数据
【3】从namingProxy
代理获取所有的数据data
● 构造http
请求,调用httpGet
方法从指定的server
获取数据
● 从获取的结果result
中获取数据bytes
【4】处理数据processData
● 从data
反序列化出datumMap
● 把数据存储到dataStore
,也就是本地缓存dataMap
● 监听器不包括key
,就创建一个空的service
,并且绑定监听器
【5】监听器listener
执行成功后,就更新data store
核心代码如下:




增量同步: 新增数据使用异步广播同步:
【1】DistroProtocol
使用sync()
方法接收增量数据
【2】向其他节点发布广播任务:调用distroTaskEngineHolder
发布延迟任务
【3】调用DistroDelayTaskProcessor.process()
方法进行任务投递:将延迟任务转换为异步变更任务
【4】执行变更任务DistroSyncChangeTask.run()
方法:向指定节点发送消息
● 调用DistroHttpAgent.syncData()
方法发送数据
● 调用NamingProxy.syncData()
方法发送数据
【5】异常任务调用handleFailedTask()
方法进行处理
● 调用DistroFailedTaskHandler
处理失败任务
● 调用DistroHttpCombinedKeyTaskFailedHandler
将失败任务重新投递成延迟任务
核心代码如下:




# 五、总结
Distro
协议是阿里的私有协议,但是对外开源框架只有Nacos
。所有我们只能从Nacos
中一窥Distro
协议。Distro
协议是一个比较简单的最终一致性协议。整体由节点寻址、数据全量同步、异步增量同步、定时上报client所有信息、心跳探活其他节点等组成。
本文中的
Nacos
源码版本为Nacos 1.3.2
,属于优化过的源码,抽象出一致性协议抽象接口,和JRaft
共用节点寻址模式。