Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit retry attempts in SIRI-SX updater #5264

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.opentripplanner.ext.siri.updater;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.UUID;
import org.opentripplanner.ext.siri.SiriAlertsUpdateHandler;
import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher;
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.framework.lang.OtpRetry;
import org.opentripplanner.framework.lang.OtpRetryBuilder;
import org.opentripplanner.routing.impl.TransitAlertServiceImpl;
import org.opentripplanner.routing.services.TransitAlertService;
import org.opentripplanner.transit.service.DefaultTransitService;
Expand All @@ -20,16 +23,23 @@
public class SiriSXUpdater extends PollingGraphUpdater implements TransitAlertProvider {

private static final Logger LOG = LoggerFactory.getLogger(SiriSXUpdater.class);
private static final long RETRY_INTERVAL_MILLIS = 5000;
private static final int RETRY_MAX_ATTEMPTS = 3;
private static final Duration RETRY_INITIAL_DELAY = Duration.ofMillis(5000);
private static final int RETRY_BACKOFF = 2;

private final String url;
private final String originalRequestorRef;
private final TransitAlertService transitAlertService;
private final SiriAlertsUpdateHandler updateHandler;
private WriteToGraphCallback saveResultOnGraph;
private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusWeeks(1);
private String requestorRef;
/**
* Global retry counter used to create a new unique requestorRef after each retry.
*/
private int retryCount = 0;
private final SiriHttpLoader siriHttpLoader;
private final OtpRetry retry;

public SiriSXUpdater(SiriSXUpdaterParameters config, TransitModel transitModel) {
super(config);
Expand All @@ -55,6 +65,16 @@ public SiriSXUpdater(SiriSXUpdaterParameters config, TransitModel transitModel)
);
siriHttpLoader = new SiriHttpLoader(url, config.timeout(), config.requestHeaders());

retry =
new OtpRetryBuilder()
.withName("SIRI-SX Update")
.withMaxAttempts(RETRY_MAX_ATTEMPTS)
.withInitialRetryInterval(RETRY_INITIAL_DELAY)
.withBackoffMultiplier(RETRY_BACKOFF)
.withRetryableException(OtpHttpClientException.class::isInstance)
.withOnRetry(this::updateRequestorRef)
.build();

LOG.info(
"Creating real-time alert updater (SIRI SX) running every {} seconds : {}",
pollingPeriod(),
Expand All @@ -76,38 +96,28 @@ public String toString() {
}

@Override
protected void runPolling() throws InterruptedException {
try {
boolean moreData = false;
do {
Siri updates = getUpdates();
if (updates != null) {
ServiceDelivery serviceDelivery = updates.getServiceDelivery();
moreData = Boolean.TRUE.equals(serviceDelivery.isMoreData());
// Mark this updater as primed after last page of updates. Copy moreData into a final
// primitive, because the object moreData persists across iterations.
final boolean markPrimed = !moreData;
if (serviceDelivery.getSituationExchangeDeliveries() != null) {
saveResultOnGraph.execute((graph, transitModel) -> {
updateHandler.update(serviceDelivery);
if (markPrimed) primed = true;
});
}
}
} while (moreData);
} catch (OtpHttpClientException e) {
final long sleepTime = RETRY_INTERVAL_MILLIS + RETRY_INTERVAL_MILLIS * retryCount;

retryCount++;

LOG.info("Caught timeout - retry no. {} after {} millis", retryCount, sleepTime);

Thread.sleep(sleepTime);
protected void runPolling() {
retry.execute(this::updateSiri);
}

// Creating new requestorRef so all data is refreshed
requestorRef = originalRequestorRef + "-retry-" + retryCount;
runPolling();
}
private void updateSiri() {
boolean moreData = false;
do {
Siri updates = getUpdates();
if (updates != null) {
ServiceDelivery serviceDelivery = updates.getServiceDelivery();
moreData = Boolean.TRUE.equals(serviceDelivery.isMoreData());
// Mark this updater as primed after last page of updates. Copy moreData into a final
// primitive, because the object moreData persists across iterations.
final boolean markPrimed = !moreData;
if (serviceDelivery.getSituationExchangeDeliveries() != null) {
saveResultOnGraph.execute((graph, transitModel) -> {
updateHandler.update(serviceDelivery);
if (markPrimed) primed = true;
});
}
}
} while (moreData);
}

private Siri getUpdates() {
Expand All @@ -129,13 +139,30 @@ private Siri getUpdates() {
lastTimestamp = responseTimestamp;
return siri;
} catch (OtpHttpClientException e) {
LOG.info("Failed after {} ms", (System.currentTimeMillis() - t1));
LOG.error("Error reading SIRI feed from " + url, e);
LOG.info(
"Retryable exception while reading SIRI feed from {} after {} ms",
url,
(System.currentTimeMillis() - t1)
);
throw e;
} catch (Exception e) {
LOG.info("Failed after {} ms", (System.currentTimeMillis() - t1));
LOG.error("Error reading SIRI feed from " + url, e);
LOG.error(
"Non-retryable exception while reading SIRI feed from {} after {} ms",
url,
(System.currentTimeMillis() - t1)
);
}
return null;
}

/**
* Reset the session with the SIRI-SX server by creating a new unique requestorRef. This is
* required if a network error causes a request to fail and let the session in an undetermined
* state. Using a new requestorRef will force the SIRI-SX server to send again all available
* messages.
*/
private void updateRequestorRef() {
retryCount++;
requestorRef = originalRequestorRef + "-retry-" + retryCount;
}
}
72 changes: 72 additions & 0 deletions src/main/java/org/opentripplanner/framework/lang/OtpRetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.opentripplanner.framework.lang;

import java.time.Duration;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Retry an operation with a configurable number of attempts.
*/
public class OtpRetry {

private static final Logger LOG = LoggerFactory.getLogger(OtpRetry.class);

private final String name;
private final int maxAttempts;
private final Duration initialRetryInterval;
private final int backoffMultiplier;
private final Runnable onRetry;
private final Predicate<Exception> retryableException;

OtpRetry(
String name,
int maxAttempts,
Duration initialRetryInterval,
int backoffMultiplier,
Predicate<Exception> retryableException,
Runnable onRetry
) {
this.name = name;
this.maxAttempts = maxAttempts;
this.initialRetryInterval = initialRetryInterval;
this.backoffMultiplier = backoffMultiplier;
this.retryableException = retryableException;
this.onRetry = onRetry;
}

public void execute(Runnable retryable) {
int attempts = 0;
long sleepTime = initialRetryInterval.toMillis();
while (true) {
try {
retryable.run();
return;
} catch (Exception e) {
if (!retryableException.test(e)) {
throw new OtpRetryException("Operation failed with non-retryable exception", e);
}
attempts++;
if (attempts > maxAttempts) {
throw new OtpRetryException("Operation failed after " + attempts + " attempts", e);
}
LOG.info(
"Operation {} failed with retryable exception: {}. Retrying {}/{} in {} millis",
name,
e.getMessage(),
attempts,
maxAttempts,
sleepTime
);
onRetry.run();
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new OtpRetryException(ex);
}
sleepTime = sleepTime * backoffMultiplier;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opentripplanner.framework.lang;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Predicate;

public class OtpRetryBuilder {

public static final int DEFAULT_MAX_ATTEMPTS = 3;
public static final Duration DEFAULT_INITIAL_RETRYABLE_INTERVAL = Duration.of(
1,
ChronoUnit.SECONDS
);
/**
* Retry all exceptions by default.
*/
public static final Predicate<Exception> DEFAULT_RETRYABLE_EXCEPTION = e -> true;
public static final Runnable DEFAULT_ON_RETRY = () -> {};
private String name;
private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
private Duration initialRetryInterval = DEFAULT_INITIAL_RETRYABLE_INTERVAL;
private int backoffMultiplier;
private Predicate<Exception> retryableException = DEFAULT_RETRYABLE_EXCEPTION;
private Runnable onRetry = DEFAULT_ON_RETRY;

/**
* Name used in log messages to identify the retried operation.
*/
public OtpRetryBuilder withName(String name) {
this.name = name;
return this;
}

/**
* Maximum number of additional attempts after the initial failure.
* With maxAttempts=0 no retry is performed after the initial failure.
*/
public OtpRetryBuilder withMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}

/**
* Initial delay before the first retry.
*/
public OtpRetryBuilder withInitialRetryInterval(Duration initialRetryInterval) {
this.initialRetryInterval = initialRetryInterval;
return this;
}

/**
* Backoff multiplier applied to the initial delay.
*/
public OtpRetryBuilder withBackoffMultiplier(int backoffMultiplier) {
this.backoffMultiplier = backoffMultiplier;
return this;
}

/**
* Predicate identifying the exceptions that should be retried.
* Other exceptions are re-thrown.
*/
public OtpRetryBuilder withRetryableException(Predicate<Exception> retryableException) {
this.retryableException = retryableException;
return this;
}

/**
* Callback invoked before executing each retry.
*/
public OtpRetryBuilder withOnRetry(Runnable onRetry) {
this.onRetry = onRetry;
return this;
}

public OtpRetry build() {
return new OtpRetry(
name,
maxAttempts,
initialRetryInterval,
backoffMultiplier,
retryableException,
onRetry
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opentripplanner.framework.lang;

public class OtpRetryException extends RuntimeException {

public OtpRetryException(String message, Throwable cause) {
super(message, cause);
}

public OtpRetryException(Throwable cause) {
super(cause);
}
}
Loading