Skip to content

Commit

Permalink
Add support for Bolt Protocol Handshake Manifest v1
Browse files Browse the repository at this point in the history
This update brings support for a new Bolt handshake and also removes support for Bolt protocol versions 4 and 4.1.
  • Loading branch information
injectives committed Jan 27, 2025
1 parent b70dc41 commit 9505210
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;

record BoltProtocolMinorVersionRange(int majorVersion, int minorVersion, int minorVersionNum) {
public boolean contains(BoltProtocolVersion version) {
if (majorVersion != version.getMajorVersion()) {
return false;
}

return version.getMinorVersion() <= minorVersion && version.getMinorVersion() >= minorVersion - minorVersionNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,25 @@
import static java.lang.Integer.toHexString;

import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v41.BoltProtocolV41;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v42.BoltProtocolV42;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v43.BoltProtocolV43;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v44.BoltProtocolV44;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v5.BoltProtocolV5;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v51.BoltProtocolV51;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v52.BoltProtocolV52;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v53.BoltProtocolV53;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v54.BoltProtocolV54;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v55.BoltProtocolV55;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v56.BoltProtocolV56;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;

public final class BoltProtocolUtil {
Expand All @@ -37,16 +50,37 @@ public final class BoltProtocolUtil {

public static final int DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES = Short.MAX_VALUE / 2;

public static final SortedMap<BoltProtocolVersion, BoltProtocol> versionToProtocol;

private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer(copyInt(
BOLT_MAGIC_PREAMBLE,
0x000001FF,
BoltProtocolV58.VERSION.toIntRange(BoltProtocolV5.VERSION),
BoltProtocolV44.VERSION.toIntRange(BoltProtocolV42.VERSION),
BoltProtocolV41.VERSION.toInt(),
BoltProtocolV3.VERSION.toInt()))
.asReadOnly();

private static final String HANDSHAKE_STRING = createHandshakeString();

static {
var map = new TreeMap<BoltProtocolVersion, BoltProtocol>(Comparator.reverseOrder());
map.putAll(Map.ofEntries(
Map.entry(BoltProtocolV58.VERSION, BoltProtocolV58.INSTANCE),
Map.entry(BoltProtocolV57.VERSION, BoltProtocolV57.INSTANCE),
Map.entry(BoltProtocolV56.VERSION, BoltProtocolV56.INSTANCE),
Map.entry(BoltProtocolV55.VERSION, BoltProtocolV55.INSTANCE),
Map.entry(BoltProtocolV54.VERSION, BoltProtocolV54.INSTANCE),
Map.entry(BoltProtocolV53.VERSION, BoltProtocolV53.INSTANCE),
Map.entry(BoltProtocolV52.VERSION, BoltProtocolV52.INSTANCE),
Map.entry(BoltProtocolV51.VERSION, BoltProtocolV51.INSTANCE),
Map.entry(BoltProtocolV5.VERSION, BoltProtocolV5.INSTANCE),
Map.entry(BoltProtocolV44.VERSION, BoltProtocolV44.INSTANCE),
Map.entry(BoltProtocolV43.VERSION, BoltProtocolV43.INSTANCE),
Map.entry(BoltProtocolV42.VERSION, BoltProtocolV42.INSTANCE),
Map.entry(BoltProtocolV3.VERSION, BoltProtocolV3.INSTANCE)));
versionToProtocol = Collections.unmodifiableSortedMap(map);
}

private BoltProtocolUtil() {}

public static ByteBuf handshakeBuf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HandshakeHandler extends ReplayingDecoder<Void> {
private boolean failed;
private ChannelActivityLogger log;
private ChannelErrorLogger errorLog;
private ManifestHandler manifestHandler;

public HandshakeHandler(
ChannelPipelineBuilder pipelineBuilder,
Expand Down Expand Up @@ -100,18 +101,47 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable error) {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
var serverSuggestedVersion = BoltProtocolVersion.fromRawBytes(in.readInt());
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake] %s", serverSuggestedVersion);

// this is a one-time handler, remove it when protocol version has been read
ctx.pipeline().remove(this);

var protocol = protocolForVersion(serverSuggestedVersion);
if (protocol != null) {
protocolSelected(serverSuggestedVersion, protocol.createMessageFormat(), ctx);
if (manifestHandler != null) {
try {
manifestHandler.decode(in);
} catch (Throwable e) {
fail(ctx, e);
}
} else {
handleUnknownSuggestedProtocolVersion(serverSuggestedVersion, ctx);
var serverSuggestedVersion = BoltProtocolVersion.fromRawBytes(in.readInt());

if (new BoltProtocolVersion(255, 1).equals(serverSuggestedVersion)) {
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake Manifest] v1", serverSuggestedVersion);
manifestHandler = new ManifestHandlerV1(ctx.channel(), logging);
} else {
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake] %s", serverSuggestedVersion);

// this is a one-time handler, remove it when protocol version has been read
ctx.pipeline().remove(this);

var protocol = protocolForVersion(serverSuggestedVersion);
if (protocol != null) {
protocolSelected(serverSuggestedVersion, protocol.createMessageFormat(), ctx);
} else {
handleUnknownSuggestedProtocolVersion(serverSuggestedVersion, ctx);
}
}
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (manifestHandler != null) {
// this is a one-time handler, remove it when protocol version has been read
ctx.pipeline().remove(this);
try {
var protocol = manifestHandler.complete();
protocolSelected(protocol.version(), protocol.createMessageFormat(), ctx);
} catch (Throwable e) {
fail(ctx, e);
}
}
super.channelReadComplete(ctx);
}

private BoltProtocol protocolForVersion(BoltProtocolVersion version) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

import io.netty.buffer.ByteBuf;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;

interface ManifestHandler {
void decode(ByteBuf in);

BoltProtocol complete();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static java.lang.Integer.toHexString;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.neo4j.driver.internal.bolt.api.LoggingProvider;
import org.neo4j.driver.internal.bolt.api.exception.BoltClientException;
import org.neo4j.driver.internal.bolt.basicimpl.impl.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;

final class ManifestHandlerV1 implements ManifestHandler {
private final ChannelActivityLogger log;
private final Channel channel;
private final VarLongBuilder expectedVersionRangesBuilder = new VarLongBuilder();

private long expectedVersionRanges = -1L;
private Set<BoltProtocolMinorVersionRange> serverSupportedVersionRanges;

public ManifestHandlerV1(Channel channel, LoggingProvider logging) {
this.channel = Objects.requireNonNull(channel);
log = new ChannelActivityLogger(channel, logging, getClass());
}

@Override
public void decode(ByteBuf byteBuf) {
if (expectedVersionRanges < 0) {
decodeExpectedVersionsSegment(byteBuf);
} else if (expectedVersionRanges > 0) {
decodeServerSupportedBoltVersionRange(byteBuf);
} else {
byteBuf.readByte();
}
}

@Override
public BoltProtocol complete() {
return findSupportedBoltProtocol();
}

private void decodeExpectedVersionsSegment(ByteBuf byteBuf) {
var segment = byteBuf.readByte();
var value = (byte) (0b01111111 & segment);

try {
expectedVersionRangesBuilder.add(value);
} catch (IllegalStateException e) {
throw new BoltClientException(
"The driver does not support the number of Bolt Protocol version ranges that the server wants to send",
e);
}

var finished = (segment >> 7) == 0;
if (finished) {
expectedVersionRanges = expectedVersionRangesBuilder.build();
var size = (int) expectedVersionRanges;
if (expectedVersionRanges != size) {
throw new BoltClientException(
"The driver does not support the number of Bolt Protocol version ranges that the server wants to send");
} else {
log.log(
System.Logger.Level.DEBUG,
"S: [Bolt Handshake Manifest] [expected version ranges %d]",
expectedVersionRanges);
serverSupportedVersionRanges = new HashSet<>(size);
}
}
}

private void decodeServerSupportedBoltVersionRange(ByteBuf byteBuf) {
var value = byteBuf.readInt();
var major = value & 0x000000FF;
var minor = (value >> 8) & 0x000000FF;
var minorNum = (value >> 16) & 0x000000FF;
serverSupportedVersionRanges.add(new BoltProtocolMinorVersionRange(major, minor, minorNum));
expectedVersionRanges--;

if (expectedVersionRanges == 0) {
log.log(
System.Logger.Level.DEBUG,
"S: [Bolt Handshake Manifest] [server supported version ranges %s]",
serverSupportedVersionRanges);
}
}

private BoltProtocol findSupportedBoltProtocol() {
for (var entry : BoltProtocolUtil.versionToProtocol.entrySet()) {
var version = entry.getKey();
for (var range : serverSupportedVersionRanges) {
if (range.contains(version)) {
var protocol = entry.getValue();
write(protocol.version().toInt());
write((byte) 0);
return protocol;
}
}
}
write(0);
write((byte) 0);
channel.flush();
throw new BoltClientException("No supported Bolt Protocol version was found");
}

private void write(int value) {
log.log(
System.Logger.Level.DEBUG,
"C: [Bolt Handshake Manifest] %s",
String.format("[%s]", toHexString(value)));
channel.write(Unpooled.copyInt(value).asReadOnly());
}

@SuppressWarnings("SameParameterValue")
private void write(byte value) {
log.log(
System.Logger.Level.DEBUG,
"C: [Bolt Handshake Manifest] %s",
String.format("[%s]", toHexString(value)));
channel.write(
unreleasableBuffer(Unpooled.copiedBuffer(new byte[] {value})).asReadOnly());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;

final class VarLongBuilder {
private long value;
private byte position;

public void add(long segment) {
if (position > 8) {
throw new IllegalStateException("Segment overflow");
}
segment = segment << (position * 7);
value |= segment;
position++;
}

public long build() {
return value;
}
}
Loading

0 comments on commit 9505210

Please sign in to comment.