-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathjob_queue.go
72 lines (62 loc) · 1.86 KB
/
job_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package job
import (
"context"
"github.com/navi-tt/job/internal/queue"
)
//获取topic对应的queue服务
func (j *Job) GetQueueByTopic(topic string) queue.Queue {
w, ok := j.workers[topic]
if !ok {
return nil
}
return w.Queue()
}
//消息入队 -- 原始message
func (j *Job) Enqueue(ctx context.Context, topic string, message string, args ...interface{}) (bool, error) {
task := GenTask(topic, message)
return j.EnqueueWithTask(ctx, topic, task, args...)
}
//消息入队 -- Task数据结构
func (j *Job) EnqueueWithTask(ctx context.Context, topic string, task Task, args ...interface{}) (bool, error) {
q := j.GetQueueByTopic(topic)
if q == nil {
return false, ErrQueueNotExist
}
if task.Topic == "" {
task.Topic = topic
}
s, _ := JsonEncode(task)
return q.Enqueue(ctx, topic, s, args...)
}
//消息入队 -- 原始message不带有task结构原生消息
func (j *Job) EnqueueRaw(ctx context.Context, topic string, message string, args ...interface{}) (bool, error) {
q := j.GetQueueByTopic(topic)
if q == nil {
return false, ErrQueueNotExist
}
return q.Enqueue(ctx, topic, message, args...)
}
//消息入队 -- 原始message
func (j *Job) BatchEnqueue(ctx context.Context, topic string, messages []string, args ...interface{}) (bool, error) {
tasks := make([]Task, len(messages))
for k, message := range messages {
tasks[k] = GenTask(topic, message)
}
return j.BatchEnqueueWithTask(ctx, topic, tasks, args...)
}
//消息入队 -- Task数据结构
func (j *Job) BatchEnqueueWithTask(ctx context.Context, topic string, tasks []Task, args ...interface{}) (bool, error) {
q := j.GetQueueByTopic(topic)
if q == nil {
return false, ErrQueueNotExist
}
arr := make([]string, len(tasks))
for k, task := range tasks {
if task.Topic == "" {
task.Topic = topic
}
s, _ := JsonEncode(task)
arr[k] = s
}
return q.BatchEnqueue(ctx, topic, arr, args...)
}