Skip to content

Commit

Permalink
Improve getAllValidators timing
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Apr 26, 2023
1 parent e18b4a4 commit cfe9c9c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 58 deletions.
5 changes: 5 additions & 0 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ var (
ErrIncorrectLength = errors.New("incorrect length")
)

// SlotPos returns the slot's position in the epoch (1-based, i.e. 1..32)
func SlotPos(slot uint64) uint64 {
return (slot % SlotsPerEpoch) + 1
}

func makeRequest(ctx context.Context, client http.Client, method, url string, payload any) (*http.Response, error) {
var req *http.Request
var err error
Expand Down
132 changes: 74 additions & 58 deletions services/housekeeper/housekeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Housekeeper struct {

headSlot uberatomic.Uint64

lastValdatorUpdateSlot uberatomic.Uint64
lastValdatorIsUpdating uberatomic.Bool

proposersAlreadySaved map[uint64]string // to avoid repeating redis writes
}

Expand Down Expand Up @@ -79,14 +82,8 @@ func (hk *Housekeeper) Start() (err error) {
go hk.updateBlockBuildersInRedis()
go hk.updateValidatorRegistrationsInRedis()

// Start the periodic task loops
go hk.periodicTaskUpdateKnownValidators()
go hk.periodicTaskLogValidators()
go hk.periodicTaskUpdateBuilderStatusInRedis()

// Process the current slot
headSlot := bestSyncStatus.HeadSlot
hk.processNewSlot(headSlot)
hk.processNewSlot(bestSyncStatus.HeadSlot)

// Start regular slot updates
c := make(chan beaconclient.HeadEventData)
Expand All @@ -97,55 +94,23 @@ func (hk *Housekeeper) Start() (err error) {
}
}

func (hk *Housekeeper) periodicTaskLogValidators() {
for {
numRegisteredValidators, err := hk.db.NumRegisteredValidators()
if err == nil {
hk.log.WithField("numRegisteredValidators", numRegisteredValidators).Infof("registered validators: %d", numRegisteredValidators)
} else {
hk.log.WithError(err).Error("failed to get number of registered validators")
}

activeValidators, err := hk.redis.GetActiveValidators()
if err == nil {
hk.log.WithField("numActiveValidators", len(activeValidators)).Infof("active validators: %d", len(activeValidators))
} else {
hk.log.WithError(err).Error("failed to get number of active validators")
}

time.Sleep(common.DurationPerEpoch / 2)
}
}

func (hk *Housekeeper) periodicTaskUpdateKnownValidators() {
for {
hk.log.Debug("periodicTaskUpdateKnownValidators start")
hk.updateKnownValidators()
hk.log.Debug("periodicTaskUpdateKnownValidators done")

// Wait half an epoch
time.Sleep(common.DurationPerEpoch / 2)
}
}

func (hk *Housekeeper) periodicTaskUpdateBuilderStatusInRedis() {
for {
// builders, err := hk.da
time.Sleep(common.DurationPerEpoch / 2)
}
}

func (hk *Housekeeper) processNewSlot(headSlot uint64) {
prevHeadSlot := hk.headSlot.Load()
if headSlot <= prevHeadSlot {
return
}
hk.headSlot.Store(headSlot)

// kick of a possible validator update
go hk.updateKnownValidators()

log := hk.log.WithFields(logrus.Fields{
"headSlot": headSlot,
"headSlotPos": common.SlotPos(headSlot),
"prevHeadSlot": prevHeadSlot,
})

// Print any missed slots
if prevHeadSlot > 0 {
for s := prevHeadSlot + 1; s < headSlot; s++ {
log.WithField("missedSlot", s).Warnf("missed slot: %d", s)
Expand All @@ -154,52 +119,103 @@ func (hk *Housekeeper) processNewSlot(headSlot uint64) {

// Update proposer duties
go hk.updateProposerDuties(headSlot)
go func() {
err := hk.redis.SetStats(datastore.RedisStatsFieldLatestSlot, headSlot)
if err != nil {
log.WithError(err).Error("failed to set stats")
}
}()

hk.headSlot.Store(headSlot)
// Set headSlot in redis (for the website)
err := hk.redis.SetStats(datastore.RedisStatsFieldLatestSlot, headSlot)
if err != nil {
log.WithError(err).Error("failed to set stats")
}

currentEpoch := headSlot / common.SlotsPerEpoch
log.WithFields(logrus.Fields{
"epoch": currentEpoch,
"slotStartNextEpoch": (currentEpoch + 1) * common.SlotsPerEpoch,
}).Infof("updated headSlot to %d", headSlot)
}

// updateKnownValidators queries the full list of known validators from the beacon node
// and stores it in redis. For the CL client this is an expensive operation and takes a bunch
// of resources. This is why we schedule the requests for slot 4 and 20 of every epoch,
// 6 seconds into the slot (on suggestion of @potuz). It's also run once at startup.
func (hk *Housekeeper) updateKnownValidators() {
// Ensure there's only one at a time
if isUpdating := hk.lastValdatorIsUpdating.Swap(true); isUpdating {
return
}
defer hk.lastValdatorIsUpdating.Store(false)

// Load data and prepare logs
headSlot := hk.headSlot.Load()
headSlotPos := common.SlotPos(headSlot) // 1-based position in epoch (32 slots, 1..32)
lastUpdateSlot := hk.lastValdatorUpdateSlot.Load()
log := hk.log.WithFields(logrus.Fields{
"headSlot": headSlot,
"headSlotPos": headSlotPos,
"lastUpdateSlot": lastUpdateSlot,
"method": "updateKnownValidators",
})
log.Debug("updateKnownValidators init")

// Abort if we already had this slot
if headSlot <= lastUpdateSlot {
return
}

// Minimum amount of slots between updates
slotsSinceLastUpdate := headSlot - lastUpdateSlot
if slotsSinceLastUpdate < 6 {
return
}

// Force update after a longer time since last successful update
forceUpdate := slotsSinceLastUpdate > 32

// Proceed only if forced, or on slot-position 4 or 20
if !forceUpdate && headSlotPos != 4 && headSlotPos != 20 {
return
}

// Wait for 6s into the slot
time.Sleep(6 * time.Second)

//
// Execute update now
//
// Query beacon node for known validators
hk.log.Debug("Querying validators from beacon node... (this may take a while)")
log.Info("Querying validators from beacon node... (this may take a while)")
timeStartFetching := time.Now()
validators, err := hk.beaconClient.GetStateValidators(beaconclient.StateIDHead) // head is fastest
if err != nil {
hk.log.WithError(err).Error("failed to fetch validators from all beacon nodes")
log.WithError(err).Error("failed to fetch validators from all beacon nodes")
return
}

numValidators := len(validators)
log := hk.log.WithField("numKnownValidators", numValidators)
log.WithField("durationFetchValidators", time.Since(timeStartFetching).Seconds()).Infof("received validators from beacon-node")
log = log.WithField("numKnownValidators", numValidators)
log.WithField("durationFetchValidatorsMs", time.Since(timeStartFetching).Milliseconds()).Infof("received validators from beacon-node")

// Store total number of validators
err = hk.redis.SetStats(datastore.RedisStatsFieldValidatorsTotal, fmt.Sprint(numValidators))
if err != nil {
log.WithError(err).Error("failed to set stats for RedisStatsFieldValidatorsTotal")
}

// At this point, consider the update successful
hk.lastValdatorUpdateSlot.Store(headSlot)

// Update Redis with validators
log.Debug("Writing to Redis...")
timeStartWriting := time.Now()

printCounter := len(hk.proposersAlreadySaved) == 0 // only on first round
// This process can take very long, that's why it prints a log line every 10k validators
printCounter := len(hk.proposersAlreadySaved) == 0 // only do this on service startup

i := 0
newValidators := 0
for _, validator := range validators {
i++
if printCounter && i%10000 == 0 {
hk.log.Debugf("writing to redis: %d / %d", i, numValidators)
log.Debugf("writing to redis: %d / %d", i, numValidators)
}

// avoid resaving if index->pubkey mapping is the same
Expand Down

0 comments on commit cfe9c9c

Please sign in to comment.