Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Bolt Protocol Handshake Manifest v1 #1605

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;

record BoltProtocolMinorVersionRange(int majorVersion, int minorVersion, int minorVersionNum) {
public boolean contains(BoltProtocolVersion version) {
if (majorVersion != version.getMajorVersion()) {
return false;
}

return version.getMinorVersion() <= minorVersion && version.getMinorVersion() >= minorVersion - minorVersionNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,25 @@
import static java.lang.Integer.toHexString;

import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v41.BoltProtocolV41;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v42.BoltProtocolV42;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v43.BoltProtocolV43;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v44.BoltProtocolV44;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v5.BoltProtocolV5;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v51.BoltProtocolV51;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v52.BoltProtocolV52;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v53.BoltProtocolV53;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v54.BoltProtocolV54;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v55.BoltProtocolV55;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v56.BoltProtocolV56;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;

public final class BoltProtocolUtil {
Expand All @@ -37,16 +50,37 @@ public final class BoltProtocolUtil {

public static final int DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES = Short.MAX_VALUE / 2;

public static final SortedMap<BoltProtocolVersion, BoltProtocol> versionToProtocol;

private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer(copyInt(
BOLT_MAGIC_PREAMBLE,
0x000001FF,
BoltProtocolV58.VERSION.toIntRange(BoltProtocolV5.VERSION),
BoltProtocolV44.VERSION.toIntRange(BoltProtocolV42.VERSION),
BoltProtocolV41.VERSION.toInt(),
BoltProtocolV3.VERSION.toInt()))
.asReadOnly();

private static final String HANDSHAKE_STRING = createHandshakeString();

static {
var map = new TreeMap<BoltProtocolVersion, BoltProtocol>(Comparator.reverseOrder());
map.putAll(Map.ofEntries(
Map.entry(BoltProtocolV58.VERSION, BoltProtocolV58.INSTANCE),
Map.entry(BoltProtocolV57.VERSION, BoltProtocolV57.INSTANCE),
Map.entry(BoltProtocolV56.VERSION, BoltProtocolV56.INSTANCE),
Map.entry(BoltProtocolV55.VERSION, BoltProtocolV55.INSTANCE),
Map.entry(BoltProtocolV54.VERSION, BoltProtocolV54.INSTANCE),
Map.entry(BoltProtocolV53.VERSION, BoltProtocolV53.INSTANCE),
Map.entry(BoltProtocolV52.VERSION, BoltProtocolV52.INSTANCE),
Map.entry(BoltProtocolV51.VERSION, BoltProtocolV51.INSTANCE),
Map.entry(BoltProtocolV5.VERSION, BoltProtocolV5.INSTANCE),
Map.entry(BoltProtocolV44.VERSION, BoltProtocolV44.INSTANCE),
Map.entry(BoltProtocolV43.VERSION, BoltProtocolV43.INSTANCE),
Map.entry(BoltProtocolV42.VERSION, BoltProtocolV42.INSTANCE),
Map.entry(BoltProtocolV3.VERSION, BoltProtocolV3.INSTANCE)));
versionToProtocol = Collections.unmodifiableSortedMap(map);
}

private BoltProtocolUtil() {}

public static ByteBuf handshakeBuf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HandshakeHandler extends ReplayingDecoder<Void> {
private boolean failed;
private ChannelActivityLogger log;
private ChannelErrorLogger errorLog;
private ManifestHandler manifestHandler;

public HandshakeHandler(
ChannelPipelineBuilder pipelineBuilder,
Expand Down Expand Up @@ -100,18 +101,47 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable error) {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
var serverSuggestedVersion = BoltProtocolVersion.fromRawBytes(in.readInt());
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake] %s", serverSuggestedVersion);

// this is a one-time handler, remove it when protocol version has been read
ctx.pipeline().remove(this);

var protocol = protocolForVersion(serverSuggestedVersion);
if (protocol != null) {
protocolSelected(serverSuggestedVersion, protocol.createMessageFormat(), ctx);
if (manifestHandler != null) {
try {
manifestHandler.decode(in);
} catch (Throwable e) {
fail(ctx, e);
}
} else {
handleUnknownSuggestedProtocolVersion(serverSuggestedVersion, ctx);
var serverSuggestedVersion = BoltProtocolVersion.fromRawBytes(in.readInt());

if (new BoltProtocolVersion(255, 1).equals(serverSuggestedVersion)) {
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake Manifest] v1", serverSuggestedVersion);
manifestHandler = new ManifestHandlerV1(ctx.channel(), logging);
} else {
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake] %s", serverSuggestedVersion);

// this is a one-time handler, remove it when protocol version has been read
ctx.pipeline().remove(this);

var protocol = protocolForVersion(serverSuggestedVersion);
if (protocol != null) {
protocolSelected(serverSuggestedVersion, protocol.createMessageFormat(), ctx);
} else {
handleUnknownSuggestedProtocolVersion(serverSuggestedVersion, ctx);
}
}
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (manifestHandler != null) {
// this is a one-time handler, remove it when protocol version has been read
ctx.pipeline().remove(this);
try {
var protocol = manifestHandler.complete();
protocolSelected(protocol.version(), protocol.createMessageFormat(), ctx);
} catch (Throwable e) {
fail(ctx, e);
}
}
super.channelReadComplete(ctx);
}

private BoltProtocol protocolForVersion(BoltProtocolVersion version) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

import io.netty.buffer.ByteBuf;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;

interface ManifestHandler {
void decode(ByteBuf in);

BoltProtocol complete();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static java.lang.Integer.toHexString;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.neo4j.driver.internal.bolt.api.LoggingProvider;
import org.neo4j.driver.internal.bolt.api.exception.BoltClientException;
import org.neo4j.driver.internal.bolt.basicimpl.impl.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;

final class ManifestHandlerV1 implements ManifestHandler {
private final ChannelActivityLogger log;
private final Channel channel;
private final VarLongBuilder expectedVersionRangesBuilder = new VarLongBuilder();

private long expectedVersionRanges = -1L;
private Set<BoltProtocolMinorVersionRange> serverSupportedVersionRanges;

public ManifestHandlerV1(Channel channel, LoggingProvider logging) {
this.channel = Objects.requireNonNull(channel);
log = new ChannelActivityLogger(channel, logging, getClass());
}

@Override
public void decode(ByteBuf byteBuf) {
if (expectedVersionRanges < 0) {
decodeExpectedVersionsSegment(byteBuf);
} else if (expectedVersionRanges > 0) {
decodeServerSupportedBoltVersionRange(byteBuf);
} else {
byteBuf.readByte();
}
}

@Override
public BoltProtocol complete() {
return findSupportedBoltProtocol();
}

private void decodeExpectedVersionsSegment(ByteBuf byteBuf) {
var segment = byteBuf.readByte();
var value = (byte) (0b01111111 & segment);

try {
expectedVersionRangesBuilder.add(value);
} catch (IllegalStateException e) {
throw new BoltClientException(
"The driver does not support the number of Bolt Protocol version ranges that the server wants to send",
e);
}

var finished = (segment >> 7) == 0;
if (finished) {
expectedVersionRanges = expectedVersionRangesBuilder.build();
var size = (int) expectedVersionRanges;
if (expectedVersionRanges != size) {
throw new BoltClientException(
"The driver does not support the number of Bolt Protocol version ranges that the server wants to send");
} else {
log.log(
System.Logger.Level.DEBUG,
"S: [Bolt Handshake Manifest] [expected version ranges %d]",
expectedVersionRanges);
serverSupportedVersionRanges = new HashSet<>(size);
}
}
}

private void decodeServerSupportedBoltVersionRange(ByteBuf byteBuf) {
var value = byteBuf.readInt();
var major = value & 0x000000FF;
var minor = (value >> 8) & 0x000000FF;
var minorNum = (value >> 16) & 0x000000FF;
serverSupportedVersionRanges.add(new BoltProtocolMinorVersionRange(major, minor, minorNum));
expectedVersionRanges--;

if (expectedVersionRanges == 0) {
log.log(
System.Logger.Level.DEBUG,
"S: [Bolt Handshake Manifest] [server supported version ranges %s]",
serverSupportedVersionRanges);
}
}

private BoltProtocol findSupportedBoltProtocol() {
for (var entry : BoltProtocolUtil.versionToProtocol.entrySet()) {
var version = entry.getKey();
for (var range : serverSupportedVersionRanges) {
if (range.contains(version)) {
var protocol = entry.getValue();
write(protocol.version().toInt());
write((byte) 0);
return protocol;
}
}
}
write(0);
write((byte) 0);
channel.flush();
throw new BoltClientException("No supported Bolt Protocol version was found");
}

private void write(int value) {
log.log(
System.Logger.Level.DEBUG,
"C: [Bolt Handshake Manifest] %s",
String.format("[%s]", toHexString(value)));
channel.write(Unpooled.copyInt(value).asReadOnly());
}

@SuppressWarnings("SameParameterValue")
private void write(byte value) {
log.log(
System.Logger.Level.DEBUG,
"C: [Bolt Handshake Manifest] %s",
String.format("[%s]", toHexString(value)));
channel.write(
unreleasableBuffer(Unpooled.copiedBuffer(new byte[] {value})).asReadOnly());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

final class VarLongBuilder {
private long value;
private byte position;

public void add(long segment) {
if (position > 8) {
throw new IllegalStateException("Segment overflow");
}
segment = segment << (position * 7);
value |= segment;
position++;
}

public long build() {
return value;
}
}
Loading