diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 87f446f3..23c6515f 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -6,7 +6,7 @@ shadowcloud { health-check-interval = 5m index { - sync-interval = 1m + sync-interval = 3m snapshot-threshold = 1000 snapshot-clear-history = true compact-threshold = 0 // Disabled by default @@ -150,7 +150,7 @@ shadowcloud { buffers { read-chunks = 16M - repair = 32M + repair = 16M } serialization { diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/ChunkIODispatcher.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/ChunkIODispatcher.scala index 8f9505a6..1e88f2c4 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/ChunkIODispatcher.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/ChunkIODispatcher.scala @@ -192,6 +192,7 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag // ----------------------------------------------------------------------- override def preStart(): Unit = { def stopOnComplete(f: Future[Done]) = { + val log = this.log f.onComplete(result ⇒ log.error("Queue stopped: {}", result)) f.map(_ ⇒ Kill).pipeTo(self) } @@ -306,4 +307,9 @@ private final class ChunkIODispatcher(storageId: StorageId, storageProps: Storag DeleteChunks.wrapFuture(paths, deleted.zip(ioResult)) } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + reason.printStackTrace() + super.preRestart(reason, message) + } } diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala index 776bd12c..4944f5d5 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionDispatcher.scala @@ -436,4 +436,9 @@ private final class RegionDispatcher(regionId: RegionId, regionConfig: RegionCon pendingIndexQueue.complete() super.postStop() } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + reason.printStackTrace() + super.preRestart(reason, message) + } } diff --git a/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionIndex.scala b/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionIndex.scala index ea8b019e..bd85121b 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionIndex.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/actors/RegionIndex.scala @@ -296,7 +296,7 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId becomeOrDefault(receivePreWrite(loadedKeys ++ keys)) case Status.Failure(error) ⇒ - log.error(error, "Keys load error") + log.debug("Keys load error: {}", error) synchronization.scheduleNext() case StreamCompleted ⇒ @@ -516,4 +516,9 @@ private[actors] final class RegionIndex(storageId: StorageId, regionId: RegionId private[this] def becomeOrDefault(receive: Receive): Unit = { context.become(receive.orElse(receiveDefault)) } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + reason.printStackTrace() + super.preRestart(reason, message) + } } diff --git a/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionRepairStream.scala b/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionRepairStream.scala index af0c7f7d..0adda89a 100644 --- a/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionRepairStream.scala +++ b/core/src/main/scala/com/karasiq/shadowcloud/streams/region/RegionRepairStream.scala @@ -51,31 +51,36 @@ object RegionRepairStream { } chunksSource - .mapAsyncUnordered(parallelism.query)(regionOps.getChunkStatus(request.regionId, _)) - .via(RestartFlow.onFailuresWithBackoff(3 seconds, 30 seconds, 0.2, 20) { () ⇒ + .via(RestartFlow.onFailuresWithBackoff(500 millis, 10 seconds, 0.2, 20) { () ⇒ + Flow[Chunk].mapAsyncUnordered(parallelism.query)(regionOps.getChunkStatus(request.regionId, _)) + }) + .via( Flow[ChunkStatus] .map(status ⇒ status → createNewAffinity(status, request.strategy)) .filterNot { case (status, newAffinity) ⇒ newAffinity.exists(_.isFinished(status)) } - .mapAsyncUnordered(parallelism.read) { - case (status, newAffinity) ⇒ - regionOps.readChunkEncrypted(request.regionId, status.chunk).map(_ → newAffinity) - } + .via(RestartFlow.onFailuresWithBackoff(500 millis, 10 seconds, 0.2, 20) { () ⇒ + Flow[(ChunkStatus, Option[ChunkWriteAffinity])].mapAsyncUnordered(parallelism.read) { + case (status, newAffinity) ⇒ + regionOps.readChunkEncrypted(request.regionId, status.chunk).map(_ → newAffinity) + } + }) .log("region-repair-read", chunk ⇒ s"${chunk._1.hashString} at ${request.regionId}") - .async .via(ByteStreams.bufferT(_._1.data.encrypted.length, bufferSize)) - .mapAsyncUnordered(parallelism.write) { - case (chunk, newAffinity) ⇒ - regionOps.rewriteChunk(request.regionId, chunk, newAffinity) - } + .via(RestartFlow.onFailuresWithBackoff(500 millis, 10 seconds, 0.2, 20) { () ⇒ + Flow[(Chunk, Option[ChunkWriteAffinity])].mapAsyncUnordered(parallelism.write) { + case (chunk, newAffinity) ⇒ + regionOps.rewriteChunk(request.regionId, chunk, newAffinity) + } + }) .log("region-repair-write", chunk ⇒ s"${chunk.hashString} at ${request.regionId}") .withAttributes(ActorAttributes.logLevels(onElement = Logging.InfoLevel)) .map(_.withoutData) - }) + ) .fold(Nil: Seq[Chunk])(_ :+ _) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) .alsoTo(AkkaStreamUtils.successPromiseOnFirst(request.result)) } - .recover { case _ => Nil } + .recover { case _ ⇒ Nil } .to(Sink.ignore) .named("regionRepairStream") } diff --git a/storage/gdrive/src/main/scala/com/karasiq/shadowcloud/storage/gdrive/GDriveRepository.scala b/storage/gdrive/src/main/scala/com/karasiq/shadowcloud/storage/gdrive/GDriveRepository.scala index f51ec14f..007f318e 100644 --- a/storage/gdrive/src/main/scala/com/karasiq/shadowcloud/storage/gdrive/GDriveRepository.scala +++ b/storage/gdrive/src/main/scala/com/karasiq/shadowcloud/storage/gdrive/GDriveRepository.scala @@ -89,7 +89,7 @@ private[gdrive] class GDriveRepository(service: GDriveService)(implicit ec: Exec .named("gdriveRead") } - def write(key: Path) = { + def write(path: Path) = { def getFolderId(path: Path) = for { folderId ← entityCache.getOrCreateFolderId(path.parent) @@ -106,24 +106,24 @@ private[gdrive] class GDriveRepository(service: GDriveService)(implicit ec: Exec Flow[Data] .via(AkkaStreamUtils.extractUpstream) - .zip(Source.lazyFuture(() ⇒ getFolderId(key))) + .zip(Source.lazyFuture(() ⇒ getFolderId(path))) .flatMapConcat { case (stream, folderId) ⇒ stream.via( AkkaStreamUtils.writeInputStream( { inputStream ⇒ - val result = Try(blockingUpload(folderId, key.name, inputStream)) - result.foreach(_ ⇒ entityCache.resetFileCache(key)) + val result = Try(blockingUpload(folderId, path.name, inputStream)) + result.foreach(_ ⇒ entityCache.resetFileCache(path)) Source.future(Future.fromTry(result)) }, Dispatcher(GDriveDispatchers.fileDispatcherId) ) ) } - .map(written ⇒ StorageIOResult.Success(key, written)) + .map(written ⇒ StorageIOResult.Success(path, written)) .withAttributes(fileStreamAttributes) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) - .recover { case err ⇒ StorageIOResult.Failure(key, StorageUtils.wrapException(key, err)) } + .via(StorageUtils.wrapStream(path)) .toMat(Sink.head)(Keep.right) .named("gdriveWrite") } diff --git a/storage/telegram/src/main/resources/tgcloud/download_service.py b/storage/telegram/src/main/resources/tgcloud/download_service.py index 4ffd9fae..985152f6 100644 --- a/storage/telegram/src/main/resources/tgcloud/download_service.py +++ b/storage/telegram/src/main/resources/tgcloud/download_service.py @@ -4,55 +4,54 @@ from __future__ import print_function from __future__ import unicode_literals +import asyncio +# noinspection PyUnresolvedReferences import errno import sys -from datetime import timedelta -from flask import Flask, Response, request from io import BytesIO +from quart import Quart, request +from quart import Response from secret import * -from telegram_client_x import TelegramClientX +from telethon import TelegramClient from telethon.tl.types import DocumentAttributeFilename -app = Flask(__name__) +app = Quart(__name__) +app.config.from_object(__name__) path_home = './' # os.path.abspath('.') # os.path.dirname(os.path.realpath(__file__)) -client = TelegramClientX(entity, api_id, api_hash, update_workers=None, spawn_read_thread=False, - timeout=timedelta(seconds=30)) -# client = TelegramClient(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True) -client.set_upload_threads_count(12) # 24 -client.set_download_threads_count(12) # 8 +client = TelegramClient(entity, api_id, api_hash, timeout=30) entity_name = "tgcloud" entity = None @app.route('/download', methods=['GET']) -def download_route(): +async def download_route(): path = request.args.get('path') assert path is not None f = BytesIO() - assert download_block(path, f) == 0 + assert await download_block(path, f) == 0 f.seek(0) - return Response(f.read()) + return Response(f) @app.route('/list', methods=['GET']) -def list_route(): +async def list_route(): path = request.args.get('path') or '' f = BytesIO() - list_files(path, f) + await list_files(path, f) f.seek(0) - return Response(f.read(), mimetype='text/plain') + return Response(f, mimetype='text/plain') @app.route('/upload', methods=['POST', 'PUT']) -def upload_route(): +async def upload_route(): path = request.args.get('path') assert path is not None app.logger.info(f"Starting upload: {path}") - result = upload_block(path, request.stream) + result = await upload_block(path, await request.body, request.method == 'PUT') if result == 0: - return Response(status=200) + return Response("") elif result == errno.EEXIST: return Response("File already exists", 409) else: @@ -60,58 +59,56 @@ def upload_route(): @app.route('/delete', methods=['POST', 'DELETE']) -def delete_route(): +async def delete_route(): path = request.args.get("path") assert path is not None - assert delete_block(path) == 0 - return Response(status=200) + assert await delete_block(path) == 0 + return Response("") @app.route('/size', methods=['GET']) -def size_route(): +async def size_route(): path = request.args.get('path') or '' - size = size_files(path) + size = await size_files(path) return Response(str(size)) -def download_block(uid, file_out): - try_reconnect() - messages = client.get_messages(entity, limit=1, search=uid) +async def download_block(uid, file_out): + messages = await client.get_messages(entity, limit=1, search=uid) for i in range(len(messages)): msg = messages[i] if msg.message == uid: - if client.download_media(msg, file_out): + if await client.download_media(msg, file_out): return 0 else: return -1 return -1 -def delete_block(uid): - try_reconnect() - messages = client.get_messages(entity, limit=1, search=uid) +async def delete_block(uid): + messages = await client.get_messages(entity, limit=1, search=uid) for i in range(len(messages)): msg = messages[i] if msg.message == uid: - client.delete_messages(entity, [msg]) + await client.delete_messages(entity, [msg]) return 0 -def upload_block(uid, file_in): - try_reconnect() - messages = client.get_messages(entity, limit=1, search=uid) +async def upload_block(uid, file_in, overwrite=False): + messages = await client.get_messages(entity, limit=1, search=uid) if len(messages): app.logger.warning(f"File already exists: {uid}") - # client.delete_messages(entity, messages) - # app.logger.error(f"File already exists: {uid}") - return errno.EEXIST - result = client.send_file(entity, - file=file_in.read(), # TODO remove .read() - caption=f'{uid}', - attributes=[DocumentAttributeFilename(f'{uid}')], - allow_cache=False, - part_size_kb=512, - force_document=True) + if overwrite: + await client.delete_messages(entity, messages) + else: + return errno.EEXIST + result = await client.send_file(entity, + file=file_in, + caption=f'{uid}', + attributes=[DocumentAttributeFilename(f'{uid}')], + allow_cache=False, + part_size_kb=512, + force_document=True) app.logger.info(f"Upload result: {result}") if result: return 0 @@ -119,24 +116,21 @@ def upload_block(uid, file_in): return -1 -def list_files(pattern, file_out): - try_reconnect() - messages = client.get_messages(entity, limit=100, search=pattern) +async def list_files(pattern, file_out): + messages = await client.get_messages(entity, limit=100, search=pattern) last_id = 0 while len(messages) != 0: last_id = messages[-1].id - with_doc = filter(lambda m: hasattr(m.original_message, 'media') and m.document is not None and m.message != '', - messages) + with_doc = filter(lambda m: m.document is not None and m.message != '', messages) texts = map(lambda m: m.message, with_doc) for file in texts: file_out.write(bytes(file + "\n", "utf-8")) - messages = client.get_messages(entity, limit=100, search=pattern, offset_id=last_id) + messages = await client.get_messages(entity, limit=100, search=pattern, offset_id=last_id) return 0 -def size_files(pattern): - try_reconnect() - messages = client.get_messages(entity, limit=100, search=pattern) +async def size_files(pattern): + messages = await client.get_messages(entity, limit=100, search=pattern) total_size = 0 last_id = 0 while len(messages) != 0: @@ -144,63 +138,63 @@ def size_files(pattern): with_doc = filter(lambda m: m.document is not None and m.message != '', messages) sizes = map(lambda m: m.document.size, with_doc) total_size += sum(sizes) - messages = client.get_messages(entity, limit=100, search=pattern, offset_id=last_id) + messages = await client.get_messages(entity, limit=100, search=pattern, offset_id=last_id) return total_size -def try_reconnect(): +async def try_reconnect(): if not client.is_connected(): - client.connect() - init_entity() + await client.connect() + await init_entity() -def init_entity(): +async def init_entity(): global entity if entity == '': - entity = client.get_entity(client.get_me()) + entity = await client.get_entity(await client.get_me()) else: - entity = next(d for d in client.iter_dialogs() if not d.is_user and d.name == entity_name) + entity = await (d async for d in client.iter_dialogs() if not d.is_user and d.name == entity_name).__anext__() app.logger.info(f"Writing to entity: {entity}") -def main(argv): +async def main(argv): global entity_name + await client.connect() + if not await client.is_user_authorized(): + raise Exception("Telegram session not found - run from the project folder: python3 telegram_create_session.py") + await init_entity() try: service = str(argv[1]) if service == 'download': uid = str(argv[2]) - return download_block(uid, sys.stdout.buffer) + return await download_block(uid, sys.stdout.buffer) elif service == 'upload': uid = str(argv[2]) - return upload_block(uid, sys.stdin.buffer) + return await upload_block(uid, sys.stdin.buffer, True) elif service == 'list': pattern = argv[2] - return list_files(pattern, sys.stdout.buffer) + return await list_files(pattern, sys.stdout.buffer) elif service == 'delete': uid = argv[2] - return delete_block(uid) + return await delete_block(uid) elif service == 'size': pattern = argv[2] - size = size_files(pattern) + size = await size_files(pattern) sys.stdout.write(str(size)) return 0 elif service == 'server': - port = argv[2] + port = argv[2] or 5000 entity_name = argv[3] - app.run(debug=True, port=port, threaded=True) + await app.run_task(port=port, use_reloader=False) else: return -1 finally: - client.disconnect() - + await client.disconnect() -client.connect() -if not client.is_user_authorized(): - raise Exception("Telegram session not found - run from the project folder: python3 telegram_create_session.py") -init_entity() if __name__ == '__main__': - result = main(sys.argv[0:]) + loop = asyncio.get_event_loop() + result = loop.run_until_complete(main(sys.argv[0:])) if isinstance(result, int): sys.exit(result) elif result: diff --git a/storage/telegram/src/main/resources/tgcloud/requirements.txt b/storage/telegram/src/main/resources/tgcloud/requirements.txt index 337a718c..f7f3a2bf 100644 --- a/storage/telegram/src/main/resources/tgcloud/requirements.txt +++ b/storage/telegram/src/main/resources/tgcloud/requirements.txt @@ -1,3 +1,4 @@ -Telethon==0.19.1 +Telethon==1.14.0 cryptg==0.2.post1 -Flask==1.1.1 +Quart==0.12.0 +Hypercorn==0.9.5 diff --git a/storage/telegram/src/main/resources/tgcloud/telegram_create_session.py b/storage/telegram/src/main/resources/tgcloud/telegram_create_session.py index 54f70f52..300544f2 100644 --- a/storage/telegram/src/main/resources/tgcloud/telegram_create_session.py +++ b/storage/telegram/src/main/resources/tgcloud/telegram_create_session.py @@ -4,16 +4,21 @@ from __future__ import print_function from __future__ import unicode_literals +import asyncio from secret import * -from telegram_client_x import TelegramClientX +from telethon import TelegramClient -path_home = './' # os.path.abspath('.') -client = TelegramClientX(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True) -# client = TelegramClient(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True) +client = TelegramClient(entity, api_id, api_hash) -client.connect() -if not client.is_user_authorized(): - client.start() +async def create_session(): + await client.connect() -client.disconnect() + if not client.is_user_authorized(): + client.start() + + await client.disconnect() + + +loop = asyncio.get_event_loop() +result = loop.run_until_complete(create_session()) diff --git a/storage/telegram/src/main/scala/com/karasiq/shadowcloud/storage/telegram/TGCloudRepository.scala b/storage/telegram/src/main/scala/com/karasiq/shadowcloud/storage/telegram/TGCloudRepository.scala index f41bff41..a9c59da9 100644 --- a/storage/telegram/src/main/scala/com/karasiq/shadowcloud/storage/telegram/TGCloudRepository.scala +++ b/storage/telegram/src/main/scala/com/karasiq/shadowcloud/storage/telegram/TGCloudRepository.scala @@ -34,11 +34,11 @@ class TGCloudRepository(port: Int)(implicit as: ActorSystem) extends PathTreeRep private[this] val executeHttpRequest = { val settings = ConnectionPoolSettings(""" - |max-connections = 8 - |max-open-requests = 32 - |idle-timeout = 1m + |max-connections = 32 + |max-open-requests = 128 + |idle-timeout = 10m |max-retries = 0 - |idle-timeout = 300s + |idle-timeout = 1200s |""".stripMargin) Flow[HttpRequest] @@ -76,11 +76,11 @@ class TGCloudRepository(port: Int)(implicit as: ActorSystem) extends PathTreeRep .flatMapConcat(_.entity.dataBytes) .alsoToMat(StorageUtils.countPassedBytes(key).toMat(Sink.head)(Keep.right))(Keep.right) - override def write(key: Path): Sink[Data, Result] = { + override def write(path: Path): Sink[Data, Result] = { Flow[Data] .alsoToMat( Flow[Data] - .via(StorageUtils.countPassedBytes(key)) + .via(StorageUtils.countPassedBytes(path)) .toMat(Sink.head)(Keep.right) )(Keep.right) .via(ByteStreams.concat) @@ -89,20 +89,17 @@ class TGCloudRepository(port: Int)(implicit as: ActorSystem) extends PathTreeRep upstream ⇒ HttpRequest( HttpMethods.POST, - Uri("/upload").withQuery(Uri.Query("path" → encodePath(key))), + Uri("/upload").withQuery(Uri.Query("path" → encodePath(path))), entity = HttpEntity(ContentTypes.`application/octet-stream`, upstream) ) ) .via(executeHttpRequest) - .alsoTo(Sink.foreach(_.discardEntityBytes())) .collect[StorageIOResult] { - case r if r.status.isSuccess() ⇒ StorageIOResult.Success(key, 0) - case r if r.status == StatusCodes.Conflict ⇒ StorageIOResult.Failure(key, StorageException.AlreadyExists(key)) - case r ⇒ StorageIOResult.Failure(key, StorageUtils.wrapException(key, new IOException(s"Request error: $r"))) - } - .recover { - case err ⇒ StorageIOResult.Failure(key, StorageUtils.wrapException(key, err)) + case r if r.status.isSuccess() ⇒ StorageIOResult.Success(path, 0) + case r if r.status == StatusCodes.Conflict ⇒ StorageIOResult.Failure(path, StorageException.AlreadyExists(path)) + case r ⇒ StorageIOResult.Failure(path, StorageUtils.wrapException(path, new IOException(s"Request error: $r"))) } + .via(StorageUtils.wrapStream(path)) .toMat(Sink.head)(StorageUtils.foldIOFutures(_, _)) }