北川广海の梦

北川广海の梦

Dgraph Raft RPC实现分析

2023-06-20
Dgraph Raft RPC实现分析

Dgraph Raft RPC 实现分析

在raft集群中,各个节点的通信都是基于RPC实现的。各个节点需要通过rpc,实现消息的同步、心跳、快照传输等。etcd的raft库实现了核心算法。而具体的网络通信,需要应用自行实现。

etcd的raft example采用了http传输。dgraph则通过gRPC进行实现。开发者可以自行选择。

本文会分析一些Dgraph数据库raft的rpc实现的相关部分

心跳

raft协议中,leader需要定时向集群中的其他节点发送自己的心跳,以确保自己的领导地位。在raft论文中,这个心跳是通过消息发送完成的。也就是如果leader发送给follower的复制消息是空的,那么就代表这是一个心跳信息,防止follower成为新的节点。

但是这个心跳实际上是raft协议的部分,对于系统的更上层来说,是感受不到心跳的存在的。dgraph在实现过程中,通过gRPC,自行的对节点之间的网络连接进行了心跳检查维护。

具体做法:

service Raft {
  rpc Heartbeat(api.Payload) returns (stream HealthInfo) {}
  rpc IsPeer(RaftContext) returns (PeerResponse) {}
}

在系统启动后,dgraph节点会从zero集群获取到对等节点的信息,然后会尝试创建gRPC连接,作为一个peer对象。dgraph为每一个peer分配了一个goroutine来监控节点的健康状态:每隔固定的时间,通过Heartbeat这个stream rpc,接收对方的响应,并持续维护这个节点的最后响应时间。当这个节点超过一定时间没有响应时,就会进入重连状态,当重连成功后,会调用IsPeer这个rpc,确认连接是否恢复正常。之后会返回固定时间接收响应的状态。

而更上层的raft,所有的网络通信都是通过上面提到的dgrpah自行维护的网络连接来进行的。例如raft协议需要对某个节点发送一个message,就需要通过peer来进行实现。在这一层次只需要关心对应peer的状态:连接到底可不可用,而无需处理任何网络失败的问题。

快照

raft协议底层是通过WAL实现的,而日志的数量不可能永远持续增长下去,必须对其进行压缩,这个压缩的过程,就是生成快照的过程。如果数据库是一个类似redis的内存数据库,那么通过fork,即可生成。而dgraph采用了badgerDB存储引擎,它生成快照的做法是:系统维护了一个watermark: applyIndex,记录了有哪些log entry是已经apply到本地db的(log entry被大多数节点接受叫commited,被应用叫applied),当需要生成快照的时候,会遍历wal中的所有entry,直到applyIndex,找寻拥有最大的commitTS(badgerDB用于MVCC的时间戳)的log entry,并且会以这个log entry的index,作为snapshot对应的index。以这个最大的commitTS作为snapshot的TS。

并且,只有leader会生成快照,leader生成完毕后,通过raft message将这个snapshot发送给其他节点,其他节点会在apply这个消息的时候,根据消息中snapshot的index之前的log entry删除,并且将snapshot保存到Raft日志存储中。

可以看出,dgrpah的快照其实根本就没有包含完整的数据,只额外记录了一个TS。这样做的原因是因为Dgraph的存储引擎,badgerDB,TS其实就是一个事务的时间戳,也可以理解为数据库的版本号。由于Raft算法可以保证:拥有完整log entry的节点才会被选为leader,那么我们可以认为leader节点的badgerDB状态一定是正确的。所以leader生成快照并发送给follower之后,其他成员会通过rpc的方式向leader请求快照数据。这个时候leader就会通过badgerDB的stream,将数据写入到follower节点中。

这样做的好处是显而易见的,可以显著减少快照的体积,只在真正需要用到快照时,传输完整的数据,并且能保证各个节点上存储的快照的状态都是一致的。

消息发送

raft leader协议会将需要同步的log entry源源不断的发送给follower。Dgraph对此进行了优化,将发送给同一个节点的消息进行合并,批量发送,减小网络开销。

具体做法是:

  • 为每一个需要发送的节点维护一个channel,并且有一个专门的goroutine读取这个channel并发送消息
    
  • 当有消息需要发送时,主消息goroutine会读取上游传递来的消息,直到当前没有立即可读的消息,并将这些消息写入到为每一个节点准备的buffer中,这个buffer就是第一步初次合并
    
  • 随后将这些buffer分别写入节点对应的channel
    
  • 每个节点对应的goroutine读取到buffer后,会再次尝试读取,直到超过消息大小限制,或者没有立即可读的消息,此时会调用rpc,将消息发送出去
    

由以上流程可以看出,这是一个典型的reactor模型,go语言的channel非常适合这种编程模式。并且这种做法减少了网络rpc相关对象的创建与开销,优化了消息传输的性能。