Skip to content

Commit

Permalink
Blocker for all Micrometer modules (#562)
Browse files Browse the repository at this point in the history
Co-authored-by: Jan Kolena <[email protected]>
  • Loading branch information
jendakol and Jan Kolena authored May 17, 2021
1 parent 9f6a029 commit fce6389
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 14 deletions.
2 changes: 1 addition & 1 deletion example/src/main/scala/com/avast/sst/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object Main extends ZioServerApp {
)
meterRegistry <- MicrometerJmxModule.make[Task](configuration.jmx)
_ <- Resource.eval(MicrometerJvmModule.make[Task](meterRegistry))
serverMetricsModule <- Resource.eval(MicrometerHttp4sServerMetricsModule.make[Task](meterRegistry, clock))
serverMetricsModule <- Resource.eval(MicrometerHttp4sServerMetricsModule.make[Task](meterRegistry, executorModule.blocker, clock))
boundedConnectExecutionContext <-
executorModule
.makeThreadPoolExecutor(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.avast.sst.http4s.server.micrometer

import cats.effect.Effect
import cats.effect.concurrent.Ref
import cats.effect.{Blocker, ContextShift, Effect, IO}
import cats.syntax.functor._
import io.micrometer.core.instrument.MeterRegistry
import org.http4s.metrics.{MetricsOps, TerminationType}
Expand All @@ -12,9 +12,11 @@ import java.util.concurrent.TimeUnit
object MicrometerHttp4sMetricsOpsModule {

/** Makes [[org.http4s.metrics.MetricsOps]] to record the usual HTTP server metrics. */
def make[F[_]: Effect](meterRegistry: MeterRegistry): F[MetricsOps[F]] = {
def make[F[_]: Effect](meterRegistry: MeterRegistry, blocker: Blocker): F[MetricsOps[F]] = {
val F = Effect[F]

implicit val iocs: ContextShift[IO] = IO.contextShift(blocker.blockingContext)

for {
activeRequests <- Ref.of[F, Long](0L)
} yield new MetricsOps[F] {
Expand All @@ -25,7 +27,7 @@ object MicrometerHttp4sMetricsOpsModule {
meterRegistry.gauge(
s"$prefix.active-requests",
activeRequests,
(_: Ref[F, Long]) => Effect[F].toIO(activeRequests.get).unsafeRunSync().toDouble
(_: Ref[F, Long]) => blocker.blockOn(Effect[F].toIO(activeRequests.get)).unsafeRunSync().toDouble
)

override def increaseActiveRequests(classifier: Option[String]): F[Unit] = activeRequests.update(_ + 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.avast.sst.http4s.server.micrometer

import cats.effect.{Clock, Effect, Sync}
import cats.effect.{Blocker, Clock, Effect, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import io.micrometer.core.instrument.MeterRegistry
Expand All @@ -14,11 +14,11 @@ object MicrometerHttp4sServerMetricsModule {
/** Makes [[com.avast.sst.http4s.server.micrometer.MicrometerHttp4sServerMetricsModule]] that can be used to setup monitoring
* of the whole HTTP server and individual routes.
*/
def make[F[_]: Effect](meterRegistry: MeterRegistry, clock: Clock[F]): F[MicrometerHttp4sServerMetricsModule[F]] = {
def make[F[_]: Effect](meterRegistry: MeterRegistry, blocker: Blocker, clock: Clock[F]): F[MicrometerHttp4sServerMetricsModule[F]] = {
implicit val c: Clock[F] = clock

for {
metricsOps <- MicrometerHttp4sMetricsOpsModule.make[F](meterRegistry)
metricsOps <- MicrometerHttp4sMetricsOpsModule.make[F](meterRegistry, blocker)
routeMetrics <- Sync[F].delay(new RouteMetrics[F](meterRegistry))
} yield new MicrometerHttp4sServerMetricsModule[F](Metrics(metricsOps), routeMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.avast.sst.http4s.server.micrometer

import cats.effect.IO
import cats.effect.{Blocker, IO}
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.http4s.{Method, Status}
import org.scalatest.funsuite.AnyFunSuite

import java.util.concurrent.TimeUnit
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.ExecutionContext

class MicrometerHttp4sMetricsOpsModuleTest extends AnyFunSuite {

test("http4s MetricsOps for Micrometer") {
val registry = new SimpleMeterRegistry()
val metricsOps = MicrometerHttp4sMetricsOpsModule.make[IO](registry).unsafeRunSync()
val blocker = Blocker.liftExecutionContext(ExecutionContext.fromExecutor(Executors.newCachedThreadPool()))
val metricsOps = MicrometerHttp4sMetricsOpsModule.make[IO](registry, blocker).unsafeRunSync()

metricsOps.increaseActiveRequests(None).unsafeRunSync()
metricsOps.recordTotalTime(Method.GET, Status.Ok, 2500, None).unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package com.avast.sst.micrometer.statsd

import cats.effect.{Resource, Sync}
import cats.effect.{Blocker, ContextShift, Resource, Sync}
import com.avast.sst.micrometer.PrefixMeterFilter
import io.micrometer.core.instrument.Clock
import io.micrometer.core.instrument.config.{MeterFilter, NamingConvention}
import io.micrometer.core.instrument.util.HierarchicalNameMapper
import io.micrometer.statsd.{StatsdConfig, StatsdFlavor, StatsdMeterRegistry, StatsdProtocol}
import io.micrometer.statsd._

import java.time.Duration

object MicrometerStatsDModule {

/** Makes configured [[io.micrometer.statsd.StatsdMeterRegistry]]. */
def make[F[_]: Sync](
def make[F[_]: Sync: ContextShift](
config: MicrometerStatsDConfig,
blocker: Blocker,
clock: Clock = Clock.SYSTEM,
nameMapper: HierarchicalNameMapper = HierarchicalNameMapper.DEFAULT,
namingConvention: Option[NamingConvention] = None,
Expand Down Expand Up @@ -43,7 +44,7 @@ object MicrometerStatsDModule {

registry
}
}(registry => Sync[F].delay(registry.close()))
}(registry => blocker.delay(registry.close()))
}

private class CustomStatsdConfig(c: MicrometerStatsDConfig) extends StatsdConfig {
Expand Down

0 comments on commit fce6389

Please sign in to comment.