-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysql_ch_es_test.go
241 lines (225 loc) · 6.23 KB
/
mysql_ch_es_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package test
import (
"github.com/enustah/db-canal/canal/multi_canal"
"github.com/enustah/db-canal/config"
"github.com/enustah/db-canal/driver"
"github.com/enustah/db-canal/hook"
"github.com/enustah/db-canal/register"
"github.com/enustah/db-canal/util"
"testing"
)
/*
mysql:
create table test(
id bigint,
uid bigint unsigned ,
test_int int,
test_uint int unsigned ,
test_str varchar(255),
test_chr char(5),
test_txt text,
test_float float,
test_double double,
test_dec DECIMAL,
test_enum enum('e1','e2','e3'),
test_bit bit(2),
test_date DATE,
test_time time,
test_datetime datetime,
test_timstp timestamp,
test_var_bin VARBINARY(255),
test_bin BINARY(5),
test_blob BLOB
);
insert into test() values(
-1152921504606846976,1152921504606846976,-1073741824,1073741824,
'iamvarcahr','char','iamtext',
1.234,2.345,1024.4096,'e1',1,
'1989-06-04','23:59:59','1989-06-04 00:00:1','1989-06-04 00:00:1',
0x123456789,
0x01,
0x111111111111
)
---------------------------------------------------------------------
ch:
create table test(
id Int64,
uid UInt64 ,
test_int Int32,
test_uint UInt32,
test_str String,
test_chr String,
test_txt String,
test_float FLOAT,
test_double DOUBLE,
test_dec DOUBLE,
test_enum String,
test_bit Int16,
test_date DATE,
test_time DateTime,
test_datetime DateTime,
test_timstp DateTime,
test_var_bin String(255),
test_bin String,
test_blob String,
is_delete Bool
) engine=ReplacingMergeTree primary key (id,uid) order by (id,uid)
---------------------------------------------------------------------
es:
curl -X PUT -H "Content-type: application/json" http://172.17.0.2:9200/test --data '
{
"settings":{
"index":{
"number_of_shards":1,
"number_of_replicas":0
}
},
"mappings":{
"properties":{
"id":{
"type":"long"
},
"uid":{
"type":"unsigned_long"
},
"test_int":{
"type":"integer"
},
"test_uint":{
"type":"unsigned_long"
},
"test_str":{
"type":"keyword"
},
"test_chr":{
"type":"keyword"
},
"test_txt":{
"type":"keyword"
},
"test_float":{
"type":"float"
},
"test_double":{
"type":"double"
},
"test_dec":{
"type":"double"
},
"test_enum":{
"type":"keyword"
},
"test_bit":{
"type":"byte"
},
"test_date":{
"type":"date"
},
"test_time":{
"type":"date"
},
"test_datetime":{
"type":"date"
},
"test_timstp":{
"type":"date"
},
"test_var_bin":{
"type":"binary"
},
"test_bin":{
"type":"binary"
},
"test_blob":{
"type":"binary"
}
}
}
}
'
*/
const mysqlChEs = `
config:
- ingress:
driver: mysql_ingress
dsn: "172.21.0.2:3306"
options:
username: "root"
password: "root"
tables:
- "test\\.test"
savePointFilePath: "/tmp/a"
canalConfig:
name: test_mysql
maxWaitTime: 1500
maxDataBatch: 100
retryOption:
maxInterval: 3000 # in ms
multiplier: 1.5
initialInterval: 1000 # in ms
egress:
- driver: clickhouse_egress
url: "tcp://172.17.0.3:9000?database=test&username=root&password=root"
hookChain:
- "clickhouseDelete(is_delete)"
- "dataFilter(test,test,id,>,50)"
- driver: elasticsearch_egress
url: "https://127.0.0.1:1234"
hookChain:
- "dataFilter(test,test,id,>,30)"
options:
idColumn:
test: id
numWorkers: 1
flushBytes: 10485760 # in bytes
flushInterval: 3s
tlsSni: a.ydx.com
caCert: |
-----BEGIN CERTIFICATE-----
MIIDETCCAfkCFCqvQvVj+0QeOkESQQqRIa6QHrl3MA0GCSqGSIb3DQEBCwUAMEUx
CzAJBgNVBAYTAkhLMQswCQYDVQQIDAJISzELMAkGA1UEBwwCSEsxDTALBgNVBAoM
BHJvb3QxDTALBgNVBAsMBHJvb3QwHhcNMjIwMzIzMDg1NzM4WhcNMzIwMzIwMDg1
NzM4WjBFMQswCQYDVQQGEwJISzELMAkGA1UECAwCSEsxCzAJBgNVBAcMAkhLMQ0w
CwYDVQQKDARyb290MQ0wCwYDVQQLDARyb290MIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAx6Utqkix8MOF/pT3L4iT5nTtGU12QHbQpEaqBdE4JH0EhC17
r38wrXN1GcRVRJdHXVU4XduAmfjizVWbYBX4d3gkYCCjK6pwQBzE8mv2BgbQTFeX
ckks5CuejzM4wwbDifqcaj8BrURPJQY5sv2dB9vajzYi66Hf1py9bOfffMWwpc5G
SJkYJppyWt/6FMih7M2VcUeR1jFHeSxrHvnIX/fKobdEO4z2W0PVnaQBvHYRLkjf
KkRnOTiyXW3yo+sO/rWzzfsWVjAPf2qKlEQXJmyzSmHTcjO2FyFqQJ4+toiPTK9n
TzsUHnzD5rdwB+/wAvSIQF6GMheBoRFXvmjFxQIDAQABMA0GCSqGSIb3DQEBCwUA
A4IBAQA1IH7apGtxmKIHhlv2PuQnMDvOoPR6/DQfZWy7wHOYrTVTxc2FdmSb2mF8
PJrfGXDOYObnpXyuTc77AHsD965DedoEaRUJ+/c7U9deqXmPgUqK/ogWE/M0qb4l
kbd0S4cieYL+G8LSeBnt0hggxsEOK/PxahECMlc/gBf+VJzKDNz3sWOp1QxP3SVO
3IzEyrR1zjt5HXPVUqUrZeTrjs3Ctx/ZyCeO3Kwm33AMpmCYY9+nLjrtOjHT9E/c
tFYMXLVX+XixtK8mJ3C8uZF+dl8LsBfOONxlPo0WNSTR1iS26EgzrXFX1+hpm0Hl
pK2YeoFHpRIlsMWZazZ+s6zNrIw6
-----END CERTIFICATE-----
logLevel: "debug"
`
func TestMysqlChEs(t *testing.T) {
util.Must(
register.RegisterHook(
func(ctx *hook.Ctx, args []interface{}) error {
deleteStatusColumn := args[0].(string)
ctx.ForEach(func(data *driver.Data) (drop bool, stop bool) {
if data.Event == driver.EventInsert {
data.RawMap[deleteStatusColumn] = 0
} else if data.Event == driver.EventDelete {
data.Event = driver.EventUpdate
data.RawMap[deleteStatusColumn] = 1
}
return
})
return nil
},
"clickhouseDelete",
[]hook.Arg{hook.ArgTypeStr},
),
)
c, err := config.FromYaml(mysqlChEs)
util.Must(err)
cc, err := multi_canal.NewMultiCanal(c[0])
util.Must(err)
util.Must(cc.Run())
<-(chan interface{})(nil)
}