-
-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathpool.go
445 lines (343 loc) · 9.51 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
package pond
import (
"context"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"github.com/alitto/pond/v2/internal/future"
"github.com/alitto/pond/v2/internal/linkedbuffer"
)
const (
DefaultQueueSize = 0
DefaultNonBlocking = false
LinkedBufferInitialSize = 1024
LinkedBufferMaxCapacity = 100 * 1024
)
var (
ErrQueueFull = errors.New("queue is full")
ErrQueueEmpty = errors.New("queue is empty")
ErrPoolStopped = errors.New("pool stopped")
poolStoppedFuture = func() Task {
future, resolve := future.NewFuture(context.Background())
resolve(ErrPoolStopped)
return future
}()
)
// basePool is the base interface for all pool types.
type basePool interface {
// Returns the number of worker goroutines that are currently active (executing a task) in the pool.
RunningWorkers() int64
// Returns the total number of tasks submitted to the pool since its creation.
SubmittedTasks() uint64
// Returns the number of tasks that are currently waiting in the pool's queue.
WaitingTasks() uint64
// Returns the number of tasks that have completed with an error.
FailedTasks() uint64
// Returns the number of tasks that have completed successfully.
SuccessfulTasks() uint64
// Returns the total number of tasks that have completed (either successfully or with an error).
CompletedTasks() uint64
// Returns the maximum concurrency of the pool.
MaxConcurrency() int
// Returns the size of the task queue.
QueueSize() int
// Returns true if the pool is non-blocking, meaning that it will not block when the task queue is full.
// In a non-blocking pool, tasks that cannot be submitted to the queue will be dropped.
// By default, pools are blocking, meaning that they will block when the task queue is full.
NonBlocking() bool
// Returns the context associated with this pool.
Context() context.Context
// Stops the pool and returns a future that can be used to wait for all tasks pending to complete.
Stop() Task
// Stops the pool and waits for all tasks to complete.
StopAndWait()
// Returns true if the pool has been stopped or its context has been cancelled.
Stopped() bool
}
// Represents a pool of goroutines that can execute tasks concurrently.
type Pool interface {
basePool
// Submits a task to the pool without waiting for it to complete.
Go(task func()) error
// Submits a task to the pool and returns a future that can be used to wait for the task to complete.
Submit(task func()) Task
// Submits a task to the pool and returns a future that can be used to wait for the task to complete.
SubmitErr(task func() error) Task
// Creates a new subpool with the specified maximum concurrency and options.
NewSubpool(maxConcurrency int, options ...Option) Pool
// Creates a new task group.
NewGroup() TaskGroup
// Creates a new task group with the specified context.
NewGroupContext(ctx context.Context) TaskGroup
}
type pool struct {
mutex sync.Mutex
parent *pool
ctx context.Context
cancel context.CancelCauseFunc
nonBlocking bool
maxConcurrency int
closed atomic.Bool
workerCount atomic.Int64
workerWaitGroup sync.WaitGroup
submitWaiters chan struct{}
queueSize int
tasks *linkedbuffer.LinkedBuffer[any]
submittedTaskCount atomic.Uint64
successfulTaskCount atomic.Uint64
failedTaskCount atomic.Uint64
}
func (p *pool) Context() context.Context {
return p.ctx
}
func (p *pool) Stopped() bool {
return p.closed.Load() || p.ctx.Err() != nil
}
func (p *pool) MaxConcurrency() int {
return p.maxConcurrency
}
func (p *pool) QueueSize() int {
return p.queueSize
}
func (p *pool) NonBlocking() bool {
return p.nonBlocking
}
func (p *pool) RunningWorkers() int64 {
return p.workerCount.Load()
}
func (p *pool) SubmittedTasks() uint64 {
return p.submittedTaskCount.Load()
}
func (p *pool) WaitingTasks() uint64 {
return p.tasks.Len()
}
func (p *pool) FailedTasks() uint64 {
return p.failedTaskCount.Load()
}
func (p *pool) SuccessfulTasks() uint64 {
return p.successfulTaskCount.Load()
}
func (p *pool) CompletedTasks() uint64 {
return p.successfulTaskCount.Load() + p.failedTaskCount.Load()
}
func (p *pool) worker(task any) {
defer p.workerWaitGroup.Done()
var readTaskErr, err error
for {
_, err = invokeTask[any](task)
p.updateMetrics(err)
task, readTaskErr = p.readTask()
if readTaskErr != nil {
return
}
}
}
func (p *pool) Go(task func()) error {
return p.submit(task)
}
func (p *pool) Submit(task func()) Task {
return p.wrapAndSubmit(task)
}
func (p *pool) SubmitErr(task func() error) Task {
return p.wrapAndSubmit(task)
}
func (p *pool) wrapAndSubmit(task any) Task {
if p.Stopped() {
return poolStoppedFuture
}
ctx := p.Context()
future, resolve := future.NewFuture(ctx)
wrapped := wrapTask[struct{}, func(error)](task, resolve)
if err := p.submit(wrapped); err != nil {
resolve(err)
return future
}
return future
}
func (p *pool) submit(task any) error {
if p.nonBlocking {
return p.trySubmit(task)
}
return p.blockingSubmit(task)
}
func (p *pool) blockingSubmit(task any) error {
for {
if err := p.trySubmit(task); err != ErrQueueFull {
return err
}
// No space left in the queue, wait until a slot is released
select {
case <-p.ctx.Done():
return p.ctx.Err()
case <-p.submitWaiters:
select {
case <-p.ctx.Done():
return p.ctx.Err()
default:
}
}
}
}
func (p *pool) trySubmit(task any) error {
// Check if the pool has been stopped
if p.Stopped() {
return ErrPoolStopped
}
done := p.ctx.Done()
var poppedTask any
var tasksLen int
p.mutex.Lock()
// Context was cancelled while waiting for the lock
select {
case <-done:
p.mutex.Unlock()
return p.ctx.Err()
default:
}
tasksLen = int(p.tasks.Len())
if p.queueSize > 0 && tasksLen >= p.queueSize {
p.mutex.Unlock()
return ErrQueueFull
}
if int(p.workerCount.Load()) < p.maxConcurrency {
p.workerCount.Add(1)
if tasksLen == 0 {
// No idle workers and queue is empty, we can pop the task immediately
poppedTask = task
} else {
// Push the task at the back of the queue
p.tasks.Write(task)
// Pop the front task
poppedTask, _ = p.tasks.Read()
}
} else {
// Push the task at the back of the queue
p.tasks.Write(task)
}
p.mutex.Unlock()
if poppedTask != nil {
if p.parent == nil {
p.workerWaitGroup.Add(1)
// Launch a new worker
go p.worker(poppedTask)
} else {
// Submit task to the parent pool
p.subpoolSubmit(poppedTask)
}
}
p.submittedTaskCount.Add(1)
return nil
}
func (p *pool) subpoolSubmit(task any) error {
p.workerWaitGroup.Add(1)
return p.parent.submit(func() (output any, err error) {
defer p.workerWaitGroup.Done()
output, err = invokeTask[any](task)
p.updateMetrics(err)
// Attempt to submit the next task to the parent pool
if task, err := p.readTask(); err == nil {
p.subpoolSubmit(task)
}
return
})
}
func (p *pool) readTask() (task any, err error) {
p.mutex.Lock()
// Check if the pool context has been cancelled
select {
case <-p.ctx.Done():
err = p.ctx.Err()
p.mutex.Unlock()
return
default:
}
if p.tasks.Len() == 0 {
// No more tasks in the queue, worker will exit
p.workerCount.Add(-1)
p.mutex.Unlock()
err = ErrQueueEmpty
return
}
task, _ = p.tasks.Read()
p.mutex.Unlock()
// Notify push waiters that there is space in the queue to push more elements
p.notifyPushWaiter()
return
}
func (p *pool) notifyPushWaiter() {
// Wake up one of the waiters (if any)
select {
case p.submitWaiters <- struct{}{}:
default:
return
}
}
func (p *pool) updateMetrics(err error) {
if err != nil {
p.failedTaskCount.Add(1)
} else {
p.successfulTaskCount.Add(1)
}
}
func (p *pool) Stop() Task {
return Submit(func() {
// Stop accepting new tasks
p.closed.Store(true)
// Wait for all workers to finish executing all tasks (including the ones in the queue)
p.workerWaitGroup.Wait()
// Cancel the context with a pool stopped error to signal that the pool has been stopped
p.cancel(ErrPoolStopped)
})
}
func (p *pool) StopAndWait() {
p.Stop().Wait()
}
func (p *pool) NewSubpool(maxConcurrency int, options ...Option) Pool {
return newPool(maxConcurrency, p, options...)
}
func (p *pool) NewGroup() TaskGroup {
return newTaskGroup(p, p.ctx)
}
func (p *pool) NewGroupContext(ctx context.Context) TaskGroup {
return newTaskGroup(p, ctx)
}
func newPool(maxConcurrency int, parent *pool, options ...Option) *pool {
if parent != nil {
if maxConcurrency > parent.MaxConcurrency() {
panic(fmt.Errorf("maxConcurrency cannot be greater than the parent pool's maxConcurrency (%d)", parent.MaxConcurrency()))
}
if maxConcurrency == 0 {
maxConcurrency = parent.MaxConcurrency()
}
}
if maxConcurrency == 0 {
maxConcurrency = math.MaxInt
}
if maxConcurrency <= 0 {
panic(errors.New("maxConcurrency must be greater than 0"))
}
pool := &pool{
ctx: context.Background(),
nonBlocking: DefaultNonBlocking,
maxConcurrency: maxConcurrency,
queueSize: DefaultQueueSize,
submitWaiters: make(chan struct{}),
}
if parent != nil {
pool.parent = parent
pool.ctx = parent.Context()
pool.queueSize = parent.queueSize
pool.nonBlocking = parent.nonBlocking
}
for _, option := range options {
option(pool)
}
pool.ctx, pool.cancel = context.WithCancelCause(pool.ctx)
pool.tasks = linkedbuffer.NewLinkedBuffer[any](LinkedBufferInitialSize, LinkedBufferMaxCapacity)
return pool
}
func NewPool(maxConcurrency int, options ...Option) Pool {
return newPool(maxConcurrency, nil, options...)
}