-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathjob_control.go
124 lines (108 loc) · 2.56 KB
/
job_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package job
import (
"context"
"github.com/navi-tt/job/internal/queue"
"time"
)
func New() *Job {
j := new(Job)
j.ctx = context.Background()
j.workers = make(map[string]*WorkerWithFunc)
j.sleepy = time.Millisecond * 10
j.initSleepy = time.Millisecond * 10
j.maxSleepy = time.Millisecond * 10
j.timer = time.Millisecond * 30
return j
}
func (j *Job) Start() {
if j.running {
return
}
j.running = true
j.processJob()
}
/**
* 暂停Job
*/
func (j *Job) Stop() {
j.running = false
}
/**
* 等待队列任务消费完成,可设置超时时间返回
* @param timeout 如果小于0则默认10秒
*/
func (j *Job) WaitStop(timeout time.Duration) error {
var err error
j.stopOnce.Do(func() {
ch := make(chan struct{})
j.Sleep()
if timeout <= 0 {
timeout = time.Second * 10
}
go func() {
j.wg.Wait()
close(ch)
}()
select {
case <-ch:
return
case <-time.After(timeout):
err = ErrTimeout
return
}
})
return err
}
func (j *Job) AddFunc(q queue.Queue, topic string, f func(context.Context, *Task), size int, args ...interface{}) error {
wp, err := j.NewWorkerWithFunc(q, topic, f, size, args)
if err != nil {
return err
}
return j.AddWorkerWithFunc(wp)
}
func (j *Job) AddWorkerWithFunc(w *WorkerWithFunc) error {
if _, ok := j.workers[w.Topic()]; ok {
return ErrTopicRegistered
}
j.workers[w.Topic()] = w
return nil
}
//获取统计数据
func (j *Job) Stats() map[string]int64 {
return map[string]int64{
"pull": j.pullCount,
"pull_err": j.pullErrCount,
"pull_empty": j.pullEmptyCount,
"task": j.taskCount,
"task_err": j.taskErrCount,
"handle": j.handleCount,
"handle_err": j.handleErrCount,
"handle_panic": j.handlePanicCount,
}
}
//设置休眠的时间 -- 碰到异常或者空消息等情况
func (j *Job) SetSleepy(sleepy time.Duration, args ...time.Duration) {
j.sleepy = sleepy
j.initSleepy = sleepy
if len(args) > 0 {
j.maxSleepy = args[0]
} else {
j.maxSleepy = sleepy
}
}
//在通道传递数据时的阻塞超时时间
func (j *Job) SetTimer(timer time.Duration) {
j.timer = timer
}
//设置任务处理前回调函数
func (j *Job) RegisterTaskBeforeCallback(f func(task *Task)) {
j.taskBeforeCallback = f
}
//设置任务处理后回调函数
func (j *Job) RegisterTaskAfterCallback(f func(task *Task)) {
j.taskAfterCallback = f
}
//设置任务panic回调函数:回调函数自己确保panic不会上报,否则会导致此topic的队列worker停止
func (j *Job) RegisterTaskPanicCallback(f func(task *Task, e ...interface{})) {
j.taskPanicCallback = f
}