Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed Jun 3, 2020
1 parent fa4adc2 commit 3bca8b2
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 121 deletions.
4 changes: 2 additions & 2 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ shadowcloud {
health-check-interval = 5m

index {
sync-interval = 1m
sync-interval = 3m
snapshot-threshold = 1000
snapshot-clear-history = true
compact-threshold = 0 // Disabled by default
Expand Down Expand Up @@ -150,7 +150,7 @@ shadowcloud {

buffers {
read-chunks = 16M
repair = 32M
repair = 16M
}

serialization {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag
// -----------------------------------------------------------------------
override def preStart(): Unit = {
def stopOnComplete(f: Future[Done]) = {
val log = this.log
f.onComplete(result log.error("Queue stopped: {}", result))
f.map(_ Kill).pipeTo(self)
}
Expand Down Expand Up @@ -306,4 +307,9 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag

DeleteChunks.wrapFuture(paths, deleted.zip(ioResult))
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason.printStackTrace()
super.preRestart(reason, message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,4 +436,9 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon
pendingIndexQueue.complete()
super.postStop()
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason.printStackTrace()
super.preRestart(reason, message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
becomeOrDefault(receivePreWrite(loadedKeys ++ keys))

case Status.Failure(error)
log.error(error, "Keys load error")
log.debug("Keys load error: {}", error)
synchronization.scheduleNext()

case StreamCompleted
Expand Down Expand Up @@ -516,4 +516,9 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId
private[this] def becomeOrDefault(receive: Receive): Unit = {
context.become(receive.orElse(receiveDefault))
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason.printStackTrace()
super.preRestart(reason, message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,36 @@ object RegionRepairStream {
}

chunksSource
.mapAsyncUnordered(parallelism.query)(regionOps.getChunkStatus(request.regionId, _))
.via(RestartFlow.onFailuresWithBackoff(3 seconds, 30 seconds, 0.2, 20) { ()
.via(RestartFlow.onFailuresWithBackoff(500 millis, 10 seconds, 0.2, 20) { ()
Flow[Chunk].mapAsyncUnordered(parallelism.query)(regionOps.getChunkStatus(request.regionId, _))
})
.via(
Flow[ChunkStatus]
.map(status status createNewAffinity(status, request.strategy))
.filterNot { case (status, newAffinity) newAffinity.exists(_.isFinished(status)) }
.mapAsyncUnordered(parallelism.read) {
case (status, newAffinity)
regionOps.readChunkEncrypted(request.regionId, status.chunk).map(_ newAffinity)
}
.via(RestartFlow.onFailuresWithBackoff(500 millis, 10 seconds, 0.2, 20) { ()
Flow[(ChunkStatus, Option[ChunkWriteAffinity])].mapAsyncUnordered(parallelism.read) {
case (status, newAffinity)
regionOps.readChunkEncrypted(request.regionId, status.chunk).map(_ newAffinity)
}
})
.log("region-repair-read", chunk s"${chunk._1.hashString} at ${request.regionId}")
.async
.via(ByteStreams.bufferT(_._1.data.encrypted.length, bufferSize))
.mapAsyncUnordered(parallelism.write) {
case (chunk, newAffinity)
regionOps.rewriteChunk(request.regionId, chunk, newAffinity)
}
.via(RestartFlow.onFailuresWithBackoff(500 millis, 10 seconds, 0.2, 20) { ()
Flow[(Chunk, Option[ChunkWriteAffinity])].mapAsyncUnordered(parallelism.write) {
case (chunk, newAffinity)
regionOps.rewriteChunk(request.regionId, chunk, newAffinity)
}
})
.log("region-repair-write", chunk s"${chunk.hashString} at ${request.regionId}")
.withAttributes(ActorAttributes.logLevels(onElement = Logging.InfoLevel))
.map(_.withoutData)
})
)
.fold(Nil: Seq[Chunk])(_ :+ _)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.alsoTo(AkkaStreamUtils.successPromiseOnFirst(request.result))
}
.recover { case _ => Nil }
.recover { case _ Nil }
.to(Sink.ignore)
.named("regionRepairStream")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[gdrive] class GDriveRepository(service: GDriveService)(implicit ec: Exec
.named("gdriveRead")
}

def write(key: Path) = {
def write(path: Path) = {
def getFolderId(path: Path) =
for {
folderId entityCache.getOrCreateFolderId(path.parent)
Expand All @@ -106,24 +106,24 @@ private[gdrive] class GDriveRepository(service: GDriveService)(implicit ec: Exec

Flow[Data]
.via(AkkaStreamUtils.extractUpstream)
.zip(Source.lazyFuture(() getFolderId(key)))
.zip(Source.lazyFuture(() getFolderId(path)))
.flatMapConcat {
case (stream, folderId)
stream.via(
AkkaStreamUtils.writeInputStream(
{ inputStream
val result = Try(blockingUpload(folderId, key.name, inputStream))
result.foreach(_ entityCache.resetFileCache(key))
val result = Try(blockingUpload(folderId, path.name, inputStream))
result.foreach(_ entityCache.resetFileCache(path))
Source.future(Future.fromTry(result))
},
Dispatcher(GDriveDispatchers.fileDispatcherId)
)
)
}
.map(written StorageIOResult.Success(key, written))
.map(written StorageIOResult.Success(path, written))
.withAttributes(fileStreamAttributes)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.recover { case err StorageIOResult.Failure(key, StorageUtils.wrapException(key, err)) }
.via(StorageUtils.wrapStream(path))
.toMat(Sink.head)(Keep.right)
.named("gdriveWrite")
}
Expand Down
Loading

0 comments on commit 3bca8b2

Please sign in to comment.