diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala index 4aa07763..f01fc836 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala @@ -285,7 +285,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon case GetHealth ⇒ val regionHealth = { - val storages = storageTracker.storages.map(s ⇒ (s.id, s.health)) + val storages = storageTracker.storages.map(storage ⇒ (storage.id, storage.health)) RegionHealth(storages.toMap) } sender() ! GetHealth.Success(regionId, regionHealth) @@ -295,7 +295,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon val storage = storageTracker.getStorage(storageId) storage.dispatcher ! StorageIndex.OpenIndex(regionId) - storage.dispatcher ! StorageDispatcher.CheckHealth + storage.dispatcher ! StorageDispatcher.GetHealth(true) val indexFuture = RegionIndex.GetIndex.unwrapFuture(storage.dispatcher ? StorageIndex.Envelope(regionId, RegionIndex.GetIndex)) diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/StorageDispatcher.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/StorageDispatcher.scala index 345ebecf..25d6b67d 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/StorageDispatcher.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/StorageDispatcher.scala @@ -24,8 +24,8 @@ import com.karasiq.shadowcloud.utils.AkkaStreamUtils object StorageDispatcher { // Messages sealed trait Message - case object CheckHealth extends Message with NotInfluenceReceiveTimeout with MessageStatus[StorageId, StorageHealth] - case object GetHealth extends Message with NotInfluenceReceiveTimeout with MessageStatus[StorageId, StorageHealth] + final case class GetHealth(checkNow: Boolean = false) extends Message with NotInfluenceReceiveTimeout + object GetHealth extends MessageStatus[StorageId, StorageHealth] // Internal messages private sealed trait InternalMessage extends PossiblyHarmful @@ -49,7 +49,7 @@ private final class StorageDispatcher(storageId: StorageId, storageProps: Storag private[this] implicit val materializer: Materializer = ActorMaterializer() private[this] val sc = ShadowCloud() private[this] val config = sc.configs.storageConfig(storageId, storageProps) - private[this] val healthCheckSchedule = context.system.scheduler.schedule(1 second, config.healthCheckInterval, self, CheckHealth) + private[this] val healthCheckSchedule = context.system.scheduler.schedule(1 second, config.healthCheckInterval, self, GetHealth(true)) // ----------------------------------------------------------------------- // State @@ -100,19 +100,18 @@ private final class StorageDispatcher(storageId: StorageId, storageProps: Storag // ----------------------------------------------------------------------- // Storage health // ----------------------------------------------------------------------- - case GetHealth ⇒ - sender() ! GetHealth.Success(storageId, health) - - case CheckHealth ⇒ - healthProvider.health - .map(CheckHealth.Success(storageId, _)) - .recover(PartialFunction(CheckHealth.Failure(storageId, _))) - .pipeTo(self) + case GetHealth(check) ⇒ + if (check) { + val future = GetHealth.wrapFuture(storageId, healthProvider.health) + future.pipeTo(self).pipeTo(sender()) + } else { + sender() ! GetHealth.Success(storageId, health) + } - case CheckHealth.Success(`storageId`, health) ⇒ + case GetHealth.Success(`storageId`, health) ⇒ updateHealth(_ ⇒ health) - case CheckHealth.Failure(`storageId`, error) ⇒ + case GetHealth.Failure(`storageId`, error) ⇒ updateHealth(_.copy(online = false)) log.error(error, "Health update failure: {}", storageId) diff --git a/core/src/main/scala/com/karasiq/shadowcloud/ops/storage/StorageOps.scala b/core/src/main/scala/com/karasiq/shadowcloud/ops/storage/StorageOps.scala index 362d4348..80a70286 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/ops/storage/StorageOps.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/ops/storage/StorageOps.scala @@ -75,10 +75,7 @@ final class StorageOps(regionSupervisor: ActorRef, timeouts: TimeoutsConfig)(imp // Utils // ----------------------------------------------------------------------- def getHealth(storageId: StorageId, checkNow: Boolean = false): Future[StorageHealth] = { - if (checkNow) - askStorage(storageId, StorageDispatcher.CheckHealth, StorageDispatcher.CheckHealth) - else - askStorage(storageId, StorageDispatcher.GetHealth, StorageDispatcher.GetHealth) + askStorage(storageId, StorageDispatcher.GetHealth, StorageDispatcher.GetHealth(checkNow)) } private[this] def askStorage[V](storageId: StorageId, status: MessageStatus[_, V], message: Any) diff --git a/project/ProjectDeps.scala b/project/ProjectDeps.scala index d30c78ef..a88cdb6e 100644 --- a/project/ProjectDeps.scala +++ b/project/ProjectDeps.scala @@ -4,7 +4,7 @@ object ProjectDeps { type Deps = Seq[ModuleID] object akka { - val version = "2.5.6" + val version = "2.5.9" val httpVersion = "10.0.11" def actors: Deps = Seq(