From e6fbdf523dc4151037f0ea8986a3ba48d8577a2a Mon Sep 17 00:00:00 2001 From: Peter Fern Date: Mon, 13 Sep 2021 16:32:21 +1000 Subject: [PATCH] fix(collector): Avoid race on upstream channel close, tidy sync points --- collector/zfs.go | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/collector/zfs.go b/collector/zfs.go index a2ae129..9510dc7 100644 --- a/collector/zfs.go +++ b/collector/zfs.go @@ -87,21 +87,15 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) { wg.Add(len(c.Collectors)) // Synchonize after timeout event, ensuring no writers are still active when we return control. timeout := make(chan struct{}) - done := make(chan struct{}) - timeoutMutex := sync.Mutex{} - - // Upon exceeding deadline, send cached data for any metrics that have not already been reported. - go func() { - <-ctx.Done() - if err := ctx.Err(); err != nil && err != context.Canceled { - timeoutMutex.Lock() - c.cache.merge(cache) - cacheIndex := cache.index() - c.sendCached(ch, cacheIndex) - close(timeout) // assert timeout for flow control in other goroutines - timeoutMutex.Unlock() + finalized := make(chan struct{}) + finalize := func() { + select { + case <-finalized: + default: + close(finalized) } - }() + + } // Close the proxy channel upon collector completion. go func() { @@ -112,20 +106,17 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) { // Cache metrics as they come in via the proxy channel, and ship them out if we've not exceeded the deadline. go func() { for metric := range proxy { - timeoutMutex.Lock() cache.add(metric) select { case <-timeout: - timeoutMutex.Unlock() - continue + finalize() default: ch <- metric.prometheus - timeoutMutex.Unlock() } } // Signal completion and update full cache. c.cache.replace(cache) - close(done) + cancel() // Notify next collection that we're ready to collect again c.ready <- struct{}{} }() @@ -145,6 +136,7 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) { collector, err := state.factory(c.logger, c.client, strings.Split(*state.Properties, `,`)) if err != nil { _ = level.Error(c.logger).Log("Error instantiating collector", "collector", name, "err", err) + wg.Done() continue } go func(name string, collector Collector) { @@ -153,11 +145,20 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) { }(name, collector) } - // Wait for either timeout or completion. - select { - case <-timeout: - case <-done: + // Wait for completion or timeout + <-ctx.Done() + err = ctx.Err() + if err == context.Canceled { + finalize() + } else if err != nil { + // Upon exceeding deadline, send cached data for any metrics that have not already been reported. + close(timeout) // assert timeout for flow control in other goroutines + c.cache.merge(cache) + cacheIndex := cache.index() + c.sendCached(ch, cacheIndex) } + // Ensure there are no in-flight writes to the upstream channel + <-finalized } // sendCached values that do not appear in the current cacheIndex.