diff --git a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/BoltConnectionImpl.java b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/BoltConnectionImpl.java index a6438b911..7c59f74ec 100644 --- a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/BoltConnectionImpl.java +++ b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/BoltConnectionImpl.java @@ -487,6 +487,11 @@ public CompletionStage close() { return close.exceptionally(ignored -> null); } + @Override + public CompletionStage setReadTimeout(Duration duration) { + return executeInEventLoop(() -> connection.setReadTimeout(duration)); + } + @Override public BoltConnectionState state() { var state = stateRef.get(); @@ -528,6 +533,11 @@ public boolean serverSideRoutingEnabled() { return serverSideRouting; } + @Override + public Optional defaultReadTimeout() { + return connection.defaultReadTimeoutMillis(); + } + private CompletionStage executeInEventLoop(Runnable runnable) { var executeFuture = new CompletableFuture(); Runnable stageCompletingRunnable = () -> { diff --git a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/NetworkConnection.java b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/NetworkConnection.java index 5d3876ac7..9907a54c1 100644 --- a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/NetworkConnection.java +++ b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/NetworkConnection.java @@ -23,7 +23,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoop; +import java.time.Duration; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -56,7 +58,8 @@ public class NetworkConnection implements Connection { private final boolean ssrEnabled; private final BoltProtocol protocol; - private final Long connectionReadTimeout; + private final Duration defaultReadTimeout; + private Duration readTimeout; private ChannelHandler connectionReadTimeoutHandler; @@ -70,8 +73,10 @@ public NetworkConnection(Channel channel, LoggingProvider logging) { this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel); this.ssrEnabled = ChannelAttributes.ssrEnabled(channel); this.protocol = BoltProtocol.forChannel(channel); - this.connectionReadTimeout = - ChannelAttributes.connectionReadTimeout(channel).orElse(null); + this.defaultReadTimeout = ChannelAttributes.connectionReadTimeout(channel) + .map(Duration::ofSeconds) + .orElse(null); + this.readTimeout = defaultReadTimeout; } @Override @@ -179,6 +184,25 @@ public EventLoop eventLoop() { return channel.eventLoop(); } + @Override + public Optional defaultReadTimeoutMillis() { + return Optional.ofNullable(defaultReadTimeout); + } + + @Override + public void setReadTimeout(Duration duration) { + if (!channel.eventLoop().inEventLoop()) { + throw new IllegalStateException("This method may only be called in the EventLoop"); + } + + if (duration != null && duration.toMillis() > 0) { + // only values greater than zero milliseconds are supported + this.readTimeout = duration; + } else { + this.readTimeout = this.defaultReadTimeout; + } + } + private CompletionStage writeMessageInEventLoop(Message message, ResponseHandler handler) { var future = new CompletableFuture(); Runnable runnable = () -> { @@ -215,8 +239,9 @@ private void registerConnectionReadTimeout(Channel channel) { throw new IllegalStateException("This method may only be called in the EventLoop"); } - if (connectionReadTimeout != null && connectionReadTimeoutHandler == null) { - connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS); + if (this.readTimeout != null && connectionReadTimeoutHandler == null) { + connectionReadTimeoutHandler = + new ConnectionReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS); channel.pipeline().addFirst(connectionReadTimeoutHandler); log.log(System.Logger.Level.DEBUG, "Added ConnectionReadTimeoutHandler"); messageDispatcher.setBeforeLastHandlerHook(() -> { diff --git a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/spi/Connection.java b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/spi/Connection.java index bd9dbb045..8310cab43 100644 --- a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/spi/Connection.java +++ b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/spi/Connection.java @@ -17,6 +17,8 @@ package org.neo4j.driver.internal.bolt.basicimpl.impl.spi; import io.netty.channel.EventLoop; +import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletionStage; import org.neo4j.driver.internal.bolt.api.BoltServerAddress; import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol; @@ -48,4 +50,8 @@ public interface Connection { CompletionStage close(); EventLoop eventLoop(); + + Optional defaultReadTimeoutMillis(); + + void setReadTimeout(Duration duration); } diff --git a/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnection.java b/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnection.java index bef1f5112..474a1ab39 100644 --- a/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnection.java +++ b/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnection.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; import org.neo4j.driver.internal.bolt.api.values.Value; @@ -75,6 +76,10 @@ CompletionStage runInAutoCommitTransaction( CompletionStage close(); + // ----- STATE UPDATES ----- + + CompletionStage setReadTimeout(Duration duration); + // ----- MUTABLE DATA ----- BoltConnectionState state(); @@ -92,4 +97,6 @@ CompletionStage runInAutoCommitTransaction( boolean telemetrySupported(); boolean serverSideRoutingEnabled(); + + Optional defaultReadTimeout(); }