diff --git a/go/token-aggregator/service/service.go b/go/token-aggregator/service/service.go index 049522e..b02db0b 100644 --- a/go/token-aggregator/service/service.go +++ b/go/token-aggregator/service/service.go @@ -123,6 +123,8 @@ func (s *TokenAggregator) Run(pairContractAddress string) { break } + log.Infof("pairsSummary snapshot has not been created yet, sleeping for %d secs", s.settingsObj.TokenAggregatorSettings.RunIntervalSecs) + time.Sleep(time.Duration(s.settingsObj.TokenAggregatorSettings.RunIntervalSecs) * time.Second) } @@ -232,6 +234,7 @@ func (s *TokenAggregator) FetchAndFillTokenMetaData(pairContractAddr string) err tokenRefs := new(TokenDataRefs) // FIX: TOKEN Symbol and name not getting stored in tokenData. + s.tokenListLock.Lock() if _, ok := s.tokenList[token0Addr]; !ok { tokenData := new(models.TokenData) @@ -239,9 +242,7 @@ func (s *TokenAggregator) FetchAndFillTokenMetaData(pairContractAddr string) err tokenData.Name = pairContractMetadata.Token0Name tokenData.ContractAddress = token0Addr - s.tokenListLock.Lock() s.tokenList[token0Addr] = tokenData - s.tokenListLock.Unlock() tokenRefs.token0Ref = tokenData log.WithField("token0", tokenData).Debug("token0 Data") @@ -256,15 +257,14 @@ func (s *TokenAggregator) FetchAndFillTokenMetaData(pairContractAddr string) err tokenData.Name = pairContractMetadata.Token1Name tokenData.ContractAddress = token1Addr - s.tokenListLock.Lock() s.tokenList[token1Addr] = tokenData - s.tokenListLock.Unlock() tokenRefs.token1Ref = tokenData log.WithField("token1", tokenData).Debug("token1 Data") } tokenRefs.token1Ref = s.tokenList[token1Addr] + s.tokenListLock.Unlock() s.tokenPairMappingLock.Lock() s.tokenPairMapping[pairContractAddr] = tokenRefs @@ -295,6 +295,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error { tokensPairData, err := s.redisCache.FetchPairSummarySnapshot(context.Background(), curBlockHeight, s.settingsObj.PoolerNamespace) if err != nil { + log.WithError(err).Error("failed to fetch pairSummary snapshot") return err } @@ -366,6 +367,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error { beginBlockHeight24h := 0 beginTimeStamp24h := 0.0 + s.tokenListLock.Lock() for key, tokenData := range s.tokenList { tokenData.Price, err = s.redisCache.FetchTokenPriceAtBlockHeight(context.Background(), tokenData.ContractAddress, int64(tokenData.BlockHeight), s.settingsObj.PoolerNamespace) if err != nil { @@ -385,6 +387,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error { priceHistory, err := s.redisCache.FetchTokenPriceHistoryInRedis(context.Background(), fromTime, curTimeEpoch, tokenData.ContractAddress, s.settingsObj.PoolerNamespace) if err != nil { + s.tokenListLock.Unlock() return err } @@ -402,6 +405,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error { key, tokenData.Name, tokenData.BlockHeight) } } + s.tokenListLock.Unlock() err = s.CommitTokenSummaryPayload() if err != nil { @@ -428,7 +432,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error { tokenSummarySnapshotMeta, err = s.WaitAndFetchBlockHeightStatus(dagChainProjectID, tentativeBlockHeight) return err - }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5)) + }, backoff.NewExponentialBackOff()) if err != nil { log.WithError(err).Errorf("failed to fetch payloadCID at blockHeight %d", tentativeBlockHeight) s.ResetTokenData() @@ -460,12 +464,14 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error { lastSnapshotBlockHeight = curBlockHeight // prune TokenPrice ZSet as price already fetched for all tokens + s.tokenListLock.Lock() for _, tokenData := range s.tokenList { err = s.redisCache.PruneTokenPriceZSet(context.Background(), tokenData.ContractAddress, int64(tokenData.BlockHeight), s.settingsObj.PoolerNamespace) if err != nil { log.WithError(err).Error("failed to prune price zset") } } + s.tokenListLock.Unlock() return nil } @@ -519,7 +525,7 @@ func (s *TokenAggregator) FetchAndUpdateStatusOfOlderSnapshots(projectId string) updatedSnapshotMeta, err = s.WaitAndFetchBlockHeightStatus(projectId, int64(snapshotMeta.DAGHeight)) return err - }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5)) + }, backoff.NewExponentialBackOff()) if err != nil { log.WithError(err).Errorf("failed to fetch payloadCID at blockHeight %d", snapshotMeta.DAGHeight) @@ -556,6 +562,7 @@ func (s *TokenAggregator) FetchAndUpdateStatusOfOlderSnapshots(projectId string) } func (s *TokenAggregator) ResetTokenData() { + s.tokenListLock.Lock() for _, tokenData := range s.tokenList { tokenData.Liquidity = 0 tokenData.LiquidityUSD = 0 @@ -564,6 +571,7 @@ func (s *TokenAggregator) ResetTokenData() { tokenData.TradeVolume7d = 0 tokenData.TradeVolumeUSD7d = 0 } + s.tokenListLock.Unlock() } // CommitTokenSummaryPayload commits the token summary payload to the audit-protocol. @@ -578,11 +586,13 @@ func (s *TokenAggregator) CommitTokenSummaryPayload() error { var index int + s.tokenListLock.Lock() for _, tokenData := range s.tokenList { request.Payload.TokensData[index] = tokenData index += 1 } + s.tokenListLock.Unlock() body, err := json.Marshal(request) if err != nil {