Skip to content

Commit

Permalink
Merge pull request #20 from PowerLoom/fix/token-aggregator
Browse files Browse the repository at this point in the history
fix: maps lock deadlock (stuck forever) in token-aggregator service
  • Loading branch information
SwaroopH authored Apr 22, 2023
2 parents 2f8628f + b764f04 commit e1ca6f1
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions go/token-aggregator/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *TokenAggregator) Run(pairContractAddress string) {
break
}

log.Infof("pairsSummary snapshot has not been created yet, sleeping for %d secs", s.settingsObj.TokenAggregatorSettings.RunIntervalSecs)

time.Sleep(time.Duration(s.settingsObj.TokenAggregatorSettings.RunIntervalSecs) * time.Second)
}

Expand Down Expand Up @@ -232,16 +234,15 @@ func (s *TokenAggregator) FetchAndFillTokenMetaData(pairContractAddr string) err
tokenRefs := new(TokenDataRefs)

// FIX: TOKEN Symbol and name not getting stored in tokenData.
s.tokenListLock.Lock()
if _, ok := s.tokenList[token0Addr]; !ok {
tokenData := new(models.TokenData)

tokenData.Symbol = pairContractMetadata.Token0Symbol
tokenData.Name = pairContractMetadata.Token0Name
tokenData.ContractAddress = token0Addr

s.tokenListLock.Lock()
s.tokenList[token0Addr] = tokenData
s.tokenListLock.Unlock()

tokenRefs.token0Ref = tokenData
log.WithField("token0", tokenData).Debug("token0 Data")
Expand All @@ -256,15 +257,14 @@ func (s *TokenAggregator) FetchAndFillTokenMetaData(pairContractAddr string) err
tokenData.Name = pairContractMetadata.Token1Name
tokenData.ContractAddress = token1Addr

s.tokenListLock.Lock()
s.tokenList[token1Addr] = tokenData
s.tokenListLock.Unlock()

tokenRefs.token1Ref = tokenData
log.WithField("token1", tokenData).Debug("token1 Data")
}

tokenRefs.token1Ref = s.tokenList[token1Addr]
s.tokenListLock.Unlock()

s.tokenPairMappingLock.Lock()
s.tokenPairMapping[pairContractAddr] = tokenRefs
Expand Down Expand Up @@ -295,6 +295,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error {

tokensPairData, err := s.redisCache.FetchPairSummarySnapshot(context.Background(), curBlockHeight, s.settingsObj.PoolerNamespace)
if err != nil {
log.WithError(err).Error("failed to fetch pairSummary snapshot")
return err
}

Expand Down Expand Up @@ -366,6 +367,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error {
beginBlockHeight24h := 0
beginTimeStamp24h := 0.0

s.tokenListLock.Lock()
for key, tokenData := range s.tokenList {
tokenData.Price, err = s.redisCache.FetchTokenPriceAtBlockHeight(context.Background(), tokenData.ContractAddress, int64(tokenData.BlockHeight), s.settingsObj.PoolerNamespace)
if err != nil {
Expand All @@ -385,6 +387,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error {

priceHistory, err := s.redisCache.FetchTokenPriceHistoryInRedis(context.Background(), fromTime, curTimeEpoch, tokenData.ContractAddress, s.settingsObj.PoolerNamespace)
if err != nil {
s.tokenListLock.Unlock()
return err
}

Expand All @@ -402,6 +405,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error {
key, tokenData.Name, tokenData.BlockHeight)
}
}
s.tokenListLock.Unlock()

err = s.CommitTokenSummaryPayload()
if err != nil {
Expand All @@ -428,7 +432,7 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error {
tokenSummarySnapshotMeta, err = s.WaitAndFetchBlockHeightStatus(dagChainProjectID, tentativeBlockHeight)

return err
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5))
}, backoff.NewExponentialBackOff())
if err != nil {
log.WithError(err).Errorf("failed to fetch payloadCID at blockHeight %d", tentativeBlockHeight)
s.ResetTokenData()
Expand Down Expand Up @@ -460,12 +464,14 @@ func (s *TokenAggregator) PrepareAndSubmitTokenSummarySnapshot() error {
lastSnapshotBlockHeight = curBlockHeight

// prune TokenPrice ZSet as price already fetched for all tokens
s.tokenListLock.Lock()
for _, tokenData := range s.tokenList {
err = s.redisCache.PruneTokenPriceZSet(context.Background(), tokenData.ContractAddress, int64(tokenData.BlockHeight), s.settingsObj.PoolerNamespace)
if err != nil {
log.WithError(err).Error("failed to prune price zset")
}
}
s.tokenListLock.Unlock()

return nil
}
Expand Down Expand Up @@ -519,7 +525,7 @@ func (s *TokenAggregator) FetchAndUpdateStatusOfOlderSnapshots(projectId string)
updatedSnapshotMeta, err = s.WaitAndFetchBlockHeightStatus(projectId, int64(snapshotMeta.DAGHeight))

return err
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 5))
}, backoff.NewExponentialBackOff())
if err != nil {
log.WithError(err).Errorf("failed to fetch payloadCID at blockHeight %d", snapshotMeta.DAGHeight)

Expand Down Expand Up @@ -556,6 +562,7 @@ func (s *TokenAggregator) FetchAndUpdateStatusOfOlderSnapshots(projectId string)
}

func (s *TokenAggregator) ResetTokenData() {
s.tokenListLock.Lock()
for _, tokenData := range s.tokenList {
tokenData.Liquidity = 0
tokenData.LiquidityUSD = 0
Expand All @@ -564,6 +571,7 @@ func (s *TokenAggregator) ResetTokenData() {
tokenData.TradeVolume7d = 0
tokenData.TradeVolumeUSD7d = 0
}
s.tokenListLock.Unlock()
}

// CommitTokenSummaryPayload commits the token summary payload to the audit-protocol.
Expand All @@ -578,11 +586,13 @@ func (s *TokenAggregator) CommitTokenSummaryPayload() error {

var index int

s.tokenListLock.Lock()
for _, tokenData := range s.tokenList {
request.Payload.TokensData[index] = tokenData

index += 1
}
s.tokenListLock.Unlock()

body, err := json.Marshal(request)
if err != nil {
Expand Down

0 comments on commit e1ca6f1

Please sign in to comment.