

Dgraph数据库 raft wal实现

Dgraph数据库 raft wal实现


分布式系统可以大致分为两种,分别是有状态和无状态的。其中无状态的系统最为简单,例如一个多节点的web api,提供一个计算服务,只需要通过负载均衡,即可非常轻松的搭建起来。并且随意增减节点,使得服务可以100%达到可用状态。

而对于有状态的服务,例如分布式存储,情况就会变得非常复杂。各个节点之间由于硬件和网络的不可靠,各个节点的状态并不能完全保持一致。需要通过特殊的方式处理。Raft算法 是Diego Ongaro教授于2013年发表的论文中提到的算法。它的最大的特点是易于理解。本文不会详细展开算法思想细节。但是会涉及许多算法中提及的概念。请自行参考raft算法论文。



Dgraph的Raft WAL实现

raft算法从复制状态机出发,通过日志进行同步,使得整个系统可以达成一致性状态。所以一个系统要实现raft算法,必须能够对WAL(Write-ahead logging)进行管理。etcd的raft库,将WAL抽象成了一个接口,应用的开发者,需要将其实现。

type Storage interface {
	// InitialState returns the saved HardState and ConfState information.
	InitialState() (pb.HardState, pb.ConfState, error)
	// Entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// Entries returns at least one entry if any.
	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via Entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later.
	Snapshot() (pb.Snapshot, error)



// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)

InitialState,需要从存储的wal中,返回当前机器的状态。分别包括HardState和ConfState,HardState 包含,“任期”,“选票”,”提交索引“ ConfState则包含raft配置信息,例如一些集群相关信息。

// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

Entries,则是提供了一个传入区间,取出对应index 的日志条目的方法, 如果想要的日志区间不存在,则会返回错误。

// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)

Term 提供了查询一个日志条目属于哪个任期,日志条目索引必须符合范围。

// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)


// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)


整体来讲,storage需要实现以上方法。 下面我们先看看具体的数据结构


type DiskStorage struct {
	dir  string
	elog trace.EventLog

	meta *metaFile
	wal  *wal
	lock sync.Mutex

这个struct就是对Storage interface的实现。 dir字段对应存储的目录。elgo是性能追踪用的。meta则是一个保存原数据的结构,后面我们会重点分析它。wal则是负责log entry 的管理的数据结构,后面也将重点分析。



// metaFile stores the RAFT metadata (e.g RAFT ID, snapshot, hard state).
type metaFile struct {


type MetaInfo int

const (
	RaftId MetaInfo = iota

// getOffset returns offsets in wal.meta file.
func getOffset(info MetaInfo) int {
	switch info {
	case RaftId:
		return 0
	case GroupId:
		return 8
	case CheckpointIndex:
		return 16
	case SnapshotIndex:
		return snapshotIndex
	case SnapshotTerm:
		return snapshotIndex + 8
		panic("Invalid info: " + fmt.Sprint(info))

通过这段代码,我们可以发现,它是通过偏移量访问byte数组的。其中包含以下信息: RaftId:本节点在当前集群里的id GroupId:raft group的id CheckpointIndex:保存了最新 applied 的 index ,用于重启后恢复 HardState:getOffset方法没有体现,保存在512偏移量的位置 SnapshotIndex和SnapshotTerm:最新的快照对应的日志条目索引和任期 SnapShotData:getOffset方法并没有直接体现,但是它在CheckpointIndex+16的位置



type wal struct {
	// files is the list of all log files ordered in ascending order by the first
	// index in the file. current is the file currently being written to, and is
	// added to files only after it is full.
	files   []*logFile
	current *logFile
	// nextEntryIdx is the index of the next entry to write to. When this value exceeds
	// maxNumEntries the file will be rotated.
	nextEntryIdx int
	// dir is the directory to use to store files.
	dir string

wal结构体,就是用来具体管理所有的日志文件的了。 首先就是files字段,对应的就是所有日志的磁盘文件
current,代表当前正在使用的文件,它并不会出现在files slice里。
dir 就是保存日志文件的目录了。

wal提供了一系列用于访问entry的方法, 例如:


func (l *wal) seekEntry(raftIndex uint64) (entry, error)


func (l *wal) AddEntries(entries []raftpb.Entry) error

获取管理的地一个日志条目的索引: storage也是直接调用这个方法实现的。

func (l *wal) firstIndex() uint64


type logFile struct {
	fid int64

	registry *badger.KeyRegistry
	dataKey  *pb.DataKey
	baseIV   []byte


