-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsync.go
458 lines (401 loc) · 19.6 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package main
import (
"context"
"strconv"
"strings"
"time"
"github.com/olivere/elastic"
"github.com/shopspring/decimal"
"github.com/btcsuite/btcd/btcjson"
)
// ROLLBACKHEIGHT 回滚个数
const ROLLBACKHEIGHT = 5
// Sync dump bitcoin chaindata to es
func (esClient *elasticClientAlias) Sync(btcClient bitcoinClientAlias) bool {
info, err := btcClient.GetBlockChainInfo()
if err != nil {
sugar.Fatal("Get info error: ", err.Error())
}
var DBCurrentHeight float64
agg, err := esClient.MaxAgg("height", "block", "block")
if err != nil {
if err.Error() == "query max agg error" {
btcClient.ReSetSync(info.Headers, esClient)
return true
}
sugar.Warn(strings.Join([]string{"Query max aggration error:", err.Error()}, " "))
return false
}
DBCurrentHeight = *agg
heightGap := info.Headers - int32(DBCurrentHeight)
switch {
case heightGap > 0:
esClient.RollbackAndSync(DBCurrentHeight, int(ROLLBACKHEIGHT), btcClient)
case heightGap == 0:
esBestBlock, err := esClient.QueryEsBlockByHeight(context.TODO(), info.Headers)
if err != nil {
sugar.Fatal("Can't query best block in es")
}
nodeblock, err := btcClient.getBlock(info.Headers)
if err != nil {
sugar.Fatal("Can't query block from bitcoind")
}
if esBestBlock.Hash != nodeblock.Hash {
esClient.RollbackAndSync(DBCurrentHeight, int(ROLLBACKHEIGHT), btcClient)
}
case heightGap < 0:
sugar.Fatal("bitcoind best height block less than max block in database , something wrong")
}
return true
}
func (esClient *elasticClientAlias) RollbackAndSync(from float64, size int, btcClient bitcoinClientAlias) {
rollbackIndex := int(from) - size
beginSynsIndex := int32(rollbackIndex)
if rollbackIndex <= 0 {
beginSynsIndex = 1
}
SyncBeginRecordIndex := strconv.FormatInt(int64(beginSynsIndex), 10)
SyncBeginRecord, err := esClient.Get().Index("block").Type("block").Id(SyncBeginRecordIndex).Do(context.Background())
if err != nil {
sugar.Fatal("Query SyncBeginRecord error")
}
info, err := btcClient.GetBlockChainInfo()
if err != nil {
sugar.Fatal("Get info error: ", err.Error())
}
if !SyncBeginRecord.Found {
sugar.Fatal("can't get begin block, need to be resync")
} else {
// 数据库倒退 5 个块再同步
btcClient.dumpToES(beginSynsIndex, info.Headers, size, esClient)
}
}
func (btcClient *bitcoinClientAlias) dumpToES(from, end int32, size int, elasticClient *elasticClientAlias) {
for height := from; height < end; height++ {
dumpBlockTime := time.Now()
block, err := btcClient.getBlock(height)
if err != nil {
sugar.Fatal("Get block error: ", err.Error())
}
// 这个地址交易数据比较明显,
// 结合 https://blockchain.info/address/12cbQLTFMXRnSzktFkuoG3eHoMeFtpTu3S 的交易数据测试验证同步逻辑 (该地址上 2009 年的交易数据)
elasticClient.RollBackAndSyncTx(from, height, size, block)
elasticClient.RollBackAndSyncBlock(from, height, size, block)
sugar.Info("Dump block ", block.Height, " ", block.Hash, " dumpBlockTimeElapsed ", time.Since(dumpBlockTime))
}
}
func (esClient *elasticClientAlias) RollBackAndSyncTx(from, height int32, size int, block *btcjson.GetBlockVerboseResult) {
// 回滚时,es 中 best height + 1 中的 vout, balance, tx 都需要回滚。
ctx := context.Background()
if height <= (from + int32(size+1)) {
esClient.RollbackTxVoutBalanceByBlock(ctx, block)
}
esClient.syncTxVoutBalance(ctx, block)
}
func (esClient *elasticClientAlias) RollBackAndSyncBlock(from, height int32, size int, block *btcjson.GetBlockVerboseResult) {
ctx := context.Background()
if height <= (from + int32(size)) {
_, err := esClient.Delete().Index("block").Type("block").Id(strconv.FormatInt(int64(height), 10)).Refresh("true").Do(ctx)
if err != nil && err.Error() != "elastic: Error 404 (Not Found)" {
sugar.Fatal("Delete block docutment error: ", err.Error())
}
}
bodyParams := blockWithTxDetail(block)
_, err := esClient.Index().Index("block").Type("block").Id(strconv.FormatInt(int64(height), 10)).BodyJson(bodyParams).Do(ctx)
if err != nil {
sugar.Fatal(strings.Join([]string{"Dump block docutment error", err.Error()}, " "))
}
}
func (esClient *elasticClientAlias) syncTxVoutBalance(ctx context.Context, block *btcjson.GetBlockVerboseResult) {
bulkRequest := esClient.Bulk()
var (
vinAddressWithAmountSlice []Balance
voutAddressWithAmountSlice []Balance
voutAddressWithAmountAndTxidSlice []AddressWithAmountAndTxid
vinAddressWithAmountAndTxidSlice []AddressWithAmountAndTxid
vinAddresses []interface{} // All addresses related with vins in a block
voutAddresses []interface{} // All addresses related with vouts in a block
esTxs []*esTx
)
// TODO too slow, neet to optimization
for _, tx := range block.Tx {
var (
voutAmount decimal.Decimal
vinAmount decimal.Decimal
txTypeVinsField []AddressWithValueInTx
txTypeVoutsField []AddressWithValueInTx
)
for _, vout := range tx.Vout {
if result := esClient.syncVout(vout, tx, bulkRequest); result == false {
continue
}
// vout amount
voutAmount = voutAmount.Add(decimal.NewFromFloat(vout.Value))
txTypeVoutsFieldTmp, voutAddressesTmp, voutAddressWithAmountSliceTmp, voutAddressWithAmountAndTxidSliceTmp := parseTxVout(vout, tx.Txid)
txTypeVoutsField = append(txTypeVoutsField, txTypeVoutsFieldTmp...)
voutAddresses = append(voutAddresses, voutAddressesTmp...) // vouts field in tx type
voutAddressWithAmountSlice = append(voutAddressWithAmountSlice, voutAddressWithAmountSliceTmp...)
voutAddressWithAmountAndTxidSlice = append(voutAddressWithAmountAndTxidSlice, voutAddressWithAmountAndTxidSliceTmp...)
}
// get es vouts with id in elasticsearch by tx vins
indexVins := indexedVinsFun(tx.Vin)
voutWithIDs := esClient.QueryVoutWithVinsOrVoutsUnlimitSize(ctx, indexVins)
for _, voutWithID := range voutWithIDs {
// vin amount
vinAmount = vinAmount.Add(decimal.NewFromFloat(voutWithID.Vout.Value))
esClient.updateVoutUsed(tx.Txid, voutWithID, bulkRequest)
txTypeVinsFieldTmp, vinAddressesTmp, vinAddressWithAmountSliceTmp, vinAddressWithAmountAndTxidSliceTmp := parseESVout(voutWithID, tx.Txid)
txTypeVinsField = append(txTypeVinsField, txTypeVinsFieldTmp...)
vinAddresses = append(vinAddresses, vinAddressesTmp...)
vinAddressWithAmountSlice = append(vinAddressWithAmountSlice, vinAddressWithAmountSliceTmp...)
vinAddressWithAmountAndTxidSlice = append(vinAddressWithAmountAndTxidSlice, vinAddressWithAmountAndTxidSliceTmp...)
}
newESTx := esTxFun(tx, block.Hash, txTypeVinsField, txTypeVoutsField, vinAmount, voutAmount)
esTxs = append(esTxs, newESTx)
}
esClient.syncVinsBalance(ctx, vinAddresses, vinAddressWithAmountSlice)
// update(add) or insert balances related to vouts addresses
// len(voutAddressWithSumDeposit) >= len(voutBalanceWithID)
esClient.syncVoutsBalance(ctx, voutAddresses, voutAddressWithAmountSlice, bulkRequest)
bulkResp, err := bulkRequest.Refresh("true").Do(ctx)
if err != nil {
sugar.Fatal("bulk request error: ", err.Error())
}
bulkResp.Created()
bulkResp.Updated()
bulkResp.Indexed()
// bulk add balancejournal doc (sync vout: add balance)
esClient.BulkInsertBalanceJournal(ctx, voutAddressWithAmountAndTxidSlice, "sync+")
// bulk add balancejournal doc (sync vin: sub balance)
esClient.BulkInsertBalanceJournal(ctx, vinAddressWithAmountAndTxidSlice, "sync-")
// bulk add tx doc
esClient.BulkInsertTxes(ctx, esTxs)
}
func (esClient *elasticClientAlias) BulkInsertBalanceJournal(ctx context.Context, balancesWithID []AddressWithAmountAndTxid, ope string) {
p, err := esClient.BulkProcessor().Name("BulkInsertBalanceJournal").Workers(5).BulkActions(40000).Do(ctx)
if err != nil {
sugar.Fatal("es BulkProcessor error: BulkInsertBalanceJournal, ", err.Error())
}
for _, balanceID := range balancesWithID {
newBalanceJournal := newBalanceJournalFun(balanceID.Address, ope, balanceID.Txid, balanceID.Amount)
insertBalanceJournal := elastic.NewBulkIndexRequest().Index("balancejournal").Type("balancejournal").Doc(newBalanceJournal)
p.Add(insertBalanceJournal)
}
defer p.Close()
}
func (esClient *elasticClientAlias) BulkInsertTxes(ctx context.Context, esTxs []*esTx) {
p, err := esClient.BulkProcessor().Name("tx").Workers(5).BulkActions(40000).Do(ctx)
if err != nil {
sugar.Fatal("es BulkProcessor error: BulkInsertTxes, ", err.Error())
}
for _, tx := range esTxs {
insertTx := elastic.NewBulkIndexRequest().Index("tx").Type("tx").Doc(tx)
p.Add(insertTx)
}
defer p.Close()
}
func (esClient *elasticClientAlias) syncVoutsBalance(ctx context.Context, voutAddresses []interface{}, voutAddressWithAmountSlice []Balance, bulkRequest *elastic.BulkService) {
// 统计区块中所有 vout 涉及到去重后的 vout 地址及其对应的增加余额
UniqueVoutAddressesWithSumDeposit := calculateUniqueAddressWithSumForVinOrVout(voutAddresses, voutAddressWithAmountSlice)
bulkQueryVoutBalance, err := esClient.BulkQueryBalanceUnlimitSize(ctx, voutAddresses...)
if err != nil {
sugar.Fatal("Query balance related with vouts address error: ", err.Error())
}
voutBalancesWithIDs := bulkQueryVoutBalance
for _, voutAddressWithSumDeposit := range UniqueVoutAddressesWithSumDeposit {
var isNewBalance bool
isNewBalance = true
for _, voutBalanceWithID := range voutBalancesWithIDs {
// update balance
if voutAddressWithSumDeposit.Address == voutBalanceWithID.Balance.Address {
balance := voutAddressWithSumDeposit.Amount.Add(decimal.NewFromFloat(voutBalanceWithID.Balance.Amount))
amount, _ := balance.Float64()
updateVoutBalcne := elastic.NewBulkUpdateRequest().Index("balance").Type("balance").Id(voutBalanceWithID.ID).
Doc(map[string]interface{}{"amount": amount})
bulkRequest.Add(updateVoutBalcne)
isNewBalance = false
break
}
}
// if voutAddressWithSumDeposit not exist in balance ES Type, insert a docutment
if isNewBalance {
amount, _ := voutAddressWithSumDeposit.Amount.Float64()
newBalance := &Balance{
Address: voutAddressWithSumDeposit.Address,
Amount: amount,
}
// bulk insert balance
insertBalance := elastic.NewBulkIndexRequest().Index("balance").Type("balance").Doc(newBalance)
bulkRequest.Add(insertBalance).Refresh("true")
}
}
}
func (esClient *elasticClientAlias) syncVinsBalance(ctx context.Context, vinAddresses []interface{}, vinAddressWithAmountSlice []Balance) {
// 统计块中所有交易 vin 涉及到的地址及其对应的余额 (balance type)
UniqueVinAddressesWithSumWithdraw := calculateUniqueAddressWithSumForVinOrVout(vinAddresses, vinAddressWithAmountSlice)
bulkQueryVinBalance, err := esClient.BulkQueryBalanceUnlimitSize(ctx, vinAddresses...)
if err != nil {
sugar.Fatal("Query balance related with vin error: ", err.Error())
}
vinBalancesWithIDs := bulkQueryVinBalance
// 判断去重后的区块中所有交易的 vin 涉及到的地址数量是否与从 es 数据库中查询得到的 vinBalancesWithIDs 数量是否一致
// 不一致则说明 balance type 中存在某个地址重复数据,此时应重新同步数据 TODO
UniqueVinAddresses := removeDuplicatesForSlice(vinAddresses...)
if len(UniqueVinAddresses) != len(vinBalancesWithIDs) {
sugar.Fatal("There are duplicate records in balances type")
}
bulkUpdateVinBalanceRequest := esClient.Bulk()
// update(sub) balances related to vins addresses
// len(vinAddressWithSumWithdraw) == len(vinBalancesWithIDs)
for _, vinAddressWithSumWithdraw := range UniqueVinAddressesWithSumWithdraw {
for _, vinBalanceWithID := range vinBalancesWithIDs {
if vinAddressWithSumWithdraw.Address == vinBalanceWithID.Balance.Address {
balance := decimal.NewFromFloat(vinBalanceWithID.Balance.Amount).Sub(vinAddressWithSumWithdraw.Amount)
amount, _ := balance.Float64()
updateVinBalcne := elastic.NewBulkUpdateRequest().Index("balance").Type("balance").Id(vinBalanceWithID.ID).
Doc(map[string]interface{}{"amount": amount})
bulkUpdateVinBalanceRequest.Add(updateVinBalcne).Refresh("true")
break
}
}
}
// vin 涉及到的地址余额必须在 vout 涉及到的地址余额之前更新,原因如下:
// 但一笔交易中的 vins 里面的地址同时出现在 vout 中(就是常见的找零),那么对于这个地址而言,必须先减去 vin 的余额,再加上 vout 的余额
if bulkUpdateVinBalanceRequest.NumberOfActions() != 0 {
bulkUpdateVinBalanceResp, err := bulkUpdateVinBalanceRequest.Refresh("true").Do(context.TODO())
if err != nil {
sugar.Fatal("update vin balance error: ", err.Error())
}
bulkUpdateVinBalanceResp.Updated()
}
}
func (esClient *elasticClientAlias) syncVout(vout btcjson.Vout, tx btcjson.TxRawResult, bulkRequest *elastic.BulkService) bool {
// bulk insert vouts
newVout, err := newVoutFun(vout, tx.Vin, tx.Txid)
if err != nil {
return false
}
createdVout := elastic.NewBulkIndexRequest().Index("vout").Type("vout").Doc(newVout)
bulkRequest.Add(createdVout).Refresh("true")
return true
}
func (esClient *elasticClientAlias) updateVoutUsed(txid string, voutWithID VoutWithID, bulkRequest *elastic.BulkService) {
// update vout type used field
updateVoutUsedField := elastic.NewBulkUpdateRequest().Index("vout").Type("vout").Id(voutWithID.ID).
Doc(map[string]interface{}{"used": voutUsed{Txid: txid, VinIndex: voutWithID.Vout.Voutindex}})
bulkRequest.Add(updateVoutUsedField).Refresh("true")
}
func (esClient *elasticClientAlias) RollbackTxVoutBalanceByBlock(ctx context.Context, block *btcjson.GetBlockVerboseResult) error {
bulkRequest := esClient.Bulk()
var (
vinAddresses []interface{} // All addresses related with vins in a block
voutAddresses []interface{} // All addresses related with vouts in a block
vinAddressWithAmountSlice []Balance
voutAddressWithAmountSlice []Balance
voutAddressWithAmountAndTxidSlice []AddressWithAmountAndTxid
vinAddressWithAmountAndTxidSlice []AddressWithAmountAndTxid
UniqueVinAddressesWithSumWithdraw []*AddressWithAmount // 统计区块中所有 vout 涉及到去重后的 vout 地址及其对应的增加余额
UniqueVoutAddressesWithSumDeposit []*AddressWithAmount // 统计区块中所有 vout 涉及到去重后的 vout 地址及其对应的增加余额
vinBalancesWithIDs []*BalanceWithID
voutBalancesWithIDs []*BalanceWithID
)
// rollback: delete txs in es by block hash
if e := esClient.DeleteEsTxsByBlockHash(ctx, block.Hash); e != nil {
sugar.Fatal("rollback block err: ", block.Hash, " fail to delete")
}
for _, tx := range block.Tx {
// es 中 vout 的 used 字段为 nil 涉及到的 vins 地址余额不用回滚
voutWithIDSliceForVins, _ := esClient.QueryVoutsByUsedFieldAndBelongTxID(ctx, tx.Vin, tx.Txid)
// 如果 len(voutWithIDSliceForVins) 为 0 ,则表面已经回滚过了,
for _, voutWithID := range voutWithIDSliceForVins {
// rollback: update vout's used to nil
updateVoutUsedField := elastic.NewBulkUpdateRequest().Index("vout").Type("vout").Id(voutWithID.ID).
Doc(map[string]interface{}{"used": nil})
bulkRequest.Add(updateVoutUsedField).Refresh("true")
_, vinAddressesTmp, vinAddressWithAmountSliceTmp, vinAddressWithAmountAndTxidSliceTmp := parseESVout(voutWithID, tx.Txid)
vinAddresses = append(vinAddresses, vinAddressesTmp...)
vinAddressWithAmountSlice = append(vinAddressWithAmountSlice, vinAddressWithAmountSliceTmp...)
vinAddressWithAmountAndTxidSlice = append(vinAddressWithAmountAndTxidSlice, vinAddressWithAmountAndTxidSliceTmp...)
}
// get es vouts with id in elasticsearch by tx vouts
indexVouts := indexedVoutsFun(tx.Vout, tx.Txid)
// 没有被删除的 vouts 涉及到的 vout 地址才需要回滚余额
voutWithIDSliceForVouts := esClient.QueryVoutWithVinsOrVoutsUnlimitSize(ctx, indexVouts)
for _, voutWithID := range voutWithIDSliceForVouts {
// rollback: delete vout
deleteVout := elastic.NewBulkDeleteRequest().Index("vout").Type("vout").Id(voutWithID.ID)
bulkRequest.Add(deleteVout).Refresh("true")
_, voutAddressesTmp, voutAddressWithAmountSliceTmp, voutAddressWithAmountAndTxidSliceTmp := parseESVout(voutWithID, tx.Txid)
voutAddresses = append(voutAddresses, voutAddressesTmp...)
voutAddressWithAmountSlice = append(voutAddressWithAmountSlice, voutAddressWithAmountSliceTmp...)
voutAddressWithAmountAndTxidSlice = append(voutAddressWithAmountAndTxidSlice, voutAddressWithAmountAndTxidSliceTmp...)
}
}
// 统计块中所有交易 vin 涉及到的地址及其对应的提现余额 (balance type)
UniqueVinAddressesWithSumWithdraw = calculateUniqueAddressWithSumForVinOrVout(vinAddresses, vinAddressWithAmountSlice)
bulkQueryVinBalance, err := esClient.BulkQueryBalance(ctx, vinAddresses...)
if err != nil {
sugar.Fatal("Rollback: query vin balance error: ", err.Error())
}
vinBalancesWithIDs = bulkQueryVinBalance
// 统计块中所有交易 vout 涉及到的地址及其对应的提现余额 (balance type)
UniqueVoutAddressesWithSumDeposit = calculateUniqueAddressWithSumForVinOrVout(voutAddresses, voutAddressWithAmountSlice)
bulkQueryVoutBalance, err := esClient.BulkQueryBalance(ctx, voutAddresses...)
if err != nil {
sugar.Fatal("Rollback: query vout balance error: ", err.Error())
}
voutBalancesWithIDs = bulkQueryVoutBalance
// rollback: add to addresses related to vins addresses
// 通过 vin 在 vout type 的 used 字段查出来(不为 nil)的地址余额才回滚
bulkUpdateVinBalanceRequest := esClient.Bulk()
// update(sub) balances related to vins addresses
// len(vinAddressWithSumWithdraw) == len(vinBalancesWithIDs)
for _, vinAddressWithSumWithdraw := range UniqueVinAddressesWithSumWithdraw {
for _, vinBalanceWithID := range vinBalancesWithIDs {
if vinAddressWithSumWithdraw.Address == vinBalanceWithID.Balance.Address {
balance := decimal.NewFromFloat(vinBalanceWithID.Balance.Amount).Add(vinAddressWithSumWithdraw.Amount)
amount, _ := balance.Float64()
updateVinBalance := elastic.NewBulkUpdateRequest().Index("balance").Type("balance").Id(vinBalanceWithID.ID).
Doc(map[string]interface{}{"amount": amount})
bulkUpdateVinBalanceRequest.Add(updateVinBalance).Refresh("true")
break
}
}
}
if bulkUpdateVinBalanceRequest.NumberOfActions() != 0 {
bulkUpdateVinBalanceResp, e := bulkUpdateVinBalanceRequest.Refresh("true").Do(ctx)
if e != nil {
sugar.Fatal("Rollback: update vin balance error: ", err.Error())
}
bulkUpdateVinBalanceResp.Updated()
}
// update(sub) balances related to vouts addresses
// len(voutAddressWithSumDeposit) >= len(voutBalanceWithID)
// 没有被删除的 vouts 涉及到的 vout 地址才需要回滚余额
for _, voutAddressWithSumDeposit := range UniqueVoutAddressesWithSumDeposit {
for _, voutBalanceWithID := range voutBalancesWithIDs {
if voutAddressWithSumDeposit.Address == voutBalanceWithID.Balance.Address {
balance := decimal.NewFromFloat(voutBalanceWithID.Balance.Amount).Sub(voutAddressWithSumDeposit.Amount)
amount, _ := balance.Float64()
updateVinBalance := elastic.NewBulkUpdateRequest().Index("balance").Type("balance").Id(voutBalanceWithID.ID).
Doc(map[string]interface{}{"amount": amount})
bulkRequest.Add(updateVinBalance).Refresh("true")
break
}
}
}
if bulkRequest.NumberOfActions() != 0 {
bulkResp, err := bulkRequest.Refresh("true").Do(ctx)
if err != nil {
sugar.Fatal("Rollback: bulkRequest do error: ", err.Error())
}
bulkResp.Updated()
bulkResp.Deleted()
bulkResp.Indexed()
}
// bulk add balancejournal doc (rollback vout: sub balance)
esClient.BulkInsertBalanceJournal(ctx, voutAddressWithAmountAndTxidSlice, "rollback-")
// bulk add balancejournal doc (rollback vin: add balance)
esClient.BulkInsertBalanceJournal(ctx, vinAddressWithAmountAndTxidSlice, "rollback+")
return nil
}