Skip to content

Commit

Permalink
Yandex storage plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed Jun 14, 2020
1 parent 3bca8b2 commit 2f54c17
Show file tree
Hide file tree
Showing 79 changed files with 127,862 additions and 996 deletions.
25 changes: 15 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import com.typesafe.sbt.packager.docker.Cmd
import sbtcrossproject.CrossPlugin.autoImport.{CrossType, crossProject}
import sbtcrossProject(JSPlatform, JVMPlatform).CrossPlugin.autoImport.{CrossType, crossProject(JSPlatform, JVMPlatform)}

val commonSettings = Seq(
organization := "com.github.karasiq",
Expand All @@ -26,8 +26,8 @@ val commonSettings = Seq(
"-Ywarn-unused:-implicits",
"-Xlint",
"-Ypartial-unification",
"-opt:l:inline",
"-opt-inline-from:**"
//"-opt:l:inline",
//"-opt-inline-from:**"
)
)

Expand Down Expand Up @@ -93,7 +93,7 @@ lazy val dockerSettings = Seq(
// -----------------------------------------------------------------------
// Shared
// -----------------------------------------------------------------------
lazy val model = crossProject
lazy val model = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.settings(commonSettings, name := "shadowcloud-model")
.settings(
Expand All @@ -111,7 +111,7 @@ lazy val modelJVM = model.jvm

lazy val modelJS = model.js

lazy val utils = crossProject
lazy val utils = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.settings(commonSettings, name := "shadowcloud-utils")
.jvmSettings(
Expand All @@ -126,7 +126,7 @@ lazy val utilsJVM = utils.jvm

lazy val utilsJS = utils.js

lazy val testUtils = (crossProject.crossType(CrossType.Pure) in file("utils") / "test")
lazy val testUtils = (crossProject(JSPlatform, JVMPlatform).crossType(CrossType.Pure) in file("utils") / "test")
.settings(commonSettings, name := "shadowcloud-test-utils")
.jvmSettings(
libraryDependencies ++=
Expand All @@ -139,7 +139,7 @@ lazy val testUtilsJVM = testUtils.jvm

lazy val testUtilsJS = testUtils.js

lazy val serialization = crossProject
lazy val serialization = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.settings(commonSettings, name := "shadowcloud-serialization")
.jvmSettings(libraryDependencies ++= ProjectDeps.playJson ++ ProjectDeps.boopickle ++ ProjectDeps.kryo)
Expand Down Expand Up @@ -183,7 +183,8 @@ lazy val coreAssembly = (project in file("core/assembly"))
mailruCloudStorage,
dropboxStorage,
webdavStorage,
telegramStorage
telegramStorage,
yandexStorage
)
.dependsOn(
Seq[ClasspathDep[ProjectReference]](javacvMetadata).filter(_ ProjectDeps.javacv.isEnabled) ++
Expand All @@ -201,7 +202,9 @@ lazy val coreAssembly = (project in file("core/assembly"))
googleDriveStorage,
mailruCloudStorage,
dropboxStorage,
webdavStorage
webdavStorage,
telegramStorage,
yandexStorage
)

// -----------------------------------------------------------------------
Expand Down Expand Up @@ -279,6 +282,8 @@ lazy val webdavStorage = storagePlugin("webdav")
lazy val telegramStorage = storagePlugin("telegram")
.settings(libraryDependencies ++= ProjectDeps.guava)

lazy val yandexStorage = storagePlugin("yandex")

// Metadata plugins
def metadataPlugin(id: String): Project = {
val prefixedId = s"metadata-$id"
Expand Down Expand Up @@ -310,7 +315,7 @@ lazy val markdownMetadata = metadataPlugin("markdown")
// -----------------------------------------------------------------------
// HTTP
// -----------------------------------------------------------------------
lazy val autowireApi = (crossProject.crossType(CrossType.Pure) in (file("server") / "autowire-api"))
lazy val autowireApi = (crossProject(JSPlatform, JVMPlatform).crossType(CrossType.Pure) in (file("server") / "autowire-api"))
.settings(commonSettings, name := "shadowcloud-autowire-api")
.jvmSettings(libraryDependencies ++= ProjectDeps.autowire ++ ProjectDeps.scalaTest.map(_ % "test"))
.jsSettings(ScalaJSDeps.autowire, ScalaJSDeps.browserDom, ScalaJSDeps.scalaTest)
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include "sc-akka-serialization.conf"

shadowcloud {
default-storage {
immutable = false // Prohibits delete of data
// chunk-key = hash
health-check-interval = 5m

Expand Down Expand Up @@ -137,7 +138,7 @@ shadowcloud {
query = 10s
chunk-write = 15s
chunk-read = 15s
chunks-list = 1m
chunks-list = 5m
chunks-delete = 2m
region-chunk-write = 2m
region-chunk-read = ${shadowcloud.timeouts.chunk-read}
Expand All @@ -149,8 +150,9 @@ shadowcloud {
}

buffers {
read-chunks = 16M
repair = 16M
default = 16M
read-chunks = ${shadowcloud.buffers.default}
repair = ${shadowcloud.buffers.default}
}

serialization {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag
// -----------------------------------------------------------------------
// Context
// -----------------------------------------------------------------------
private[this] val sc = ShadowCloud()
private[this] val config = sc.configs.storageConfig(storageId, storageProps)
private[this] val chunksWrite = PendingOperations.withRegionChunk
private[this] val chunksRead = PendingOperations.withRegionChunk
private[this] val sc = ShadowCloud()
private[this] val config = sc.configs.storageConfig(storageId, storageProps)
private[this] val chunksWrite = PendingOperations.withRegionChunk
private[this] val chunksRead = PendingOperations.withRegionChunk

import sc.implicits.materializer

Expand All @@ -92,24 +92,28 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag

writeSource.extractMatFuture
.flatMapConcat {
case StorageIOResult.Failure(_, StorageException.AlreadyExists(sp, _)) =>
case res @ StorageIOResult.Failure(_, StorageException.AlreadyExists(sp, _))
log.warning("Chunk already exists: {}", sp)
Source
.single(path.chunkId)
.mapAsync(1)(repository.delete(_))
.flatMapConcatMatRight(_ => writeSource)
.extractMatFuture
.map(_.last)

case otherResult =>

if (config.immutable)
Source.single(res)
else
Source
.single(path.chunkId)
.mapAsync(1)(repository.delete(_))
.flatMapConcatMatRight(_ writeSource)
.extractMatFuture
.map(_.last)

case otherResult
Source.single(otherResult)
}
.completionTimeout(config.chunkIO.writeTimeout)
.alsoTo(AkkaStreamUtils.successPromiseOnFirst(promise))
.viaMat(AkkaStreamUtils.ignoreUpstream(Source.future(promise.future)))(Keep.right)
.log("storage-chunkio-write")
.map(_ => NotUsed)
.recover { case _ NotUsed }
.map(_ NotUsed)
.recoverWithRetries(1, { case _ Source.empty })
.named("storageWrite")
}
)
Expand Down Expand Up @@ -146,13 +150,13 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag
.fromGraph(readGraph)
// .log("storage-read-results")
.alsoToMat(Sink.head)(Keep.right)
.mapMaterializedValue { f =>
.mapMaterializedValue { f
promise.completeWith(f)
NotUsed
}
.completionTimeout(config.chunkIO.readTimeout)
.alsoTo(AkkaStreamUtils.failPromiseOnFailure(promise))
.recover { case _ NotUsed }
.recoverWithRetries(1, { case _ Source.empty })
.named("storageReadGraph")
}
)
Expand Down Expand Up @@ -180,6 +184,7 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag
chunksRead.finish(msg.key, msg)

case DeleteChunks(chunks)
if (config.immutable) sender() ! DeleteChunks.Failure(chunks, new IllegalStateException("Storage is immutable"))
log.warning("Deleting chunks from storage: [{}]", Utils.printValues(chunks))
deleteChunks(chunks).pipeTo(sender())

Expand All @@ -205,10 +210,10 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag

override def postStop(): Unit = {
chunksWrite.finishAll(
key => WriteChunk.Failure(key, StorageException.IOFailure(key._1.toStoragePath, new RuntimeException("Chunk IO dispatcher stopped")))
key WriteChunk.Failure(key, StorageException.IOFailure(key._1.toStoragePath, new RuntimeException("Chunk IO dispatcher stopped")))
)
chunksRead.finishAll(
key => ReadChunk.Failure(key, StorageException.IOFailure(key._1.toStoragePath, new RuntimeException("Chunk IO dispatcher stopped")))
key ReadChunk.Failure(key, StorageException.IOFailure(key._1.toStoragePath, new RuntimeException("Chunk IO dispatcher stopped")))
)
writeQueue.complete()
readQueue.complete()
Expand Down Expand Up @@ -307,9 +312,4 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag

DeleteChunks.wrapFuture(paths, deleted.zip(ioResult))
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason.printStackTrace()
super.preRestart(reason, message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
import sc.implicits.{defaultTimeout, materializer}

val storageTracker = StorageTracker()
val chunksTracker = ChunksTracker(regionId, regionConfig, storageTracker)
val chunksTracker = ChunksTracker(regionId, regionConfig, storageTracker, () => schedules.scheduleRetry())
val indexTracker = RegionIndexTracker(regionId, chunksTracker)

private[this] implicit val regionContext =
Expand Down Expand Up @@ -274,13 +274,15 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
log.info("Attaching storage {}: {}", storageId, dispatcher)
storageTracker.register(storageId, props, dispatcher, health)
self ! PullStorageIndex(storageId)
chunksTracker.chunkIO.retryPendingChunks()
}

case DetachStorage(storageId) if storageTracker.contains(storageId)
log.warning("Detaching storage: {}", storageId)
chunksTracker.storages.state.unregister(storageId)
indexTracker.storages.state.dropStorageDiffs(storageId)
storageTracker.unregister(storageId)
chunksTracker.chunkIO.retryPendingChunks()

case GetStorages
sender() ! GetStorages.Success(regionId, storageTracker.storages)
Expand Down Expand Up @@ -353,7 +355,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
indexTracker.storages.state.dropStorageDiffs(storageId, sequenceNrs)

case StorageEvents.ChunkWritten(ChunkPath(`regionId`, _), chunk)
log.info("Chunk written: {}", chunk)
// log.info("Chunk written: {}", chunk)
// chunks.onWriteSuccess(chunk, storageId)
indexTracker.indexes.registerChunk(chunk)
sc.eventStreams.publishRegionEvent(regionId, RegionEvents.ChunkWritten(storageId, chunk))
Expand Down Expand Up @@ -419,7 +421,7 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
}

private[RegionDispatcher] def initSchedules(): Unit = {
scheduler.schedule(30 seconds, 3 seconds, self, RetryPendingChunks)
scheduler.scheduleAtFixedRate(30 seconds, 3 seconds, self, RetryPendingChunks)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.karasiq.shadowcloud.actors

import akka.actor.{Actor, ActorLogging, DeadLetterSuppression, PossiblyHarmful, Props, ReceiveTimeout, Status}
import akka.event.Logging
import akka.persistence._
import akka.stream.ActorAttributes
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.{Done, NotUsed}
import com.karasiq.shadowcloud.ShadowCloud
Expand Down Expand Up @@ -65,8 +67,8 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
require(storageId.nonEmpty && regionId.nonEmpty, "Invalid storage identifier")

import context.dispatcher
private[this] implicit lazy val sc = ShadowCloud()
private[this] lazy val config = sc.configs.storageConfig(storageId, storageProps)
private[this] implicit lazy val sc = ShadowCloud()
private[this] lazy val config = sc.configs.storageConfig(storageId, storageProps)

import sc.implicits.materializer

Expand Down Expand Up @@ -110,7 +112,9 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
}

case Compact
if (!state.compactRequested) {
if (config.immutable) {
log.warning("Storage is immutable, unable to compact indexes")
} else if (!state.compactRequested) {
log.debug("Index compaction requested")
state.compactRequested = true
}
Expand Down Expand Up @@ -416,6 +420,7 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
Source(diffsToDelete)
.via(state.streams.delete(repository))
.log("compact-delete")
.withAttributes(ActorAttributes.logLevels(onElement = Logging.InfoLevel))
.filter(_.ioResult.isSuccess)
.map(_.key)
.fold(Set.empty[SequenceNr])(_ + _)
Expand Down Expand Up @@ -497,7 +502,8 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
}

private[this] def isCompactRequired(): Boolean = {
state.compactRequested || (config.index.compactThreshold > 0 && state.diffStats.deletes > config.index.compactThreshold)
!config.immutable &&
(state.compactRequested || (config.index.compactThreshold > 0 && state.diffStats.deletes > config.index.compactThreshold))
}

private[this] def loadRepositoryKeys(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[actors] final class StorageContainer(instantiator: StorageInstantiator,

//noinspection ScalaUnusedSymbol
private[this] val healthSupervisor: ActorRef =
context.watch(context.actorOf(StorageHealthSupervisor.props(storage, 30 seconds, 5), "health-sv"))
context.watch(context.actorOf(StorageHealthSupervisor.props(storage, 1 minute, 3), "health-sv"))

override def preStart(): Unit = {
super.preStart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ private final class StorageDispatcher(
log.debug("{} bytes written, updating storage health", written)
updateHealth(_ - written)

case _
// Ignore
case _ // Ignore
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.karasiq.shadowcloud.actors

import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, PossiblyHarmful, Props, Terminated}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import com.karasiq.shadowcloud.ShadowCloud
import com.karasiq.shadowcloud.model.utils.StorageHealth

Expand Down Expand Up @@ -29,29 +30,26 @@ object StorageHealthSupervisor {
class StorageHealthSupervisor(actor: ActorRef, interval: FiniteDuration, maxFailures: Int) extends Actor with ActorLogging {
import StorageHealthSupervisor._
import context.dispatcher
private[this] implicit val timeout = ShadowCloud().implicits.defaultTimeout
private[this] implicit val timeout: Timeout = ShadowCloud().config.timeouts.chunksList

private[this] var schedule: Cancellable = _
private[this] var failures = 0

override def receive: Receive = {
case Check
(actor ? StorageDispatcher.GetHealth())
.mapTo[StorageDispatcher.GetHealth.Success]
.filter(_.result.online)
.map(hs Success(hs.result))
.recover { case _ Failure }
StorageDispatcher.GetHealth
.unwrapFuture(actor ? StorageDispatcher.GetHealth())
.pipeTo(self)

case Failure
case result @ (StorageDispatcher.GetHealth.Failure(_, _) | StorageDispatcher.GetHealth.Success(_, StorageHealth(_, _, _, false)))
failures += 1
log.debug("Health check failure #{}", failures)
log.debug("Health check failure #{}: {}", failures, result)
if (failures >= maxFailures) {
log.warning("Health checks failed ({}), restarting storage: {}", failures, actor)
context.stop(actor)
}

case Success(health)
case StorageDispatcher.GetHealth.Success(_, health) if health.online
log.debug("Health check passed: {}", health)
failures = 0

Expand Down
Loading

0 comments on commit 2f54c17

Please sign in to comment.