北川广海の梦

北川广海の梦

zincsearch 写入document 过程

47
2023-08-15

zincsearch

zincsearch 是一个以 bluge 作为全文检索库的搜索引擎,基于 Go 编写,其最大的特点就是轻量级,对资源友好。相比 ElasticSearch 对机器的配置要求,zincsearch 更适合作为一个应用程序的检索,例如你可以很轻松的为自己的 博客、个人网站,基于 zincsearch 构建一个检索系统。并且它提供了与 ElasticSearch 完全兼容的 API,使用者可以很轻松的切换到 zincsearch。目前网站上关于其资料并不多,本文将分析一个 document 的写入,到其被持久化,最终被可搜索的过程。

源码地址:https://github.com/zincsearch/zincsearch

写入一个document

写入的 API

zincsearch 使用了 gin 作为 web api 框架,我们可以很轻松的找到创建 document 这个 API 的位置。在 pkg/routes/routes.go 文件,我们可以看到所有的 API。我们找到 document.CreateUpdate 方法:

func CreateUpdate(c *gin.Context) {
	indexName := c.Param("target")
	docID := c.Param("id") // ID for the document to be updated provided in URL path

	var err error
	var doc map[string]interface{}
	if err = zutils.GinBindJSON(c, &doc); err != nil {
		zutils.GinRenderJSON(c, http.StatusBadRequest, meta.HTTPResponseError{Error: err.Error()})
		return
	}

	update := false
	// If id field is present then use it, else create a new UUID and use it
	if id, ok := doc["_id"]; ok {
		docID = id.(string)
	}
	if docID == "" {
		docID = ider.Generate()
	} else {
		update = true
	}

	// If the index does not exist, then create it
	index, _, err := core.GetOrCreateIndex(indexName, "", 0)
	if err != nil {
		zutils.GinRenderJSON(c, http.StatusInternalServerError, meta.HTTPResponseError{Error: err.Error()})
		return
	}

	err = index.CreateDocument(docID, doc, update)
	if err != nil {
		zutils.GinRenderJSON(c, http.StatusInternalServerError, meta.HTTPResponseError{Error: err.Error()})
		return
	}
	zutils.GinRenderJSON(c, http.StatusOK, meta.HTTPResponseESID{
		Message:     "ok",
		ID:          docID,
		ESID:        docID,
		Index:       indexName,
		Version:     1,
		SeqNo:       0,
		PrimaryTerm: 0,
		Result:      "created",
	})
}

这个 就是 API 部分,它做的事情很简单,从请求中读取参数,然后判断 index 是否存在,如果不存在则创建。然后调用 index.CreateDocument 将文档保存。

// CreateDocument inserts or updates a document in the zinc index
func (index *Index) CreateDocument(docID string, doc map[string]interface{}, update bool) error {
	// metrics
	IncrMetricStatsByIndex(index.GetName(), "wal_request")

	// check WAL
	shard := index.GetShardByDocID(docID)
	if err := shard.OpenWAL(); err != nil {
		return err
	}

	secondShardID := ShardIDNeedLatest
	if update {
		secondShardID = ShardIDNeedUpdate
	}
	data, err := shard.CheckDocument(docID, doc, update, secondShardID)
	if err != nil {
		return err
	}

	return shard.wal.Write(data)
}

这里做先用 docID,通过哈希的方式,计算出 shardId,然后又根据是否 update,得到了第二个 shardId,这么做的原因,在 pkg/core/index_shards.go中的注释有描述:一个 index 的索引被分为了两层:第一层的数量就是 在创建 index的时候的 shard num,这个值是固定的。而第二层则是做出的优化,为了避免单个 shard 过大,所以在前面的基础上再次做了 shard,这可以提高性能。并且每个 second shard 的大小受环境变量 ZINC_SHARD_MAX_SIZE 控制。默认大小为 1GB。

之后是 shard.CheckDocument 方法,这个方法做的事情就是对document 的字段进行检查,并将前面的 second shard相关信息暂时写入 doc中,之后将doc序列化返回。之后,first shard将这个 doc的数据写入到 wal,此时这个document就被持久化保存了,用户的写入请求也会成功返回,但是这并不意味着这个 doc 此时能被搜索到。

从 WAL 到可搜索

上文提到,doc 被写入 WAL之后,写入请求就结束了,但是它并不是可搜索的。因为直到此时,我们的全文检索库都还没有登场。现在我们定位到 pkg/core/index_shards_wallist.go 文件:

func init() {
   ZINC_INDEX_SHARD_WAL_LIST.Shards = make(map[string]*IndexShard)
   go ZINC_INDEX_SHARD_WAL_LIST.ConsumeWAL()
}

在 init 函数,它启动了一个 ConsumeWAL goroutine,很显然,这就是文档真正被索引的过程了。

