From 0ac37a94214e5b09999475b7df3b67ce3e43d67e Mon Sep 17 00:00:00 2001 From: noproblem666 Date: Tue, 14 Jun 2016 13:08:51 +0200 Subject: [PATCH] Adds support for clusters w/o shared file system Normally Peel expects the files of a system to be stored in a folder shared among the nodes in the cluster. This commit allows distibution via rsync in clusters without a shared folder. To enable this option, one has to set the flag `system..path.isShared = true` in the 's configuration file. --- .../src/main/resources/reference.peel.conf | 3 ++ .../core/beans/system/System.scala | 40 +++++++++++++++++++ .../src/main/resources/reference.dstat.conf | 1 + .../src/main/resources/reference.flink.conf | 1 + .../main/resources/reference.hadoop-1.x.conf | 1 + .../main/resources/reference.hadoop-2.x.conf | 1 + .../src/main/resources/reference.spark.conf | 1 + .../main/resources/reference.zookeeper.conf | 1 + 8 files changed, 49 insertions(+) diff --git a/peel-core/src/main/resources/reference.peel.conf b/peel-core/src/main/resources/reference.peel.conf index b471e2b5..1e6db954 100644 --- a/peel-core/src/main/resources/reference.peel.conf +++ b/peel-core/src/main/resources/reference.peel.conf @@ -60,6 +60,9 @@ system { parallelism.per-node = ${runtime.cpu.cores} parallelism.total = ${system.default.config.parallelism.per-node} } + path { + isShared = false + } # system startup options startup { max.attempts = 3 diff --git a/peel-core/src/main/scala/org/peelframework/core/beans/system/System.scala b/peel-core/src/main/scala/org/peelframework/core/beans/system/System.scala index dc3c3aee..4dcae35a 100644 --- a/peel-core/src/main/scala/org/peelframework/core/beans/system/System.scala +++ b/peel-core/src/main/scala/org/peelframework/core/beans/system/System.scala @@ -16,6 +16,8 @@ package org.peelframework.core.beans.system import java.nio.file.{Files, Paths} +import java.io.File +import java.util.concurrent.TimeUnit import com.samskivert.mustache.Mustache import com.typesafe.config.ConfigFactory @@ -27,6 +29,12 @@ import org.peelframework.core.util.{Version, shell} import org.slf4j.LoggerFactory import org.springframework.beans.factory.BeanNameAware +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.Try +import scala.concurrent.ExecutionContext.Implicits.global + /** This class represents a System in the Peel framework. * * Most nodes in the Peel dependency-graph are systems. A [[System]] can specify it's dependencies which are then set @@ -73,12 +81,44 @@ abstract class System( } configuration().update() + if (config.getBoolean(s"system.$configKey.path.isShared")) + copyHomeToSlaves() start() logger.info(s"System '$toString' is now up and running") } } + def copyHomeToSlaves(): Unit = { + val homePath = config.getString(s"system.$configKey.path.home") + val destinationPath = new File(homePath).getParent + val slaves = config.getStringList(s"system.$configKey.config.slaves").asScala + val user = config.getString(s"system.$configKey.user") + val logPath = Paths.get(config.getString(s"system.$configKey.path.log")) + val relativeLogPath = Paths.get(destinationPath).relativize(logPath).normalize + val futureSyncedDirs = Future.traverse(slaves) { host => + for { + _ <- Future(createRemoteDirectory(destinationPath, user, host)) + f <- Future(copyDirectorytoRemote(homePath, destinationPath, user, host, relativeLogPath.toString + "/*")) + } yield f + } + + // await for all future log file counts and convert the result to a map + Await.result(futureSyncedDirs, Duration(Math.max(30, 5 * slaves.size), TimeUnit.SECONDS)) + } + + def copyDirectorytoRemote(localSource: String, remoteDestination: String, user: String, host: String, exclude: String): Int = { + val fullDestination: String = s"$user@$host:$remoteDestination" + val command = s"rsync -a $localSource $fullDestination --exclude $exclude" + logger.info(command) + shell ! (command, s"failed to copy $localSource to $fullDestination", fatal = true) + } + + def createRemoteDirectory(path: String, user: String, host: String): Int = { + logger.info(s"creating directory $path on remote host $host") + shell ! (s"ssh $user@$host mkdir -p $path", s"failed to create directory $path on remote host $host", fatal = true) + } + /** Cleans up and shuts down the system. */ def tearDown(): Unit = { if (!isRunning) { diff --git a/peel-extensions/src/main/resources/reference.dstat.conf b/peel-extensions/src/main/resources/reference.dstat.conf index 672bb731..05c40670 100644 --- a/peel-extensions/src/main/resources/reference.dstat.conf +++ b/peel-extensions/src/main/resources/reference.dstat.conf @@ -4,6 +4,7 @@ system { group = ${system.default.group} path { + isShared = ${system.default.path.isShared} archive.dst = ${app.path.systems} pids = ${java.io.tmpdir}"/dstat/dstat.pids" conf = ${system.dstat.path.home} diff --git a/peel-extensions/src/main/resources/reference.flink.conf b/peel-extensions/src/main/resources/reference.flink.conf index 92038205..49bef6c4 100644 --- a/peel-extensions/src/main/resources/reference.flink.conf +++ b/peel-extensions/src/main/resources/reference.flink.conf @@ -3,6 +3,7 @@ system { user = ${system.default.user} group = ${system.default.group} path { + isShared = ${system.default.path.isShared} archive.dst = ${app.path.systems} config = ${system.flink.path.home}"/conf" log = ${system.flink.path.home}"/log" diff --git a/peel-extensions/src/main/resources/reference.hadoop-1.x.conf b/peel-extensions/src/main/resources/reference.hadoop-1.x.conf index f591ba11..61509851 100644 --- a/peel-extensions/src/main/resources/reference.hadoop-1.x.conf +++ b/peel-extensions/src/main/resources/reference.hadoop-1.x.conf @@ -3,6 +3,7 @@ system { user = ${system.default.user} group = ${system.default.group} path { + isShared = ${system.default.path.isShared} archive.dst = ${app.path.systems} config = ${system.hadoop-1.path.home}"/conf" log = ${system.hadoop-1.path.home}"/logs" diff --git a/peel-extensions/src/main/resources/reference.hadoop-2.x.conf b/peel-extensions/src/main/resources/reference.hadoop-2.x.conf index 27889092..cdbcba14 100644 --- a/peel-extensions/src/main/resources/reference.hadoop-2.x.conf +++ b/peel-extensions/src/main/resources/reference.hadoop-2.x.conf @@ -3,6 +3,7 @@ system { user = ${system.default.user} group = ${system.default.group} path { + isShared = ${system.default.path.isShared} archive.dst = ${app.path.systems} config = ${system.hadoop-2.path.home}"/etc/hadoop" log = ${system.hadoop-2.path.home}"/logs" diff --git a/peel-extensions/src/main/resources/reference.spark.conf b/peel-extensions/src/main/resources/reference.spark.conf index 1a4258c4..208fda29 100644 --- a/peel-extensions/src/main/resources/reference.spark.conf +++ b/peel-extensions/src/main/resources/reference.spark.conf @@ -3,6 +3,7 @@ system { user = ${system.default.user} group = ${system.default.group} path { + isShared = ${system.default.path.isShared} archive.dst = ${app.path.systems} home = ${app.path.systems}"/spark" config = ${system.spark.path.home}"/conf" diff --git a/peel-extensions/src/main/resources/reference.zookeeper.conf b/peel-extensions/src/main/resources/reference.zookeeper.conf index 63b0d7fa..64b91a06 100644 --- a/peel-extensions/src/main/resources/reference.zookeeper.conf +++ b/peel-extensions/src/main/resources/reference.zookeeper.conf @@ -3,6 +3,7 @@ system { user = ${system.default.user} group = ${system.default.group} path { + isShared = ${system.default.path.isShared} archive.dst = ${app.path.systems} config = ${system.zookeeper.path.home}"/conf" data = ${system.zookeeper.path.home}"/data"