From c8561cd536bbabe69f81609588a6edc1777ef0fa Mon Sep 17 00:00:00 2001 From: GMadorell Date: Thu, 2 Nov 2017 01:19:50 +0100 Subject: [PATCH] Add query bus supporting more than one handler --- build.sbt | 8 +-- project/Configuration.scala | 2 +- .../bus/domain/Query.scala | 4 +- .../bus/domain/QueryBus.scala | 29 +++++--- .../bus/domain/QueryHandler.scala | 4 +- .../types/ThrowableTypeClasses.scala | 13 ++++ .../generate/FindUserGreetQuery.scala | 4 +- .../generate/FindUserGreetQueryHandler.scala | 9 ++- ...cala => DelayedMemoryUserRepository.scala} | 9 +-- .../InAsyncDelayedMemoryUserRepository.scala | 3 +- .../acceptance/AsyncUserGreetFinderTest.scala | 16 ++--- .../acceptance/SyncUserGreetFinderTest.scala | 27 ++++---- .../integration/QueryBusTest.scala | 66 +++++++++++++++++++ 13 files changed, 145 insertions(+), 49 deletions(-) create mode 100644 src/main/scala/tv/codely/cqrs_ddd_scala_example/types/ThrowableTypeClasses.scala rename src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/{InSyncDelayedMemoryUserRepository.scala => DelayedMemoryUserRepository.scala} (75%) create mode 100644 src/test/scala/tv/codely/cqrs_ddd_scala_example/integration/QueryBusTest.scala diff --git a/build.sbt b/build.sbt index e38638f..19bc8d5 100644 --- a/build.sbt +++ b/build.sbt @@ -8,13 +8,13 @@ Configuration.settings /** ********* PROD DEPENDENCIES *****************/ libraryDependencies ++= Seq( "com.github.nscala-time" %% "nscala-time" % "2.16.0", - "com.lihaoyi" %% "pprint" % "0.5.2", - "org.typelevel" %% "cats-core" % "1.0.0-MF" + "com.lihaoyi" %% "pprint" % "0.5.2", + "org.typelevel" %% "cats-core" % "1.0.0-MF" ) /** ********* TEST DEPENDENCIES *****************/ libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % "3.0.1" % Test, + "org.scalatest" %% "scalatest" % "3.0.1" % Test, "org.scalamock" %% "scalamock-scalatest-support" % "3.6.0" % Test ) @@ -27,7 +27,7 @@ addCommandAlias("tsf", "testShowFailed") addCommandAlias("c", "compile") addCommandAlias("tc", "test:compile") -addCommandAlias("f", "scalafmt") // Format all files according to ScalaFmt +addCommandAlias("f", "scalafmt") // Format all files according to ScalaFmt addCommandAlias("ft", "scalafmtTest") // Test if all files are formatted according to ScalaFmt addCommandAlias("prep", ";c;tc;ft") // All the needed tasks before running the test diff --git a/project/Configuration.scala b/project/Configuration.scala index dd9256b..a7cdfa3 100644 --- a/project/Configuration.scala +++ b/project/Configuration.scala @@ -16,7 +16,7 @@ object Configuration { "-Ywarn-unused-import", "-Xcheckinit", // Check against early initialization "-language:higherKinds" - ), + ), scalacOptions in run in Compile -= "-Xcheckinit", // Remove it in production because it's expensive javaOptions += "-Duser.timezone=UTC", // Test options diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/Query.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/Query.scala index a85e546..e0d1b31 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/Query.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/Query.scala @@ -4,4 +4,6 @@ import java.util.UUID import org.joda.time.DateTime -abstract class Query(val queryId: UUID, val askedAt: DateTime) extends Request(queryId, askedAt) +abstract class Query(val queryId: UUID, val askedAt: DateTime) extends Request(queryId, askedAt) { + type QueryResponse <: Response +} diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryBus.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryBus.scala index bf73d7a..aafd7ae 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryBus.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryBus.scala @@ -2,12 +2,25 @@ package tv.codely.cqrs_ddd_scala_example.bus.domain import scala.reflect.ClassTag -final class QueryBus[P[_], QueryType <: Query, ResponseType <: Response]( - private val handlers: Map[ - ClassTag[QueryType], - QueryHandler[P, QueryType,ResponseType] - ] -) { - def ask(query: QueryType): P[ResponseType] = - handlers(ClassTag[QueryType](query.getClass)).handle(query) +import cats.implicits._ +import tv.codely.cqrs_ddd_scala_example.types.ThrowableTypeClasses.MonadErrorThrowable + +final class QueryBus[P[_]: MonadErrorThrowable] { + private var handlers: Map[Class[_], Query => P[_ <: Response]] = Map.empty + + def ask[Q <: Query: ClassTag](query: Q): P[Q#QueryResponse] = { + val classTag = implicitly[ClassTag[Q]] + handlers + .get(classTag.runtimeClass) + .fold[P[Q#QueryResponse]](MonadErrorThrowable[P].raiseError(QueryHandlerNotFoundForQuery))(handler => + handler(query).map(_.asInstanceOf[Q#QueryResponse])) + } + + def subscribe[Q <: Query: ClassTag](handler: QueryHandler[P, Q]): Unit = { + val classTag = implicitly[ClassTag[Q]] + handlers = handlers + + (classTag.runtimeClass -> ((query: Query) => handler.handle(query.asInstanceOf[Q]))) + } } + +object QueryHandlerNotFoundForQuery extends Exception diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryHandler.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryHandler.scala index c8d03a1..05dbf8b 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryHandler.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/bus/domain/QueryHandler.scala @@ -1,5 +1,5 @@ package tv.codely.cqrs_ddd_scala_example.bus.domain -abstract class QueryHandler[P[_], QueryType <: Query, ResponseType <: Response] { - def handle(query: QueryType): P[ResponseType] +abstract class QueryHandler[P[_], QueryType <: Query] { + def handle(query: QueryType): P[QueryType#QueryResponse] } diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/types/ThrowableTypeClasses.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/types/ThrowableTypeClasses.scala new file mode 100644 index 0000000..ddb0b6f --- /dev/null +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/types/ThrowableTypeClasses.scala @@ -0,0 +1,13 @@ +package tv.codely.cqrs_ddd_scala_example.types + +import cats.MonadError + +object ThrowableTypeClasses { + type MonadErrorThrowable[P[_]] = MonadError[P, Throwable] + + object MonadErrorThrowable { + def apply[P[_]](implicit E: MonadError[P, Throwable]): MonadErrorThrowable[P] = E + } + + type EitherThrowable[A] = Either[Throwable, A] +} diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQuery.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQuery.scala index e381f42..4704863 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQuery.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQuery.scala @@ -6,4 +6,6 @@ import org.joda.time.DateTime import tv.codely.cqrs_ddd_scala_example.bus.domain.Query case class FindUserGreetQuery(override val queryId: UUID, override val askedAt: DateTime, userId: UUID) - extends Query(queryId, askedAt) + extends Query(queryId, askedAt) { + override type QueryResponse = UserGreetResponse +} diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQueryHandler.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQueryHandler.scala index 610a822..5834df9 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQueryHandler.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/application/generate/FindUserGreetQueryHandler.scala @@ -7,13 +7,12 @@ import tv.codely.cqrs_ddd_scala_example.bus.domain.QueryHandler import tv.codely.cqrs_ddd_scala_example.user_greet.domain.UserId final class FindUserGreetQueryHandler[P[_]: Functor]( - private val generator: UserGreetFinder[P] -) extends QueryHandler[P, FindUserGreetQuery, UserGreetResponse] { + private val generator: UserGreetFinder[P] +) extends QueryHandler[P, FindUserGreetQuery] { + override def handle(query: FindUserGreetQuery): P[UserGreetResponse] = { val userId = UserId(query.userId) - generator.generate(userId).map(greet => - UserGreetResponse(query.requestId, greet.greet) - ) + generator.generate(userId).map(greet => UserGreetResponse(query.requestId, greet.greet)) } } diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InSyncDelayedMemoryUserRepository.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/DelayedMemoryUserRepository.scala similarity index 75% rename from src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InSyncDelayedMemoryUserRepository.scala rename to src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/DelayedMemoryUserRepository.scala index 8777e74..b2d6ac1 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InSyncDelayedMemoryUserRepository.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/DelayedMemoryUserRepository.scala @@ -4,10 +4,11 @@ import java.util.UUID import scala.concurrent.duration.FiniteDuration -import cats.Id +import cats.Applicative +import cats.syntax.applicative._ import tv.codely.cqrs_ddd_scala_example.user_greet.domain.{User, UserId, UserRepository} -final class InSyncDelayedMemoryUserRepository(delay: FiniteDuration) extends UserRepository[Id] { +final class DelayedMemoryUserRepository[P[_]: Applicative](delay: FiniteDuration) extends UserRepository[P] { private val users: Map[UserId, User] = Map( UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")) -> User( UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")), @@ -19,11 +20,11 @@ final class InSyncDelayedMemoryUserRepository(delay: FiniteDuration) extends Use ) ) - override def search(userId: UserId): Id[Option[User]] = { + override def search(userId: UserId): P[Option[User]] = { println("Executing sync search (before delay)") Thread.sleep(delay.toMillis) println("Executing sync search (after delay)") - users.get(userId) + users.get(userId).pure } } diff --git a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InAsyncDelayedMemoryUserRepository.scala b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InAsyncDelayedMemoryUserRepository.scala index 2dc2055..56fb46a 100644 --- a/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InAsyncDelayedMemoryUserRepository.scala +++ b/src/main/scala/tv/codely/cqrs_ddd_scala_example/user_greet/infrastructure/InAsyncDelayedMemoryUserRepository.scala @@ -7,7 +7,8 @@ import scala.concurrent.duration.FiniteDuration import tv.codely.cqrs_ddd_scala_example.user_greet.domain.{User, UserId, UserRepository} -final class InAsyncDelayedMemoryUserRepository(delay: FiniteDuration)(implicit val ec: ExecutionContext) extends UserRepository[Future] { +final class InAsyncDelayedMemoryUserRepository(delay: FiniteDuration)(implicit val ec: ExecutionContext) + extends UserRepository[Future] { private val users: Map[UserId, User] = Map( UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")) -> User( UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")), diff --git a/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/AsyncUserGreetFinderTest.scala b/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/AsyncUserGreetFinderTest.scala index c256ffa..4a5b036 100644 --- a/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/AsyncUserGreetFinderTest.scala +++ b/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/AsyncUserGreetFinderTest.scala @@ -2,7 +2,7 @@ package tv.codely.cqrs_ddd_scala_example.acceptance import java.util.UUID -import scala.reflect.classTag +import scala.concurrent.Future import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global @@ -28,24 +28,20 @@ final class AsyncUserGreetFinderTest extends WordSpec with ScalaFutures with Giv Given("a UserGreetGenerator with a user repository with a notable delay") - val notableDelay = 10.seconds + val notableDelay = 3.seconds val userRepository = new InAsyncDelayedMemoryUserRepository(notableDelay) val userGreetGeneratorWithDelay = new UserGreetFinder(userRepository) val generateUserGreetQueryHandler = new FindUserGreetQueryHandler(userGreetGeneratorWithDelay) And("an AsyncQueryBus which doesn't block the execution flow until getting a response") - val queryBus = new QueryBus( - Map( - classTag[FindUserGreetQuery] -> generateUserGreetQueryHandler - )) + val queryBus = new QueryBus[Future]() + queryBus.subscribe(generateUserGreetQueryHandler) When("we ask the GenerateUserGreetQuery to the AsyncQueryBus") - val query = FindUserGreetQuery( - UUID.randomUUID(), - DateTime.now(), - UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")) + val query = + FindUserGreetQuery(UUID.randomUUID(), DateTime.now(), UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")) val futureGreeting = queryBus.ask(query) diff --git a/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/SyncUserGreetFinderTest.scala b/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/SyncUserGreetFinderTest.scala index 63f70af..80a65c0 100644 --- a/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/SyncUserGreetFinderTest.scala +++ b/src/test/scala/tv/codely/cqrs_ddd_scala_example/acceptance/SyncUserGreetFinderTest.scala @@ -2,15 +2,20 @@ package tv.codely.cqrs_ddd_scala_example.acceptance import java.util.UUID -import scala.reflect.classTag +import cats.implicits._ import scala.concurrent.duration._ import org.joda.time.DateTime import org.scalatest._ import org.scalatest.Matchers._ import tv.codely.cqrs_ddd_scala_example.bus.domain.QueryBus -import tv.codely.cqrs_ddd_scala_example.user_greet.application.generate.{FindUserGreetQuery, FindUserGreetQueryHandler, UserGreetFinder} -import tv.codely.cqrs_ddd_scala_example.user_greet.infrastructure.InSyncDelayedMemoryUserRepository +import tv.codely.cqrs_ddd_scala_example.types.ThrowableTypeClasses.EitherThrowable +import tv.codely.cqrs_ddd_scala_example.user_greet.application.generate.{ + FindUserGreetQuery, + FindUserGreetQueryHandler, + UserGreetFinder +} +import tv.codely.cqrs_ddd_scala_example.user_greet.infrastructure.DelayedMemoryUserRepository final class SyncUserGreetFinderTest extends WordSpec with GivenWhenThen { @@ -19,18 +24,15 @@ final class SyncUserGreetFinderTest extends WordSpec with GivenWhenThen { Given("a UserGreetGenerator with a user repository") - val notableDelay = 10.seconds - val userRepository = new InSyncDelayedMemoryUserRepository(notableDelay) + val notableDelay = 3.seconds + val userRepository = new DelayedMemoryUserRepository[EitherThrowable](notableDelay) val userGreetGeneratorWithDelay = new UserGreetFinder(userRepository) val generateUserGreetQueryHandler = new FindUserGreetQueryHandler(userGreetGeneratorWithDelay) And("an SyncQueryBus which block the execution flow until getting a response") - val queryBus = new QueryBus( - Map( - classTag[FindUserGreetQuery] -> generateUserGreetQueryHandler - ) - ) + val queryBus = new QueryBus[EitherThrowable]() + queryBus.subscribe(generateUserGreetQueryHandler) When("we ask the GenerateUserGreetQuery to the SyncQueryBus") @@ -42,13 +44,14 @@ final class SyncUserGreetFinderTest extends WordSpec with GivenWhenThen { ) ) - val greeting = queryBus.ask(query) + val queryResult = queryBus.ask(query) println("Query asked to the sync query bus") Then("it should say hello to someone") - greeting.greet shouldBe "Hello Rafa" + queryResult.isRight shouldBe true + queryResult.map(_.greet shouldBe "Hello Rafa") } } } diff --git a/src/test/scala/tv/codely/cqrs_ddd_scala_example/integration/QueryBusTest.scala b/src/test/scala/tv/codely/cqrs_ddd_scala_example/integration/QueryBusTest.scala new file mode 100644 index 0000000..09d1d02 --- /dev/null +++ b/src/test/scala/tv/codely/cqrs_ddd_scala_example/integration/QueryBusTest.scala @@ -0,0 +1,66 @@ +package tv.codely.cqrs_ddd_scala_example.integration + +import java.util.UUID + +import cats.Applicative +import cats.syntax.applicative._ +import cats.instances.either._ +import org.joda.time.DateTime +import org.scalatest.{Matchers, WordSpec} +import tv.codely.cqrs_ddd_scala_example.bus.domain._ +import tv.codely.cqrs_ddd_scala_example.types.ThrowableTypeClasses.EitherThrowable + +final class QueryBusTest extends WordSpec with Matchers { + + val queryBus = new QueryBus[EitherThrowable] + queryBus.subscribe(new PingQueryHandler) + queryBus.subscribe(new EchoQueryHandler) + + "A QueryBus" should { + "return the correct ping response" in { + val pingQuery = PingQuery(UUID.randomUUID(), DateTime.now()) + + val result = queryBus.ask(pingQuery) + result.isRight shouldBe true + result.map(response => response shouldBe PingResponse(pingQuery.queryId)) + } + + "return the correct echo response" in { + val echoQuery = EchoQuery("Coming soon in CodelyTV...", UUID.randomUUID(), DateTime.now()) + + val result = queryBus.ask(echoQuery) + result.isRight shouldBe true + result.map(response => response shouldBe EchoResponse(echoQuery.message, echoQuery.queryId)) + } + + "fail when no handler exists for the asked query" in { + val responseWithoutHandlers = new Response(UUID.randomUUID()) {} + val queryWithoutHandlers = new Query(UUID.randomUUID(), DateTime.now()) { + override type QueryResponse = responseWithoutHandlers.type + } + + val result = queryBus.ask(queryWithoutHandlers) + result.isLeft shouldBe true + result.left.map(_ shouldBe QueryHandlerNotFoundForQuery) + } + } +} + +private case class PingQuery(override val queryId: UUID, override val createdAt: DateTime) + extends Query(queryId, createdAt) { + override type QueryResponse = PingResponse +} +private case class PingResponse(override val requestId: UUID) extends Response(requestId) +private class PingQueryHandler[P[_]: Applicative] extends QueryHandler[P, PingQuery] { + override def handle(query: PingQuery): P[PingResponse] = Applicative[P].pure(PingResponse(query.queryId)) +} + +private case class EchoQuery(message: String, override val queryId: UUID, override val createdAt: DateTime) + extends Query(queryId, createdAt) { + override type QueryResponse = EchoResponse +} +private case class EchoResponse(message: String, override val requestId: UUID) extends Response(requestId) +private class EchoQueryHandler[P[_]: Applicative] extends QueryHandler[P, EchoQuery] { + override def handle(query: EchoQuery): P[EchoResponse] = + EchoResponse(query.message, query.queryId).pure +}