func (t *IndexShardWALList) ConsumeWAL() {

   indexes := make(map[string]*Index)
   eg := &errgroup.Group{}
   eg.SetLimit(config.Global.Shard.GoroutineNum)
   tick := time.NewTicker(config.Global.WalSyncInterval)
   for range tick.C {
      shardClosed := make(chan string, t.Len())
      indexUpdated := make(chan string, t.Len())
      for _, shard := range t.List() {
         shard := shard
         indexes[shard.GetIndexName()] = shard.root
         eg.Go(func() error {
            select {
            case <-shard.close:
               shardClosed <- shard.GetShardName()
               return nil
            default:
               // continue
            }
            updated := shard.ConsumeWAL()
            if updated {
               indexUpdated <- shard.GetIndexName()
            }
            return nil
         })
      }
      _ = eg.Wait()
      close(shardClosed)
      close(indexUpdated)

      // check shard closed
      for name := range shardClosed {
         t.Remove(name)
      }

      // update index stats
      for name := range indexUpdated {
         index, ok := indexes[name]
         if !ok {
            continue
         }

         _ = index.UpdateMetadata()
         size := index.GetWALSize()
         if size == uint64(index.GetShardNum()) {
            size = 0
         }
         index.UpdateWALSize(size)

         stats := index.GetStats()
         SetMetricStatsByIndex(name, "doc_num", float64(atomic.LoadUint64(&stats.DocNum)))
         SetMetricStatsByIndex(name, "storage_size", float64(atomic.LoadUint64(&stats.StorageSize)/1024/1024)) // convert to MB

         delete(indexes, name)
      }

      // force gc
      runtime.GC()
   }
}

可以看到有一个ticker,根据配置的固定时间,会执行一次 shard.ConsumeWAL,这是不是很像 ES?ES有一个延迟1s的问题,就是写入数据之后,需要等1s左右才可以搜索到,其实和这里是一样的,ES同样是先将数据写入了内存缓冲区,并没有flush,所以数据是不可搜的。

接下来,我们再进入shard.ConsumeWAL 方法:

// ConsumeWAL consume WAL for index returns if there is any data updated
func (s *IndexShard) ConsumeWAL() bool {
   if err := s.wal.Sync(); err != nil {
      log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.Sync()")
   }

   var err error
   var entry []byte
   var minID, maxID, startID uint64
   maxID, err = s.wal.LastIndex()
   if err != nil {
      log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.LastIndex()")
      return false
   }
   // read last committed ID
   _, minID, err = s.readRedoLog(RedoActionWrite)
   if err != nil && err.Error() != errors.ErrNotFound.Error() {
      log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.readRedoLog()")
      return false
   }
   if minID == maxID {
      return false // no new entries
   }
   log.Debug().Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal begin")

   // limit max batch size
   if maxID-minID > MaxBatchSize {
      maxID = minID + MaxBatchSize
   }

   batch := blugeindex.NewBatch()
   docs := make(walMergeDocs)
   minID++
   for startID = minID; minID <= maxID; minID++ {
      entry, err = s.wal.Read(minID)
      if err != nil {
         log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.Read()")
         return false
      }

      doc := make(map[string]interface{})
      err = json.Unmarshal(entry, &doc)
      if err != nil {
         log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.entry.Unmarshal()")
         return false
      }
      docs.AddDocument(doc)
      if docs.MaxShardLen() >= config.Global.BatchSize {
         if err = s.writeRedoLog(RedoActionRead, startID, minID); err != nil {
            log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "read").Msg("consume wal.redolog.Write()")
            return false
         }
         if err = docs.WriteTo(s, batch, false); err != nil {
            log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.docs.WriteTo()")
            return false
         }
         if err = s.writeRedoLog(RedoActionWrite, startID, minID); err != nil {
            log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "write").Msg("consume wal.redolog.Write()")
            return false
         }
         // Reset startID to nextID
         startID = minID + 1
      }
   }

   minID-- // need reduce one, because the next loop add one

   // check if there is any docs to write
   if docs.MaxShardLen() > 0 {
      if err = s.writeRedoLog(RedoActionRead, startID, minID); err != nil {
         log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "read").Msg("consume wal.redolog.Write()")
         return false
      }
      if err := docs.WriteTo(s, batch, false); err != nil {
         log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume wal.docs.WriteTo()")
         return false
      }
      if err = s.writeRedoLog(RedoActionWrite, startID, minID); err != nil {
         log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Str("stage", "write").Msg("consume wal.redolog.Write()")
         return false
      }
   }
   log.Debug().Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal end")

   // Truncate log
   if err = s.wal.TruncateFront(minID); err != nil {
      log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("id", minID).Msg("consume wal.Truncate()")
      return true
   }

   // check shards
   if err = s.CheckShards(); err != nil {
      log.Error().Err(err).Str("index", s.GetIndexName()).Str("shard", s.GetID()).Msg("consume index.CheckShards()")
      return true
   }

   //  update metadata
   s.root.UpdateMetadataByShard(s.GetID())
   return true
}

