zincsearch 写入document 过程
zincsearch
zincsearch 是一个以 bluge 作为全文检索库的搜索引擎,基于 Go 编写,其最大的特点就是轻量级,对资源友好。相比 ElasticSearch 对机器的配置要求,zincsearch 更适合作为一个应用程序的检索,例如你可以很轻松的为自己的 博客、个人网站,基于 zincsearch 构建一个检索系统。并且它提供了与 ElasticSearch 完全兼容的 API,使用者可以很轻松的切换到 zincsearch。目前网站上关于其资料并不多,本文将分析一个 document 的写入,到其被持久化,最终被可搜索的过程。
源码地址:https://github.com/zincsearch/zincsearch
写入一个document
写入的 API
zincsearch 使用了 gin 作为 web api 框架,我们可以很轻松的找到创建 document 这个 API 的位置。在 pkg/routes/routes.go 文件,我们可以看到所有的 API。我们找到 document.CreateUpdate
方法:
func CreateUpdate(c *gin.Context) {
indexName := c.Param("target")
docID := c.Param("id") // ID for the document to be updated provided in URL path
var err error
var doc map[string]interface{}
if err = zutils.GinBindJSON(c, &doc); err != nil {
zutils.GinRenderJSON(c, http.StatusBadRequest, meta.HTTPResponseError{Error: err.Error()})
return
}
update := false
// If id field is present then use it, else create a new UUID and use it
if id, ok := doc["_id"]; ok {
docID = id.(string)
}
if docID == "" {
docID = ider.Generate()
} else {
update = true
}
// If the index does not exist, then create it
index, _, err := core.GetOrCreateIndex(indexName, "", 0)
if err != nil {
zutils.GinRenderJSON(c, http.StatusInternalServerError, meta.HTTPResponseError{Error: err.Error()})
return
}
err = index.CreateDocument(docID, doc, update)
if err != nil {
zutils.GinRenderJSON(c, http.StatusInternalServerError, meta.HTTPResponseError{Error: err.Error()})
return
}
zutils.GinRenderJSON(c, http.StatusOK, meta.HTTPResponseESID{
Message: "ok",
ID: docID,
ESID: docID,
Index: indexName,
Version: 1,
SeqNo: 0,
PrimaryTerm: 0,
Result: "created",
})
}
这个 就是 API 部分,它做的事情很简单,从请求中读取参数,然后判断 index 是否存在,如果不存在则创建。然后调用 index.CreateDocument
将文档保存。
// CreateDocument inserts or updates a document in the zinc index
func (index *Index) CreateDocument(docID string, doc map[string]interface{}, update bool) error {
// metrics
IncrMetricStatsByIndex(index.GetName(), "wal_request")
// check WAL
shard := index.GetShardByDocID(docID)
if err := shard.OpenWAL(); err != nil {
return err
}
secondShardID := ShardIDNeedLatest
if update {
secondShardID = ShardIDNeedUpdate
}
data, err := shard.CheckDocument(docID, doc, update, secondShardID)
if err != nil {
return err
}
return shard.wal.Write(data)
}
这里做先用 docID,通过哈希的方式,计算出 shardId,然后又根据是否 update,得到了第二个 shardId,这么做的原因,在 pkg/core/index_shards.go中的注释有描述:一个 index 的索引被分为了两层:第一层的数量就是 在创建 index的时候的 shard num,这个值是固定的。而第二层则是做出的优化,为了避免单个 shard 过大,所以在前面的基础上再次做了 shard,这可以提高性能。并且每个 second shard 的大小受环境变量 ZINC_SHARD_MAX_SIZE
控制。默认大小为 1GB。
之后是 shard.CheckDocument 方法,这个方法做的事情就是对document 的字段进行检查,并将前面的 second shard相关信息暂时写入 doc中,之后将doc序列化返回。之后,first shard将这个 doc的数据写入到 wal,此时这个document就被持久化保存了,用户的写入请求也会成功返回,但是这并不意味着这个 doc 此时能被搜索到。
从 WAL 到可搜索
上文提到,doc 被写入 WAL之后,写入请求就结束了,但是它并不是可搜索的。因为直到此时,我们的全文检索库都还没有登场。现在我们定位到 pkg/core/index_shards_wallist.go 文件:
func init() {
ZINC_INDEX_SHARD_WAL_LIST.Shards = make(map[string]*IndexShard)
go ZINC_INDEX_SHARD_WAL_LIST.ConsumeWAL()
}
在 init 函数,它启动了一个 ConsumeWAL goroutine,很显然,这就是文档真正被索引的过程了。
func (t *IndexShardWALList) ConsumeWAL() {
indexes := make(map[string]*Index)
eg := &errgroup.Group{}
eg.SetLimit(config.Global.Shard.GoroutineNum)
tick := time.NewTicker(config.Global.WalSyncInterval)
for range tick.C {
shardClosed := make(chan string, t.Len())
indexUpdated := make(chan string, t.Len())
for _, shard := range t.List() {
shard := shard
indexes[shard.GetIndexName()] = shard.root
eg.Go(func() error {
select {
case <-shard.close:
shardClosed <- shard.GetShardName()
return nil
default:
// continue
}
updated := shard.ConsumeWAL()
if updated {
indexUpdated <- shard.GetIndexName()
}
return nil
})
}
_ = eg.Wait()
close(shardClosed)
close(indexUpdated)
// check shard closed
for name := range shardClosed {
t.Remove(name)
}
// update index stats
for name := range indexUpdated {
index, ok := indexes[name]
if !ok {
continue
}
_ = index.UpdateMetadata()
size := index.GetWALSize()
if size == uint64(index.GetShardNum()) {
size = 0
}
index.UpdateWALSize(size)
stats := index.GetStats()
SetMetricStatsByIndex(name, "doc_num", float64(atomic.LoadUint64(&stats.DocNum)))
SetMetricStatsByIndex(name, "storage_size", float64(atomic.LoadUint64(&stats.StorageSize)/1024/1024)) // convert to MB
delete(indexes, name)
}
// force gc
runtime.GC()
}
}
可以看到有一个ticker,根据配置的固定时间,会执行一次 shard.ConsumeWAL,这是不是很像 ES?ES有一个延迟1s的问题,就是写入数据之后,需要等1s左右才可以搜索到,其实和这里是一样的,ES同样是先将数据写入了内存缓冲区,并没有flush,所以数据是不可搜的。
接下来,我们再进入shard.ConsumeWAL
方法:
// ConsumeWAL consume WAL for index returns if there is any data updated
func (s *IndexShard) ConsumeWAL() bool {
if err := s.wal.Sync(); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.Sync()")
}
var err error
var entry []byte
var minID, maxID, startID uint64
maxID, err = s.wal.LastIndex()
if err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.LastIndex()")
return false
}
// read last committed ID
_, minID, err = s.readRedoLog(RedoActionWrite)
if err != nil && err.Error() != errors.ErrNotFound.Error() {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.readRedoLog()")
return false
}
if minID == maxID {
return false // no new entries
}
log.Debug().Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal begin")
// limit max batch size
if maxID-minID > MaxBatchSize {
maxID = minID + MaxBatchSize
}
batch := blugeindex.NewBatch()
docs := make(walMergeDocs)
minID++
for startID = minID; minID <= maxID; minID++ {
entry, err = s.wal.Read(minID)
if err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.Read()")
return false
}
doc := make(map[string]interface{})
err = json.Unmarshal(entry, &doc)
if err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.entry.Unmarshal()")
return false
}
docs.AddDocument(doc)
if docs.MaxShardLen() >= config.Global.BatchSize {
if err = s.writeRedoLog(RedoActionRead, startID, minID); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "read").Msg("consume wal.redolog.Write()")
return false
}
if err = docs.WriteTo(s, batch, false); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.docs.WriteTo()")
return false
}
if err = s.writeRedoLog(RedoActionWrite, startID, minID); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "write").Msg("consume wal.redolog.Write()")
return false
}
// Reset startID to nextID
startID = minID + 1
}
}
minID-- // need reduce one, because the next loop add one
// check if there is any docs to write
if docs.MaxShardLen() > 0 {
if err = s.writeRedoLog(RedoActionRead, startID, minID); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "read").Msg("consume wal.redolog.Write()")
return false
}
if err := docs.WriteTo(s, batch, false); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.docs.WriteTo()")
return false
}
if err = s.writeRedoLog(RedoActionWrite, startID, minID); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "write").Msg("consume wal.redolog.Write()")
return false
}
}
log.Debug().Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal end")
// Truncate log
if err = s.wal.TruncateFront(minID); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("id", minID).Msg("consume wal.Truncate()")
return true
}
// check shards
if err = s.CheckShards(); err != nil {
log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume index.CheckShards()")
return true
}
// update metadata
s.root.UpdateMetadataByShard(s.GetID())
return true
}
这里主要是一个 正确性的保证,通过 redolog,记录了哪些 log entry没有被消费,真正的写入方法是:docs.WriteTo
// WriteTo write documents to index and sync to disk
// need split by shards
// need merge actions by docID
func (w *walMergeDocs) WriteTo(shard *IndexShard, batch *blugeindex.Batch, rollback bool) error {
var err error
for shardID := range *w {
if !rollback {
err = w.WriteToShard(shard, shardID, batch)
} else {
err = w.WriteToShardRollback(shard, shardID, batch)
}
if err != nil {
return err
}
batch.Reset()
}
w.Reset()
return nil
}
func (w *walMergeDocs) WriteToShard(shard *IndexShard, shardID int64, batch *blugeindex.Batch) error {
docs, ok := (*w)[shardID]
if !ok {
return nil
}
var writer *bluge.Writer
otherWriters := make([]*bluge.Writer, 0)
otherBatch := blugeindex.NewBatch()
if shardID == ShardIDNeedLatest {
shardID = shard.GetLatestShardID()
}
if shardID >= 0 {
w, err := shard.GetWriter(shardID)
if err != nil {
return err
}
writer = w
} else {
ws, err := shard.GetWriters() // get all shard
if err != nil {
return err
}
writer = ws[len(ws)-1]
otherWriters = append(otherWriters, ws...)
otherWriters = otherWriters[:len(ws)-1]
}
var firstAction, lastAction string
for _, doc := range docs {
// str, err := json.Marshal(doc.data)
// fmt.Printf("%s, %v, %v\n", str, err, doc.actions)
bdoc, err := shard.BuildBlugeDocumentFromJSON(doc.docID, doc.data)
if err != nil {
return err
}
firstAction = doc.actions[0]
switch firstAction {
case meta.ActionTypeInsert:
if len(doc.actions) == 1 {
batch.Insert(bdoc)
} else {
lastAction = doc.actions[len(doc.actions)-1]
switch lastAction {
case meta.ActionTypeInsert:
batch.Insert(bdoc)
case meta.ActionTypeUpdate:
batch.Insert(bdoc)
case meta.ActionTypeDelete:
// noop
}
}
case meta.ActionTypeUpdate:
if len(doc.actions) == 1 {
batch.Update(bdoc.ID(), bdoc)
otherBatch.Delete(bdoc.ID())
} else {
lastAction = doc.actions[len(doc.actions)-1]
switch lastAction {
case meta.ActionTypeInsert:
batch.Update(bdoc.ID(), bdoc)
otherBatch.Delete(bdoc.ID())
case meta.ActionTypeUpdate:
batch.Update(bdoc.ID(), bdoc)
otherBatch.Delete(bdoc.ID())
case meta.ActionTypeDelete:
batch.Delete(bdoc.ID())
otherBatch.Delete(bdoc.ID())
}
}
case meta.ActionTypeDelete:
if len(doc.actions) == 1 {
batch.Delete(bdoc.ID())
otherBatch.Delete(bdoc.ID())
} else {
lastAction = doc.actions[len(doc.actions)-1]
switch lastAction {
case meta.ActionTypeInsert:
batch.Update(bdoc.ID(), bdoc)
otherBatch.Delete(bdoc.ID())
case meta.ActionTypeUpdate:
batch.Update(bdoc.ID(), bdoc)
otherBatch.Delete(bdoc.ID())
case meta.ActionTypeDelete:
batch.Delete(bdoc.ID())
otherBatch.Delete(bdoc.ID())
}
}
default:
return fmt.Errorf("walMergeDocs: invalid action type [%s]", firstAction)
}
}
if err := writer.Batch(batch); err != nil {
return err
}
for _, writer := range otherWriters {
if err := writer.Batch(otherBatch); err != nil {
return err
}
}
return nil
}
这里是真正调用 bluge 的地方,通过其提供的 API,将 document 写入到全文检索。WriteToShard 则对 second shard 进行了解析,如果能够确定其存在的 second shard id,那么直接通过 对应的 shard id 拿到 bluge writer 即可操作了。否则对于新增操作,则直接操作最后一个 shard,或者是 ShardIDNeedUpdate 这种 second shard id为-2的情况,则也是操作最后一个shard,但是需要将其他 shard 中次文档的数据都删除。
而那一大串的 switch case,则是对操作类型进行判断,如果操作是 先新增,后删除,那么可以直接跳过,否则其他情况,需要按照逻辑处理。
至此,一个document的写入和索引就完毕了,用户可以通过 search 接口,搜索到刚写入的document了。
- 0
- 0
-
分享