Skip to content

Commit

Permalink
More resilient read
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed Jun 23, 2020
1 parent 69095a6 commit e8ff437
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 41 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ val packageSettings = Seq(
)

lazy val dockerSettings = Seq(
dockerBaseImage := "openjdk:8-jre-slim-stretch",
dockerBaseImage := "openjdk:8-jre-slim",
dockerExposedPorts := Seq(1911),
dockerExposedVolumes := Seq("/opt/docker/sc"),
dockerUsername := Some("karasiq"),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ shadowcloud {
chunks-list = 5m
chunks-delete = 2m
region-chunk-write = 2m
region-chunk-read = ${shadowcloud.timeouts.chunk-read}
region-chunk-read = 2m
index-write = 30s
index-read = 15s
index-list = 15s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf
// Read/write
// -----------------------------------------------------------------------
object chunkIO {
def readChunk(chunk: Chunk, receiver: ActorRef)(implicit storageSelector: StorageSelector): Option[ChunkStatus] = {
def readChunk(chunk: Chunk, receiver: ActorRef = ActorRef.noSender)(implicit storageSelector: StorageSelector): Option[ChunkStatus] = {
def getReadFuture(status: ChunkStatus): (Option[RegionStorage], Future[Chunk]) = {
implicit val readTimeout: Timeout = sc.config.timeouts.chunkRead

Expand Down Expand Up @@ -164,7 +164,10 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf
}

if (filteredAffinity.isEmpty) {
receiver ! WriteChunk.Failure(chunk, RegionException.ChunkRepairFailed(chunk, new IllegalStateException("No storages available for replication")))
receiver ! WriteChunk.Failure(
chunk,
RegionException.ChunkRepairFailed(chunk, new IllegalStateException("No storages available for replication"))
)
return chunks.getChunkStatus(chunk)
}

Expand Down Expand Up @@ -225,6 +228,12 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf

case _ // Skip
}

readingChunks.collect {
case (chunk, rs) if rs.reading.isEmpty
log.debug("Retrying chunk read: {}", chunk.hashString)
chunkIO.readChunk(chunk)
}
}

private[ChunksTracker] def tryFinishChunk(status: ChunkStatus): ChunkStatus = {
Expand Down Expand Up @@ -566,15 +575,7 @@ private[actors] final class ChunksTracker(regionId: RegionId, config: RegionConf
} else {
val newReading = reading -- storageId
readingChunks += chunk ChunkReadStatus(newReading, waiting)
if (newReading.isEmpty) {
if (storageId.nonEmpty) {
// Retry
chunkIO.readChunk(chunk, ActorRef.noSender)
} else {
// No storages left
cancelChunkRead()
}
}
if (newReading.isEmpty) scheduleRetry()
}

case None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.karasiq.shadowcloud.streams.file

import akka.NotUsed
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Source}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Source, Zip}
import akka.stream.{FlowShape, Materializer}
import akka.util.ByteString
import com.karasiq.shadowcloud.index.files.FileVersions
Expand Down Expand Up @@ -38,16 +38,33 @@ final class FileStreams(regionStreams: RegionStreams, chunkProcessing: ChunkProc
}

def readChunkStreamRanged(regionId: RegionId): Flow[(Chunk, RangeList), ByteString, NotUsed] = {
Flow[(Chunk, RangeList)]
.log("chunk-ranges")
.flatMapConcat {
case (chunk, ranges)
Source
.single(chunk)
.via(readChunkStream(regionId))
.map(ranges.slice)
}
.named("readChunkStreamRanged")
val graph = GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val upstream = b.add(Broadcast[(Chunk, RangeList)](2))

val read = b.add(
Flow[Chunk]
.map((regionId, _))
.via(regionStreams.readChunksNonBuffered)
.map(_.data.plain)
)

val sliceData = b.add(
Flow[(ByteString, RangeList)]
.map { case (bytes, ranges) ranges.slice(bytes) }
)

val zip = b.add(Zip[ByteString, RangeList])

upstream.map(_._2) ~> zip.in1
upstream.map(_._1) ~> read.in
read.out ~> zip.in0
zip.out ~> sliceData.in

FlowShape(upstream.in, sliceData.out)
}

Flow.fromGraph(graph).named("readChunkStreamRanged")
}

def readChunkStreamRanged(regionId: RegionId, chunks: Seq[Chunk], ranges: RangeList): Source[ByteString, NotUsed] = {
Expand Down Expand Up @@ -76,25 +93,26 @@ final class FileStreams(regionStreams: RegionStreams, chunkProcessing: ChunkProc
Flow[ByteString]
.via(AkkaStreamUtils.extractUpstream)
.zip(Source.future(supervisorOps.getRegionConfig(regionId)))
.flatMapConcat { case (stream, config) =>
val matSink = Flow
.fromGraph(chunkProcessing.split(config.chunkSize.getOrElse(chunkProcessing.defaultChunkSize)))
.via(chunkProcessing.beforeWrite())
// .map(c ⇒ c.copy(data = c.data.copy(plain = ByteString.empty))) // Memory optimization
.map((regionId, _))
.via(regionStreams.writeChunks)
// .log("chunk-stream-write")
.toMat(chunkProcessing.index())(Keep.right)
.flatMapConcat {
case (stream, config)
val matSink = Flow
.fromGraph(chunkProcessing.split(config.chunkSize.getOrElse(chunkProcessing.defaultChunkSize)))
.via(chunkProcessing.beforeWrite())
// .map(c ⇒ c.copy(data = c.data.copy(plain = ByteString.empty))) // Memory optimization
.map((regionId, _))
.via(regionStreams.writeChunks)
// .log("chunk-stream-write")
.toMat(chunkProcessing.index())(Keep.right)

val graph = GraphDSL.create(matSink) { implicit builder matSink
import GraphDSL.Implicits._
val extractResult = builder.add(Flow[Future[FileIndexer.Result]].flatMapConcat(Source.future))
val graph = GraphDSL.create(matSink) { implicit builder matSink
import GraphDSL.Implicits._
val extractResult = builder.add(Flow[Future[FileIndexer.Result]].flatMapConcat(Source.future))

builder.materializedValue ~> extractResult
FlowShape(matSink.in, extractResult.out)
}
builder.materializedValue ~> extractResult
FlowShape(matSink.in, extractResult.out)
}

stream.via(graph)
stream.via(graph)
}
.named("writeChunkStream")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ final class RegionStreams(regionOps: RegionOps, parallelism: ParallelismConfig,

val readChunks = Flow[(RegionId, Chunk)]
.mapAsync(parallelism.read) { case (regionId, chunk) regionOps.readChunk(regionId, chunk) }
.async
.via(ByteStreams.bufferT(_.data.encrypted.length, buffers.readChunks))
.named("readChunks")

val readChunksNonBuffered = Flow[(RegionId, Chunk)]
.mapAsync(1) { case (regionId, chunk) regionOps.readChunk(regionId, chunk) }
.named("readChunksNonBuffered")

val findFiles = Flow[(RegionId, Path)]
.mapAsync(parallelism.query) {
case (regionId, path)
Expand Down

0 comments on commit e8ff437

Please sign in to comment.