这里主要是一个 正确性的保证,通过 redolog,记录了哪些 log entry没有被消费,真正的写入方法是:docs.WriteTo

// WriteTo write documents to index and sync to disk
// need split by shards
// need merge actions by docID
func (w *walMergeDocs) WriteTo(shard *IndexShard, batch *blugeindex.Batch, rollback bool) error {
	var err error
	for shardID := range *w {
		if !rollback {
			err = w.WriteToShard(shard, shardID, batch)
		} else {
			err = w.WriteToShardRollback(shard, shardID, batch)
		}
		if err != nil {
			return err
		}
		batch.Reset()
	}
	w.Reset()
	return nil
}

func (w *walMergeDocs) WriteToShard(shard *IndexShard, shardID int64, batch *blugeindex.Batch) error {
	docs, ok := (*w)[shardID]
	if !ok {
		return nil
	}
	var writer *bluge.Writer
	otherWriters := make([]*bluge.Writer, 0)
	otherBatch := blugeindex.NewBatch()
	if shardID == ShardIDNeedLatest {
		shardID = shard.GetLatestShardID()
	}
	if shardID >= 0 {
		w, err := shard.GetWriter(shardID)
		if err != nil {
			return err
		}
		writer = w
	} else {
		ws, err := shard.GetWriters() // get all shard
		if err != nil {
			return err
		}
		writer = ws[len(ws)-1]
		otherWriters = append(otherWriters, ws...)
		otherWriters = otherWriters[:len(ws)-1]
	}
	var firstAction, lastAction string
	for _, doc := range docs {
		// str, err := json.Marshal(doc.data)
		// fmt.Printf("%s, %v, %v\n", str, err, doc.actions)
		bdoc, err := shard.BuildBlugeDocumentFromJSON(doc.docID, doc.data)
		if err != nil {
			return err
		}
		firstAction = doc.actions[0]
		switch firstAction {
		case meta.ActionTypeInsert:
			if len(doc.actions) == 1 {
				batch.Insert(bdoc)
			} else {
				lastAction = doc.actions[len(doc.actions)-1]
				switch lastAction {
				case meta.ActionTypeInsert:
					batch.Insert(bdoc)
				case meta.ActionTypeUpdate:
					batch.Insert(bdoc)
				case meta.ActionTypeDelete:
					// noop
				}
			}
		case meta.ActionTypeUpdate:
			if len(doc.actions) == 1 {
				batch.Update(bdoc.ID(), bdoc)
				otherBatch.Delete(bdoc.ID())
			} else {
				lastAction = doc.actions[len(doc.actions)-1]
				switch lastAction {
				case meta.ActionTypeInsert:
					batch.Update(bdoc.ID(), bdoc)
					otherBatch.Delete(bdoc.ID())
				case meta.ActionTypeUpdate:
					batch.Update(bdoc.ID(), bdoc)
					otherBatch.Delete(bdoc.ID())
				case meta.ActionTypeDelete:
					batch.Delete(bdoc.ID())
					otherBatch.Delete(bdoc.ID())
				}
			}
		case meta.ActionTypeDelete:
			if len(doc.actions) == 1 {
				batch.Delete(bdoc.ID())
				otherBatch.Delete(bdoc.ID())
			} else {
				lastAction = doc.actions[len(doc.actions)-1]
				switch lastAction {
				case meta.ActionTypeInsert:
					batch.Update(bdoc.ID(), bdoc)
					otherBatch.Delete(bdoc.ID())
				case meta.ActionTypeUpdate:
					batch.Update(bdoc.ID(), bdoc)
					otherBatch.Delete(bdoc.ID())
				case meta.ActionTypeDelete:
					batch.Delete(bdoc.ID())
					otherBatch.Delete(bdoc.ID())
				}
			}
		default:
			return fmt.Errorf("walMergeDocs: invalid action type [%s]", firstAction)
		}
	}

	if err := writer.Batch(batch); err != nil {
		return err
	}
	for _, writer := range otherWriters {
		if err := writer.Batch(otherBatch); err != nil {
			return err
		}
	}
	return nil
}

这里是真正调用 bluge 的地方,通过其提供的 API,将 document 写入到全文检索。WriteToShard 则对 second shard 进行了解析,如果能够确定其存在的 second shard id,那么直接通过 对应的 shard id 拿到 bluge writer 即可操作了。否则对于新增操作,则直接操作最后一个 shard,或者是 ShardIDNeedUpdate 这种 second shard id为-2的情况,则也是操作最后一个shard,但是需要将其他 shard 中次文档的数据都删除。

而那一大串的 switch case,则是对操作类型进行判断,如果操作是 先新增,后删除,那么可以直接跳过,否则其他情况,需要按照逻辑处理。

至此,一个document的写入和索引就完毕了,用户可以通过 search 接口,搜索到刚写入的document了。