Skip to content

Commit

Permalink
add some log
Browse files Browse the repository at this point in the history
  • Loading branch information
YangSen-qn committed Nov 18, 2024
1 parent 24367e5 commit 8b46ad7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
10 changes: 10 additions & 0 deletions iqshell/common/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,32 @@ func (f *Flow) Start() {
workList := make([]*WorkInfo, 0, f.doWorkInfoListCount)
for {
hasMore, workInfo, err := f.WorkProvider.Provide()
log.DebugF("work producer get work, hasMore:%v, workInfo: %+v, err: %+v", hasMore, workInfo.Data, err)
if err != nil {
if err.Code == data.ErrorCodeParamMissing ||
err.Code == data.ErrorCodeLineHeader {
log.DebugF("work producer get work, skip:%s because:%s", workInfo.Data, err)
f.notifyWorkSkip(workInfo, nil, err)
} else {
log.DebugF("work producer get work fail, error:%s info:%s", err, workInfo.Data)
f.notifyWorkFail(workInfo, err)
}
continue
}

if workInfo == nil || workInfo.Work == nil {
if !hasMore {
log.Info("work producer get work completed")
break
} else {
log.Info("work producer get work fail: work in empty")
continue
}
}

// 检测 work 是否需要过
if skip, cause := f.shouldWorkSkip(workInfo); skip {
log.DebugF("work producer get work, skip:%s cause:%s", workInfo.Data, cause)
f.notifyWorkSkip(workInfo, nil, cause)
continue
}
Expand Down Expand Up @@ -192,11 +198,15 @@ func (f *Flow) Start() {

workCount := len(workList)

log.InfoF("work consumer get works, count:%s", workCount)

_ = f.limitAcquire(workCount)
// workRecordList 有数据则长度和 workList 长度相同
workRecordList, workErr := worker.DoWork(workList)
f.limitRelease(workCount)

log.InfoF("work consumer handle works, count:%s error:%+v", workCount, workErr)

if len(workRecordList) == 0 && workErr != nil {
log.ErrorF("Do Worker Error:%+v", workErr)
for _, workInfo := range workList {
Expand Down
15 changes: 12 additions & 3 deletions iqshell/storage/object/download/operations/work_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (w *workProvider) getWorkInfoFromFile() {
var keys []string
for {
if len(keys) == 300 {
w.getWorkInfoOfKeys(keys)
if !w.getWorkInfoOfKeys(keys) {
break
}
keys = nil
}

Expand Down Expand Up @@ -140,9 +142,9 @@ func (w *workProvider) getWorkInfoFromFile() {
}()
}

func (w *workProvider) getWorkInfoOfKeys(keys []string) {
func (w *workProvider) getWorkInfoOfKeys(keys []string) bool {
if len(keys) == 0 {
return
return true
}

operations := make([]batch.Operation, 0, len(keys))
Expand All @@ -155,6 +157,11 @@ func (w *workProvider) getWorkInfoOfKeys(keys []string) {
}
}

if len(operations) == 0 {
log.Error("get file info error: key invalid")
return false
}

results, err := batch.Some(operations)
if len(results) == len(operations) {
for i, result := range results {
Expand Down Expand Up @@ -203,6 +210,8 @@ func (w *workProvider) getWorkInfoOfKeys(keys []string) {
}
time.Sleep(10 * time.Second)
}

return true
}

func (w *workProvider) getWorkInfoFromBucket() {
Expand Down

0 comments on commit 8b46ad7

Please sign in to comment.