Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add actor testcontainer tests #1192

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.spring.boot.autoconfigure.client;

import io.dapr.actors.client.ActorClient;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.config.Properties;
Expand Down Expand Up @@ -70,6 +72,20 @@ DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetail
return new DaprWorkflowClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorClient daprActorClient(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return new ActorClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorRuntime daprActorRuntime(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return ActorRuntime.getInstance(properties);
}

@Bean
@ConditionalOnMissingBean
WorkflowRuntimeBuilder daprWorkflowRuntimeBuilder(DaprConnectionDetails daprConnectionDetails) {
Expand Down
22 changes: 2 additions & 20 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
import io.grpc.Channel;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties),
this(NetworkUtils.buildGrpcManagedChannel(overrideProperties),
metadata,
resiliencyOptions,
overrideProperties.getValue(Properties.API_TOKEN));
Expand Down Expand Up @@ -129,25 +130,6 @@ public void close() {
}
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @param overrideProperties Overrides
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel(Properties overrideProperties) {
int port = overrideProperties.getValue(Properties.GRPC_PORT);
if (port <= 0) {
throw new IllegalArgumentException("Invalid port.");
}

var sidecarHost = overrideProperties.getValue(Properties.SIDECAR_IP);

return ManagedChannelBuilder.forAddress(sidecarHost, port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* Build an instance of the Client based on the provided setup.
Expand Down
103 changes: 55 additions & 48 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Version;
import io.dapr.utils.NetworkUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.io.Closeable;
Expand Down Expand Up @@ -80,23 +79,32 @@ public class ActorRuntime implements Closeable {
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime() throws IllegalStateException {
this(buildManagedChannel());
this(new Properties());
}

/**
* The default constructor. This should not be called directly.
*
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime(Properties properties) throws IllegalStateException {
this(NetworkUtils.buildGrpcManagedChannel(properties));
}

/**
* Constructor once channel is available. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @throws IllegalStateException If cannot instantiate Runtime.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime(ManagedChannel channel) throws IllegalStateException {
this(channel, buildDaprClient(channel));
this(channel, new DaprClientImpl(channel));
}

/**
* Constructor with dependency injection, useful for testing. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @param channel GRPC managed channel to be closed (or null).
* @param daprClient Client to communicate with Dapr.
* @throws IllegalStateException If class has one instance already.
*/
Expand Down Expand Up @@ -128,6 +136,24 @@ public static ActorRuntime getInstance() {
return instance;
}

/**
* Returns an ActorRuntime object.
*
* @param properties Properties to be used for the runtime.
* @return An ActorRuntime object.
*/
public static ActorRuntime getInstance(Properties properties) {
if (instance == null) {
synchronized (ActorRuntime.class) {
if (instance == null) {
instance = new ActorRuntime(properties);
}
}
}

return instance;
}

/**
* Gets the Actor configuration for this runtime.
*
Expand All @@ -149,24 +175,22 @@ public byte[] serializeConfig() throws IOException {

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer} and {@link DefaultActorFactory}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz) {
registerActor(clazz, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory<T> actorFactory) {
registerActor(clazz, actorFactory, new DefaultObjectSerializer(), new DefaultObjectSerializer());
Expand All @@ -181,8 +205,8 @@ public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
}

/**
Expand All @@ -195,9 +219,9 @@ public <T extends AbstractActor> void registerActor(
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
if (clazz == null) {
throw new IllegalArgumentException("Class is required.");
}
Expand All @@ -216,12 +240,12 @@ public <T extends AbstractActor> void registerActor(
// Create ActorManager, if not yet registered.
this.actorManagers.computeIfAbsent(actorTypeInfo.getName(), (k) -> {
ActorRuntimeContext<T> context = new ActorRuntimeContext<>(
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this.config.addRegisteredActorType(actorTypeInfo.getName());
return new ActorManager<T>(context);
});
Expand All @@ -236,7 +260,7 @@ public <T extends AbstractActor> void registerActor(
*/
public Mono<Void> deactivate(String actorTypeName, String actorId) {
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
}

/**
Expand All @@ -252,8 +276,8 @@ public Mono<Void> deactivate(String actorTypeName, String actorId) {
public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeMethod(id, actorMethodName, payload));
}

/**
Expand All @@ -268,8 +292,8 @@ public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMet
public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeReminder(new ActorId(actorId), reminderName, params));
}

/**
Expand All @@ -284,8 +308,8 @@ public Mono<Void> invokeReminder(String actorTypeName, String actorId, String re
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeTimer(new ActorId(actorId), timerName, params));
}

/**
Expand Down Expand Up @@ -318,23 +342,6 @@ private static DaprClient buildDaprClient(ManagedChannel channel) {
return new DaprClientImpl(channel);
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}

return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void writeReadState() throws Exception {
proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class, deferClose(run2.newActorClient()));
ActorProxy newProxy = proxyBuilder.build(actorId);

// wating for actor to be activated
// waiting for actor to be activated
Thread.sleep(2000);

callWithRetry(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.dapr.testcontainers.Component;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
import io.dapr.testcontainers.Subscription;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
Expand All @@ -45,7 +46,7 @@
webEnvironment = WebEnvironment.DEFINED_PORT,
classes = {
DaprClientAutoConfiguration.class,
TestApplication.class
TestApplication.class, TestRestController.class
},
properties = {"dapr.pubsub.name=pubsub"}
)
Expand All @@ -61,10 +62,11 @@ public class DaprSpringMessagingIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.14.4")
.withAppName("messaging-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap()))
.withSubscription(new Subscription("my-app-subscription", "pubsub", "mockTopic", "subscribe"))
.withAppPort(8080)
.withDaprLogLevel(DaprLogLevel.DEBUG)
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
Expand All @@ -81,14 +83,10 @@ public static void beforeAll(){
org.testcontainers.Testcontainers.exposeHostPorts(8080);
}

@BeforeEach
public void beforeEach() throws InterruptedException {
Thread.sleep(1000);
}

@Test
@Disabled("Test is flaky due to global state in the spring test application.")
public void testDaprMessagingTemplate() throws InterruptedException {
Thread.sleep(10000);
for (int i = 0; i < 10; i++) {
var msg = "ProduceAndReadWithPrimitiveMessageType:" + i;

Expand All @@ -98,7 +96,7 @@ public void testDaprMessagingTemplate() throws InterruptedException {
}

// Wait for the messages to arrive
Thread.sleep(1000);
Thread.sleep(10000);

List<CloudEvent<String>> events = testRestController.getEvents();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ public class TestRestController {
private static final Logger LOG = LoggerFactory.getLogger(TestRestController.class);
private final List<CloudEvent<String>> events = new ArrayList<>();

@GetMapping("/")
public TestRestController() {
System.out.println("TestRestController started...");
}

@GetMapping("/actuator/health")
public String ok() {
return "OK";
}

@Topic(name = topicName, pubsubName = pubSubName)
@PostMapping("/subscribe")
@PostMapping("subscribe")
public void handleMessages(@RequestBody CloudEvent<String> event) {
LOG.info("++++++CONSUME {}------", event);
events.add(event);
Expand Down
Loading
Loading