Skip to content

Commit

Permalink
Adds support for clusters w/o shared file system
Browse files Browse the repository at this point in the history
Normally Peel expects the files of a system to be stored in a folder
shared among the nodes in the cluster. This commit allows distibution
via rsync in clusters without a shared folder.

To enable this option, one has to set the flag
`system.<system>.path.isShared = true` in the <system>'s configuration file.
  • Loading branch information
noproblem666 authored and akunft committed Jun 23, 2016
1 parent 6a97c51 commit 0ac37a9
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 0 deletions.
3 changes: 3 additions & 0 deletions peel-core/src/main/resources/reference.peel.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ system {
parallelism.per-node = ${runtime.cpu.cores}
parallelism.total = ${system.default.config.parallelism.per-node}
}
path {
isShared = false
}
# system startup options
startup {
max.attempts = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.peelframework.core.beans.system

import java.nio.file.{Files, Paths}
import java.io.File
import java.util.concurrent.TimeUnit

import com.samskivert.mustache.Mustache
import com.typesafe.config.ConfigFactory
Expand All @@ -27,6 +29,12 @@ import org.peelframework.core.util.{Version, shell}
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.BeanNameAware

import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits.global

/** This class represents a System in the Peel framework.
*
* Most nodes in the Peel dependency-graph are systems. A [[System]] can specify it's dependencies which are then set
Expand Down Expand Up @@ -73,12 +81,44 @@ abstract class System(
}

configuration().update()
if (config.getBoolean(s"system.$configKey.path.isShared"))
copyHomeToSlaves()
start()

logger.info(s"System '$toString' is now up and running")
}
}

def copyHomeToSlaves(): Unit = {
val homePath = config.getString(s"system.$configKey.path.home")
val destinationPath = new File(homePath).getParent
val slaves = config.getStringList(s"system.$configKey.config.slaves").asScala
val user = config.getString(s"system.$configKey.user")
val logPath = Paths.get(config.getString(s"system.$configKey.path.log"))
val relativeLogPath = Paths.get(destinationPath).relativize(logPath).normalize
val futureSyncedDirs = Future.traverse(slaves) { host =>
for {
_ <- Future(createRemoteDirectory(destinationPath, user, host))
f <- Future(copyDirectorytoRemote(homePath, destinationPath, user, host, relativeLogPath.toString + "/*"))
} yield f
}

// await for all future log file counts and convert the result to a map
Await.result(futureSyncedDirs, Duration(Math.max(30, 5 * slaves.size), TimeUnit.SECONDS))
}

def copyDirectorytoRemote(localSource: String, remoteDestination: String, user: String, host: String, exclude: String): Int = {
val fullDestination: String = s"$user@$host:$remoteDestination"
val command = s"rsync -a $localSource $fullDestination --exclude $exclude"
logger.info(command)
shell ! (command, s"failed to copy $localSource to $fullDestination", fatal = true)
}

def createRemoteDirectory(path: String, user: String, host: String): Int = {
logger.info(s"creating directory $path on remote host $host")
shell ! (s"ssh $user@$host mkdir -p $path", s"failed to create directory $path on remote host $host", fatal = true)
}

/** Cleans up and shuts down the system. */
def tearDown(): Unit = {
if (!isRunning) {
Expand Down
1 change: 1 addition & 0 deletions peel-extensions/src/main/resources/reference.dstat.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ system {
group = ${system.default.group}

path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
pids = ${java.io.tmpdir}"/dstat/dstat.pids"
conf = ${system.dstat.path.home}
Expand Down
1 change: 1 addition & 0 deletions peel-extensions/src/main/resources/reference.flink.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ system {
user = ${system.default.user}
group = ${system.default.group}
path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
config = ${system.flink.path.home}"/conf"
log = ${system.flink.path.home}"/log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ system {
user = ${system.default.user}
group = ${system.default.group}
path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
config = ${system.hadoop-1.path.home}"/conf"
log = ${system.hadoop-1.path.home}"/logs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ system {
user = ${system.default.user}
group = ${system.default.group}
path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
config = ${system.hadoop-2.path.home}"/etc/hadoop"
log = ${system.hadoop-2.path.home}"/logs"
Expand Down
1 change: 1 addition & 0 deletions peel-extensions/src/main/resources/reference.spark.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ system {
user = ${system.default.user}
group = ${system.default.group}
path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
home = ${app.path.systems}"/spark"
config = ${system.spark.path.home}"/conf"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ system {
user = ${system.default.user}
group = ${system.default.group}
path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
config = ${system.zookeeper.path.home}"/conf"
data = ${system.zookeeper.path.home}"/data"
Expand Down

0 comments on commit 0ac37a9

Please sign in to comment.