Skip to content

Commit

Permalink
Merge pull request #72 from KANIOYH/hotreloading
Browse files Browse the repository at this point in the history
feature:support hot reloading through a multi-version InvertedDB.
  • Loading branch information
CocaineCong authored Dec 22, 2024
2 parents 0bb2ddf + 5d8d96b commit b2cbfa0
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 21 deletions.
189 changes: 171 additions & 18 deletions app/search_engine/repository/storage/inverted_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ package storage
import (
"context"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/pkg/errors"

Expand All @@ -35,38 +40,186 @@ type KvInfo struct {
Value []byte
}

var GlobalInvertedDB []*InvertedDB
var GlobalInvertedDB *InvertedDBManager

type InvertedDB struct {
file *os.File
db *bolt.DB
offset int64
}

// 使用manager来管理多个倒排db
// currentVersion为当前最新的版本
// versionSet 记录了还在使用版本的信息
type InvertedDBManager struct {
currentVersion *Version
versionSet map[int64]*Version
rwMutex sync.RWMutex
}

// 当前的版本信息
type Version struct {
versionId int64
dbTable map[string]*InvertedDB
oldFiles []string
dbs []*InvertedDB
ref atomic.Int64
}

// InitInvertedDB 初始化倒排索引库
func InitInvertedDB(ctx context.Context) []*InvertedDB {
dbs := make([]*InvertedDB, 0)
filePath, _ := redis.ListInvertedPath(ctx, redis.InvertedIndexDbPathKeys)
func InitInvertedDB(ctx context.Context) {
// 新建一个dummy version
version := Version{
versionId: 0,
dbTable: make(map[string]*InvertedDB),
dbs: make([]*InvertedDB, 0),
}
manager := InvertedDBManager{
currentVersion: &version,
versionSet: make(map[int64]*Version),
}
manager.versionSet[0] = &version
manager.UpdateFromRedis(ctx)
// cleanTime主要用来测试用
cleanTime, ok := ctx.Value("cleanTime").(int)
if !ok {
// 未设置就按照30分钟来异步清理一次
go manager.backgroundCleaner(30 * 60)
} else {
go manager.backgroundCleaner(cleanTime)
}
GlobalInvertedDB = &manager
}

func (m *InvertedDBManager) UpdateFromRedis(ctx context.Context) {
newTable := make(map[string]*InvertedDB)
// mock redis用于测试,测试ok可以丢掉这部分
mockRedisChan, ok := ctx.Value("mockRedisChan").(chan []string)
var filePath []string
if !ok {
filePath, _ = redis.ListInvertedPath(ctx, redis.InvertedIndexDbPathKeys)
} else {
filePath = <-mockRedisChan
}
// 使用newTable异地构建,可仅上读锁不影响在线服务
m.rwMutex.RLock()
// 新建version
version := Version{
versionId: m.currentVersion.versionId + 1,
dbs: make([]*InvertedDB, 0),
}
version.ref.Store(0)

// 找到没有被映射的file,构建倒排db
// 复用还在使用的db
currentDBTable := m.currentVersion.dbTable
for _, file := range filePath {
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
log.LogrusObj.Error(err)
_, exist := currentDBTable[file]
if !exist {
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
log.LogrusObj.Error(err)
}
stat, err := f.Stat()
if err != nil {
log.LogrusObj.Error(err)
}
db, err := bolt.Open(file, 0600, nil)
if err != nil {
log.LogrusObj.Error(err)
}
currentDBTable[file] = &InvertedDB{f, db, stat.Size()}
}
// 添加倒排db指针
idb := currentDBTable[file]
newTable[file] = idb
version.dbs = append(version.dbs, idb)
}

// 找到新版本不再使用的file,记录下来
oldFiles := make([]string, 0)
for file := range currentDBTable {
// newTable中没有旧的file
if _, exist := newTable[file]; !exist {
oldFiles = append(oldFiles, file)
}
stat, err := f.Stat()
if err != nil {
log.LogrusObj.Error(err)
}
// 设置当前版本的dbTable
version.dbTable = newTable
m.rwMutex.RUnlock()

// 上写锁更新版本,这里会影响到在线服务,尽量减少写锁内的操作
m.rwMutex.Lock()
// 新版本不再使用的file放到当前版本
m.versionSet[m.currentVersion.versionId].oldFiles = oldFiles
m.versionSet[version.versionId] = &version
m.currentVersion = &version
m.rwMutex.Unlock()
}

// 后台异步清理掉不再使用的倒排db,构建操作不应该是频繁操作
func (m *InvertedDBManager) backgroundCleaner(cleanTime int) {
// 接受信号优雅退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
for {
// 用计算器来每cleanTime秒检查一次是否有旧版本需要清理
timer := time.NewTimer(time.Duration(cleanTime) * time.Second)
defer timer.Stop()
select {
case <-sig:
return
case <-timer.C:
m.cleanOldVersion()
}
db, err := bolt.Open(file, 0600, nil)
if err != nil {
log.LogrusObj.Error(err)
}
}

func (m *InvertedDBManager)cleanOldVersion() {
oldIds := make([]int64, 0)
m.rwMutex.RLock()
if len(m.versionSet) > 1 {
//存在旧版本,关闭不再需要的db
for id, version := range m.versionSet {
// 为什么可以直接清理掉ref为0的版本(最新版本除外)?
// 因为旧版本不可能再被用到,后续请求永远使用最新版本
if id < m.currentVersion.versionId && version.ref.Load() == 0 {
for _, file := range version.oldFiles {
if idb, exist := version.dbTable[file]; exist {
idb.Close()
}
}
//记录一下,后续在版本链中清除
oldIds = append(oldIds, id)
}
}
dbs = append(dbs, &InvertedDB{f, db, stat.Size()})
}
if len(filePath) == 0 {
return nil
m.rwMutex.RUnlock()
//版本链中去除旧版本需要写锁
m.rwMutex.Lock()
for _, oldId := range oldIds {
delete(m.versionSet, oldId)
}
GlobalInvertedDB = dbs
return nil
m.rwMutex.Unlock()
}

// 仅能通过ref来获取倒排db
func (m *InvertedDBManager) Ref() ([]*InvertedDB, int64) {
m.rwMutex.RLock()
defer m.rwMutex.RUnlock()
//获取当前版本files并添加引用
versionId := m.currentVersion.versionId
dbs := m.currentVersion.dbs
m.currentVersion.ref.Add(1)
return dbs, versionId
}

// 用完需要回收
func (m *InvertedDBManager) Unref(versionId int64) {
m.rwMutex.RLock()
defer m.rwMutex.RUnlock()
version := m.versionSet[versionId]
version.ref.Add(-1)
}

// NewInvertedDB 新建一个inverted
Expand Down
63 changes: 61 additions & 2 deletions app/search_engine/repository/storage/inverted_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package storage
import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/CocaineCong/tangseng/config"
)
Expand All @@ -37,10 +39,67 @@ func TestGetInvertedInfo(t *testing.T) {
fmt.Println(p)
}

// 为redisMockChan产生文件名
func getMsg(testDir string, start, end int) []string {
msg := make([]string, 0)
for file_id := start; file_id < end; file_id++ {
msg = append(msg, testDir+fmt.Sprintf("%d", file_id))
}
return msg
}

func TestInitInvertedDB(t *testing.T) {
testDir := "/tmp/ts/TestInvertedDBManager/"
err := os.MkdirAll(testDir, 0755)
if err != nil {
t.Fatalf("Failed to create test directory: %v", err)
}
defer os.RemoveAll(testDir) // 确保在测试结束时删除目录
mockRedisChan := make(chan []string, 10)
ctx := context.Background()
ctx = context.WithValue(ctx, "cleanTime", 2)
ctx = context.WithValue(ctx, "mockRedisChan", mockRedisChan)

// 向channel发送数据
mockRedisChan <- getMsg(testDir, 3, 10)

InitInvertedDB(ctx)
for _, v := range GlobalInvertedDB {
fmt.Println(v)

//睡眠3秒,确保后台clean线程删除version-0
time.Sleep(3 * time.Second)
if len(GlobalInvertedDB.versionSet) != 1 {
t.Errorf("Expected %v, but got %v", 1, len(GlobalInvertedDB.versionSet))
}
if GlobalInvertedDB.currentVersion.versionId != 1 {
t.Errorf("Expected %v, but got %v", 1, GlobalInvertedDB.currentVersion.versionId)
}
// 使用当前版本,然后新建一个版本
_, oldversionId := GlobalInvertedDB.Ref()
if GlobalInvertedDB.currentVersion.ref.Load() != 1 {
t.Errorf("Expected %v, but got %v", 1, GlobalInvertedDB.currentVersion.ref.Load())
}
mockRedisChan <- getMsg(testDir, 6, 12)
GlobalInvertedDB.UpdateFromRedis(ctx)

/*
此时有两个version
current ↓
| version-1,ref:1 | version-2,ref:0 |
*/
if GlobalInvertedDB.currentVersion.versionId != 2 {
t.Errorf("Expected %v, but got %v", 1, GlobalInvertedDB.currentVersion.versionId)
}

//去引用,这个版本会被异步释放掉
GlobalInvertedDB.Unref(oldversionId)

//睡眠3秒,确保后台clean线程删除version-1
time.Sleep(3 * time.Second)
// 当前只有最新版本
if GlobalInvertedDB.currentVersion.versionId != 2 {
t.Errorf("Expected %v, but got %v", 1, GlobalInvertedDB.currentVersion.versionId)
}
if len(GlobalInvertedDB.versionSet) != 1 {
t.Errorf("Expected %v, but got %v", 1, len(GlobalInvertedDB.versionSet))
}
}
4 changes: 3 additions & 1 deletion app/search_engine/service/recall/recall.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (r *Recall) searchDoc(ctx context.Context, tokens []string) (recalls []int6
func fetchPostingsByToken(token string) (postingsList []*types.PostingsList, err error) {
// 遍历存储index的地方,token对应的doc Id 全部取出
postingsList = make([]*types.PostingsList, 0, 1e6)
for _, inverted := range storage.GlobalInvertedDB {
dbs, versionId := storage.GlobalInvertedDB.Ref()
defer storage.GlobalInvertedDB.Unref(versionId)
for _, inverted := range dbs {
docIds, errx := inverted.GetInverted([]byte(token))
if errx != nil {
err = errors.WithMessage(err, "getInverted error")
Expand Down

0 comments on commit b2cbfa0

Please sign in to comment.