Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed Feb 13, 2018
1 parent 8a81ce4 commit 8434560
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon

case GetHealth
val regionHealth = {
val storages = storageTracker.storages.map(s (s.id, s.health))
val storages = storageTracker.storages.map(storage (storage.id, storage.health))
RegionHealth(storages.toMap)
}
sender() ! GetHealth.Success(regionId, regionHealth)
Expand All @@ -295,7 +295,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
val storage = storageTracker.getStorage(storageId)

storage.dispatcher ! StorageIndex.OpenIndex(regionId)
storage.dispatcher ! StorageDispatcher.CheckHealth
storage.dispatcher ! StorageDispatcher.GetHealth(true)

val indexFuture = RegionIndex.GetIndex.unwrapFuture(storage.dispatcher ?
StorageIndex.Envelope(regionId, RegionIndex.GetIndex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import com.karasiq.shadowcloud.utils.AkkaStreamUtils
object StorageDispatcher {
// Messages
sealed trait Message
case object CheckHealth extends Message with NotInfluenceReceiveTimeout with MessageStatus[StorageId, StorageHealth]
case object GetHealth extends Message with NotInfluenceReceiveTimeout with MessageStatus[StorageId, StorageHealth]
final case class GetHealth(checkNow: Boolean = false) extends Message with NotInfluenceReceiveTimeout
object GetHealth extends MessageStatus[StorageId, StorageHealth]

// Internal messages
private sealed trait InternalMessage extends PossiblyHarmful
Expand All @@ -49,7 +49,7 @@ private final class StorageDispatcher(storageId: StorageId, storageProps: Storag
private[this] implicit val materializer: Materializer = ActorMaterializer()
private[this] val sc = ShadowCloud()
private[this] val config = sc.configs.storageConfig(storageId, storageProps)
private[this] val healthCheckSchedule = context.system.scheduler.schedule(1 second, config.healthCheckInterval, self, CheckHealth)
private[this] val healthCheckSchedule = context.system.scheduler.schedule(1 second, config.healthCheckInterval, self, GetHealth(true))

// -----------------------------------------------------------------------
// State
Expand Down Expand Up @@ -100,19 +100,18 @@ private final class StorageDispatcher(storageId: StorageId, storageProps: Storag
// -----------------------------------------------------------------------
// Storage health
// -----------------------------------------------------------------------
case GetHealth
sender() ! GetHealth.Success(storageId, health)

case CheckHealth
healthProvider.health
.map(CheckHealth.Success(storageId, _))
.recover(PartialFunction(CheckHealth.Failure(storageId, _)))
.pipeTo(self)
case GetHealth(check)
if (check) {
val future = GetHealth.wrapFuture(storageId, healthProvider.health)
future.pipeTo(self).pipeTo(sender())
} else {
sender() ! GetHealth.Success(storageId, health)
}

case CheckHealth.Success(`storageId`, health)
case GetHealth.Success(`storageId`, health)
updateHealth(_ health)

case CheckHealth.Failure(`storageId`, error)
case GetHealth.Failure(`storageId`, error)
updateHealth(_.copy(online = false))
log.error(error, "Health update failure: {}", storageId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ final class StorageOps(regionSupervisor: ActorRef, timeouts: TimeoutsConfig)(imp
// Utils
// -----------------------------------------------------------------------
def getHealth(storageId: StorageId, checkNow: Boolean = false): Future[StorageHealth] = {
if (checkNow)
askStorage(storageId, StorageDispatcher.CheckHealth, StorageDispatcher.CheckHealth)
else
askStorage(storageId, StorageDispatcher.GetHealth, StorageDispatcher.GetHealth)
askStorage(storageId, StorageDispatcher.GetHealth, StorageDispatcher.GetHealth(checkNow))
}

private[this] def askStorage[V](storageId: StorageId, status: MessageStatus[_, V], message: Any)
Expand Down
2 changes: 1 addition & 1 deletion project/ProjectDeps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ object ProjectDeps {
type Deps = Seq[ModuleID]

object akka {
val version = "2.5.6"
val version = "2.5.9"
val httpVersion = "10.0.11"

def actors: Deps = Seq(
Expand Down

0 comments on commit 8434560

Please sign in to comment.