From e8ff437413ce62e99c255616e3da0f0e096ac93d Mon Sep 17 00:00:00 2001 From: Karasiq Date: Tue, 23 Jun 2020 20:19:28 +0300 Subject: [PATCH] More resilient read --- build.sbt | 2 +- core/src/main/resources/reference.conf | 2 +- .../actors/internal/ChunksTracker.scala | 23 +++--- .../streams/file/FileStreams.scala | 72 ++++++++++++------- .../streams/region/RegionStreams.scala | 5 +- 5 files changed, 63 insertions(+), 41 deletions(-) diff --git a/build.sbt b/build.sbt index 2a79e9eb..1eebdb18 100644 --- a/build.sbt +++ b/build.sbt @@ -65,7 +65,7 @@ val packageSettings = Seq( ) lazy val dockerSettings = Seq( - dockerBaseImage := "openjdk:8-jre-slim-stretch", + dockerBaseImage := "openjdk:8-jre-slim", dockerExposedPorts := Seq(1911), dockerExposedVolumes := Seq("/opt/docker/sc"), dockerUsername := Some("karasiq"), diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 943a8301..9a31e174 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -141,7 +141,7 @@ shadowcloud { chunks-list = 5m chunks-delete = 2m region-chunk-write = 2m - region-chunk-read = ${shadowcloud.timeouts.chunk-read} + region-chunk-read = 2m index-write = 30s index-read = 15s index-list = 15s diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala index 6e8c1d1c..7583f5ba 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala @@ -56,7 +56,7 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf // Read/write // ----------------------------------------------------------------------- object chunkIO { - def readChunk(chunk: Chunk, receiver: ActorRef)(implicit storageSelector: StorageSelector): Option[ChunkStatus] = { + def readChunk(chunk: Chunk, receiver: ActorRef = ActorRef.noSender)(implicit storageSelector: StorageSelector): Option[ChunkStatus] = { def getReadFuture(status: ChunkStatus): (Option[RegionStorage], Future[Chunk]) = { implicit val readTimeout: Timeout = sc.config.timeouts.chunkRead @@ -164,7 +164,10 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf } if (filteredAffinity.isEmpty) { - receiver ! WriteChunk.Failure(chunk, RegionException.ChunkRepairFailed(chunk, new IllegalStateException("No storages available for replication"))) + receiver ! WriteChunk.Failure( + chunk, + RegionException.ChunkRepairFailed(chunk, new IllegalStateException("No storages available for replication")) + ) return chunks.getChunkStatus(chunk) } @@ -225,6 +228,12 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf case _ ⇒ // Skip } + + readingChunks.collect { + case (chunk, rs) if rs.reading.isEmpty ⇒ + log.debug("Retrying chunk read: {}", chunk.hashString) + chunkIO.readChunk(chunk) + } } private[ChunksTracker] def tryFinishChunk(status: ChunkStatus): ChunkStatus = { @@ -566,15 +575,7 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf } else { val newReading = reading -- storageId readingChunks += chunk → ChunkReadStatus(newReading, waiting) - if (newReading.isEmpty) { - if (storageId.nonEmpty) { - // Retry - chunkIO.readChunk(chunk, ActorRef.noSender) - } else { - // No storages left - cancelChunkRead() - } - } + if (newReading.isEmpty) scheduleRetry() } case None ⇒ diff --git a/core/src/main/scala/com/karasiq/shadowcloud/streams/file/FileStreams.scala b/core/src/main/scala/com/karasiq/shadowcloud/streams/file/FileStreams.scala index 871c0c56..e4c87e72 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/streams/file/FileStreams.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/streams/file/FileStreams.scala @@ -1,7 +1,7 @@ package com.karasiq.shadowcloud.streams.file import akka.NotUsed -import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Source} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Source, Zip} import akka.stream.{FlowShape, Materializer} import akka.util.ByteString import com.karasiq.shadowcloud.index.files.FileVersions @@ -38,16 +38,33 @@ final class FileStreams(regionStreams: RegionStreams, chunkProcessing: ChunkProc } def readChunkStreamRanged(regionId: RegionId): Flow[(Chunk, RangeList), ByteString, NotUsed] = { - Flow[(Chunk, RangeList)] - .log("chunk-ranges") - .flatMapConcat { - case (chunk, ranges) ⇒ - Source - .single(chunk) - .via(readChunkStream(regionId)) - .map(ranges.slice) - } - .named("readChunkStreamRanged") + val graph = GraphDSL.create() { implicit b ⇒ + import GraphDSL.Implicits._ + val upstream = b.add(Broadcast[(Chunk, RangeList)](2)) + + val read = b.add( + Flow[Chunk] + .map((regionId, _)) + .via(regionStreams.readChunksNonBuffered) + .map(_.data.plain) + ) + + val sliceData = b.add( + Flow[(ByteString, RangeList)] + .map { case (bytes, ranges) ⇒ ranges.slice(bytes) } + ) + + val zip = b.add(Zip[ByteString, RangeList]) + + upstream.map(_._2) ~> zip.in1 + upstream.map(_._1) ~> read.in + read.out ~> zip.in0 + zip.out ~> sliceData.in + + FlowShape(upstream.in, sliceData.out) + } + + Flow.fromGraph(graph).named("readChunkStreamRanged") } def readChunkStreamRanged(regionId: RegionId, chunks: Seq[Chunk], ranges: RangeList): Source[ByteString, NotUsed] = { @@ -76,25 +93,26 @@ final class FileStreams(regionStreams: RegionStreams, chunkProcessing: ChunkProc Flow[ByteString] .via(AkkaStreamUtils.extractUpstream) .zip(Source.future(supervisorOps.getRegionConfig(regionId))) - .flatMapConcat { case (stream, config) => - val matSink = Flow - .fromGraph(chunkProcessing.split(config.chunkSize.getOrElse(chunkProcessing.defaultChunkSize))) - .via(chunkProcessing.beforeWrite()) - // .map(c ⇒ c.copy(data = c.data.copy(plain = ByteString.empty))) // Memory optimization - .map((regionId, _)) - .via(regionStreams.writeChunks) - // .log("chunk-stream-write") - .toMat(chunkProcessing.index())(Keep.right) + .flatMapConcat { + case (stream, config) ⇒ + val matSink = Flow + .fromGraph(chunkProcessing.split(config.chunkSize.getOrElse(chunkProcessing.defaultChunkSize))) + .via(chunkProcessing.beforeWrite()) + // .map(c ⇒ c.copy(data = c.data.copy(plain = ByteString.empty))) // Memory optimization + .map((regionId, _)) + .via(regionStreams.writeChunks) + // .log("chunk-stream-write") + .toMat(chunkProcessing.index())(Keep.right) - val graph = GraphDSL.create(matSink) { implicit builder ⇒ matSink ⇒ - import GraphDSL.Implicits._ - val extractResult = builder.add(Flow[Future[FileIndexer.Result]].flatMapConcat(Source.future)) + val graph = GraphDSL.create(matSink) { implicit builder ⇒ matSink ⇒ + import GraphDSL.Implicits._ + val extractResult = builder.add(Flow[Future[FileIndexer.Result]].flatMapConcat(Source.future)) - builder.materializedValue ~> extractResult - FlowShape(matSink.in, extractResult.out) - } + builder.materializedValue ~> extractResult + FlowShape(matSink.in, extractResult.out) + } - stream.via(graph) + stream.via(graph) } .named("writeChunkStream") } diff --git a/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionStreams.scala b/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionStreams.scala index 694c187d..7095a144 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionStreams.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionStreams.scala @@ -32,10 +32,13 @@ final class RegionStreams(regionOps: RegionOps, parallelism: ParallelismConfig, val readChunks = Flow[(RegionId, Chunk)] .mapAsync(parallelism.read) { case (regionId, chunk) ⇒ regionOps.readChunk(regionId, chunk) } - .async .via(ByteStreams.bufferT(_.data.encrypted.length, buffers.readChunks)) .named("readChunks") + val readChunksNonBuffered = Flow[(RegionId, Chunk)] + .mapAsync(1) { case (regionId, chunk) ⇒ regionOps.readChunk(regionId, chunk) } + .named("readChunksNonBuffered") + val findFiles = Flow[(RegionId, Path)] .mapAsync(parallelism.query) { case (regionId, path) ⇒