forked from langhuihui/monibuca
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransformer.go
141 lines (129 loc) · 3.41 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package m7s
import (
"context"
"slices"
"time"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
type (
ITransformer interface {
task.ITask
GetTransformJob() *TransformJob
}
Transformer = func() ITransformer
TransformJob struct {
task.Job
StreamPath string // 对应本地流
Config config.Transform // 对应目标流
Plugin *Plugin
Publisher *Publisher
Subscriber *Subscriber
Transformer ITransformer
}
DefaultTransformer struct {
task.Task
TransformJob TransformJob
}
TransformedMap struct {
StreamPath string
Target string
TransformJob *TransformJob
}
Transforms struct {
task.Work
util.Collection[string, *TransformedMap]
//PublishEvent chan *Publisher
}
// TransformsPublishEvent struct {
// task.ChannelTask
// Transforms *Transforms
// }
)
//func (t *TransformsPublishEvent) GetSignal() any {
// return t.Transforms.PublishEvent
//}
//
//func (t *TransformsPublishEvent) Tick(pub any) {
// incomingPublisher := pub.(*Publisher)
// for job := range t.Transforms.Search(func(m *TransformedMap) bool {
// return m.StreamPath == incomingPublisher.StreamPath
// }) {
// job.TransformJob.TransformPublished(incomingPublisher)
// }
//}
func (t *TransformedMap) GetKey() string {
return t.Target
}
func (r *DefaultTransformer) GetTransformJob() *TransformJob {
return &r.TransformJob
}
func (p *TransformJob) Subscribe() (err error) {
subConfig := p.Plugin.config.Subscribe
subConfig.SubType = SubscribeTypeTransform
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.Transformer, p.StreamPath, subConfig)
if err == nil {
p.Transformer.Depend(p.Subscriber)
}
return
}
func (p *TransformJob) Publish(streamPath string) (err error) {
var conf = p.Plugin.GetCommonConf().Publish
conf.PubType = PublishTypeTransform
p.Publisher, err = p.Plugin.PublishWithConfig(context.WithValue(p.Transformer, Owner, p.Transformer), streamPath, conf)
if err == nil {
p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) {
p.Stop(p.Publisher.StopReason())
} else {
p.Transformer.Stop(p.Publisher.StopReason())
}
})
}
return
}
func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath string, conf config.Transform) *TransformJob {
p.Plugin = plugin
p.Config = conf
p.StreamPath = streamPath
p.Transformer = transformer
p.SetDescriptions(task.Description{
"streamPath": streamPath,
"conf": conf,
})
transformer.SetRetry(-1, time.Second*2)
plugin.Server.Transforms.AddTask(p, plugin.Logger.With("streamPath", streamPath))
return p
}
func (p *TransformJob) Start() (err error) {
s := p.Plugin.Server
if slices.ContainsFunc(p.Config.Output, func(to config.TransfromOutput) bool {
return s.Transforms.Has(to.Target)
}) {
return pkg.ErrTransformSame
}
for _, to := range p.Config.Output {
if to.Target != "" {
s.Transforms.Set(&TransformedMap{
StreamPath: to.StreamPath,
Target: to.Target,
TransformJob: p,
})
}
}
p.Info("transform +1", "count", s.Transforms.Length)
p.AddTask(p.Transformer, p.Logger)
return
}
//func (p *TransformJob) TransformPublished(pub *Publisher) {
//
//}
func (p *TransformJob) Dispose() {
transList := &p.Plugin.Server.Transforms
p.Info("transform -1", "count", transList.Length)
for _, to := range p.Config.Output {
transList.RemoveByKey(to.Target)
}
}