Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync common ancestor improvements #9211

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ protected ForwardSyncService createForwardSyncService() {
syncConfig.getForwardSyncMaxPendingBatches(),
syncConfig.getForwardSyncMaxBlocksPerMinute(),
syncConfig.getForwardSyncMaxBlobSidecarsPerMinute(),
syncConfig.getForwardSyncMaxDistanceFromHead(),
spec);
} else {
LOG.info("Using single peer sync");
Expand All @@ -210,6 +211,7 @@ protected ForwardSyncService createForwardSyncService() {
blobSidecarManager,
blockBlobSidecarsTrackersPool,
syncConfig.getForwardSyncBatchSize(),
syncConfig.getForwardSyncMaxDistanceFromHead(),
spec);
}
return forwardSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.OptionalInt;
import tech.pegasys.teku.networking.eth2.P2PConfig;

public class SyncConfig {
Expand Down Expand Up @@ -42,6 +43,7 @@ public class SyncConfig {
private final int forwardSyncMaxPendingBatches;
private final int forwardSyncMaxBlocksPerMinute;
private final int forwardSyncMaxBlobSidecarsPerMinute;
private final OptionalInt forwardSyncMaxDistanceFromHead;

private SyncConfig(
final boolean isEnabled,
Expand All @@ -52,7 +54,8 @@ private SyncConfig(
final int forwardSyncBatchSize,
final int forwardSyncMaxPendingBatches,
final int forwardSyncMaxBlocksPerMinute,
final int forwardSyncMaxBlobSidecarsPerMinute) {
final int forwardSyncMaxBlobSidecarsPerMinute,
final OptionalInt forwardSyncMaxDistanceFromHead) {
this.isEnabled = isEnabled;
this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled;
this.reconstructHistoricStatesEnabled = reconstructHistoricStatesEnabled;
Expand All @@ -62,6 +65,7 @@ private SyncConfig(
this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches;
this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute;
this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute;
this.forwardSyncMaxDistanceFromHead = forwardSyncMaxDistanceFromHead;
}

public static Builder builder() {
Expand Down Expand Up @@ -104,6 +108,10 @@ public int getForwardSyncMaxBlobSidecarsPerMinute() {
return forwardSyncMaxBlobSidecarsPerMinute;
}

public OptionalInt getForwardSyncMaxDistanceFromHead() {
return forwardSyncMaxDistanceFromHead;
}

public static class Builder {
private Boolean isEnabled;
private Boolean isMultiPeerSyncEnabled = DEFAULT_MULTI_PEER_SYNC_ENABLED;
Expand All @@ -115,6 +123,7 @@ public static class Builder {
private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE;
private Integer forwardSyncMaxBlobSidecarsPerMinute =
DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE;
private OptionalInt forwardSyncMaxDistanceFromHead = OptionalInt.empty();

private Builder() {}

Expand All @@ -129,7 +138,8 @@ public SyncConfig build() {
forwardSyncBatchSize,
forwardSyncMaxPendingBatches,
forwardSyncMaxBlocksPerMinute,
forwardSyncMaxBlobSidecarsPerMinute);
forwardSyncMaxBlobSidecarsPerMinute,
forwardSyncMaxDistanceFromHead);
}

private void initMissingDefaults() {
Expand Down Expand Up @@ -175,6 +185,15 @@ public Builder forwardSyncMaxPendingBatches(final Integer forwardSyncMaxPendingB
return this;
}

public Builder forwardSyncMaxDistanceFromHead(final Integer forwardSyncMaxDistanceFromHead) {
if (forwardSyncMaxDistanceFromHead == null) {
this.forwardSyncMaxDistanceFromHead = OptionalInt.empty();
} else {
this.forwardSyncMaxDistanceFromHead = OptionalInt.of(forwardSyncMaxDistanceFromHead);
}
return this;
}

public Builder forwardSyncMaxBlocksPerMinute(final Integer forwardSyncMaxBlocksPerMinute) {
checkNotNull(forwardSyncMaxBlocksPerMinute);
this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
Expand Down Expand Up @@ -46,9 +47,15 @@ public class MultipeerCommonAncestorFinder {
}

public static MultipeerCommonAncestorFinder create(
final RecentChainData recentChainData, final EventThread eventThread, final Spec spec) {
final RecentChainData recentChainData,
final EventThread eventThread,
final OptionalInt maxDistanceFromHeadReached,
final Spec spec) {
return new MultipeerCommonAncestorFinder(
recentChainData, new CommonAncestor(recentChainData), eventThread, spec);
recentChainData,
new CommonAncestor(recentChainData, maxDistanceFromHeadReached),
eventThread,
spec);
}

public SafeFuture<UInt64> findCommonAncestor(final TargetChain targetChain) {
Expand All @@ -62,23 +69,23 @@ public SafeFuture<UInt64> findCommonAncestor(final TargetChain targetChain) {
}

return findCommonAncestor(latestFinalizedSlot, targetChain)
.thenPeek(ancestor -> LOG.trace("Found common ancestor at slot {}", ancestor));
.thenPeek(ancestor -> LOG.info("Found common ancestor at slot {}", ancestor));
}

private SafeFuture<UInt64> findCommonAncestor(
final UInt64 latestFinalizedSlot, final TargetChain targetChain) {
eventThread.checkOnEventThread();
final SyncSource source1 = targetChain.selectRandomPeer().orElseThrow();
final Optional<SyncSource> source2 = targetChain.selectRandomPeer(source1);
// Only one peer available, just go with it's common ancestor
// Only one peer available, just go with its common ancestor
final SafeFuture<UInt64> source1CommonAncestor =
commonAncestorFinder.getCommonAncestor(
source1, latestFinalizedSlot, targetChain.getChainHead().getSlot());
if (source2.isEmpty()) {
LOG.trace("Finding common ancestor from one peer");
LOG.debug("Finding common ancestor from one peer");
return source1CommonAncestor;
}
LOG.trace("Finding common ancestor from two peers");
LOG.debug("Finding common ancestor from two peers");
// Two peers available, so check they have the same common ancestor
return source1CommonAncestor
.thenCombineAsync(
Expand All @@ -90,8 +97,7 @@ private SafeFuture<UInt64> findCommonAncestor(
eventThread)
.exceptionally(
error -> {
LOG.debug("Failed to find common ancestor. Starting sync from finalized slot", error);
return latestFinalizedSlot;
throw new RuntimeException("Failed to find common ancestor.", error);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.forward.multipeer;

import java.util.OptionalInt;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.beacon.sync.events.SyncingStatus;
import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService;
Expand Down Expand Up @@ -77,6 +78,7 @@ public static MultipeerSyncService create(
final int maxPendingBatches,
final int maxBlocksPerMinute,
final int maxBlobSidecarsPerMinute,
final OptionalInt maxDistanceFromHeadReached,
final Spec spec) {
final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory);
final SettableLabelledGauge targetChainCountGauge =
Expand All @@ -99,7 +101,8 @@ public static MultipeerSyncService create(
eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()),
batchSize,
maxPendingBatches,
MultipeerCommonAncestorFinder.create(recentChainData, eventThread, spec),
MultipeerCommonAncestorFinder.create(
recentChainData, eventThread, maxDistanceFromHeadReached, spec),
timeProvider);
final SyncController syncController =
new SyncController(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -156,7 +157,8 @@ private InProgressSync startSync(final SyncTarget syncTarget) {
syncResult.finishAsync(
this::onSyncComplete,
error -> {
LOG.error("Sync process failed to complete");
LOG.error(
"Sync process failed to complete: {}", ExceptionUtil.getMessageOrSimpleName(error));
LOG.debug("Error encountered during sync", error);
onSyncComplete(SyncResult.FAILED);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand All @@ -38,15 +39,21 @@ public class CommonAncestor {

private final RecentChainData recentChainData;
private final int maxAttempts;
private final OptionalInt maxDistanceFromHead;

public CommonAncestor(final RecentChainData recentChainData) {
this(recentChainData, DEFAULT_MAX_ATTEMPTS);
public CommonAncestor(
final RecentChainData recentChainData, final OptionalInt maxDistanceFromHead) {
this(recentChainData, DEFAULT_MAX_ATTEMPTS, maxDistanceFromHead);
}

@VisibleForTesting
CommonAncestor(final RecentChainData recentChainData, final int maxAttempts) {
CommonAncestor(
final RecentChainData recentChainData,
final int maxAttempts,
final OptionalInt maxDistanceFromHead) {
this.recentChainData = recentChainData;
this.maxAttempts = maxAttempts;
this.maxDistanceFromHead = maxDistanceFromHead;
}

public SafeFuture<UInt64> getCommonAncestor(
Expand Down Expand Up @@ -74,12 +81,15 @@ private SafeFuture<UInt64> getCommonAncestor(
final UInt64 firstRequestedSlot,
final UInt64 firstNonFinalSlot,
final int attempt) {
final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT_PER_ATTEMPT);

if (maxDistanceFromHeadReached(lastSlot)) {
return SafeFuture.failedFuture(new RuntimeException("Max distance from head reached"));
}
if (attempt >= maxAttempts || firstRequestedSlot.isLessThanOrEqualTo(firstNonFinalSlot)) {
return SafeFuture.completedFuture(firstNonFinalSlot);
}

final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT_PER_ATTEMPT);

LOG.debug("Sampling ahead from {} to {}.", firstRequestedSlot, lastSlot);

final BestBlockListener blockResponseListener = new BestBlockListener(recentChainData);
Expand All @@ -95,7 +105,15 @@ private SafeFuture<UInt64> getCommonAncestor(
__ ->
blockResponseListener
.getBestSlot()
.map(SafeFuture::completedFuture)
.<SafeFuture<UInt64>>map(
bestSlot -> {
if (maxDistanceFromHeadReached(bestSlot)) {
return SafeFuture.failedFuture(
new RuntimeException("Max distance from head reached"));
} else {
return SafeFuture.completedFuture(bestSlot);
}
})
.orElseGet(
() ->
getCommonAncestor(
Expand All @@ -106,6 +124,15 @@ private SafeFuture<UInt64> getCommonAncestor(
attempt + 1)));
}

private boolean maxDistanceFromHeadReached(final UInt64 slot) {
if (maxDistanceFromHead.isEmpty()) {
return false;
}
final UInt64 oldestAcceptedSlotFromHead =
recentChainData.getHeadSlot().minusMinZero(maxDistanceFromHead.getAsInt());
return slot.isLessThan(oldestAcceptedSlotFromHead);
}

private static class BestBlockListener implements RpcResponseListener<SignedBeaconBlock> {
private final RecentChainData recentChainData;
private Optional<UInt64> bestSlot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class PeerSync {
private final AsyncRunner asyncRunner;
private final Counter blockImportSuccessResult;
private final Counter blockImportFailureResult;
private final OptionalInt maxDistanceFromHeadReached;

private final AtomicInteger throttledRequestCount = new AtomicInteger(0);

Expand All @@ -89,6 +91,7 @@ public PeerSync(
final BlobSidecarManager blobSidecarManager,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int batchSize,
final OptionalInt maxDistanceFromHeadReached,
final MetricsSystem metricsSystem) {
this.spec = recentChainData.getSpec();
this.asyncRunner = asyncRunner;
Expand All @@ -105,6 +108,7 @@ public PeerSync(
this.blockImportSuccessResult = blockImportCounter.labels("imported");
this.blockImportFailureResult = blockImportCounter.labels("rejected");
this.batchSize = UInt64.valueOf(batchSize);
this.maxDistanceFromHeadReached = maxDistanceFromHeadReached;
this.minSlotsToProgressPerRequest = this.batchSize.dividedBy(4);
}

Expand Down Expand Up @@ -155,7 +159,8 @@ private SafeFuture<PeerSyncResult> executeSync(
if (!findCommonAncestor) {
return SafeFuture.completedFuture(startSlot);
}
CommonAncestor ancestor = new CommonAncestor(recentChainData);
CommonAncestor ancestor =
new CommonAncestor(recentChainData, maxDistanceFromHeadReached);
return ancestor.getCommonAncestor(peer, startSlot, status.getHeadSlot());
})
.thenCompose(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.forward.singlepeer;

import java.util.OptionalInt;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
Expand All @@ -34,6 +35,7 @@ public static ForwardSyncService create(
final BlobSidecarManager blobSidecarManager,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int batchSize,
final OptionalInt maxDistanceFromHeadReached,
final Spec spec) {
final SyncManager syncManager =
SyncManager.create(
Expand All @@ -45,6 +47,7 @@ public static ForwardSyncService create(
blockBlobSidecarsTrackersPool,
metricsSystem,
batchSize,
maxDistanceFromHeadReached,
spec);
return new SinglePeerSyncService(syncManager, recentChainData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -94,6 +95,7 @@ public static SyncManager create(
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final MetricsSystem metricsSystem,
final int batchSize,
final OptionalInt maxDistanceFromHeadReached,
final Spec spec) {
final PeerSync peerSync =
new PeerSync(
Expand All @@ -103,6 +105,7 @@ public static SyncManager create(
blobSidecarManager,
blockBlobSidecarsTrackersPool,
batchSize,
maxDistanceFromHeadReached,
metricsSystem);
return new SyncManager(asyncRunner, network, recentChainData, peerSync, spec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void shouldUseLatestFinalizedSlotWhenMultipleSourcesDisagree() {
}

@Test
void shouldUseLatestFinalizedSlotWhenOneSourceFailsToFindCommonAncestor() {
void shouldFailWhenOneSourceFailsToFindCommonAncestor() {
final TargetChain chain =
chainWith(
new SlotAndBlockRoot(UInt64.valueOf(10_000), dataStructureUtil.randomBytes32()),
Expand All @@ -167,7 +167,7 @@ void shouldUseLatestFinalizedSlotWhenOneSourceFailsToFindCommonAncestor() {

source1CommonAncestor.completeExceptionally(new RuntimeException("Doh!"));
source2CommonAncestor.complete(UInt64.valueOf(1485));
assertThat(result).isCompletedWithValue(finalizedSlot);
assertThat(result).isCompletedExceptionally();
}

private SafeFuture<UInt64> findCommonAncestor(final TargetChain chain) {
Expand Down
Loading