Skip to content
This repository has been archived by the owner on Feb 13, 2019. It is now read-only.

Add query bus supporting more than one handler #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ Configuration.settings
/** ********* PROD DEPENDENCIES *****************/
libraryDependencies ++= Seq(
"com.github.nscala-time" %% "nscala-time" % "2.16.0",
"com.lihaoyi" %% "pprint" % "0.5.2",
"org.typelevel" %% "cats-core" % "1.0.0-MF"
"com.lihaoyi" %% "pprint" % "0.5.2",
"org.typelevel" %% "cats-core" % "1.0.0-MF"
)

/** ********* TEST DEPENDENCIES *****************/
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1" % Test,
"org.scalatest" %% "scalatest" % "3.0.1" % Test,
"org.scalamock" %% "scalamock-scalatest-support" % "3.6.0" % Test
)

Expand All @@ -27,7 +27,7 @@ addCommandAlias("tsf", "testShowFailed")
addCommandAlias("c", "compile")
addCommandAlias("tc", "test:compile")

addCommandAlias("f", "scalafmt") // Format all files according to ScalaFmt
addCommandAlias("f", "scalafmt") // Format all files according to ScalaFmt
addCommandAlias("ft", "scalafmtTest") // Test if all files are formatted according to ScalaFmt

addCommandAlias("prep", ";c;tc;ft") // All the needed tasks before running the test
2 changes: 1 addition & 1 deletion project/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object Configuration {
"-Ywarn-unused-import",
"-Xcheckinit", // Check against early initialization
"-language:higherKinds"
),
),
scalacOptions in run in Compile -= "-Xcheckinit", // Remove it in production because it's expensive
javaOptions += "-Duser.timezone=UTC",
// Test options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ import java.util.UUID

import org.joda.time.DateTime

