-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathjob.go
89 lines (76 loc) · 1.92 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package job
import (
"context"
"errors"
"sync"
"time"
)
const (
//默认worker的并发数
defaultConcurrency = 5
)
var (
ErrQueueNotExist = errors.New("queue is not exists")
ErrTimeout = errors.New("timeout")
ErrTopicRegistered = errors.New("the key had been registered")
)
type Job struct {
//上下文
ctx context.Context
stopOnce sync.Once
workers map[string]*WorkerWithFunc
//work并发处理的等待暂停
wg sync.WaitGroup
//启动状态
running bool
//异常状态时需要sleep时间
sleepy time.Duration
//设置的初始等待时间
initSleepy time.Duration
//设置的等待时间的上限
maxSleepy time.Duration
//通道定时器超时时间
timer time.Duration
//是否初始化
isQueueInit bool
//统计
pullCount int64
pullEmptyCount int64
pullErrCount int64
taskCount int64
taskErrCount int64
handleCount int64
handleErrCount int64
handlePanicCount int64
//回调函数
//任务返回失败回调函数
taskErrCallback func(task *Task)
//任务panic回调函数 增加参数将panic信息传递给回调函数,方便做sentry处理
taskPanicCallback func(task *Task, e ...interface{})
//任务处理前回调
taskBeforeCallback func(task *Task)
//任务处理后回调
taskAfterCallback func(task *Task)
}
func (j *Job) processJob() {
for _, w := range j.workers {
w.Run()
}
}
//After there is no data, the job starts from initsleepy to sleep,
//and then multiplies to maxsleepy. After finding the data, it sleep from initsleepy again
func (j *Job) Sleep() {
if j.sleepy.Nanoseconds()*2 < j.maxSleepy.Nanoseconds() {
j.sleepy = time.Duration(j.sleepy.Nanoseconds() * 2)
} else if j.sleepy.Nanoseconds()*2 >= j.maxSleepy.Nanoseconds() && j.sleepy != j.maxSleepy {
if j.sleepy < j.maxSleepy {
j.sleepy = j.maxSleepy
}
}
time.Sleep(j.sleepy)
}
func (j *Job) ResetSleep() {
if j.sleepy != j.initSleepy {
j.SetSleepy(j.initSleepy)
}
}