Skip to content

Commit

Permalink
File system fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed Jul 6, 2020
1 parent e8ff437 commit 0a8c15b
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.karasiq.shadowcloud.utils.{ChunkUtils, Utils}

import scala.collection.mutable
import scala.concurrent.Future
import scala.language.{implicitConversions, postfixOps}


private[actors] object ChunksTracker {
def apply(regionId: RegionId, config: RegionConfig, storageTracker: StorageTracker, scheduleRetry: () Unit)(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.karasiq.shadowcloud.config

import scala.collection.JavaConverters._
import scala.language.{implicitConversions, postfixOps}

import com.typesafe.config.{Config, ConfigObject, ConfigValueType}

import com.karasiq.common.configs.ConfigImplicits
import com.karasiq.shadowcloud.utils.ProviderInstantiator
import com.typesafe.config.{Config, ConfigObject, ConfigValueType}

import scala.collection.JavaConverters._

private[shadowcloud] case class ProvidersConfig[T](rootConfig: Config, classes: Seq[(String, Class[T])]) extends WrappedConfig {
def instances(implicit inst: ProviderInstantiator): Seq[(String, T)] = {
classes.map { case (name, pClass)
name inst.getInstance(pClass)
classes.map {
case (name, pClass)
name inst.getInstance(pClass)
}
}
}
Expand All @@ -26,9 +25,10 @@ private[shadowcloud] object ProvidersConfig extends WrappedConfigFactory[Provide
}

private[this] def readProviders[T](obj: ConfigObject): Seq[(String, Class[T])] = {
obj.asScala.toVector.map { case (key, value)
require(value.valueType() == ConfigValueType.STRING, s"Invalid provider name: $value")
key Class.forName(value.unwrapped().asInstanceOf[String]).asInstanceOf[Class[T]]
obj.asScala.toVector.map {
case (key, value)
require(value.valueType() == ConfigValueType.STRING, s"Invalid provider name: $value")
key Class.forName(value.unwrapped().asInstanceOf[String]).asInstanceOf[Class[T]]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class FileIOScheduler(config: SCDriveConfig, regionId: RegionId, file: File) ext
// -----------------------------------------------------------------------
object actorState {
val pendingFlush = PendingOperation[NotUsed]
var lastFlush = 0L
var lastFlush = System.nanoTime()

def finishFlush(result: Flush.Status): Unit = {
pendingFlush.finish(NotUsed, result)
Expand Down Expand Up @@ -173,19 +173,17 @@ class FileIOScheduler(config: SCDriveConfig, regionId: RegionId, file: File) ext

def appendsStream = {
val chunksEnd = currentRanges.lastOption.fold(0L)(_.end)
if (currentWrites.forall(_.range.end <= chunksEnd)) {
if (range.end <= chunksEnd || currentWrites.forall(_.range.end <= chunksEnd)) {
Source.empty[ByteString]
} else {
val fileEnd = math.max(chunksEnd, math.min(range.end, currentWrites.map(_.range.end).max))
val appendsRange = ChunkRanges.Range(chunksEnd, fileEnd)
val appendsRange = ChunkRanges.Range(chunksEnd, range.end)
Source
.fromIterator(() dataUtils.pendingAppends(currentWrites, chunksEnd))
.map {
case (range, chunk)
val selectedRange = range.relativeTo(appendsRange)
val selectedRange = range.relativeTo(appendsRange).fitToSize(appendsRange.size)
selectedRange.slice(chunk.data.plain)
}
// .via(ByteStreams.limit(appendsRange.size))
.named("scDriveReadAppends")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import scala.language.implicitConversions
private[drive] final case class ChunkPatch(offset: Long, data: ByteString) {
val range = ChunkRanges.Range(offset, offset + data.length)

override def toString: String = s"ChunkPatch($offset, ${MemorySize(data.length)})"
override def toString: String = s"ChunkPatch($offset-${offset + data.length}, ${MemorySize(data.length)})"
}

private[drive] final case class ChunkPatchList(patches: Seq[ChunkPatch]) {
def canReplace(chunkSize: Long): Boolean = {
@tailrec
def canReplaceRec(position: Long, patches: Seq[ChunkPatch]): Boolean = patches match {
case Nil position >= chunkSize
case Nil position >= chunkSize
case p +: rest if p.offset == position canReplaceRec(position + p.data.length, rest)
case _ false
case _ false
}

canReplaceRec(0, this.patches.sortBy(_.offset))
Expand All @@ -31,15 +31,16 @@ private[drive] final case class ChunkPatchList(patches: Seq[ChunkPatch]) {
}

def patchChunk(dataRange: ChunkRanges.Range, data: ByteString) = {
patches.foldLeft(data) { case (data, write)
val relRange = write.range.relativeTo(dataRange)
val offset = dataRange.relativeTo(write.range)
ChunkRanges.Range.patch(data, relRange, offset.slice(write.data))
patches.foldLeft(data) {
case (data, write)
val relRange = write.range.relativeTo(dataRange)
val offset = dataRange.relativeTo(write.range)
ChunkRanges.Range.patch(data, relRange, offset.slice(write.data))
}
}
}

private[drive] object ChunkPatchList {
implicit def fromPatchesSeq(patches: Seq[ChunkPatch]): ChunkPatchList = ChunkPatchList(patches)
implicit def toPatchesSeq(pl: ChunkPatchList): Seq[ChunkPatch] = pl.patches
implicit def toPatchesSeq(pl: ChunkPatchList): Seq[ChunkPatch] = pl.patches
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import com.karasiq.shadowcloud.streams.utils.ByteStreams
import com.karasiq.shadowcloud.test.utils.SCExtensionSpec
import org.scalatest.FlatSpecLike

import scala.concurrent.duration._

final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike {
// -----------------------------------------------------------------------
// Context
// -----------------------------------------------------------------------
val config = SCDriveConfig(sc.config.rootConfig.getConfig("drive"))
val config = SCDriveConfig(sc.config.rootConfig.getConfig("drive"))
val scheduler = TestActorRef[FileIOScheduler](FileIOScheduler.props(config, "testRegion", File("/123.txt")))
val zeroes = ByteString(0, 0, 0, 0, 0, 0, 0, 0, 0, 0).ensuring(_.length == 10)
val testData = ByteString("1234567890").ensuring(_.length == 10)
val zeroes = ByteString(0, 0, 0, 0, 0, 0, 0, 0, 0, 0).ensuring(_.length == 10)
val testData = ByteString("1234567890").ensuring(_.length == 10)

// -----------------------------------------------------------------------
// Tests
Expand Down Expand Up @@ -72,16 +74,16 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike {
flushResult.writes shouldBe Seq(write: ChunkPatch)
flushResult.ops.sortBy(_.range.start) match {
case ChunkIOOperation.ChunkRewritten(ChunkRanges.Range(0, 20), oldChunk, chunk) +:
ChunkIOOperation.ChunkAppended(ChunkRanges.Range(20, 30), newChunk) +: Nil

ChunkIOOperation.ChunkAppended(ChunkRanges.Range(20, 30), newChunk) +: Nil
oldChunk.checksum.size shouldBe 20
chunk.checksum.size shouldBe 20
newChunk.checksum.size shouldBe 10
}

testChunks { case chunk +: newChunk +: Nil
chunk.checksum.size shouldBe 20
newChunk.checksum.size shouldBe 10
testChunks {
case chunk +: newChunk +: Nil
chunk.checksum.size shouldBe 20
newChunk.checksum.size shouldBe 10
}

testRead(testData ++ zeroes ++ testData)
Expand All @@ -100,15 +102,16 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike {
newChunk.checksum.size shouldBe 40
}

testChunks { case newChunk +: Nil
newChunk.checksum.size shouldBe 40
testChunks {
case newChunk +: Nil
newChunk.checksum.size shouldBe 40
}

testRead(testData ++ zeroes ++ testData ++ testData)
}

it should "cut file" in {
(scheduler ? CutFile(25)).futureValue
(scheduler ? CutFile(25)).futureValue
testChunks(_ shouldBe empty)
testRead(testData ++ zeroes ++ testData.take(5))

Expand All @@ -119,8 +122,9 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike {
newChunk.checksum.size shouldBe 25
}

testChunks { case newChunk +: Nil
newChunk.checksum.size shouldBe 25
testChunks {
case newChunk +: Nil
newChunk.checksum.size shouldBe 25
}
testRead(testData ++ zeroes ++ testData.take(5))
}
Expand All @@ -129,13 +133,23 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike {
// Utils
// -----------------------------------------------------------------------
def testRead(data: ByteString) = {
val testSink = scheduler.underlyingActor.dataIO.readStream(0 to 100)
val testSink = scheduler.underlyingActor.dataIO
.readStream(0 until 100)
.via(ByteStreams.concat)
.runWith(TestSink.probe)

testSink.request(2)
testSink.expectNext(data)
testSink.expectComplete()

val testSink1 = scheduler.underlyingActor.dataIO
.readStream(0 until 5)
.via(ByteStreams.concat)
.runWith(TestSink.probe)

testSink1.request(2)
testSink1.expectNext(data.take(5))
testSink1.expectComplete()
}

def testChunks(f: Seq[Chunk] Unit) = {
Expand All @@ -154,5 +168,6 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike {
sc.actors.regionSupervisor ! CreateRegion("testRegion", sc.configs.regionConfig("testRegion"))
sc.actors.regionSupervisor ! CreateStorage("testStorage", StorageProps.inMemory)
sc.actors.regionSupervisor ! RegisterStorage("testRegion", "testStorage")
awaitAssert(sc.ops.region.getHealth("testRegion").futureValue shouldBe 'fullyOnline, 10 seconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,27 +229,18 @@ class SCFileSystem(config: SCDriveConfig, fsDispatcher: ActorRef, log: LoggingAd
}

override def read(path: String, buf: Pointer, size: Long, offset: Long, fi: FuseFileInfo): Int = {
def tryRead() = {
Try(
dispatch(
DispatchIOOperation(path, FileIOScheduler.ReadData(ChunkRanges.Range(offset, offset + size))),
DispatchIOOperation,
critical = true,
handle = fi.fh.longValue()
)
val result = Try(
dispatch(
DispatchIOOperation(path, FileIOScheduler.ReadData(ChunkRanges.Range(offset, offset + size))),
DispatchIOOperation,
critical = true,
handle = fi.fh.longValue()
)
}

var result: Try[Any] = tryRead()
var tries = 3
while (result.isFailure && tries > 0) {
// Thread.sleep(5000)
result = tryRead()
tries -= 1
}
)

result match {
case Success(FileIOScheduler.ReadData.Success(_, data))
require(data.length <= size, "Read too much")
for (i data.indices) buf.putByte(i, data(i))
data.length

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.karasiq.shadowcloud.model

import scala.language.{implicitConversions, postfixOps}

@SerialVersionUID(0L)
final case class Path(nodes: Seq[String]) extends SCEntity {
@transient
Expand Down Expand Up @@ -81,4 +79,4 @@ object Path {
}

implicit val ordering: Ordering[Path] = Ordering.by(path (path.nodes.length, path.toString))
}
}
25 changes: 12 additions & 13 deletions model/src/main/scala/com/karasiq/shadowcloud/utils/Utils.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package com.karasiq.shadowcloud.utils

import scala.collection.TraversableLike
import scala.concurrent.duration.FiniteDuration
import scala.language.{higherKinds, postfixOps}

import akka.util.ByteString
import com.typesafe.config.ConfigFactory

import com.karasiq.common.encoding.HexString
import com.karasiq.shadowcloud.model.{Chunk, Path}
import com.typesafe.config.ConfigFactory

import scala.collection.TraversableLike
import scala.concurrent.duration.FiniteDuration

private[shadowcloud] object Utils {
// -----------------------------------------------------------------------
// Paths
// -----------------------------------------------------------------------
val InternalFolder = Path.root / ".$sc-internal"

// -----------------------------------------------------------------------
// Time
// -----------------------------------------------------------------------
Expand All @@ -35,7 +33,7 @@ private[shadowcloud] object Utils {
private[this] val printValueDelimiter = ", "

def printValues[T](values: Traversable[T], limit: Int = 10): String = {
val sb = new StringBuilder(100)
val sb = new StringBuilder(100)
val size = values.size
values.take(limit).foreach { v
if (sb.nonEmpty) sb.append(printValueDelimiter)
Expand All @@ -48,7 +46,7 @@ private[shadowcloud] object Utils {
def printHashes(hashes: Traversable[ByteString], limit: Int = 10): String = {
if (hashes.isEmpty) return ""
val size = hashes.size
val sb = new StringBuilder(math.min(limit, size) * (hashes.head.length * 2) + 10)
val sb = new StringBuilder(math.min(limit, size) * (hashes.head.length * 2) + 10)
hashes.filter(_.nonEmpty).take(limit).foreach { hash
if (sb.nonEmpty) sb.append(printValueDelimiter)
sb.append(HexString.encode(hash))
Expand All @@ -64,7 +62,7 @@ private[shadowcloud] object Utils {
// -----------------------------------------------------------------------
// Misc
// -----------------------------------------------------------------------
@inline
@inline
def isSameChunk(chunk: Chunk, chunk1: Chunk): Boolean = {
// chunk.withoutData == chunk1.withoutData
chunk == chunk1
Expand All @@ -78,8 +76,8 @@ private[shadowcloud] object Utils {
def getFileExtension(path: String): String = {
def indexOfExtension(path: String): Option[Int] = {
def indexOfLastSeparator(filename: String): Int = math.max(filename.lastIndexOf('/'), filename.lastIndexOf('\\'))
val lastDot = path.lastIndexOf('.')
val lastSeparator = indexOfLastSeparator(path)
val lastDot = path.lastIndexOf('.')
val lastSeparator = indexOfLastSeparator(path)
Some(lastDot).filterNot(lastSeparator > _)
}
indexOfExtension(path).fold("")(index path.substring(index + 1))
Expand All @@ -100,7 +98,8 @@ private[shadowcloud] object Utils {
str
} else {
//cutAt("\n").orElse(cutAt(". "))
cutAt(". ").orElse(cutAt(" "))
cutAt(". ")
.orElse(cutAt(" "))
.getOrElse(str.take(maxLength))
}
}
Expand Down
Loading

0 comments on commit 0a8c15b

Please sign in to comment.