Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execdetails: fix data race in the BasicRuntimeStats (#42338) #49698

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 42 additions & 35 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,20 +511,24 @@ func (p *Percentile[valueType]) Sum() float64 {
// String implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) String() string {
if e.storeType == "tiflash" {
return fmt.Sprintf("time:%v, loops:%d, threads:%d, ", FormatDuration(time.Duration(e.consume)), e.loop, e.threads) + e.BasicRuntimeStats.tiflashScanContext.String()
return fmt.Sprintf("time:%v, loops:%d, threads:%d, ", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads) + e.BasicRuntimeStats.tiflashScanContext.String()
}
return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume)), e.loop)
return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load())
}

// Clone implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) Clone() RuntimeStats {
return &basicCopRuntimeStats{
BasicRuntimeStats: BasicRuntimeStats{loop: e.loop, consume: e.consume, rows: e.rows, tiflashScanContext: e.tiflashScanContext.Clone()},
stats := &basicCopRuntimeStats{
BasicRuntimeStats: BasicRuntimeStats{tiflashScanContext: e.tiflashScanContext.Clone()},
threads: e.threads,
storeType: e.storeType,
totalTasks: e.totalTasks,
procTimes: e.procTimes,
}
stats.loop.Store(e.loop.Load())
stats.consume.Store(e.consume.Load())
stats.rows.Store(e.rows.Load())
return stats
}

// Merge implements the RuntimeStats interface.
Expand All @@ -533,13 +537,13 @@ func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) {
if !ok {
return
}
e.loop += tmp.loop
e.consume += tmp.consume
e.rows += tmp.rows
e.loop.Add(tmp.loop.Load())
e.consume.Add(tmp.consume.Load())
e.rows.Add(tmp.rows.Load())
e.threads += tmp.threads
e.totalTasks += tmp.totalTasks
if tmp.procTimes.Size() == 0 {
e.procTimes.Add(Duration(tmp.consume))
e.procTimes.Add(Duration(tmp.consume.Load()))
} else {
e.procTimes.MergePercentile(&tmp.procTimes)
}
Expand Down Expand Up @@ -575,11 +579,9 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu
storeType: crs.storeType,
}
}
crs.stats[address].Merge(&basicCopRuntimeStats{
data := &basicCopRuntimeStats{
storeType: crs.storeType,
BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations),
consume: int64(*summary.TimeProcessedNs),
rows: int64(*summary.NumProducedRows),
BasicRuntimeStats: BasicRuntimeStats{
tiflashScanContext: TiFlashScanContext{
totalDmfileScannedPacks: summary.GetTiflashScanContext().GetTotalDmfileScannedPacks(),
totalDmfileSkippedPacks: summary.GetTiflashScanContext().GetTotalDmfileSkippedPacks(),
Expand All @@ -589,13 +591,17 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu
totalDmfileReadTimeMs: summary.GetTiflashScanContext().GetTotalDmfileReadTimeMs(),
totalCreateSnapshotTimeMs: summary.GetTiflashScanContext().GetTotalCreateSnapshotTimeMs()}}, threads: int32(summary.GetConcurrency()),
totalTasks: 1,
})
}
data.BasicRuntimeStats.loop.Store(int32(*summary.NumIterations))
data.BasicRuntimeStats.consume.Store(int64(*summary.TimeProcessedNs))
data.BasicRuntimeStats.rows.Store(int64(*summary.NumProducedRows))
crs.stats[address].Merge(data)
}

// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
for _, instanceStats := range crs.stats {
totalRows += instanceStats.rows
totalRows += instanceStats.rows.Load()
}
return totalRows
}
Expand All @@ -605,8 +611,8 @@ func (crs *CopRuntimeStats) MergeBasicStats() (procTimes Percentile[Duration], t
totalTiFlashScanContext = TiFlashScanContext{}
for _, instanceStats := range crs.stats {
procTimes.MergePercentile(&instanceStats.procTimes)
totalTime += time.Duration(instanceStats.consume)
totalLoops += instanceStats.loop
totalTime += time.Duration(instanceStats.consume.Load())
totalLoops += instanceStats.loop.Load()
totalThreads += instanceStats.threads
totalTiFlashScanContext.Merge(instanceStats.tiflashScanContext)
totalTasks += instanceStats.totalTasks
Expand Down Expand Up @@ -751,28 +757,29 @@ func (context *TiFlashScanContext) Empty() bool {
// BasicRuntimeStats is the basic runtime stats.
type BasicRuntimeStats struct {
// executor's Next() called times.
loop int32
loop atomic.Int32
// executor consume time.
consume int64
consume atomic.Int64
// executor return row count.
rows int64
rows atomic.Int64
// executor extra infos
tiflashScanContext TiFlashScanContext
}

// GetActRows return total rows of BasicRuntimeStats.
func (e *BasicRuntimeStats) GetActRows() int64 {
return e.rows
return e.rows.Load()
}

// Clone implements the RuntimeStats interface.
func (e *BasicRuntimeStats) Clone() RuntimeStats {
return &BasicRuntimeStats{
loop: e.loop,
consume: e.consume,
rows: e.rows,
result := &BasicRuntimeStats{
tiflashScanContext: e.tiflashScanContext.Clone(),
}
result.loop.Store(e.loop.Load())
result.consume.Store(e.consume.Load())
result.rows.Store(e.rows.Load())
return result
}

// Merge implements the RuntimeStats interface.
Expand All @@ -781,9 +788,9 @@ func (e *BasicRuntimeStats) Merge(rs RuntimeStats) {
if !ok {
return
}
e.loop += tmp.loop
e.consume += tmp.consume
e.rows += tmp.rows
e.loop.Add(tmp.loop.Load())
e.consume.Add(tmp.consume.Load())
e.rows.Add(tmp.rows.Load())
e.tiflashScanContext.Merge(tmp.tiflashScanContext)
}

Expand All @@ -808,7 +815,7 @@ func (e *RootRuntimeStats) GetActRows() int64 {
if e.basic == nil {
return 0
}
return e.basic.rows
return e.basic.rows.Load()
}

// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly.
Expand All @@ -834,14 +841,14 @@ func (e *RootRuntimeStats) String() string {

// Record records executor's execution.
func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) {
atomic.AddInt32(&e.loop, 1)
atomic.AddInt64(&e.consume, int64(d))
atomic.AddInt64(&e.rows, int64(rowNum))
e.loop.Add(1)
e.consume.Add(int64(d))
e.rows.Add(int64(rowNum))
}

// SetRowNum sets the row num.
func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {
atomic.StoreInt64(&e.rows, rowNum)
e.rows.Store(rowNum)
}

// String implements the RuntimeStats interface.
Expand All @@ -851,15 +858,15 @@ func (e *BasicRuntimeStats) String() string {
}
var str strings.Builder
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(e.consume)))
str.WriteString(FormatDuration(time.Duration(e.consume.Load())))
str.WriteString(", loops:")
str.WriteString(strconv.FormatInt(int64(e.loop), 10))
str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10))
return str.String()
}

// GetTime get the int64 total time
func (e *BasicRuntimeStats) GetTime() int64 {
return e.consume
return e.consume.Load()
}

// RuntimeStatsColl collects executors's execution info.
Expand Down