From 30c0ca09fda81cf09de8e758d222d5a601517592 Mon Sep 17 00:00:00 2001 From: mingliang Date: Wed, 10 Jun 2015 23:53:28 +0200 Subject: [PATCH 1/2] Added h2o extension --- .../stratosphere/peel/core/config/Model.scala | 4 + .../src/main/resources/peel-extensions.xml | 6 + .../src/main/resources/reference.h2o.conf | 31 ++++ .../templates/h2o/conf/h2o-conf.mustache | 5 + .../templates/h2o/conf/hosts.mustache | 3 + .../h2o/beans/experiment/H2OExperiment.scala | 150 ++++++++++++++++++ .../extensions/h2o/beans/system/H2O.scala | 107 +++++++++++++ 7 files changed, 306 insertions(+) create mode 100644 peel-extensions/src/main/resources/reference.h2o.conf create mode 100644 peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache create mode 100644 peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache create mode 100644 peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala create mode 100644 peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala diff --git a/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala b/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala index bbcbf39f..ccaf5c0c 100644 --- a/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala +++ b/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala @@ -92,6 +92,10 @@ object Model { val hosts = c.getStringList(key) } + class HostsWithPort(val c: Config, val prefix: String) extends Model { + val hosts = (for (host <- c.getStringList(s"$prefix.slaves").asScala) yield s"$host:${c.getInt(s"$prefix.cli.dataport")}").asJava + } + def factory[T <: Model](config: Config, prefix: String)(implicit m: Manifest[T]) = { val constructor = m.runtimeClass.getConstructor(classOf[Config], classOf[String]) constructor.newInstance(config, prefix) diff --git a/peel-extensions/src/main/resources/peel-extensions.xml b/peel-extensions/src/main/resources/peel-extensions.xml index bd1b09ef..fd59db68 100644 --- a/peel-extensions/src/main/resources/peel-extensions.xml +++ b/peel-extensions/src/main/resources/peel-extensions.xml @@ -66,4 +66,10 @@ + + + + + + \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.h2o.conf b/peel-extensions/src/main/resources/reference.h2o.conf new file mode 100644 index 00000000..c305321d --- /dev/null +++ b/peel-extensions/src/main/resources/reference.h2o.conf @@ -0,0 +1,31 @@ +system { + h2o { + user = ${system.default.user} + group = ${system.default.group} + path { + # uncomment the following section if you want to extract an archive on every run + # archive = { + # src = ${app.path.downloads}"/flink-bin-0.6-incubating-SNAPSHOT.tgz" + # dst = ${app.path.systems} + # } + home = ${app.path.systems}"/h2o" + } + startup { + max.attempts = ${system.default.startup.max.attempts} + polling { + counter = ${system.default.startup.polling.counter} + interval = ${system.default.startup.polling.interval} + } + } + config { + # put list of slaves + slaves = ${system.default.config.slaves} + cli { + tmp.dir = "/tmp/h2o-"${system.default.user} + dataport = 55555 + memory.per-node = 2g + parallelism.per-node = ${system.default.config.parallelism.per-node} + } + } + } +} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache b/peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache new file mode 100644 index 00000000..0069487b --- /dev/null +++ b/peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache @@ -0,0 +1,5 @@ +{{#tmp.dir}}tmp.dir {{tmp.dir}}{{/tmp.dir}} +{{#dataport}}dataport {{dataport}}{{/dataport}} +{{#parallelism.per-node}}parallelism.per-node {{parallelism.per-node}}{{/parallelism.per-node}} +{{#memory.per-node}}memory.per-node {{memory.per-node}}{{/memory.per-node}} +{{#single-precision}}single-precision {{single-precision}}{{/single-precision}} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache b/peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache new file mode 100644 index 00000000..f70d5453 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache @@ -0,0 +1,3 @@ +{{#hosts}} +{{{.}}} +{{/hosts}} diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala new file mode 100644 index 00000000..976f3b05 --- /dev/null +++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala @@ -0,0 +1,150 @@ +package eu.stratosphere.peel.extensions.h2o.beans.experiment + +import java.lang.{System => Sys} +import java.io.FileWriter +import java.nio.file.{Paths, Files} + +import com.typesafe.config.Config +import eu.stratosphere.peel.core.beans.data.{ExperimentOutput, DataSet} +import eu.stratosphere.peel.core.beans.experiment.Experiment +import eu.stratosphere.peel.core.util.shell +import eu.stratosphere.peel.extensions.h2o.beans.system.H2O +import spray.json._ + +/** + * Created by qml_moon on 23/05/15. + */ +class H2OExperiment(command: String, + runner: H2O, + runs: Int, + inputs: Set[DataSet], + outputs: Set[ExperimentOutput], + name: String, + config: Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) { + + def this(runs: Int, runner: H2O, input: DataSet, output: ExperimentOutput, command: String, name: String, config: Config) = this(command, runner, runs, Set(input), Set(output), name, config) + + def this(runs: Int, runner: H2O, inputs: Set[DataSet], output: ExperimentOutput, command: String, name: String, config: Config) = this(command, runner, runs, inputs, Set(output), name, config) + + override def run(id: Int, force: Boolean): Experiment.Run[H2O] = new H2OExperiment.SingleJobRun(id, this, force) +} + +object H2OExperiment { + + case class State(name: String, + command: String, + var runExitCode: Option[Int] = None, + var runTime: Long = 0) extends Experiment.RunState {} + + object StateProtocol extends DefaultJsonProtocol with NullOptions { + implicit val stateFormat = jsonFormat4(State) + } + + /** A private inner class encapsulating the logic of single run. */ + class SingleJobRun(val id: Int, val exp: H2OExperiment, val force: Boolean) extends Experiment.SingleJobRun[H2O, State] { + + import eu.stratosphere.peel.extensions.h2o.beans.experiment.H2OExperiment.StateProtocol._ + + val runnerLogPath = s"${exp.config.getString("system.h2o.config.cli.tmp.dir")}/h2ologs" + + override def isSuccessful = state.runExitCode.getOrElse(-1) == 0 + + override protected def logFilePatterns = List(s"$runnerLogPath/h2o_*-3-info.log") + + val pollingNode = exp.config.getStringList("system.h2o.config.slaves").get(0) + val user = exp.config.getString("system.h2o.user") + val dataPort = exp.config.getInt("system.h2o.config.cli.dataport") + + override protected def loadState(): State = { + if (Files.isRegularFile(Paths.get(s"$home/state.json"))) { + try { + io.Source.fromFile(s"$home/state.json").mkString.parseJson.convertTo[State] + } catch { + case e: Throwable => State(name, command) + } + } else { + State(name, command) + } + } + + override protected def writeState() = { + val fw = new FileWriter(s"$home/state.json") + fw.write(state.toJson.prettyPrint) + fw.close() + } + + override protected def runJob() = { + // try to execute the experiment + val (runExit, t) = Experiment.time(this ! (command, s"$home/run.out", s"$home/run.err")) + state.runTime = t + state.runExitCode = Some(runExit) +// jobids = (this !!(command)).split("\n") + +// var isDone = false +// while (!isDone) { +// Thread.sleep(exp.config.getInt("system.h2o.startup.polling.interval") * 5) +// val status = (this getStatus(jobids(1))) +// if (status == "DONE" || status == "FAILED") { +// isDone = true +// state.runTime = Integer.parseInt(this getRuntime(exp.config, jobids(1))) +// state.runExitCode = status match { +// case "DONE" => Some(0) +// case "FAILED" => Some(-1) +// } +// } +// } + + } + + /** Before the run, collect runner log files and their current line counts */ + override protected def beforeRun() = { + val logFiles = for (pattern <- logFilePatterns; f <- (shell !! s""" ssh $user@$pollingNode "ls $pattern" """).split(Sys.lineSeparator).map(_.trim)) yield f + logFileCounts = Map((for (f <- logFiles) yield f -> (shell !! s""" ssh $user@$pollingNode "wc -l $f | cut -d' ' -f1" """).trim.toLong): _*) + } + + /** After the run, copy logs */ + override protected def afterRun(): Unit = { + shell ! s"rm -Rf $home/logs/*" + for ((file, count) <- logFileCounts) shell ! s""" ssh $user@$pollingNode "tail -n +${count + 1} $file" > $home/logs/${Paths.get(file).getFileName}""" + } + + override def cancelJob() = { + //firstly, retrieve the jobid of running job + val s = shell !! s"wget -qO- $user@$pollingNode:$dataPort/3/Jobs" + var jobid = "" + for (job <- s.parseJson.asJsObject.fields.get("jobs").get.asInstanceOf[JsArray].elements) { + val fields = job.asJsObject.fields + if (fields.getOrElse("status", "").toString == "\"RUNNING\"" || fields.getOrElse("status", "").toString == "\"CREATED\"") { + val sid = fields.get("key").get.asInstanceOf[JsObject].fields.get("name").get.toString() + jobid = sid.substring(1, jobid.length - 1) + } + } + + //send cancel request to REST API + shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")}/cancel --post-data="job_id=${jobid}" """ + + //check if the job has been successfully cancelled + var isCancelled = false + while (!isCancelled) { + Thread.sleep(exp.config.getInt("system.h2o.startup.polling.interval") * 2) + val status = this getStatus(jobid) + isCancelled = status == "CANCELLED" + } + state.runTime = exp.config.getLong("experiment.timeout") * 1000 + state.runExitCode = Some(-1) + } + + private def !(command: String, outFile: String, errFile: String) = { + shell ! s"$command $pollingNode:$dataPort > $outFile 2> $errFile" + } + + private def getStatus(jobid: String): String = { + (shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")} | grep -Eo '"status":"[A-Z]+"' | grep -Eo [A-Z]+ """).stripLineEnd + } + +// private def getRuntime(config: Config, jobid: String): String = { +// (shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")} | grep -Eo '"msec":[0-9]+' | grep -Eo [0-9]+ """).stripLineEnd +// } + } + +} \ No newline at end of file diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala new file mode 100644 index 00000000..c1c76df6 --- /dev/null +++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala @@ -0,0 +1,107 @@ +package eu.stratosphere.peel.extensions.h2o.beans.system + +import com.samskivert.mustache.Mustache +import com.typesafe.config.ConfigException +import eu.stratosphere.peel.core.beans.system.Lifespan._ +import eu.stratosphere.peel.core.beans.system.{SetUpTimeoutException, System} +import eu.stratosphere.peel.core.config.{Model, SystemConfig} +import eu.stratosphere.peel.core.util.shell + +import scala.collection.JavaConverters._ + +/** Wrapper for H2O + * + * Implements H2O as a [[eu.stratosphere.peel.core.beans.system.System System]] class and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "2.8.6") + * @param lifespan [[eu.stratosphere.peel.core.beans.system.Lifespan Lifespan]] of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class H2O(version: String, lifespan: Lifespan, dependencies: Set[System] = Set(), mc: Mustache.Compiler) extends System("h2o", version, lifespan, dependencies, mc) { + + override def configuration() = SystemConfig(config, { + val home = config.getString("system.h2o.path.home") + List( + SystemConfig.Entry[Model.HostsWithPort]("system.h2o.config", s"$home/flatfile", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Yaml]("system.h2o.config.cli", s"$home/conf", templatePath("conf/h2o-conf"), mc) + ) + }) + + override protected def start(): Unit = { + val user = config.getString("system.h2o.user") + val tmpDir =config.getString("system.h2o.config.cli.tmp.dir") + val dataPort = config.getInt("system.h2o.config.cli.dataport") + val memory = config.getString("system.h2o.config.cli.memory.per-node") + val nthreads = config.getInt("system.h2o.config.cli.parallelism.per-node") + val home = config.getString("system.h2o.path.home") + + // check if tmp dir exists and create if not + try { + + for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) { + logger.info(s"Initializing tmp directory $tmpDir at taskmanager node $dataNode") + shell ! s""" ssh $user@$dataNode "rm -Rf $tmpDir" """ + shell ! s""" ssh $user@$dataNode "mkdir -p $tmpDir" """ + } + } catch { + case _: ConfigException => // ignore not set explicitly, java default is taken + } + + var failedStartUpAttempts = 0 + + while (!isUp) { + try { + val totl = config.getStringList("system.h2o.config.slaves").size() + val init = 0 // H2O doesn't reset the log on startup + + for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) { + shell ! s""" ssh $user@$dataNode "java -Xmx$memory -cp $home/h2odriver.jar water.H2OApp -flatfile $home/flatfile -nthreads $nthreads -ice_root $tmpDir -port $dataPort > /dev/null &" """ + } + logger.info("Waiting for nodes to connect") + + var curr = init + var cntr = config.getInt("system.h2o.startup.polling.counter") + val pollingNode = config.getStringList("system.h2o.config.slaves").get(0) + while (curr - init < totl) { + logger.info(s"Connected ${curr - init} from $totl nodes") + // wait a bit + Thread.sleep(config.getInt("system.h2o.startup.polling.interval")) + // get new values + try { + curr = Integer.parseInt((shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Cloud.json | grep -Eo 'cloud_size":[0-9]+,' | grep -Eo '[0-9]+' """).stripLineEnd) + } catch { + case _ : Throwable => ; + } + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt("system.h2o.startup.max.attempts")) { + stop() + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + } + + override protected def stop() = { + val user = config.getString("system.h2o.user") + for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) { + shell ! s""" ssh $user@$dataNode "ps -ef | grep h2odriver.jar | grep -v grep | awk '{print \\$$2}' | xargs kill" """ + } + isUp = false + } + + def isRunning = { + val pollingNode = config.getStringList("system.h2o.config.slaves").get(0) + val user = config.getString("system.h2o.user") + (shell ! s""" ssh $user@$pollingNode "ps -ef | grep h2odriver.jar | grep -v grep " """) == 0 + } +} \ No newline at end of file From bb90e2f5799a37a48734721e73f630ab19690640 Mon Sep 17 00:00:00 2001 From: mingliang Date: Tue, 4 Aug 2015 15:59:50 +0200 Subject: [PATCH 2/2] bring peel-h2o up-to-date --- .../src/main/resources/peel-extensions.xml | 2 +- .../resources/reference.h2o-3.0.0.12.conf | 16 ++++ .../src/main/resources/reference.h2o.conf | 51 +++++----- .../h2o/beans/experiment/H2OExperiment.scala | 93 +++++++++++-------- .../extensions/h2o/beans/system/H2O.scala | 2 + 5 files changed, 95 insertions(+), 69 deletions(-) create mode 100644 peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf diff --git a/peel-extensions/src/main/resources/peel-extensions.xml b/peel-extensions/src/main/resources/peel-extensions.xml index fd59db68..c9b6f4eb 100644 --- a/peel-extensions/src/main/resources/peel-extensions.xml +++ b/peel-extensions/src/main/resources/peel-extensions.xml @@ -67,7 +67,7 @@ - + diff --git a/peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf b/peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf new file mode 100644 index 00000000..e073785f --- /dev/null +++ b/peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf @@ -0,0 +1,16 @@ +# include common h2o configuration +include "reference.h2o.conf" + +system { + h2o { + path { + #IMPORTANT: it has to be a pre-built version with HADOOP + #TODO: zip file is not supported + archive.url = "http://h2o-release.s3.amazonaws.com/h2o/rel-shannon/12/h2o-3.0.0.12-hdp2.2.zip" + archive.md5 = "297811A13E849D264F6223536D736B19" + archive.src = ${app.path.downloads}"/h2o-3.0.0.12-hdp2.2.zip" + home = ${system.h2o.path.archive.dst}"/h2o-3.0.0.12-hdp2.2" + } + + } +} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.h2o.conf b/peel-extensions/src/main/resources/reference.h2o.conf index c305321d..c4bdef36 100644 --- a/peel-extensions/src/main/resources/reference.h2o.conf +++ b/peel-extensions/src/main/resources/reference.h2o.conf @@ -1,31 +1,26 @@ system { - h2o { - user = ${system.default.user} - group = ${system.default.group} - path { - # uncomment the following section if you want to extract an archive on every run - # archive = { - # src = ${app.path.downloads}"/flink-bin-0.6-incubating-SNAPSHOT.tgz" - # dst = ${app.path.systems} - # } - home = ${app.path.systems}"/h2o" - } - startup { - max.attempts = ${system.default.startup.max.attempts} - polling { - counter = ${system.default.startup.polling.counter} - interval = ${system.default.startup.polling.interval} - } - } - config { - # put list of slaves - slaves = ${system.default.config.slaves} - cli { - tmp.dir = "/tmp/h2o-"${system.default.user} - dataport = 55555 - memory.per-node = 2g - parallelism.per-node = ${system.default.config.parallelism.per-node} - } - } + h2o { + user = ${system.default.user} + group = ${system.default.group} + path { + archive.dst = ${app.path.systems} + } + startup { + max.attempts = ${system.default.startup.max.attempts} + polling { + counter = ${system.default.startup.polling.counter} + interval = ${system.default.startup.polling.interval} + } + } + config { + # put list of slaves + slaves = ${system.default.config.slaves} + cli { + tmp.dir = "/tmp/h2o-"${system.default.user} + dataport = 55555 + memory.per-node = 2g + parallelism.per-node = ${system.default.config.parallelism.per-node} + } + } } } \ No newline at end of file diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala index 976f3b05..1dbabf57 100644 --- a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala +++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala @@ -11,33 +11,65 @@ import eu.stratosphere.peel.core.util.shell import eu.stratosphere.peel.extensions.h2o.beans.system.H2O import spray.json._ -/** - * Created by qml_moon on 23/05/15. - */ -class H2OExperiment(command: String, - runner: H2O, - runs: Int, - inputs: Set[DataSet], - outputs: Set[ExperimentOutput], - name: String, - config: Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) { +/** An [[eu.stratosphere.peel.core.beans.experiment.Experiment Experiment]] implementation which handles the execution + * of a single H2O job. + * + * Note: + * H2O currently doesn't support submitting a java/scala job in jar file. + * There are 2 ways to submit a h2o job: + * 1. through interactive web interface, only the pre-implemented algorithms are supported. + * 2. execute a python/R script. precondition: install the h2o python/R package + * + * So if you don't want to install the h2o python/R package, you have to write a java program which simulate all + * the actions on the web interface. + */ +class H2OExperiment( + command: String, + runner : H2O, + runs : Int, + inputs : Set[DataSet], + outputs: Set[ExperimentOutput], + name : String, + config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) { + + def this( + runs : Int, + runner : H2O, + input : DataSet, + output : ExperimentOutput, + command: String, + name : String, + config : Config) = this(command, runner, runs, Set(input), Set(output), name, config) + + def this( + runs : Int, + runner : H2O, + inputs : Set[DataSet], + output : ExperimentOutput, + command: String, + name : String, + config : Config) = this(command, runner, runs, inputs, Set(output), name, config) - def this(runs: Int, runner: H2O, input: DataSet, output: ExperimentOutput, command: String, name: String, config: Config) = this(command, runner, runs, Set(input), Set(output), name, config) + override def run(id: Int, force: Boolean): Experiment.Run[H2O] = new H2OExperiment.SingleJobRun(id, this, force) - def this(runs: Int, runner: H2O, inputs: Set[DataSet], output: ExperimentOutput, command: String, name: String, config: Config) = this(command, runner, runs, inputs, Set(output), name, config) + def copy(name: String = name, config: Config = config) = new H2OExperiment(command, runner, runs, inputs, outputs, name, config) - override def run(id: Int, force: Boolean): Experiment.Run[H2O] = new H2OExperiment.SingleJobRun(id, this, force) } object H2OExperiment { - case class State(name: String, - command: String, - var runExitCode: Option[Int] = None, - var runTime: Long = 0) extends Experiment.RunState {} + case class State( + name: String, + suiteName: String, + command: String, + runnerID: String, + runnerName: String, + runnerVersion: String, + var runExitCode: Option[Int] = None, + var runTime: Long = 0) extends Experiment.RunState {} object StateProtocol extends DefaultJsonProtocol with NullOptions { - implicit val stateFormat = jsonFormat4(State) + implicit val stateFormat = jsonFormat8(State) } /** A private inner class encapsulating the logic of single run. */ @@ -60,10 +92,10 @@ object H2OExperiment { try { io.Source.fromFile(s"$home/state.json").mkString.parseJson.convertTo[State] } catch { - case e: Throwable => State(name, command) + case e: Throwable => State(name, Sys.getProperty("app.suite.name"), command, exp.runner.beanName, exp.runner.name, exp.runner.version) } } else { - State(name, command) + State(name, Sys.getProperty("app.suite.name"), command, exp.runner.beanName, exp.runner.name, exp.runner.version) } } @@ -78,22 +110,6 @@ object H2OExperiment { val (runExit, t) = Experiment.time(this ! (command, s"$home/run.out", s"$home/run.err")) state.runTime = t state.runExitCode = Some(runExit) -// jobids = (this !!(command)).split("\n") - -// var isDone = false -// while (!isDone) { -// Thread.sleep(exp.config.getInt("system.h2o.startup.polling.interval") * 5) -// val status = (this getStatus(jobids(1))) -// if (status == "DONE" || status == "FAILED") { -// isDone = true -// state.runTime = Integer.parseInt(this getRuntime(exp.config, jobids(1))) -// state.runExitCode = status match { -// case "DONE" => Some(0) -// case "FAILED" => Some(-1) -// } -// } -// } - } /** Before the run, collect runner log files and their current line counts */ @@ -135,16 +151,13 @@ object H2OExperiment { } private def !(command: String, outFile: String, errFile: String) = { - shell ! s"$command $pollingNode:$dataPort > $outFile 2> $errFile" + shell ! s"$command > $outFile 2> $errFile" } private def getStatus(jobid: String): String = { (shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")} | grep -Eo '"status":"[A-Z]+"' | grep -Eo [A-Z]+ """).stripLineEnd } -// private def getRuntime(config: Config, jobid: String): String = { -// (shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")} | grep -Eo '"msec":[0-9]+' | grep -Eo [0-9]+ """).stripLineEnd -// } } } \ No newline at end of file diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala index c1c76df6..fad30159 100644 --- a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala +++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala @@ -17,6 +17,8 @@ import scala.collection.JavaConverters._ * @param lifespan [[eu.stratosphere.peel.core.beans.system.Lifespan Lifespan]] of the system * @param dependencies Set of dependencies that this system needs * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + * + * */ class H2O(version: String, lifespan: Lifespan, dependencies: Set[System] = Set(), mc: Mustache.Compiler) extends System("h2o", version, lifespan, dependencies, mc) {