abstract class Query(val queryId: UUID, val askedAt: DateTime) extends Request(queryId, askedAt)
abstract class Query(val queryId: UUID, val askedAt: DateTime) extends Request(queryId, askedAt) {
type QueryResponse <: Response
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have used the types as you've done in this PR yet. Thanks for sharing!

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@ package tv.codely.cqrs_ddd_scala_example.bus.domain

import scala.reflect.ClassTag

final class QueryBus[P[_], QueryType <: Query, ResponseType <: Response](
private val handlers: Map[
ClassTag[QueryType],
QueryHandler[P, QueryType,ResponseType]
]
) {
def ask(query: QueryType): P[ResponseType] =
handlers(ClassTag[QueryType](query.getClass)).handle(query)
import cats.implicits._
import tv.codely.cqrs_ddd_scala_example.types.ThrowableTypeClasses.MonadErrorThrowable

final class QueryBus[P[_]: MonadErrorThrowable] {
private var handlers: Map[Class[_], Query => P[_ <: Response]] = Map.empty

def ask[Q <: Query: ClassTag](query: Q): P[Q#QueryResponse] = {
val classTag = implicitly[ClassTag[Q]]
handlers
.get(classTag.runtimeClass)
.fold[P[Q#QueryResponse]](MonadErrorThrowable[P].raiseError(QueryHandlerNotFoundForQuery))(handler =>
handler(query).map(_.asInstanceOf[Q#QueryResponse]))
}

def subscribe[Q <: Query: ClassTag](handler: QueryHandler[P, Q]): Unit = {
val classTag = implicitly[ClassTag[Q]]
handlers = handlers +
(classTag.runtimeClass -> ((query: Query) => handler.handle(query.asInstanceOf[Q])))
}
}

object QueryHandlerNotFoundForQuery extends Exception
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package tv.codely.cqrs_ddd_scala_example.bus.domain

abstract class QueryHandler[P[_], QueryType <: Query, ResponseType <: Response] {
def handle(query: QueryType): P[ResponseType]
abstract class QueryHandler[P[_], QueryType <: Query] {
def handle(query: QueryType): P[QueryType#QueryResponse]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package tv.codely.cqrs_ddd_scala_example.types

import cats.MonadError

object ThrowableTypeClasses {
type MonadErrorThrowable[P[_]] = MonadError[P, Throwable]

object MonadErrorThrowable {
def apply[P[_]](implicit E: MonadError[P, Throwable]): MonadErrorThrowable[P] = E
}

type EitherThrowable[A] = Either[Throwable, A]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ import org.joda.time.DateTime
import tv.codely.cqrs_ddd_scala_example.bus.domain.Query

case class FindUserGreetQuery(override val queryId: UUID, override val askedAt: DateTime, userId: UUID)
extends Query(queryId, askedAt)
extends Query(queryId, askedAt) {
override type QueryResponse = UserGreetResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import tv.codely.cqrs_ddd_scala_example.bus.domain.QueryHandler
import tv.codely.cqrs_ddd_scala_example.user_greet.domain.UserId

final class FindUserGreetQueryHandler[P[_]: Functor](
private val generator: UserGreetFinder[P]
) extends QueryHandler[P, FindUserGreetQuery, UserGreetResponse] {
private val generator: UserGreetFinder[P]
) extends QueryHandler[P, FindUserGreetQuery] {

override def handle(query: FindUserGreetQuery): P[UserGreetResponse] = {
val userId = UserId(query.userId)

generator.generate(userId).map(greet =>
UserGreetResponse(query.requestId, greet.greet)
)
generator.generate(userId).map(greet => UserGreetResponse(query.requestId, greet.greet))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import java.util.UUID

import scala.concurrent.duration.FiniteDuration

import cats.Id
import cats.Applicative
import cats.syntax.applicative._
import tv.codely.cqrs_ddd_scala_example.user_greet.domain.{User, UserId, UserRepository}

final class InSyncDelayedMemoryUserRepository(delay: FiniteDuration) extends UserRepository[Id] {
final class DelayedMemoryUserRepository[P[_]: Applicative](delay: FiniteDuration) extends UserRepository[P] {
private val users: Map[UserId, User] = Map(
UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")) -> User(
UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")),
Expand All @@ -19,11 +20,11 @@ final class InSyncDelayedMemoryUserRepository(delay: FiniteDuration) extends Use
)
)

override def search(userId: UserId): Id[Option[User]] = {
override def search(userId: UserId): P[Option[User]] = {
println("Executing sync search (before delay)")
Thread.sleep(delay.toMillis)
println("Executing sync search (after delay)")

users.get(userId)
users.get(userId).pure
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import scala.concurrent.duration.FiniteDuration

import tv.codely.cqrs_ddd_scala_example.user_greet.domain.{User, UserId, UserRepository}

final class InAsyncDelayedMemoryUserRepository(delay: FiniteDuration)(implicit val ec: ExecutionContext) extends UserRepository[Future] {
final class InAsyncDelayedMemoryUserRepository(delay: FiniteDuration)(implicit val ec: ExecutionContext)
extends UserRepository[Future] {
private val users: Map[UserId, User] = Map(
UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")) -> User(
UserId(UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package tv.codely.cqrs_ddd_scala_example.acceptance

import java.util.UUID

import scala.reflect.classTag
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

Expand All @@ -28,24 +28,20 @@ final class AsyncUserGreetFinderTest extends WordSpec with ScalaFutures with Giv

Given("a UserGreetGenerator with a user repository with a notable delay")

val notableDelay = 10.seconds
val notableDelay = 3.seconds
val userRepository = new InAsyncDelayedMemoryUserRepository(notableDelay)
val userGreetGeneratorWithDelay = new UserGreetFinder(userRepository)
val generateUserGreetQueryHandler = new FindUserGreetQueryHandler(userGreetGeneratorWithDelay)

And("an AsyncQueryBus which doesn't block the execution flow until getting a response")

val queryBus = new QueryBus(
Map(
classTag[FindUserGreetQuery] -> generateUserGreetQueryHandler
))
val queryBus = new QueryBus[Future]()
queryBus.subscribe(generateUserGreetQueryHandler)

When("we ask the GenerateUserGreetQuery to the AsyncQueryBus")

val query = FindUserGreetQuery(
UUID.randomUUID(),
DateTime.now(),
UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe"))
val query =
FindUserGreetQuery(UUID.randomUUID(), DateTime.now(), UUID.fromString("1646fd5c-de2b-435f-b20f-ad1f50924dfe"))

val futureGreeting = queryBus.ask(query)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package tv.codely.cqrs_ddd_scala_example.acceptance

import java.util.UUID

import scala.reflect.classTag
import cats.implicits._
import scala.concurrent.duration._

import org.joda.time.DateTime
import org.scalatest._
import org.scalatest.Matchers._
import tv.codely.cqrs_ddd_scala_example.bus.domain.QueryBus
import tv.codely.cqrs_ddd_scala_example.user_greet.application.generate.{FindUserGreetQuery, FindUserGreetQueryHandler, UserGreetFinder}
import tv.codely.cqrs_ddd_scala_example.user_greet.infrastructure.InSyncDelayedMemoryUserRepository
import tv.codely.cqrs_ddd_scala_example.types.ThrowableTypeClasses.EitherThrowable
import tv.codely.cqrs_ddd_scala_example.user_greet.application.generate.{
FindUserGreetQuery,
FindUserGreetQueryHandler,
UserGreetFinder
}
import tv.codely.cqrs_ddd_scala_example.user_greet.infrastructure.DelayedMemoryUserRepository

final class SyncUserGreetFinderTest extends WordSpec with GivenWhenThen {

Expand All @@ -19,18 +24,15 @@ final class SyncUserGreetFinderTest extends WordSpec with GivenWhenThen {

Given("a UserGreetGenerator with a user repository")

val notableDelay = 10.seconds
val userRepository = new InSyncDelayedMemoryUserRepository(notableDelay)
val notableDelay = 3.seconds
val userRepository = new DelayedMemoryUserRepository[EitherThrowable](notableDelay)
val userGreetGeneratorWithDelay = new UserGreetFinder(userRepository)
val generateUserGreetQueryHandler = new FindUserGreetQueryHandler(userGreetGeneratorWithDelay)

And("an SyncQueryBus which block the execution flow until getting a response")

val queryBus = new QueryBus(
Map(
classTag[FindUserGreetQuery] -> generateUserGreetQueryHandler
)
)
val queryBus = new QueryBus[EitherThrowable]()
queryBus.subscribe(generateUserGreetQueryHandler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try to refactor this requiring to inject the subscribers while creating the QueryBus instance as we where doing?

This way we would be avoiding mutable state inside the QueryBus 🙂

If you don't have time in order to add this improvement, we could merge this PR and do that later 🙂


When("we ask the GenerateUserGreetQuery to the SyncQueryBus")

Expand All @@ -42,13 +44,14 @@ final class SyncUserGreetFinderTest extends WordSpec with GivenWhenThen {
)
)

val greeting = queryBus.ask(query)
val queryResult = queryBus.ask(query)

println("Query asked to the sync query bus")

Then("it should say hello to someone")

greeting.greet shouldBe "Hello Rafa"
queryResult.isRight shouldBe true
queryResult.map(_.greet shouldBe "Hello Rafa")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package tv.codely.cqrs_ddd_scala_example.integration

import java.util.UUID

import cats.Applicative
import cats.syntax.applicative._
import cats.instances.either._
import org.joda.time.DateTime
import org.scalatest.{Matchers, WordSpec}
import tv.codely.cqrs_ddd_scala_example.bus.domain._
import tv.codely.cqrs_ddd_scala_example.types.ThrowableTypeClasses.EitherThrowable

final class QueryBusTest extends WordSpec with Matchers {

val queryBus = new QueryBus[EitherThrowable]
queryBus.subscribe(new PingQueryHandler)
queryBus.subscribe(new EchoQueryHandler)

"A QueryBus" should {
"return the correct ping response" in {
val pingQuery = PingQuery(UUID.randomUUID(), DateTime.now())

val result = queryBus.ask(pingQuery)
result.isRight shouldBe true
result.map(response => response shouldBe PingResponse(pingQuery.queryId))
}

"return the correct echo response" in {
val echoQuery = EchoQuery("Coming soon in CodelyTV...", UUID.randomUUID(), DateTime.now())

val result = queryBus.ask(echoQuery)
result.isRight shouldBe true
result.map(response => response shouldBe EchoResponse(echoQuery.message, echoQuery.queryId))
}

"fail when no handler exists for the asked query" in {
val responseWithoutHandlers = new Response(UUID.randomUUID()) {}
val queryWithoutHandlers = new Query(UUID.randomUUID(), DateTime.now()) {
override type QueryResponse = responseWithoutHandlers.type
}

val result = queryBus.ask(queryWithoutHandlers)
result.isLeft shouldBe true
result.left.map(_ shouldBe QueryHandlerNotFoundForQuery)
}
}
}

private case class PingQuery(override val queryId: UUID, override val createdAt: DateTime)
extends Query(queryId, createdAt) {
override type QueryResponse = PingResponse
}
private case class PingResponse(override val requestId: UUID) extends Response(requestId)
private class PingQueryHandler[P[_]: Applicative] extends QueryHandler[P, PingQuery] {
override def handle(query: PingQuery): P[PingResponse] = Applicative[P].pure(PingResponse(query.queryId))
}

private case class EchoQuery(message: String, override val queryId: UUID, override val createdAt: DateTime)
extends Query(queryId, createdAt) {
override type QueryResponse = EchoResponse
}
private case class EchoResponse(message: String, override val requestId: UUID) extends Response(requestId)
private class EchoQueryHandler[P[_]: Applicative] extends QueryHandler[P, EchoQuery] {
override def handle(query: EchoQuery): P[EchoResponse] =
EchoResponse(query.message, query.queryId).pure
}