Skip to content

Commit

Permalink
Merge pull request #330 from scalecube/create-websocket-trasport
Browse files Browse the repository at this point in the history
Create websocket trasport
  • Loading branch information
artem-v authored Jun 11, 2020
2 parents 3a7760e + f6b7425 commit 56040e0
Show file tree
Hide file tree
Showing 35 changed files with 1,299 additions and 216 deletions.
2 changes: 1 addition & 1 deletion cluster-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-cluster-parent</artifactId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>

<artifactId>scalecube-cluster-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion cluster-testlib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>scalecube-cluster-parent</artifactId>
<groupId>io.scalecube</groupId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public Address address() {
return transport.address();
}

@Override
public Mono<Transport> start() {
return transport.start();
}

@Override
public Mono<Void> stop() {
return transport.stop();
Expand Down
2 changes: 1 addition & 1 deletion cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-cluster-parent</artifactId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>

<artifactId>scalecube-cluster</artifactId>
Expand Down
20 changes: 20 additions & 0 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import io.scalecube.utils.ServiceLoaderUtil;
Expand All @@ -30,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -158,6 +160,19 @@ public ClusterImpl transport(UnaryOperator<TransportConfig> options) {
return cluster;
}

/**
* Returns a new cluster's instance which will apply the given options.
*
* @param supplier transport factory supplier
* @return new {@code ClusterImpl} instance
*/
public ClusterImpl transportFactory(Supplier<TransportFactory> supplier) {
Objects.requireNonNull(supplier);
ClusterImpl cluster = new ClusterImpl(this);
cluster.config = config.transport(opts -> opts.transportFactory(supplier.get()));
return cluster;
}

/**
* Returns a new cluster's instance which will apply the given options.
*
Expand Down Expand Up @@ -566,6 +581,11 @@ public Address address() {
return transport.address();
}

@Override
public Mono<Transport> start() {
return transport.start();
}

@Override
public Mono<Void> stop() {
return transport.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.of;

import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -22,7 +23,11 @@ public void testInvalidNamespaceFormat(String namespace) {
Exception actualException =
assertThrows(
IllegalArgumentException.class,
() -> new ClusterImpl().membership(opts -> opts.namespace(namespace)).startAwait());
() ->
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace(namespace))
.startAwait());
Assertions.assertAll(
() ->
assertEquals(
Expand Down Expand Up @@ -50,16 +55,22 @@ public static Stream<Arguments> testInvalidNamespaceFormat() {

@Test
public void testSeparateEmptyNamespaces() {
Cluster root = new ClusterImpl().membership(opts -> opts.namespace("root")).startAwait();
Cluster root =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root"))
.startAwait();

Cluster root1 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root1"))
.membership(opts -> opts.seedMembers(root.address()))
.startAwait();

Cluster root2 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root2"))
.membership(opts -> opts.seedMembers(root.address()))
.startAwait();
Expand All @@ -71,28 +82,36 @@ public void testSeparateEmptyNamespaces() {

@Test
public void testSeparateNonEmptyNamespaces() {
Cluster root = new ClusterImpl().membership(opts -> opts.namespace("root")).startAwait();
Cluster root =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root"))
.startAwait();

Cluster bob =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root"))
.membership(opts -> opts.seedMembers(root.address()))
.startAwait();

Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root"))
.membership(opts -> opts.seedMembers(root.address(), bob.address()))
.startAwait();

Cluster root2 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root2"))
.membership(opts -> opts.seedMembers(root.address()))
.startAwait();

Cluster dan =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root2"))
.membership(
opts ->
Expand All @@ -102,6 +121,7 @@ public void testSeparateNonEmptyNamespaces() {

Cluster eve =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root2"))
.membership(
opts ->
Expand All @@ -125,29 +145,36 @@ public void testSeparateNonEmptyNamespaces() {
@Test
public void testSimpleNamespacesHierarchy() {
Cluster rootDevelop =
new ClusterImpl().membership(opts -> opts.namespace("develop")).startAwait();
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop"))
.startAwait();

Cluster bob =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop"))
.membership(opts -> opts.seedMembers(rootDevelop.address()))
.startAwait();

Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop"))
.membership(opts -> opts.seedMembers(rootDevelop.address(), bob.address()))
.startAwait();

Cluster dan =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop-2"))
.membership(
opts -> opts.seedMembers(rootDevelop.address(), bob.address(), carol.address()))
.startAwait();

Cluster eve =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop-2"))
.membership(
opts ->
Expand All @@ -168,24 +195,35 @@ public void testSimpleNamespacesHierarchy() {

@Test
public void testIsolatedParentNamespaces() {
Cluster parent1 = new ClusterImpl().membership(opts -> opts.namespace("a/1")).startAwait();
Cluster parent1 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/1"))
.startAwait();

Cluster bob =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/1/c"))
.membership(opts -> opts.seedMembers(parent1.address()))
.startAwait();

Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/1/c"))
.membership(opts -> opts.seedMembers(parent1.address(), bob.address()))
.startAwait();

Cluster parent2 = new ClusterImpl().membership(opts -> opts.namespace("a/111")).startAwait();
Cluster parent2 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/111"))
.startAwait();

Cluster dan =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/111/c"))
.membership(
opts ->
Expand All @@ -195,6 +233,7 @@ public void testIsolatedParentNamespaces() {

Cluster eve =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/111/c"))
.membership(
opts ->
Expand Down
2 changes: 1 addition & 1 deletion codec-parent/codec-jackson-smile/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-codec-parent</artifactId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>

<artifactId>scalecube-codec-jackson-smile</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion codec-parent/codec-jackson/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>scalecube-codec-parent</artifactId>
<groupId>io.scalecube</groupId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion codec-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>scalecube-cluster-parent</artifactId>
<groupId>io.scalecube</groupId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
Expand Down
7 changes: 6 additions & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-cluster-parent</artifactId>
<version>2.5.1-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
</parent>

<artifactId>scalecube-cluster-examples</artifactId>
Expand All @@ -16,6 +16,11 @@
<artifactId>scalecube-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-transport-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-cluster-testlib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

public class ClusterCustomMetadataEncodingExample {
public class CustomMetadataEncodingExample {

/** Main method. */
public static void main(String[] args) throws Exception {
// Start seed cluster member Alice
Cluster alice =
new ClusterImpl().config(opts -> opts.metadataCodec(new LongMetadataCodec())).startAwait();
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.config(opts -> opts.metadataCodec(new LongMetadataCodec()))
.startAwait();
System.out.println(
"[" + alice.member().id() + "] Alice's metadata: " + alice.metadata().orElse(null));

Cluster joe =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(123L))
.membership(opts -> opts.seedMembers(alice.address()))
.startAwait();
Expand All @@ -26,6 +31,7 @@ public static void main(String[] args) throws Exception {

Cluster bob =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(456L))
.membership(opts -> opts.seedMembers(alice.address()))
.startAwait();
Expand Down
Loading

0 comments on commit 56040e0

Please sign in to comment.