Skip to content

Commit

Permalink
Performance and stability fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed Jun 3, 2020
1 parent 07f547a commit fa4adc2
Show file tree
Hide file tree
Showing 28 changed files with 240 additions and 207 deletions.
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import com.typesafe.sbt.packager.docker.Cmd
import sbtcrossproject.CrossPlugin.autoImport.{CrossType, crossProject}

val commonSettings = Seq(
organization := "com.github.karasiq",
Expand All @@ -25,8 +26,8 @@ val commonSettings = Seq(
"-Ywarn-unused:-implicits",
"-Xlint",
"-Ypartial-unification",
// "-opt:l:inline",
// "-opt-inline-from:**"
"-opt:l:inline",
"-opt-inline-from:**"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class RegionGCTest extends SCExtensionSpec with FlatSpecLike with SequentialNest
sc.ops.region.synchronize(testRegionId).futureValue
sc.ops.storage.deleteChunks(testStorageId, Set(ChunkPath(testRegionId, chunk.checksum.hash))).futureValue._2.isSuccess shouldBe true
sc.actors.regionSupervisor ! RegionEnvelope(testRegionId, RegionGC.UnReserve(Set(chunk)))
expectNoMsg(1 second)
expectNoMessage(1 second)
whenReady(sc.ops.region.collectGarbage(testRegionId, GCStrategy.Delete)) { gcReport
gcReport.regionId shouldBe testRegionId
gcReport.regionState.oldFiles shouldBe empty
Expand All @@ -42,7 +42,7 @@ class RegionGCTest extends SCExtensionSpec with FlatSpecLike with SequentialNest
sc.ops.region.writeChunk(testRegionId, chunk).futureValue shouldBe chunk
sc.actors.regionSupervisor ! RegionEnvelope(testRegionId, RegionGC.UnReserve(Set(chunk)))
sc.ops.region.synchronize(testRegionId)
expectNoMsg(1 seconds)
expectNoMessage(1 seconds)

sc.ops.storage.writeIndex(testStorageId, testRegionId, IndexDiff.deleteChunks(chunk)).futureValue
sc.ops.storage.synchronize(testStorageId, testRegionId).futureValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class RegionSupervisorTest extends SCExtensionSpec with FlatSpecLike with Matche
regions.keySet shouldBe Set(testRegion)
storages shouldBe empty
regions(testRegion).storages shouldBe empty
expectNoMsg(500 millis) // Wait for storage actor stop
expectNoMessage(500 millis) // Wait for storage actor stop
}

it should "suspend storage" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag
// -----------------------------------------------------------------------
// Context
// -----------------------------------------------------------------------
private[this] implicit val materializer = ActorMaterializer()
private[this] val sc = ShadowCloud()
private[this] val config = sc.configs.storageConfig(storageId, storageProps)
private[this] val chunksWrite = PendingOperations.withRegionChunk
private[this] val chunksRead = PendingOperations.withRegionChunk

import sc.implicits.materializer

// -----------------------------------------------------------------------
// Queues
// -----------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.karasiq.shadowcloud.actors

import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash}

import com.karasiq.shadowcloud.ShadowCloud
import com.karasiq.shadowcloud.actors.utils.ContainerActor
import com.karasiq.shadowcloud.actors.RegionContainer.SetConfig
import com.karasiq.shadowcloud.actors.RegionSupervisor.RenewRegionSubscriptions
import com.karasiq.shadowcloud.actors.utils.ContainerActor
import com.karasiq.shadowcloud.config.RegionConfig
import com.karasiq.shadowcloud.model.RegionId
import com.karasiq.shadowcloud.utils.Utils

private[actors] object RegionContainer {
sealed trait Message
Expand All @@ -20,18 +18,19 @@ private[actors] object RegionContainer {
}

private[actors] final class RegionContainer(regionId: RegionId) extends Actor with Stash with ActorLogging with ContainerActor {
private[this] val sc = ShadowCloud()
private[this] val sc = ShadowCloud()
var regionConfig: RegionConfig = sc.configs.regionConfig(regionId)

def receive: Receive = {
case SetConfig(rc)
log.warning("Region config changed: {}", rc)
log.info("Region config changed: {}", rc)
this.regionConfig = sc.configs.regionConfig(regionId, rc)
restartActor()
}

def startActor(): Unit = {
val dispatcher = context.actorOf(RegionDispatcher.props(regionId, this.regionConfig), Utils.uniqueActorName(regionId))
val props = RegionDispatcher.props(regionId, this.regionConfig)
val dispatcher = context.actorOf(props, regionId)
afterStart(dispatcher)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
// -----------------------------------------------------------------------
// Context
// -----------------------------------------------------------------------
private[this] implicit val sc = ShadowCloud()
private[this] implicit val materializer = ActorMaterializer()
private[this] implicit val sc = ShadowCloud()

import context.dispatcher
import sc.implicits.defaultTimeout
import sc.implicits.{defaultTimeout, materializer}

val storageTracker = StorageTracker()
val chunksTracker = ChunksTracker(regionId, regionConfig, storageTracker)
Expand All @@ -113,7 +113,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
// -----------------------------------------------------------------------
// Actors
// -----------------------------------------------------------------------
private[this] val gcActor = context.actorOf(RegionGC.props(regionId, regionConfig), "region-gc")
private[this] val gcActor = context.watch(context.actorOf(RegionGC.props(regionId, regionConfig), "region-gc"))

// -----------------------------------------------------------------------
// Streams
Expand All @@ -126,7 +126,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
.log("region-grouped-diff")
.map(WriteIndexDiff)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.actorRef(self, Kill))
.to(Sink.actorRef(self, Kill, _ => Kill))
.named("regionPendingQueue")
.run()

Expand Down Expand Up @@ -298,7 +298,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon

val indexFuture = indexTracker.storages.io
.synchronize(storage)
.flatMap(_ => indexTracker.storages.io.getIndex(storage))
.flatMap(_ indexTracker.storages.io.getIndex(storage))

indexFuture.onComplete {
case Success(IndexMerger.State(Nil, IndexDiff.empty)) | Failure(StorageException.NotFound(_, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.karasiq.shadowcloud.actors

import akka.actor.{Actor, ActorLogging, DeadLetterSuppression, PossiblyHarmful, Props, ReceiveTimeout, Status}
import akka.persistence._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.{Done, NotUsed}
import com.karasiq.shadowcloud.ShadowCloud
Expand Down Expand Up @@ -66,10 +65,11 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
require(storageId.nonEmpty && regionId.nonEmpty, "Invalid storage identifier")

import context.dispatcher
private[this] implicit val materializer = ActorMaterializer()
private[this] implicit lazy val sc = ShadowCloud()
private[this] lazy val config = sc.configs.storageConfig(storageId, storageProps)

import sc.implicits.materializer

private[this] object state {
val indexId = RegionIndexId(storageId, regionId)
val persistenceId = indexId.toPersistenceId
Expand Down Expand Up @@ -115,8 +115,9 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
state.compactRequested = true
}

case Synchronize if sender() != self && sender() != Actor.noSender
state.pendingSync.addWaiter(state.indexId, sender())
case Synchronize
if (sender() != self && sender() != Actor.noSender)
state.pendingSync.addWaiter(state.indexId, sender())

case DeleteHistory
val sender = context.sender()
Expand Down Expand Up @@ -223,7 +224,7 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
becomeOrDefault(receivePreRead(loadedKeys ++ keys))

case Status.Failure(error)
log.error(error, "Diffs load failed")
log.debug("Diffs load failed: {}", error)
// throw error
synchronization.scheduleNext()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.karasiq.shadowcloud.actors

import akka.NotUsed
import akka.actor.{ActorLogging, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.actor.{ActorInitializationException, ActorLogging, DeathPactException, OneForOneStrategy, Props, Status, SupervisorStrategy, Terminated}
import akka.persistence.{PersistentActor, RecoveryCompleted, SnapshotOffer}
import akka.util.Timeout
import com.karasiq.shadowcloud.ShadowCloud
Expand All @@ -21,18 +21,18 @@ object RegionSupervisor {
// Messages
sealed trait Message
final case class CreateRegion(regionId: RegionId, regionConfig: RegionConfig = RegionConfig.empty) extends Message
final case class DeleteRegion(regionId: RegionId) extends Message
final case class CreateStorage(storageId: StorageId, props: StorageProps) extends Message
final case class DeleteStorage(storageId: StorageId) extends Message
final case class RegisterStorage(regionId: RegionId, storageId: StorageId) extends Message
final case class UnregisterStorage(regionId: RegionId, storageId: StorageId) extends Message
final case class SuspendStorage(storageId: StorageId) extends Message
final case class SuspendRegion(regionId: RegionId) extends Message
final case class ResumeStorage(storageId: StorageId) extends Message
final case class ResumeRegion(regionId: RegionId) extends Message
final case object GetSnapshot extends Message with MessageStatus[NotUsed, RegionTracker.Snapshot]

private[actors] final case class RenewRegionSubscriptions(regionId: RegionId) extends Message
final case class DeleteRegion(regionId: RegionId) extends Message
final case class CreateStorage(storageId: StorageId, props: StorageProps) extends Message
final case class DeleteStorage(storageId: StorageId) extends Message
final case class RegisterStorage(regionId: RegionId, storageId: StorageId) extends Message
final case class UnregisterStorage(regionId: RegionId, storageId: StorageId) extends Message
final case class SuspendStorage(storageId: StorageId) extends Message
final case class SuspendRegion(regionId: RegionId) extends Message
final case class ResumeStorage(storageId: StorageId) extends Message
final case class ResumeRegion(regionId: RegionId) extends Message
final case object GetSnapshot extends Message with MessageStatus[NotUsed, RegionTracker.Snapshot]

private[actors] final case class RenewRegionSubscriptions(regionId: RegionId) extends Message
private[actors] final case class RenewStorageSubscriptions(storageId: StorageId) extends Message

// Snapshot
Expand All @@ -54,18 +54,18 @@ private final class RegionSupervisor extends PersistentActor with ActorLogging w
// Settings
// -----------------------------------------------------------------------
private[this] implicit val timeout: Timeout = Timeout(10 seconds)
private[this] implicit lazy val sc = ShadowCloud()
private[this] implicit lazy val sc = ShadowCloud()

override def persistenceId: String = "regions"
override def journalPluginId: String = sc.config.persistence.journalPlugin
override def persistenceId: String = "regions"
override def journalPluginId: String = sc.config.persistence.journalPlugin
override def snapshotPluginId: String = sc.config.persistence.snapshotPlugin

// -----------------------------------------------------------------------
// Recover
// -----------------------------------------------------------------------
def receiveRecover: Receive = { // TODO: Create snapshots
val storages = mutable.AnyRefMap.empty[String, StorageSnapshot]
val regions = mutable.AnyRefMap.empty[String, RegionSnapshot]
val regions = mutable.AnyRefMap.empty[String, RegionSnapshot]
val recoverFunc: Receive = {
case SnapshotOffer(_, snapshot: Snapshot)
regions.clear()
Expand Down Expand Up @@ -177,8 +177,7 @@ private final class RegionSupervisor extends PersistentActor with ActorLogging w
dispatcher.forward(message)

case ActorState.Suspended
sender() ! Status.Failure(SupervisorException.IllegalRegionState(regionId,
new IllegalStateException("Region is suspended")))
sender() ! Status.Failure(SupervisorException.IllegalRegionState(regionId, new IllegalStateException("Region is suspended")))
}
} else {
sender() ! Status.Failure(SupervisorException.RegionNotFound(regionId))
Expand All @@ -191,12 +190,17 @@ private final class RegionSupervisor extends PersistentActor with ActorLogging w
dispatcher.forward(message)

case ActorState.Suspended
sender() ! Status.Failure(SupervisorException.IllegalStorageState(storageId,
new IllegalStateException("Storage is suspended")))
sender() ! Status.Failure(SupervisorException.IllegalStorageState(storageId, new IllegalStateException("Storage is suspended")))
}
} else {
sender() ! Status.Failure(SupervisorException.StorageNotFound(storageId))
}

// -----------------------------------------------------------------------
// Misc
// -----------------------------------------------------------------------
case Terminated(ref)
log.error("Supervised actor terminated: {}", ref)
}

// -----------------------------------------------------------------------
Expand All @@ -206,11 +210,15 @@ private final class RegionSupervisor extends PersistentActor with ActorLogging w
case error: IllegalArgumentException
log.error(error, "Unexpected error")
SupervisorStrategy.Resume


case error @ (_: ActorInitializationException | _: DeathPactException)
log.error(error, "Stopping actor due to error")
SupervisorStrategy.Stop

case error: Throwable
log.error(error, "Actor failure")
SupervisorStrategy.Escalate
}
}
}

private sealed trait RegionSupervisorState { self: RegionSupervisor
Expand Down Expand Up @@ -271,12 +279,14 @@ private sealed trait RegionSupervisorState { self: RegionSupervisor ⇒

def loadState(storages: collection.Map[String, StorageSnapshot], regions: collection.Map[String, RegionSnapshot]): Unit = {
state.clear()
storages.foreach { case (storageId, StorageSnapshot(props, active))
updateState(StorageAdded(storageId, props, active))
storages.foreach {
case (storageId, StorageSnapshot(props, active))
updateState(StorageAdded(storageId, props, active))
}
regions.foreach { case (regionId, RegionSnapshot(regionConfig, storages, active))
updateState(RegionAdded(regionId, regionConfig, active))
storages.foreach(storageId updateState(StorageRegistered(regionId, storageId)))
regions.foreach {
case (regionId, RegionSnapshot(regionConfig, storages, active))
updateState(RegionAdded(regionId, regionConfig, active))
storages.foreach(storageId updateState(StorageRegistered(regionId, storageId)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.karasiq.shadowcloud.actors.internal.StorageInstantiator
import com.karasiq.shadowcloud.actors.utils.ContainerActor
import com.karasiq.shadowcloud.model.StorageId
import com.karasiq.shadowcloud.storage.props.StorageProps
import com.karasiq.shadowcloud.utils.Utils

import scala.concurrent.duration._

Expand All @@ -23,21 +22,27 @@ private[actors] object StorageContainer {

//noinspection ActorMutableStateInspection
private[actors] final class StorageContainer(instantiator: StorageInstantiator, storageId: StorageId)
extends Actor with ActorLogging with Stash with ContainerActor {
extends Actor
with ActorLogging
with Stash
with ContainerActor {

private[this] val sc = ShadowCloud()
private[this] var storageProps: StorageProps = StorageProps.inMemory

def receive: Receive = {
case SetProps(props)
log.warning("Storage props changed: {}", props)
log.info("Storage props changed: {}", props)
this.storageProps = props
restartActor()
}

def startActor(): Unit = {
val props = Props(new Actor {
private[this] val storage = instantiator.createStorage(storageId, storageProps)
private[this] val storage: ActorRef = context.watch(instantiator.createStorage(storageId, storageProps))

//noinspection ScalaUnusedSymbol
private[this] val healthSupervisor: ActorRef =
context.watch(context.actorOf(StorageHealthSupervisor.props(storage, 30 seconds, 5), "health-sv"))

override def preStart(): Unit = {
super.preStart()
Expand All @@ -46,7 +51,7 @@ private[actors] final class StorageContainer(instantiator: StorageInstantiator,

def receive: Receive = {
case Terminated(ref)
log.error("Watched actor terminated: {}", ref)
log.warning("Terminated: {}", ref)
context.stop(self)

case msg if sender() == storage
Expand All @@ -57,14 +62,12 @@ private[actors] final class StorageContainer(instantiator: StorageInstantiator,
}
})

val id = Utils.uniqueActorName(storageId)
val actor = context.actorOf(props, id)
val healthSupervisor = context.actorOf(StorageHealthSupervisor.props(actor, 30 seconds, 5), s"$id-health-sv")
afterStart(healthSupervisor)
val actor = context.actorOf(props, storageId)
afterStart(actor)
}

override def afterStart(actor: ActorRef): Unit = {
sc.actors.regionSupervisor ! RenewStorageSubscriptions(storageId)
ShadowCloud().actors.regionSupervisor ! RenewStorageSubscriptions(storageId)
super.afterStart(actor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ private final class StorageDispatcher(
// -----------------------------------------------------------------------
import context.dispatcher

private[this] implicit val materializer: Materializer = ActorMaterializer()
private[this] val sc = ShadowCloud()
private[this] val config = sc.configs.storageConfig(storageId, storageProps)
private[this] val healthCheckSchedule = context.system.scheduler.schedule(1 second, config.healthCheckInterval, self, GetHealth(true))
private[this] val sc = ShadowCloud()
private[this] val config = sc.configs.storageConfig(storageId, storageProps)
private[this] val healthCheckSchedule =
context.system.scheduler.scheduleWithFixedDelay(Duration.Zero, config.healthCheckInterval, self, GetHealth(true))

import sc.implicits.materializer

// -----------------------------------------------------------------------
// State
Expand Down
Loading

0 comments on commit fa4adc2

Please sign in to comment.