Skip to content

Commit

Permalink
Add read timeout support to BoltConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
injectives committed Feb 1, 2025
1 parent aa30d32 commit 53bcf3b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ public CompletionStage<Void> close() {
return close.exceptionally(ignored -> null);
}

@Override
public CompletionStage<Void> setReadTimeout(Duration duration) {
return executeInEventLoop(() -> connection.setReadTimeout(duration));
}

@Override
public BoltConnectionState state() {
var state = stateRef.get();
Expand Down Expand Up @@ -528,6 +533,11 @@ public boolean serverSideRoutingEnabled() {
return serverSideRouting;
}

@Override
public Optional<Duration> defaultReadTimeout() {
return connection.defaultReadTimeoutMillis();
}

private CompletionStage<Void> executeInEventLoop(Runnable runnable) {
var executeFuture = new CompletableFuture<Void>();
Runnable stageCompletingRunnable = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoop;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -56,7 +58,8 @@ public class NetworkConnection implements Connection {
private final boolean ssrEnabled;
private final BoltProtocol protocol;

private final Long connectionReadTimeout;
private final Duration defaultReadTimeout;
private Duration readTimeout;

private ChannelHandler connectionReadTimeoutHandler;

Expand All @@ -70,8 +73,10 @@ public NetworkConnection(Channel channel, LoggingProvider logging) {
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
this.ssrEnabled = ChannelAttributes.ssrEnabled(channel);
this.protocol = BoltProtocol.forChannel(channel);
this.connectionReadTimeout =
ChannelAttributes.connectionReadTimeout(channel).orElse(null);
this.defaultReadTimeout = ChannelAttributes.connectionReadTimeout(channel)
.map(Duration::ofSeconds)
.orElse(null);
this.readTimeout = defaultReadTimeout;
}

@Override
Expand Down Expand Up @@ -179,6 +184,25 @@ public EventLoop eventLoop() {
return channel.eventLoop();
}

@Override
public Optional<Duration> defaultReadTimeoutMillis() {
return Optional.ofNullable(defaultReadTimeout);
}

@Override
public void setReadTimeout(Duration duration) {
if (!channel.eventLoop().inEventLoop()) {
throw new IllegalStateException("This method may only be called in the EventLoop");
}

if (duration != null && duration.toMillis() > 0) {
// only values greater than zero milliseconds are supported
this.readTimeout = duration;
} else {
this.readTimeout = this.defaultReadTimeout;
}
}

private CompletionStage<Void> writeMessageInEventLoop(Message message, ResponseHandler handler) {
var future = new CompletableFuture<Void>();
Runnable runnable = () -> {
Expand Down Expand Up @@ -215,8 +239,9 @@ private void registerConnectionReadTimeout(Channel channel) {
throw new IllegalStateException("This method may only be called in the EventLoop");
}

if (connectionReadTimeout != null && connectionReadTimeoutHandler == null) {
connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS);
if (this.readTimeout != null && connectionReadTimeoutHandler == null) {
connectionReadTimeoutHandler =
new ConnectionReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
channel.pipeline().addFirst(connectionReadTimeoutHandler);
log.log(System.Logger.Level.DEBUG, "Added ConnectionReadTimeoutHandler");
messageDispatcher.setBeforeLastHandlerHook(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.neo4j.driver.internal.bolt.basicimpl.impl.spi;

import io.netty.channel.EventLoop;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
Expand Down Expand Up @@ -48,4 +50,8 @@ public interface Connection {
CompletionStage<Void> close();

EventLoop eventLoop();

Optional<Duration> defaultReadTimeoutMillis();

void setReadTimeout(Duration duration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.bolt.api.values.Value;
Expand Down Expand Up @@ -75,6 +76,10 @@ CompletionStage<BoltConnection> runInAutoCommitTransaction(

CompletionStage<Void> close();

// ----- STATE UPDATES -----

CompletionStage<Void> setReadTimeout(Duration duration);

// ----- MUTABLE DATA -----

BoltConnectionState state();
Expand All @@ -92,4 +97,6 @@ CompletionStage<BoltConnection> runInAutoCommitTransaction(
boolean telemetrySupported();

boolean serverSideRoutingEnabled();

Optional<Duration> defaultReadTimeout();
}

0 comments on commit 53bcf3b

Please sign in to comment.