diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 564d97e2f6e..80be79f806c 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -197,6 +197,7 @@ protected ForwardSyncService createForwardSyncService() { syncConfig.getForwardSyncMaxPendingBatches(), syncConfig.getForwardSyncMaxBlocksPerMinute(), syncConfig.getForwardSyncMaxBlobSidecarsPerMinute(), + syncConfig.getForwardSyncMaxDistanceFromHead(), spec); } else { LOG.info("Using single peer sync"); @@ -210,6 +211,7 @@ protected ForwardSyncService createForwardSyncService() { blobSidecarManager, blockBlobSidecarsTrackersPool, syncConfig.getForwardSyncBatchSize(), + syncConfig.getForwardSyncMaxDistanceFromHead(), spec); } return forwardSync; diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java index 051baf9d734..36bb5fe93a0 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import java.util.OptionalInt; import tech.pegasys.teku.networking.eth2.P2PConfig; public class SyncConfig { @@ -42,6 +43,7 @@ public class SyncConfig { private final int forwardSyncMaxPendingBatches; private final int forwardSyncMaxBlocksPerMinute; private final int forwardSyncMaxBlobSidecarsPerMinute; + private final OptionalInt forwardSyncMaxDistanceFromHead; private SyncConfig( final boolean isEnabled, @@ -52,7 +54,8 @@ private SyncConfig( final int forwardSyncBatchSize, final int forwardSyncMaxPendingBatches, final int forwardSyncMaxBlocksPerMinute, - final int forwardSyncMaxBlobSidecarsPerMinute) { + final int forwardSyncMaxBlobSidecarsPerMinute, + final OptionalInt forwardSyncMaxDistanceFromHead) { this.isEnabled = isEnabled; this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled; this.reconstructHistoricStatesEnabled = reconstructHistoricStatesEnabled; @@ -62,6 +65,7 @@ private SyncConfig( this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches; this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute; this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute; + this.forwardSyncMaxDistanceFromHead = forwardSyncMaxDistanceFromHead; } public static Builder builder() { @@ -104,6 +108,10 @@ public int getForwardSyncMaxBlobSidecarsPerMinute() { return forwardSyncMaxBlobSidecarsPerMinute; } + public OptionalInt getForwardSyncMaxDistanceFromHead() { + return forwardSyncMaxDistanceFromHead; + } + public static class Builder { private Boolean isEnabled; private Boolean isMultiPeerSyncEnabled = DEFAULT_MULTI_PEER_SYNC_ENABLED; @@ -115,6 +123,7 @@ public static class Builder { private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE; private Integer forwardSyncMaxBlobSidecarsPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE; + private OptionalInt forwardSyncMaxDistanceFromHead = OptionalInt.empty(); private Builder() {} @@ -129,7 +138,8 @@ public SyncConfig build() { forwardSyncBatchSize, forwardSyncMaxPendingBatches, forwardSyncMaxBlocksPerMinute, - forwardSyncMaxBlobSidecarsPerMinute); + forwardSyncMaxBlobSidecarsPerMinute, + forwardSyncMaxDistanceFromHead); } private void initMissingDefaults() { @@ -175,6 +185,15 @@ public Builder forwardSyncMaxPendingBatches(final Integer forwardSyncMaxPendingB return this; } + public Builder forwardSyncMaxDistanceFromHead(final Integer forwardSyncMaxDistanceFromHead) { + if (forwardSyncMaxDistanceFromHead == null) { + this.forwardSyncMaxDistanceFromHead = OptionalInt.empty(); + } else { + this.forwardSyncMaxDistanceFromHead = OptionalInt.of(forwardSyncMaxDistanceFromHead); + } + return this; + } + public Builder forwardSyncMaxBlocksPerMinute(final Integer forwardSyncMaxBlocksPerMinute) { checkNotNull(forwardSyncMaxBlocksPerMinute); this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute; diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinder.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinder.java index ed928d78999..c36f3fa879d 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinder.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinder.java @@ -15,6 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Optional; +import java.util.OptionalInt; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; @@ -46,9 +47,15 @@ public class MultipeerCommonAncestorFinder { } public static MultipeerCommonAncestorFinder create( - final RecentChainData recentChainData, final EventThread eventThread, final Spec spec) { + final RecentChainData recentChainData, + final EventThread eventThread, + final OptionalInt maxDistanceFromHeadReached, + final Spec spec) { return new MultipeerCommonAncestorFinder( - recentChainData, new CommonAncestor(recentChainData), eventThread, spec); + recentChainData, + new CommonAncestor(recentChainData, maxDistanceFromHeadReached), + eventThread, + spec); } public SafeFuture findCommonAncestor(final TargetChain targetChain) { @@ -62,7 +69,7 @@ public SafeFuture findCommonAncestor(final TargetChain targetChain) { } return findCommonAncestor(latestFinalizedSlot, targetChain) - .thenPeek(ancestor -> LOG.trace("Found common ancestor at slot {}", ancestor)); + .thenPeek(ancestor -> LOG.info("Found common ancestor at slot {}", ancestor)); } private SafeFuture findCommonAncestor( @@ -70,15 +77,15 @@ private SafeFuture findCommonAncestor( eventThread.checkOnEventThread(); final SyncSource source1 = targetChain.selectRandomPeer().orElseThrow(); final Optional source2 = targetChain.selectRandomPeer(source1); - // Only one peer available, just go with it's common ancestor + // Only one peer available, just go with its common ancestor final SafeFuture source1CommonAncestor = commonAncestorFinder.getCommonAncestor( source1, latestFinalizedSlot, targetChain.getChainHead().getSlot()); if (source2.isEmpty()) { - LOG.trace("Finding common ancestor from one peer"); + LOG.debug("Finding common ancestor from one peer"); return source1CommonAncestor; } - LOG.trace("Finding common ancestor from two peers"); + LOG.debug("Finding common ancestor from two peers"); // Two peers available, so check they have the same common ancestor return source1CommonAncestor .thenCombineAsync( @@ -90,8 +97,7 @@ private SafeFuture findCommonAncestor( eventThread) .exceptionally( error -> { - LOG.debug("Failed to find common ancestor. Starting sync from finalized slot", error); - return latestFinalizedSlot; + throw new RuntimeException("Failed to find common ancestor.", error); }); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java index 6082e13cfa2..62d95bb9a90 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.beacon.sync.forward.multipeer; +import java.util.OptionalInt; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.beacon.sync.events.SyncingStatus; import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService; @@ -77,6 +78,7 @@ public static MultipeerSyncService create( final int maxPendingBatches, final int maxBlocksPerMinute, final int maxBlobSidecarsPerMinute, + final OptionalInt maxDistanceFromHeadReached, final Spec spec) { final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory); final SettableLabelledGauge targetChainCountGauge = @@ -99,7 +101,8 @@ public static MultipeerSyncService create( eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()), batchSize, maxPendingBatches, - MultipeerCommonAncestorFinder.create(recentChainData, eventThread, spec), + MultipeerCommonAncestorFinder.create( + recentChainData, eventThread, maxDistanceFromHeadReached, spec), timeProvider); final SyncController syncController = new SyncController( diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java index bdec36e9cab..30794f6229f 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java @@ -24,6 +24,7 @@ import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; +import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.storage.client.RecentChainData; @@ -156,7 +157,8 @@ private InProgressSync startSync(final SyncTarget syncTarget) { syncResult.finishAsync( this::onSyncComplete, error -> { - LOG.error("Sync process failed to complete"); + LOG.error( + "Sync process failed to complete: {}", ExceptionUtil.getMessageOrSimpleName(error)); LOG.debug("Error encountered during sync", error); onSyncComplete(SyncResult.FAILED); }, diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestor.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestor.java index 0ca2697b8d0..6ca1a223418 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestor.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestor.java @@ -15,6 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Optional; +import java.util.OptionalInt; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -38,15 +39,21 @@ public class CommonAncestor { private final RecentChainData recentChainData; private final int maxAttempts; + private final OptionalInt maxDistanceFromHead; - public CommonAncestor(final RecentChainData recentChainData) { - this(recentChainData, DEFAULT_MAX_ATTEMPTS); + public CommonAncestor( + final RecentChainData recentChainData, final OptionalInt maxDistanceFromHead) { + this(recentChainData, DEFAULT_MAX_ATTEMPTS, maxDistanceFromHead); } @VisibleForTesting - CommonAncestor(final RecentChainData recentChainData, final int maxAttempts) { + CommonAncestor( + final RecentChainData recentChainData, + final int maxAttempts, + final OptionalInt maxDistanceFromHead) { this.recentChainData = recentChainData; this.maxAttempts = maxAttempts; + this.maxDistanceFromHead = maxDistanceFromHead; } public SafeFuture getCommonAncestor( @@ -74,12 +81,15 @@ private SafeFuture getCommonAncestor( final UInt64 firstRequestedSlot, final UInt64 firstNonFinalSlot, final int attempt) { + final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT_PER_ATTEMPT); + + if (maxDistanceFromHeadReached(lastSlot)) { + return SafeFuture.failedFuture(new RuntimeException("Max distance from head reached")); + } if (attempt >= maxAttempts || firstRequestedSlot.isLessThanOrEqualTo(firstNonFinalSlot)) { return SafeFuture.completedFuture(firstNonFinalSlot); } - final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT_PER_ATTEMPT); - LOG.debug("Sampling ahead from {} to {}.", firstRequestedSlot, lastSlot); final BestBlockListener blockResponseListener = new BestBlockListener(recentChainData); @@ -95,7 +105,15 @@ private SafeFuture getCommonAncestor( __ -> blockResponseListener .getBestSlot() - .map(SafeFuture::completedFuture) + .>map( + bestSlot -> { + if (maxDistanceFromHeadReached(bestSlot)) { + return SafeFuture.failedFuture( + new RuntimeException("Max distance from head reached")); + } else { + return SafeFuture.completedFuture(bestSlot); + } + }) .orElseGet( () -> getCommonAncestor( @@ -106,6 +124,15 @@ private SafeFuture getCommonAncestor( attempt + 1))); } + private boolean maxDistanceFromHeadReached(final UInt64 slot) { + if (maxDistanceFromHead.isEmpty()) { + return false; + } + final UInt64 oldestAcceptedSlotFromHead = + recentChainData.getHeadSlot().minusMinZero(maxDistanceFromHead.getAsInt()); + return slot.isLessThan(oldestAcceptedSlotFromHead); + } + private static class BestBlockListener implements RpcResponseListener { private final RecentChainData recentChainData; private Optional bestSlot; diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSync.java index 93c9a577a21..5aeb9acbb77 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSync.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,6 +78,7 @@ public class PeerSync { private final AsyncRunner asyncRunner; private final Counter blockImportSuccessResult; private final Counter blockImportFailureResult; + private final OptionalInt maxDistanceFromHeadReached; private final AtomicInteger throttledRequestCount = new AtomicInteger(0); @@ -89,6 +91,7 @@ public PeerSync( final BlobSidecarManager blobSidecarManager, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final int batchSize, + final OptionalInt maxDistanceFromHeadReached, final MetricsSystem metricsSystem) { this.spec = recentChainData.getSpec(); this.asyncRunner = asyncRunner; @@ -105,6 +108,7 @@ public PeerSync( this.blockImportSuccessResult = blockImportCounter.labels("imported"); this.blockImportFailureResult = blockImportCounter.labels("rejected"); this.batchSize = UInt64.valueOf(batchSize); + this.maxDistanceFromHeadReached = maxDistanceFromHeadReached; this.minSlotsToProgressPerRequest = this.batchSize.dividedBy(4); } @@ -155,7 +159,8 @@ private SafeFuture executeSync( if (!findCommonAncestor) { return SafeFuture.completedFuture(startSlot); } - CommonAncestor ancestor = new CommonAncestor(recentChainData); + CommonAncestor ancestor = + new CommonAncestor(recentChainData, maxDistanceFromHeadReached); return ancestor.getCommonAncestor(peer, startSlot, status.getHeadSlot()); }) .thenCompose( diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SinglePeerSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SinglePeerSyncServiceFactory.java index adc26e82898..20984b8a31a 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SinglePeerSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SinglePeerSyncServiceFactory.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.beacon.sync.forward.singlepeer; +import java.util.OptionalInt; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService; import tech.pegasys.teku.infrastructure.async.AsyncRunner; @@ -34,6 +35,7 @@ public static ForwardSyncService create( final BlobSidecarManager blobSidecarManager, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final int batchSize, + final OptionalInt maxDistanceFromHeadReached, final Spec spec) { final SyncManager syncManager = SyncManager.create( @@ -45,6 +47,7 @@ public static ForwardSyncService create( blockBlobSidecarsTrackersPool, metricsSystem, batchSize, + maxDistanceFromHeadReached, spec); return new SinglePeerSyncService(syncManager, recentChainData); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SyncManager.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SyncManager.java index 984b35a9b65..971b33112c5 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SyncManager.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/SyncManager.java @@ -21,6 +21,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -94,6 +95,7 @@ public static SyncManager create( final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final MetricsSystem metricsSystem, final int batchSize, + final OptionalInt maxDistanceFromHeadReached, final Spec spec) { final PeerSync peerSync = new PeerSync( @@ -103,6 +105,7 @@ public static SyncManager create( blobSidecarManager, blockBlobSidecarsTrackersPool, batchSize, + maxDistanceFromHeadReached, metricsSystem); return new SyncManager(asyncRunner, network, recentChainData, peerSync, spec); } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinderTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinderTest.java index 27ab2180496..fed7c775fd8 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinderTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerCommonAncestorFinderTest.java @@ -144,7 +144,7 @@ void shouldUseLatestFinalizedSlotWhenMultipleSourcesDisagree() { } @Test - void shouldUseLatestFinalizedSlotWhenOneSourceFailsToFindCommonAncestor() { + void shouldFailWhenOneSourceFailsToFindCommonAncestor() { final TargetChain chain = chainWith( new SlotAndBlockRoot(UInt64.valueOf(10_000), dataStructureUtil.randomBytes32()), @@ -167,7 +167,7 @@ void shouldUseLatestFinalizedSlotWhenOneSourceFailsToFindCommonAncestor() { source1CommonAncestor.completeExceptionally(new RuntimeException("Doh!")); source2CommonAncestor.complete(UInt64.valueOf(1485)); - assertThat(result).isCompletedWithValue(finalizedSlot); + assertThat(result).isCompletedExceptionally(); } private SafeFuture findCommonAncestor(final TargetChain chain) { diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestorTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestorTest.java index 3208dff0b96..b9166a0594d 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestorTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/CommonAncestorTest.java @@ -24,6 +24,7 @@ import static tech.pegasys.teku.beacon.sync.forward.singlepeer.CommonAncestor.SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE; import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; +import java.util.OptionalInt; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -31,7 +32,11 @@ public class CommonAncestorTest extends AbstractSyncTest { - private final CommonAncestor commonAncestor = new CommonAncestor(recentChainData, 4); + private final int maxHeadDistance = 2; + private final CommonAncestor commonAncestor = + new CommonAncestor(recentChainData, 4, OptionalInt.empty()); + private final CommonAncestor commonAncestorWithMaxHeadDistance = + new CommonAncestor(recentChainData, 4, OptionalInt.of(maxHeadDistance)); @Test void shouldNotSearchCommonAncestorWithoutSufficientLocalData() { @@ -174,6 +179,75 @@ void shouldStopSearchingIfFirstNonFinalSlotIsReached() { .isCompletedWithValue(firstNonFinalSlot); } + @Test + void shouldCompleteExceptionallyIfMaxHeadDistanceIsReached() { + final UInt64 firstNonFinalSlot = UInt64.valueOf(1000); + + final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(21)); + final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(20)); + final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1)); + final SafeFuture requestFuture = new SafeFuture<>(); + when(peer.requestBlocksByRange(eq(syncStartSlot), eq(BLOCK_COUNT_PER_ATTEMPT), any())) + .thenReturn(requestFuture); + final PeerStatus status = + withPeerHeadSlot( + currentRemoteHead, + spec.computeEpochAtSlot(currentRemoteHead), + dataStructureUtil.randomBytes32()); + when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead); + when(recentChainData.containsBlock(any())).thenReturn(true).thenReturn(false); + + final SafeFuture futureSlot = + commonAncestorWithMaxHeadDistance.getCommonAncestor( + peer, firstNonFinalSlot, status.getHeadSlot()); + + assertThat(futureSlot.isDone()).isFalse(); + + verifyAndRespond(syncStartSlot, requestFuture); + + assertThatSafeFuture(futureSlot) + .isCompletedExceptionallyWithMessage("Max distance from head reached"); + } + + @Test + void shouldStopSearchingAndReturnExceptionallyIfMaxHeadDistanceIsReached() { + final UInt64 firstNonFinalSlot = dataStructureUtil.randomUInt64(); + + final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(21)); + final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(20)); + final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1)); + final SafeFuture requestFuture = new SafeFuture<>(); + when(peer.requestBlocksByRange(eq(syncStartSlot), eq(BLOCK_COUNT_PER_ATTEMPT), any())) + .thenReturn(requestFuture); + final PeerStatus status = + withPeerHeadSlot( + currentRemoteHead, + spec.computeEpochAtSlot(currentRemoteHead), + dataStructureUtil.randomBytes32()); + + when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead); + when(recentChainData.containsBlock(any())).thenReturn(false); + + final SafeFuture futureSlot = + commonAncestorWithMaxHeadDistance.getCommonAncestor( + peer, firstNonFinalSlot, status.getHeadSlot()); + + assertThat(futureSlot.isDone()).isFalse(); + + verify(peer) + .requestBlocksByRange( + eq(syncStartSlot), + eq(BLOCK_COUNT_PER_ATTEMPT), + blockResponseListenerArgumentCaptor.capture()); + + requestFuture.complete(null); + + assertThatSafeFuture( + commonAncestorWithMaxHeadDistance.getCommonAncestor( + peer, firstNonFinalSlot, status.getHeadSlot())) + .isCompletedExceptionallyWithMessage("Max distance from head reached"); + } + private void verifyAndRespond( final UInt64 syncStartSlot, final SafeFuture requestFutureAttempt) { verify(peer) diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java index 505cd195228..04ae2590d5e 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import java.util.concurrent.CancellationException; import java.util.function.Supplier; import org.apache.tuweni.bytes.Bytes; @@ -100,6 +101,7 @@ public void setUp() { blobSidecarManager, blockBlobSidecarsTrackersPool, FORWARD_SYNC_BATCH_SIZE.intValue(), + OptionalInt.empty(), new NoOpMetricsSystem()); } diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index 6fc2fc55743..73894ad2fd7 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.function.Consumer; import org.apache.tuweni.bytes.Bytes32; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; @@ -198,6 +199,7 @@ public static SyncingNodeManager create( BlockBlobSidecarsTrackersPool.NOOP, new NoOpMetricsSystem(), SyncConfig.DEFAULT_FORWARD_SYNC_BATCH_SIZE, + OptionalInt.empty(), spec); final ForwardSyncService syncService = new SinglePeerSyncService(syncManager, recentChainData); diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java index 40fd299d68a..eff175de407 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java @@ -264,6 +264,16 @@ The network interface(s) on which the node listens for P2P communication. private Integer forwardSyncBlocksRateLimit = SyncConfig.DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE; + @Option( + names = {"--Xp2p-sync-max-distance-from-head"}, + paramLabel = "", + showDefaultValue = Visibility.ALWAYS, + description = + "Maximum number slots to jump back when trying to find a common ancestor with target chain.", + hidden = true, + arity = "1") + private Integer forwardSyncMaxDistanceFromHead; + @Option( names = {"--Xp2p-sync-blob-sidecars-rate-limit"}, paramLabel = "", @@ -514,7 +524,8 @@ public void configure(final TekuConfiguration.Builder builder) { .forwardSyncMaxBlocksPerMinute(forwardSyncBlocksRateLimit) .forwardSyncMaxBlobSidecarsPerMinute(forwardSyncBlobSidecarsRateLimit) .forwardSyncBatchSize(forwardSyncBatchSize) - .forwardSyncMaxPendingBatches(forwardSyncMaxPendingBatches)); + .forwardSyncMaxPendingBatches(forwardSyncMaxPendingBatches) + .forwardSyncMaxDistanceFromHead(forwardSyncMaxDistanceFromHead)); if (subscribeAllSubnetsEnabled) { builder