From 0a8c15bf725387924326fe8f03905fbf95e0bc3f Mon Sep 17 00:00:00 2001 From: Karasiq Date: Mon, 6 Jul 2020 10:32:08 +0300 Subject: [PATCH] File system fixes --- .../actors/internal/ChunksTracker.scala | 2 +- .../shadowcloud/config/ProvidersConfig.scala | 20 +++--- .../shadowcloud/drive/FileIOScheduler.scala | 10 ++- .../shadowcloud/drive/utils/ChunkPatch.scala | 17 ++--- .../drive/test/FileIOSchedulerTest.scala | 43 ++++++++---- .../shadowcloud/drive/fuse/SCFileSystem.scala | 25 +++---- .../com/karasiq/shadowcloud/model/Path.scala | 4 +- .../com/karasiq/shadowcloud/utils/Utils.scala | 25 ++++--- .../components/common/AppComponents.scala | 25 ++++--- .../webapp/components/keys/KeysView.scala | 22 +++--- .../components/region/RegionConfigView.scala | 26 +++---- .../components/region/StorageConfigView.scala | 56 ++++++++------- .../shell/ImplicitConversions.scala | 4 +- .../resources/tgcloud/download_service.py | 69 +++++++++---------- 14 files changed, 177 insertions(+), 171 deletions(-) diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala index 7583f5ba..5aa1ee06 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/internal/ChunksTracker.scala @@ -19,7 +19,7 @@ import com.karasiq.shadowcloud.utils.{ChunkUtils, Utils} import scala.collection.mutable import scala.concurrent.Future -import scala.language.{implicitConversions, postfixOps} + private[actors] object ChunksTracker { def apply(regionId: RegionId, config: RegionConfig, storageTracker: StorageTracker, scheduleRetry: () ⇒ Unit)( diff --git a/core/src/main/scala/com/karasiq/shadowcloud/config/ProvidersConfig.scala b/core/src/main/scala/com/karasiq/shadowcloud/config/ProvidersConfig.scala index 70471bad..9cdd16d1 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/config/ProvidersConfig.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/config/ProvidersConfig.scala @@ -1,17 +1,16 @@ package com.karasiq.shadowcloud.config -import scala.collection.JavaConverters._ -import scala.language.{implicitConversions, postfixOps} - -import com.typesafe.config.{Config, ConfigObject, ConfigValueType} - import com.karasiq.common.configs.ConfigImplicits import com.karasiq.shadowcloud.utils.ProviderInstantiator +import com.typesafe.config.{Config, ConfigObject, ConfigValueType} + +import scala.collection.JavaConverters._ private[shadowcloud] case class ProvidersConfig[T](rootConfig: Config, classes: Seq[(String, Class[T])]) extends WrappedConfig { def instances(implicit inst: ProviderInstantiator): Seq[(String, T)] = { - classes.map { case (name, pClass) ⇒ - name → inst.getInstance(pClass) + classes.map { + case (name, pClass) ⇒ + name → inst.getInstance(pClass) } } } @@ -26,9 +25,10 @@ private[shadowcloud] object ProvidersConfig extends WrappedConfigFactory[Provide } private[this] def readProviders[T](obj: ConfigObject): Seq[(String, Class[T])] = { - obj.asScala.toVector.map { case (key, value) ⇒ - require(value.valueType() == ConfigValueType.STRING, s"Invalid provider name: $value") - key → Class.forName(value.unwrapped().asInstanceOf[String]).asInstanceOf[Class[T]] + obj.asScala.toVector.map { + case (key, value) ⇒ + require(value.valueType() == ConfigValueType.STRING, s"Invalid provider name: $value") + key → Class.forName(value.unwrapped().asInstanceOf[String]).asInstanceOf[Class[T]] } } } diff --git a/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/FileIOScheduler.scala b/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/FileIOScheduler.scala index f2c16530..0d509cea 100644 --- a/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/FileIOScheduler.scala +++ b/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/FileIOScheduler.scala @@ -92,7 +92,7 @@ class FileIOScheduler(config: SCDriveConfig, regionId: RegionId, file: File) ext // ----------------------------------------------------------------------- object actorState { val pendingFlush = PendingOperation[NotUsed] - var lastFlush = 0L + var lastFlush = System.nanoTime() def finishFlush(result: Flush.Status): Unit = { pendingFlush.finish(NotUsed, result) @@ -173,19 +173,17 @@ class FileIOScheduler(config: SCDriveConfig, regionId: RegionId, file: File) ext def appendsStream = { val chunksEnd = currentRanges.lastOption.fold(0L)(_.end) - if (currentWrites.forall(_.range.end <= chunksEnd)) { + if (range.end <= chunksEnd || currentWrites.forall(_.range.end <= chunksEnd)) { Source.empty[ByteString] } else { - val fileEnd = math.max(chunksEnd, math.min(range.end, currentWrites.map(_.range.end).max)) - val appendsRange = ChunkRanges.Range(chunksEnd, fileEnd) + val appendsRange = ChunkRanges.Range(chunksEnd, range.end) Source .fromIterator(() ⇒ dataUtils.pendingAppends(currentWrites, chunksEnd)) .map { case (range, chunk) ⇒ - val selectedRange = range.relativeTo(appendsRange) + val selectedRange = range.relativeTo(appendsRange).fitToSize(appendsRange.size) selectedRange.slice(chunk.data.plain) } - // .via(ByteStreams.limit(appendsRange.size)) .named("scDriveReadAppends") } } diff --git a/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/utils/ChunkPatch.scala b/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/utils/ChunkPatch.scala index b7f82adb..ffa0ea8a 100644 --- a/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/utils/ChunkPatch.scala +++ b/drive/core/src/main/scala/com/karasiq/shadowcloud/drive/utils/ChunkPatch.scala @@ -10,16 +10,16 @@ import scala.language.implicitConversions private[drive] final case class ChunkPatch(offset: Long, data: ByteString) { val range = ChunkRanges.Range(offset, offset + data.length) - override def toString: String = s"ChunkPatch($offset, ${MemorySize(data.length)})" + override def toString: String = s"ChunkPatch($offset-${offset + data.length}, ${MemorySize(data.length)})" } private[drive] final case class ChunkPatchList(patches: Seq[ChunkPatch]) { def canReplace(chunkSize: Long): Boolean = { @tailrec def canReplaceRec(position: Long, patches: Seq[ChunkPatch]): Boolean = patches match { - case Nil ⇒ position >= chunkSize + case Nil ⇒ position >= chunkSize case p +: rest if p.offset == position ⇒ canReplaceRec(position + p.data.length, rest) - case _ ⇒ false + case _ ⇒ false } canReplaceRec(0, this.patches.sortBy(_.offset)) @@ -31,15 +31,16 @@ private[drive] final case class ChunkPatchList(patches: Seq[ChunkPatch]) { } def patchChunk(dataRange: ChunkRanges.Range, data: ByteString) = { - patches.foldLeft(data) { case (data, write) ⇒ - val relRange = write.range.relativeTo(dataRange) - val offset = dataRange.relativeTo(write.range) - ChunkRanges.Range.patch(data, relRange, offset.slice(write.data)) + patches.foldLeft(data) { + case (data, write) ⇒ + val relRange = write.range.relativeTo(dataRange) + val offset = dataRange.relativeTo(write.range) + ChunkRanges.Range.patch(data, relRange, offset.slice(write.data)) } } } private[drive] object ChunkPatchList { implicit def fromPatchesSeq(patches: Seq[ChunkPatch]): ChunkPatchList = ChunkPatchList(patches) - implicit def toPatchesSeq(pl: ChunkPatchList): Seq[ChunkPatch] = pl.patches + implicit def toPatchesSeq(pl: ChunkPatchList): Seq[ChunkPatch] = pl.patches } diff --git a/drive/core/src/test/scala/com/karasiq/shadowcloud/drive/test/FileIOSchedulerTest.scala b/drive/core/src/test/scala/com/karasiq/shadowcloud/drive/test/FileIOSchedulerTest.scala index 1e4705a3..d4255ebe 100644 --- a/drive/core/src/test/scala/com/karasiq/shadowcloud/drive/test/FileIOSchedulerTest.scala +++ b/drive/core/src/test/scala/com/karasiq/shadowcloud/drive/test/FileIOSchedulerTest.scala @@ -16,14 +16,16 @@ import com.karasiq.shadowcloud.streams.utils.ByteStreams import com.karasiq.shadowcloud.test.utils.SCExtensionSpec import org.scalatest.FlatSpecLike +import scala.concurrent.duration._ + final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike { // ----------------------------------------------------------------------- // Context // ----------------------------------------------------------------------- - val config = SCDriveConfig(sc.config.rootConfig.getConfig("drive")) + val config = SCDriveConfig(sc.config.rootConfig.getConfig("drive")) val scheduler = TestActorRef[FileIOScheduler](FileIOScheduler.props(config, "testRegion", File("/123.txt"))) - val zeroes = ByteString(0, 0, 0, 0, 0, 0, 0, 0, 0, 0).ensuring(_.length == 10) - val testData = ByteString("1234567890").ensuring(_.length == 10) + val zeroes = ByteString(0, 0, 0, 0, 0, 0, 0, 0, 0, 0).ensuring(_.length == 10) + val testData = ByteString("1234567890").ensuring(_.length == 10) // ----------------------------------------------------------------------- // Tests @@ -72,16 +74,16 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike { flushResult.writes shouldBe Seq(write: ChunkPatch) flushResult.ops.sortBy(_.range.start) match { case ChunkIOOperation.ChunkRewritten(ChunkRanges.Range(0, 20), oldChunk, chunk) +: - ChunkIOOperation.ChunkAppended(ChunkRanges.Range(20, 30), newChunk) +: Nil ⇒ - + ChunkIOOperation.ChunkAppended(ChunkRanges.Range(20, 30), newChunk) +: Nil ⇒ oldChunk.checksum.size shouldBe 20 chunk.checksum.size shouldBe 20 newChunk.checksum.size shouldBe 10 } - testChunks { case chunk +: newChunk +: Nil ⇒ - chunk.checksum.size shouldBe 20 - newChunk.checksum.size shouldBe 10 + testChunks { + case chunk +: newChunk +: Nil ⇒ + chunk.checksum.size shouldBe 20 + newChunk.checksum.size shouldBe 10 } testRead(testData ++ zeroes ++ testData) @@ -100,15 +102,16 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike { newChunk.checksum.size shouldBe 40 } - testChunks { case newChunk +: Nil ⇒ - newChunk.checksum.size shouldBe 40 + testChunks { + case newChunk +: Nil ⇒ + newChunk.checksum.size shouldBe 40 } testRead(testData ++ zeroes ++ testData ++ testData) } it should "cut file" in { - (scheduler ? CutFile(25)).futureValue + (scheduler ? CutFile(25)).futureValue testChunks(_ shouldBe empty) testRead(testData ++ zeroes ++ testData.take(5)) @@ -119,8 +122,9 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike { newChunk.checksum.size shouldBe 25 } - testChunks { case newChunk +: Nil ⇒ - newChunk.checksum.size shouldBe 25 + testChunks { + case newChunk +: Nil ⇒ + newChunk.checksum.size shouldBe 25 } testRead(testData ++ zeroes ++ testData.take(5)) } @@ -129,13 +133,23 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike { // Utils // ----------------------------------------------------------------------- def testRead(data: ByteString) = { - val testSink = scheduler.underlyingActor.dataIO.readStream(0 to 100) + val testSink = scheduler.underlyingActor.dataIO + .readStream(0 until 100) .via(ByteStreams.concat) .runWith(TestSink.probe) testSink.request(2) testSink.expectNext(data) testSink.expectComplete() + + val testSink1 = scheduler.underlyingActor.dataIO + .readStream(0 until 5) + .via(ByteStreams.concat) + .runWith(TestSink.probe) + + testSink1.request(2) + testSink1.expectNext(data.take(5)) + testSink1.expectComplete() } def testChunks(f: Seq[Chunk] ⇒ Unit) = { @@ -154,5 +168,6 @@ final class FileIOSchedulerTest extends SCExtensionSpec with FlatSpecLike { sc.actors.regionSupervisor ! CreateRegion("testRegion", sc.configs.regionConfig("testRegion")) sc.actors.regionSupervisor ! CreateStorage("testStorage", StorageProps.inMemory) sc.actors.regionSupervisor ! RegisterStorage("testRegion", "testStorage") + awaitAssert(sc.ops.region.getHealth("testRegion").futureValue shouldBe 'fullyOnline, 10 seconds) } } diff --git a/drive/fuse/src/main/scala/com/karasiq/shadowcloud/drive/fuse/SCFileSystem.scala b/drive/fuse/src/main/scala/com/karasiq/shadowcloud/drive/fuse/SCFileSystem.scala index be71e131..3b44683f 100644 --- a/drive/fuse/src/main/scala/com/karasiq/shadowcloud/drive/fuse/SCFileSystem.scala +++ b/drive/fuse/src/main/scala/com/karasiq/shadowcloud/drive/fuse/SCFileSystem.scala @@ -229,27 +229,18 @@ class SCFileSystem(config: SCDriveConfig, fsDispatcher: ActorRef, log: LoggingAd } override def read(path: String, buf: Pointer, size: Long, offset: Long, fi: FuseFileInfo): Int = { - def tryRead() = { - Try( - dispatch( - DispatchIOOperation(path, FileIOScheduler.ReadData(ChunkRanges.Range(offset, offset + size))), - DispatchIOOperation, - critical = true, - handle = fi.fh.longValue() - ) + val result = Try( + dispatch( + DispatchIOOperation(path, FileIOScheduler.ReadData(ChunkRanges.Range(offset, offset + size))), + DispatchIOOperation, + critical = true, + handle = fi.fh.longValue() ) - } - - var result: Try[Any] = tryRead() - var tries = 3 - while (result.isFailure && tries > 0) { - // Thread.sleep(5000) - result = tryRead() - tries -= 1 - } + ) result match { case Success(FileIOScheduler.ReadData.Success(_, data)) ⇒ + require(data.length <= size, "Read too much") for (i ← data.indices) buf.putByte(i, data(i)) data.length diff --git a/model/src/main/scala/com/karasiq/shadowcloud/model/Path.scala b/model/src/main/scala/com/karasiq/shadowcloud/model/Path.scala index 061408b3..a79baf1a 100644 --- a/model/src/main/scala/com/karasiq/shadowcloud/model/Path.scala +++ b/model/src/main/scala/com/karasiq/shadowcloud/model/Path.scala @@ -1,7 +1,5 @@ package com.karasiq.shadowcloud.model -import scala.language.{implicitConversions, postfixOps} - @SerialVersionUID(0L) final case class Path(nodes: Seq[String]) extends SCEntity { @transient @@ -81,4 +79,4 @@ object Path { } implicit val ordering: Ordering[Path] = Ordering.by(path ⇒ (path.nodes.length, path.toString)) -} \ No newline at end of file +} diff --git a/model/src/main/scala/com/karasiq/shadowcloud/utils/Utils.scala b/model/src/main/scala/com/karasiq/shadowcloud/utils/Utils.scala index 18bb8119..f5307685 100644 --- a/model/src/main/scala/com/karasiq/shadowcloud/utils/Utils.scala +++ b/model/src/main/scala/com/karasiq/shadowcloud/utils/Utils.scala @@ -1,21 +1,19 @@ package com.karasiq.shadowcloud.utils -import scala.collection.TraversableLike -import scala.concurrent.duration.FiniteDuration -import scala.language.{higherKinds, postfixOps} - import akka.util.ByteString -import com.typesafe.config.ConfigFactory - import com.karasiq.common.encoding.HexString import com.karasiq.shadowcloud.model.{Chunk, Path} +import com.typesafe.config.ConfigFactory + +import scala.collection.TraversableLike +import scala.concurrent.duration.FiniteDuration private[shadowcloud] object Utils { // ----------------------------------------------------------------------- // Paths // ----------------------------------------------------------------------- val InternalFolder = Path.root / ".$sc-internal" - + // ----------------------------------------------------------------------- // Time // ----------------------------------------------------------------------- @@ -35,7 +33,7 @@ private[shadowcloud] object Utils { private[this] val printValueDelimiter = ", " def printValues[T](values: Traversable[T], limit: Int = 10): String = { - val sb = new StringBuilder(100) + val sb = new StringBuilder(100) val size = values.size values.take(limit).foreach { v ⇒ if (sb.nonEmpty) sb.append(printValueDelimiter) @@ -48,7 +46,7 @@ private[shadowcloud] object Utils { def printHashes(hashes: Traversable[ByteString], limit: Int = 10): String = { if (hashes.isEmpty) return "" val size = hashes.size - val sb = new StringBuilder(math.min(limit, size) * (hashes.head.length * 2) + 10) + val sb = new StringBuilder(math.min(limit, size) * (hashes.head.length * 2) + 10) hashes.filter(_.nonEmpty).take(limit).foreach { hash ⇒ if (sb.nonEmpty) sb.append(printValueDelimiter) sb.append(HexString.encode(hash)) @@ -64,7 +62,7 @@ private[shadowcloud] object Utils { // ----------------------------------------------------------------------- // Misc // ----------------------------------------------------------------------- - @inline + @inline def isSameChunk(chunk: Chunk, chunk1: Chunk): Boolean = { // chunk.withoutData == chunk1.withoutData chunk == chunk1 @@ -78,8 +76,8 @@ private[shadowcloud] object Utils { def getFileExtension(path: String): String = { def indexOfExtension(path: String): Option[Int] = { def indexOfLastSeparator(filename: String): Int = math.max(filename.lastIndexOf('/'), filename.lastIndexOf('\\')) - val lastDot = path.lastIndexOf('.') - val lastSeparator = indexOfLastSeparator(path) + val lastDot = path.lastIndexOf('.') + val lastSeparator = indexOfLastSeparator(path) Some(lastDot).filterNot(lastSeparator > _) } indexOfExtension(path).fold("")(index ⇒ path.substring(index + 1)) @@ -100,7 +98,8 @@ private[shadowcloud] object Utils { str } else { //cutAt("\n").orElse(cutAt(". ")) - cutAt(". ").orElse(cutAt(" ")) + cutAt(". ") + .orElse(cutAt(" ")) .getOrElse(str.take(maxLength)) } } diff --git a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/common/AppComponents.scala b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/common/AppComponents.scala index c2f5dec3..c7b3d87a 100644 --- a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/common/AppComponents.scala +++ b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/common/AppComponents.scala @@ -44,13 +44,15 @@ object AppComponents { TabOverride.set(elem.asInstanceOf[dom.html.TextArea]) } - def closeableAlert(style: AlertStyle, onClose: () => Unit, md: Modifier*): Tag = + def closeableAlert(style: AlertStyle, onClose: () ⇒ Unit, md: Modifier*): Tag = div(new UniversalAlert(style) { override def closeButton: JsDom.all.Tag = - super.closeButton(onclick := Callback.onClick(_ => onClose())) + super.closeButton(onclick := Callback.onClick(_ ⇒ onClose())) }.renderTag(md: _*)) - def exportDialog(title: String, fileName: String, content: String, contentType: String = "application/json")(implicit context: AppContext): Modal = { + def exportDialog(title: String, fileName: String, content: String, contentType: String = "application/json")( + implicit context: AppContext + ): Modal = { def download(): Unit = Blobs.saveBlob(Blobs.fromString(content, contentType), fileName) @@ -67,7 +69,7 @@ object AppComponents { ) } - def importDialog(title: String)(submit: String => Unit)(implicit context: AppContext): Modal = { + def importDialog(title: String)(submit: String ⇒ Unit)(implicit context: AppContext): Modal = { val result = Var("") Modal() .withTitle(title) @@ -81,10 +83,17 @@ object AppComponents { ) } - def disabledIf(rx: Rx[Boolean]): Modifier = (t: Element) => { - rx.foreach(value => - if (value) t.setAttribute("disabled", "") - else t.removeAttribute("disabled") + def disabledIf(rx: Rx[Boolean]): Modifier = (t: Element) ⇒ { + rx.foreach( + value ⇒ + if (value) t.setAttribute("disabled", "") + else t.removeAttribute("disabled") ) } + + def idSelect(title: String, ids: Seq[String], selected: Seq[String]): (FormSelect, Tag) = { + val select = FormInput.multipleSelect(title, ids.map(id ⇒ FormSelectOption(id, id))) + select.selected() = selected + select → select.renderTag(height := 500.px) + } } diff --git a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/keys/KeysView.scala b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/keys/KeysView.scala index bfd7be9e..ddd87fa3 100644 --- a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/keys/KeysView.scala +++ b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/keys/KeysView.scala @@ -1,21 +1,17 @@ package com.karasiq.shadowcloud.webapp.components.keys import akka.util.ByteString -import org.scalajs.dom.MouseEvent -import org.scalajs.dom.html.TextArea -import rx.{Rx, Var} - import com.karasiq.bootstrap.Bootstrap.default._ -import scalaTags.all._ - import com.karasiq.shadowcloud.config.SerializedProps -import com.karasiq.shadowcloud.model.keys.KeySet import com.karasiq.shadowcloud.model.keys.KeyProps.RegionSet +import com.karasiq.shadowcloud.model.keys.KeySet import com.karasiq.shadowcloud.webapp.components.common.AppComponents import com.karasiq.shadowcloud.webapp.components.region.RegionContext import com.karasiq.shadowcloud.webapp.context.AppContext import com.karasiq.shadowcloud.webapp.context.AppContext.JsExecutionContext -import com.karasiq.shadowcloud.webapp.utils.{Blobs, ExportUtils} +import com.karasiq.shadowcloud.webapp.utils.ExportUtils +import rx.{Rx, Var} +import scalaTags.all._ object KeysView { def apply()(implicit context: AppContext, kc: KeysContext, rc: RegionContext): KeysView = { @@ -121,16 +117,16 @@ class KeysView()(implicit context: AppContext, kc: KeysContext, rc: RegionContex def renderPermissions() = { val forEncryptionRx = Var(forEncryption) val forDecryptionRx = Var(forDecryption) - val regionSelector = FormInput.multipleSelect(context.locale.regions, rc.regions.map(_.regions.keys.toVector.map(id ⇒ FormSelectOption(id, id)))) - regionSelector.selected() = regionSet.toVector + val allIds = rc.regions.now.regions.keys.toVector + val (idSelect, idSelectRendered) = AppComponents.idSelect(context.locale.regions, allIds, regionSet.toVector) - Rx((forEncryptionRx(), forDecryptionRx(), regionSelector.selected())) - .triggerLater(updatePermissions(regionSelector.selected.now.toSet, forEncryptionRx.now, forDecryptionRx.now)) + Rx((forEncryptionRx(), forDecryptionRx(), idSelect.selected())) + .triggerLater(updatePermissions(idSelect.selected.now.toSet, forEncryptionRx.now, forDecryptionRx.now)) Form( FormInput.checkbox(context.locale.keyForEncryption, forEncryptionRx.reactiveInput), FormInput.checkbox(context.locale.keyForDecryption, forDecryptionRx.reactiveInput), - regionSelector + idSelectRendered ) } diff --git a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/RegionConfigView.scala b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/RegionConfigView.scala index 67c2edf7..15f72241 100644 --- a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/RegionConfigView.scala +++ b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/RegionConfigView.scala @@ -49,10 +49,10 @@ class RegionConfigView(regionId: RegionId)(implicit context: AppContext, regionC // Hack to bypass an initialization lag def updateHealth(): Unit = { healthRx.update() - org.scalajs.dom.window.setTimeout(() => healthRx.update(), 100) - org.scalajs.dom.window.setTimeout(() => healthRx.update(), 500) - org.scalajs.dom.window.setTimeout(() => healthRx.update(), 1500) - org.scalajs.dom.window.setTimeout(() => healthRx.update(), 5000) + org.scalajs.dom.window.setTimeout(() ⇒ healthRx.update(), 100) + org.scalajs.dom.window.setTimeout(() ⇒ healthRx.update(), 500) + org.scalajs.dom.window.setTimeout(() ⇒ healthRx.update(), 1500) + org.scalajs.dom.window.setTimeout(() ⇒ healthRx.update(), 5000) } compactReport.triggerLater(updateHealth()) @@ -66,7 +66,7 @@ class RegionConfigView(regionId: RegionId)(implicit context: AppContext, regionC div( if (!regionStatus.suspended) { Seq( - div(HealthView(healthRx.toRx), onclick := Callback.onClick(_ => updateHealth())), + div(HealthView(healthRx.toRx), onclick := Callback.onClick(_ ⇒ updateHealth())), renderCompactButton(), renderGCButton(), renderRepairButton(), @@ -108,19 +108,19 @@ class RegionConfigView(regionId: RegionId)(implicit context: AppContext, regionC ) }, div(gcReport.map[Frag] { - case Some(report) ⇒ AppComponents.closeableAlert(AlertStyle.warning, () => gcReport() = None, report.toString) + case Some(report) ⇒ AppComponents.closeableAlert(AlertStyle.warning, () ⇒ gcReport() = None, report.toString) case None ⇒ Bootstrap.noContent }) ) } private[this] def renderSyncReports(reportsVar: Var[Map[StorageId, SyncReport]]) = - div(reportsVar.map { reports => + div(reportsVar.map { reports ⇒ if (reports.nonEmpty) div( AppComponents.closeableAlert( AlertStyle.success, - () => reportsVar() = Map.empty, + () ⇒ reportsVar() = Map.empty, for ((storageId, report) ← reports.toSeq) yield div(b(storageId), ": ", report.toString) ) ) @@ -156,7 +156,7 @@ class RegionConfigView(regionId: RegionId)(implicit context: AppContext, regionC if (!repairStarted.now) { repairStarted() = true val future = context.api.repairRegion(regionId, storages) - future.onComplete { _ => + future.onComplete { _ ⇒ Toastr.success(s"Region replication finished: $regionId") repairStarted() = false } @@ -204,7 +204,7 @@ class RegionConfigView(regionId: RegionId)(implicit context: AppContext, regionC private[this] def renderStateButtons(regionStatus: RegionStatus) = { def doSuspend() = context.api.suspendRegion(regionId).foreach(_ ⇒ regionContext.updateRegion(regionId)) - def doResume() = context.api.resumeRegion(regionId).foreach { _ => + def doResume() = context.api.resumeRegion(regionId).foreach { _ ⇒ regionContext.updateRegion(regionId) updateHealth() } @@ -269,12 +269,12 @@ class RegionConfigView(regionId: RegionId)(implicit context: AppContext, regionC def renderAddButton() = { def showAddDialog(): Unit = { - val allIds = regionContext.regions.now.storages.keys.toSeq.sorted - val idSelect = FormInput.multipleSelect(context.locale.storages, allIds.map(id ⇒ FormSelectOption(id, id))) + val allIds = regionContext.regions.now.storages.keys.toSeq.sorted + val (idSelect, idSelectRendered) = AppComponents.idSelect(context.locale.storages, allIds, regionStatus.storages.toSeq) idSelect.selected() = regionStatus.storages.toSeq Modal() .withTitle(context.locale.registerStorage) - .withBody(Form(idSelect)) + .withBody(Form(idSelectRendered)) .withButtons( AppComponents.modalSubmit(onclick := Callback.onClick(_ ⇒ updateStorageList(idSelect.selected.now.toSet))), AppComponents.modalClose() diff --git a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/StorageConfigView.scala b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/StorageConfigView.scala index f0e5f128..4a5fabf4 100644 --- a/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/StorageConfigView.scala +++ b/server/webapp/src/main/scala/com/karasiq/shadowcloud/webapp/components/region/StorageConfigView.scala @@ -1,21 +1,19 @@ package com.karasiq.shadowcloud.webapp.components.region -import scala.concurrent.Future - import akka.util.ByteString -import rx.Var -import rx.async._ - import com.karasiq.bootstrap.Bootstrap.default._ -import scalaTags.all._ - import com.karasiq.shadowcloud.config.SerializedProps import com.karasiq.shadowcloud.model.StorageId -import com.karasiq.shadowcloud.model.utils.StorageHealth import com.karasiq.shadowcloud.model.utils.RegionStateReport.StorageStatus +import com.karasiq.shadowcloud.model.utils.StorageHealth import com.karasiq.shadowcloud.webapp.components.common.{AppComponents, AppIcons} import com.karasiq.shadowcloud.webapp.context.AppContext import com.karasiq.shadowcloud.webapp.context.AppContext.JsExecutionContext +import rx.Var +import rx.async._ +import scalaTags.all._ + +import scala.concurrent.Future object StorageConfigView { def apply(storageId: StorageId)(implicit context: AppContext, regionContext: RegionContext): StorageConfigView = { @@ -45,33 +43,38 @@ class StorageConfigView(storageId: StorageId)(implicit context: AppContext, regi private[this] def renderStateButtons(storageStatus: StorageStatus) = { def doSuspend() = { - context.api.suspendStorage(storageId) + context.api + .suspendStorage(storageId) .foreach(_ ⇒ regionContext.updateStorage(storageId)) } def doResume() = { - context.api.resumeStorage(storageId) + context.api + .resumeStorage(storageId) .foreach(_ ⇒ regionContext.updateStorage(storageId)) } def doDelete() = { - context.api.deleteStorage(storageId) + context.api + .deleteStorage(storageId) .foreach(_ ⇒ regionContext.updateAll()) } - val suspendButton = if (storageStatus.suspended) - Button(ButtonStyle.success, ButtonSize.extraSmall)(AppIcons.resume, context.locale.resume, onclick := Callback.onClick(_ ⇒ doResume())) - else - Button(ButtonStyle.warning, ButtonSize.extraSmall)(AppIcons.suspend, context.locale.suspend, onclick := Callback.onClick(_ ⇒ doSuspend())) + val suspendButton = + if (storageStatus.suspended) + Button(ButtonStyle.success, ButtonSize.extraSmall)(AppIcons.resume, context.locale.resume, onclick := Callback.onClick(_ ⇒ doResume())) + else + Button(ButtonStyle.warning, ButtonSize.extraSmall)(AppIcons.suspend, context.locale.suspend, onclick := Callback.onClick(_ ⇒ doSuspend())) - val deleteButton = Button(ButtonStyle.danger, ButtonSize.extraSmall)(AppIcons.delete, context.locale.delete, onclick := Callback.onClick(_ ⇒ doDelete())) + val deleteButton = + Button(ButtonStyle.danger, ButtonSize.extraSmall)(AppIcons.delete, context.locale.delete, onclick := Callback.onClick(_ ⇒ doDelete())) ButtonGroup(ButtonGroupSize.extraSmall, suspendButton, deleteButton) } private[this] def renderConfigField(storageStatus: StorageStatus) = { def renderConfigForm() = { - val changed = Var(false) + val changed = Var(false) val newConfigRx = Var(storageStatus.storageProps.data.utf8String) newConfigRx.triggerLater(changed() = true) @@ -80,7 +83,8 @@ class StorageConfigView(storageId: StorageId)(implicit context: AppContext, regi Form.submit(context.locale.submit, changed.reactiveShow), onsubmit := Callback.onSubmit { _ ⇒ val newConfig = SerializedProps(storageStatus.storageProps.format, ByteString(newConfigRx.now)) - context.api.createStorage(storageId, newConfig) + context.api + .createStorage(storageId, newConfig) .foreach(_ ⇒ regionContext.updateStorage(storageId)) } ) @@ -92,7 +96,7 @@ class StorageConfigView(storageId: StorageId)(implicit context: AppContext, regi private[this] def renderRegionsRegistration(storageStatus: StorageStatus) = { def updateRegionList(newIdSet: Set[StorageId]) = { val currentIdSet = storageStatus.regions - val toRegister = newIdSet -- currentIdSet + val toRegister = newIdSet -- currentIdSet val toUnregister = currentIdSet -- newIdSet for { _ ← Future.sequence(toUnregister.map(context.api.unregisterStorage(_, storageId))) @@ -102,12 +106,11 @@ class StorageConfigView(storageId: StorageId)(implicit context: AppContext, regi def renderAddButton() = { def showAddDialog(): Unit = { - val allIds = regionContext.regions.now.regions.keys.toSeq.sorted - val idSelect = FormInput.multipleSelect(context.locale.regions, allIds.map(id ⇒ FormSelectOption(id, id))) - idSelect.selected() = storageStatus.regions.toSeq + val allIds = regionContext.regions.now.regions.keys.toSeq.sorted + val (idSelect, idSelectRendered) = AppComponents.idSelect(context.locale.regions, allIds, storageStatus.regions.toSeq) Modal() .withTitle(context.locale.registerRegion) - .withBody(Form(idSelect)) + .withBody(Form(idSelectRendered)) .withButtons( AppComponents.modalSubmit(onclick := Callback.onClick(_ ⇒ updateRegionList(idSelect.selected.now.toSet))), AppComponents.modalClose() @@ -115,7 +118,11 @@ class StorageConfigView(storageId: StorageId)(implicit context: AppContext, regi .show() } - Button(ButtonStyle.primary, ButtonSize.extraSmall)(AppIcons.register, context.locale.registerRegion, onclick := Callback.onClick(_ ⇒ showAddDialog())) + Button(ButtonStyle.primary, ButtonSize.extraSmall)( + AppIcons.register, + context.locale.registerRegion, + onclick := Callback.onClick(_ ⇒ showAddDialog()) + ) } def renderRegion(storageId: StorageId) = { @@ -132,4 +139,3 @@ class StorageConfigView(storageId: StorageId)(implicit context: AppContext, regi ) } } - diff --git a/shell/src/main/scala/com/karasiq/shadowcloud/shell/ImplicitConversions.scala b/shell/src/main/scala/com/karasiq/shadowcloud/shell/ImplicitConversions.scala index a2d77f39..1c5d7a0b 100644 --- a/shell/src/main/scala/com/karasiq/shadowcloud/shell/ImplicitConversions.scala +++ b/shell/src/main/scala/com/karasiq/shadowcloud/shell/ImplicitConversions.scala @@ -1,8 +1,6 @@ package com.karasiq.shadowcloud.shell -import java.nio.file.{Paths, Path ⇒ FSPath} - -import scala.language.{implicitConversions, postfixOps} +import java.nio.file.{Paths, Path => FSPath} trait ImplicitConversions { implicit def toFSPath(path: String): FSPath = { diff --git a/storage/telegram/src/main/resources/tgcloud/download_service.py b/storage/telegram/src/main/resources/tgcloud/download_service.py index 0b40fad5..f01dd2f8 100644 --- a/storage/telegram/src/main/resources/tgcloud/download_service.py +++ b/storage/telegram/src/main/resources/tgcloud/download_service.py @@ -10,13 +10,12 @@ import errno import lz4.frame import pytz +import secret from quart import Quart, request, Response from telethon import TelegramClient from telethon.errors.rpcbaseerrors import RPCError from telethon.tl.types import DocumentAttributeFilename -import secret - app = Quart(__name__) app.config.from_object(__name__) @@ -93,13 +92,14 @@ async def download_block(uid, file_out): async def delete_data(uid): messages = client.iter_messages(entity, search=uid) - to_delete = [] + ids_to_delete = [] async for m in messages: if starts_with(m.message, uid): - to_delete.append(m.id) + ids_to_delete.append(m.id) - if len(await client.delete_messages(entity, to_delete)) > 0: - await delete_outdated_meta(uid) + if len(await client.delete_messages(entity, ids_to_delete)) > 0: + max_id = min(ids_to_delete) + await delete_outdated_meta(uid, max_id) return 0 else: return errno.EEXIST @@ -145,11 +145,12 @@ def get_meta_lock(path) -> asyncio.Lock: return lock -async def delete_outdated_meta(path): +async def delete_outdated_meta(path, min_id): to_delete = [] async with meta_lock: - async for m in client.iter_messages(entity, search='$tgcloud_meta'): + async for m in client.iter_messages(entity, search='$tgcloud_meta', min_id=min_id): if starts_with(f'$tgcloud_meta/{path}', m.message): + app.logger.warning(f"Removing meta file: {m}") to_delete.append(m.id) return await client.delete_messages(entity, to_delete) @@ -161,32 +162,29 @@ def __init__(self, obj): import json - uid = f'$tgcloud_meta/{path}' + meta_file_path = f'$tgcloud_meta/{path}' - async def list_files() -> list: - meta_messages = [] - async for m in client.iter_messages(entity, search=uid): - if m.message == uid: - meta_messages.append(m) - return meta_messages + async def get_meta_file(): + async for m in client.iter_messages(entity, search=meta_file_path): + if m.message == meta_file_path: + return m + return None - size_meta = await list_files() - messages = client.iter_messages(entity, search=path) + meta_file = await get_meta_file() data = { 'messages': [], 'last_id': -1 } - if len(size_meta) > 0: - msg = size_meta[0] + if meta_file: + msg = meta_file compressed_data = BytesIO() await client.download_media(msg, compressed_data) - try: - compressed_data.seek(0) - raw_data = lz4.frame.decompress(compressed_data.read()) - data = json.loads(raw_data) - messages = client.iter_messages(entity, search=path, min_id=data['last_id'] + 1) - except Exception: - pass + compressed_data.seek(0) + raw_data = lz4.frame.decompress(compressed_data.read()) + data = json.loads(raw_data) + messages = client.iter_messages(entity, search=path, min_id=data['last_id'] + 1) + else: + messages = client.iter_messages(entity, search=path) added = 0 async for m in messages: @@ -200,21 +198,18 @@ async def list_files() -> list: added += 1 now = pytz.utc.localize(datetime.utcnow()) - is_outdated = added > 0 and (len(size_meta) == 0 or now > (size_meta[0].date + timedelta(minutes=10))) + is_outdated = added > 0 and (meta_file is None or now > (meta_file.date + timedelta(minutes=10))) if is_outdated and len(data['messages']) > 10: try: raw_json = bytes(json.dumps(data), "utf-8") compressed_json = lz4.frame.compress(raw_json) - result = await client.send_file(entity, - file=compressed_json, - caption=f'{uid}', - attributes=[DocumentAttributeFilename(f'{uid}')], - allow_cache=False, - part_size_kb=512, - force_document=True) - if result: - if len(size_meta) > 0: - await client.delete_messages(entity, list(map(lambda m: m.id, size_meta))) + await client.send_file(entity, + file=compressed_json, + caption=f'{meta_file_path}', + attributes=[DocumentAttributeFilename(f'{meta_file_path}')], + allow_cache=False, + part_size_kb=512, + force_document=True) except RPCError: pass messages_objs = list(map(lambda m: AsObject(m), data['messages']))