From e5f7b4e7e40469379ba722802d2f65beb25844dc Mon Sep 17 00:00:00 2001 From: Alexander Alexandrov Date: Tue, 11 Aug 2015 18:02:07 +0200 Subject: [PATCH] Added support for extra systems in Experiment beans. --- .../core/beans/experiment/Experiment.scala | 16 +++++++++------- .../org/peelframework/core/graph/package.scala | 8 +++++--- .../flink/beans/experiment/FlinkExperiment.scala | 15 ++++++++++----- .../spark/beans/experiment/SparkExperiment.scala | 16 ++++++++++------ 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/peel-core/src/main/scala/org/peelframework/core/beans/experiment/Experiment.scala b/peel-core/src/main/scala/org/peelframework/core/beans/experiment/Experiment.scala index 337da088..3307dac6 100644 --- a/peel-core/src/main/scala/org/peelframework/core/beans/experiment/Experiment.scala +++ b/peel-core/src/main/scala/org/peelframework/core/beans/experiment/Experiment.scala @@ -31,6 +31,7 @@ import scala.language.postfixOps * You do not have to state the command that is used to 'run' the command (e.g. in Flink * ./bin/flink run * + * @param systems Systems that are required for the experiment (excluding the runner). * @param runner The system that is used to run the experiment (e.g. Flink, Spark, ...) * @param runs The number of runs/repetitions of this experiment * @param inputs Input Datasets for the experiment @@ -40,13 +41,14 @@ import scala.language.postfixOps * @tparam R The system that is used to execute the experiment */ abstract class Experiment[+R <: System]( - val command: String, - val runner: R, - val runs: Int, - val inputs: Set[DataSet], - val outputs: Set[ExperimentOutput], - val name: String, - var config: Config) extends Node with Configurable { + val command : String, + val systems : Set[System], + val runner : R, + val runs : Int, + val inputs : Set[DataSet], + val outputs : Set[ExperimentOutput], + val name : String, + var config : Config) extends Node with Configurable { /** Experiment run factory method. * diff --git a/peel-core/src/main/scala/org/peelframework/core/graph/package.scala b/peel-core/src/main/scala/org/peelframework/core/graph/package.scala index 0c00078e..75af46ec 100644 --- a/peel-core/src/main/scala/org/peelframework/core/graph/package.scala +++ b/peel-core/src/main/scala/org/peelframework/core/graph/package.scala @@ -41,9 +41,11 @@ package object graph { for (e <- suite.experiments) { g.addEdge(suite, e) - // add the experiment runner - g.addEdge(e, e.runner) - processDependencies(e.runner) + // add the experiment runner and the additional experiment systems + for (s <- Set(e.runner) ++ e.systems) { + g.addEdge(e, s) + processDependencies(s) + } // add the experiment inputs and their dependencies for (i <- e.inputs) { diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/experiment/FlinkExperiment.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/experiment/FlinkExperiment.scala index b2ed95de..35cd782a 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/experiment/FlinkExperiment.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/experiment/FlinkExperiment.scala @@ -7,6 +7,7 @@ import java.lang.{System => Sys} import com.typesafe.config.Config import org.peelframework.core.beans.data.{DataSet, ExperimentOutput} import org.peelframework.core.beans.experiment.Experiment +import org.peelframework.core.beans.system.System import org.peelframework.core.util.{Version, shell} import org.peelframework.flink.beans.system.Flink import spray.json._ @@ -14,46 +15,50 @@ import spray.json._ /** An `Experiment` implementation which handles the execution of a single Flink job. */ class FlinkExperiment( command: String, + systems: Set[System], runner : Flink, runs : Int, inputs : Set[DataSet], outputs: Set[ExperimentOutput], name : String, - config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) { + config : Config) extends Experiment(command, systems, runner, runs, inputs, outputs, name, config) { def this( command: String, + systems: Set[System], runner : Flink, runs : Int, input : DataSet, output : ExperimentOutput, name : String, - config : Config) = this(command, runner, runs, Set(input), Set(output), name, config) + config : Config) = this(command, systems, runner, runs, Set(input), Set(output), name, config) def this( command: String, + systems: Set[System], runner : Flink, runs : Int, inputs : Set[DataSet], output : ExperimentOutput, name : String, - config : Config) = this(command, runner, runs, inputs, Set(output), name, config) + config : Config) = this(command, systems, runner, runs, inputs, Set(output), name, config) def this( command: String, + systems: Set[System], runner : Flink, runs : Int, input : DataSet, outputs: Set[ExperimentOutput], name : String, - config : Config) = this(command, runner, runs, Set(input), outputs, name, config) + config : Config) = this(command, systems, runner, runs, Set(input), outputs, name, config) override def run(id: Int, force: Boolean): Experiment.Run[Flink] = { new FlinkExperiment.SingleJobRun(id, this, force) } def copy(name: String = name, config: Config = config) = { - new FlinkExperiment(command, runner, runs, inputs, outputs, name, config) + new FlinkExperiment(command, systems, runner, runs, inputs, outputs, name, config) } } diff --git a/peel-extensions/src/main/scala/org/peelframework/spark/beans/experiment/SparkExperiment.scala b/peel-extensions/src/main/scala/org/peelframework/spark/beans/experiment/SparkExperiment.scala index 0fddb184..46418db3 100644 --- a/peel-extensions/src/main/scala/org/peelframework/spark/beans/experiment/SparkExperiment.scala +++ b/peel-extensions/src/main/scala/org/peelframework/spark/beans/experiment/SparkExperiment.scala @@ -7,54 +7,58 @@ import java.nio.file._ import com.typesafe.config.Config import org.peelframework.core.beans.data.{DataSet, ExperimentOutput} import org.peelframework.core.beans.experiment.Experiment +import org.peelframework.core.beans.system.System import org.peelframework.core.util.shell import org.peelframework.spark.beans.system.Spark -import org.peelframework.spark.beans.system.Spark import spray.json._ /** An `Expriment` implementation which handles the execution of a single Spark job. */ class SparkExperiment( command: String, + systems: Set[System], runner : Spark, runs : Int, inputs : Set[DataSet], outputs: Set[ExperimentOutput], name : String, - config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) { + config : Config) extends Experiment(command, systems, runner, runs, inputs, outputs, name, config) { def this( command: String, + systems: Set[System], runner : Spark, runs : Int, input : DataSet, output : ExperimentOutput, name : String, - config : Config) = this(command, runner, runs, Set(input), Set(output), name, config) + config : Config) = this(command, systems, runner, runs, Set(input), Set(output), name, config) def this( command: String, + systems: Set[System], runner : Spark, runs : Int, inputs : Set[DataSet], output : ExperimentOutput, name : String, - config : Config) = this(command, runner, runs, inputs, Set(output), name, config) + config : Config) = this(command, systems, runner, runs, inputs, Set(output), name, config) def this( command: String, + systems: Set[System], runner : Spark, runs : Int, input : DataSet, outputs: Set[ExperimentOutput], name : String, - config : Config) = this(command, runner, runs, Set(input), outputs, name, config) + config : Config) = this(command, systems, runner, runs, Set(input), outputs, name, config) override def run(id: Int, force: Boolean): Experiment.Run[Spark] = { new SparkExperiment.SingleJobRun(id, this, force) } def copy(name: String = name, config: Config = config) = { - new SparkExperiment(command, runner, runs, inputs, outputs, name, config) + new SparkExperiment(command, systems, runner, runs, inputs, outputs, name, config) } }