Skip to content

Commit

Permalink
Added support for extra systems in Experiment beans.
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexandrov committed Aug 11, 2015
1 parent 99093ac commit e5f7b4e
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.language.postfixOps
* You do not have to state the command that is used to 'run' the command (e.g. in Flink
* <code> ./bin/flink run </code>
*
* @param systems Systems that are required for the experiment (excluding the runner).
* @param runner The system that is used to run the experiment (e.g. Flink, Spark, ...)
* @param runs The number of runs/repetitions of this experiment
* @param inputs Input Datasets for the experiment
Expand All @@ -40,13 +41,14 @@ import scala.language.postfixOps
* @tparam R The system that is used to execute the experiment
*/
abstract class Experiment[+R <: System](
val command: String,
val runner: R,
val runs: Int,
val inputs: Set[DataSet],
val outputs: Set[ExperimentOutput],
val name: String,
var config: Config) extends Node with Configurable {
val command : String,
val systems : Set[System],
val runner : R,
val runs : Int,
val inputs : Set[DataSet],
val outputs : Set[ExperimentOutput],
val name : String,
var config : Config) extends Node with Configurable {

/** Experiment run factory method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ package object graph {

for (e <- suite.experiments) {
g.addEdge(suite, e)
// add the experiment runner
g.addEdge(e, e.runner)
processDependencies(e.runner)
// add the experiment runner and the additional experiment systems
for (s <- Set(e.runner) ++ e.systems) {
g.addEdge(e, s)
processDependencies(s)
}

// add the experiment inputs and their dependencies
for (i <- e.inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,58 @@ import java.lang.{System => Sys}
import com.typesafe.config.Config
import org.peelframework.core.beans.data.{DataSet, ExperimentOutput}
import org.peelframework.core.beans.experiment.Experiment
import org.peelframework.core.beans.system.System
import org.peelframework.core.util.{Version, shell}
import org.peelframework.flink.beans.system.Flink
import spray.json._

/** An `Experiment` implementation which handles the execution of a single Flink job. */
class FlinkExperiment(
command: String,
systems: Set[System],
runner : Flink,
runs : Int,
inputs : Set[DataSet],
outputs: Set[ExperimentOutput],
name : String,
config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) {
config : Config) extends Experiment(command, systems, runner, runs, inputs, outputs, name, config) {

def this(
command: String,
systems: Set[System],
runner : Flink,
runs : Int,
input : DataSet,
output : ExperimentOutput,
name : String,
config : Config) = this(command, runner, runs, Set(input), Set(output), name, config)
config : Config) = this(command, systems, runner, runs, Set(input), Set(output), name, config)

def this(
command: String,
systems: Set[System],
runner : Flink,
runs : Int,
inputs : Set[DataSet],
output : ExperimentOutput,
name : String,
config : Config) = this(command, runner, runs, inputs, Set(output), name, config)
config : Config) = this(command, systems, runner, runs, inputs, Set(output), name, config)

def this(
command: String,
systems: Set[System],
runner : Flink,
runs : Int,
input : DataSet,
outputs: Set[ExperimentOutput],
name : String,
config : Config) = this(command, runner, runs, Set(input), outputs, name, config)
config : Config) = this(command, systems, runner, runs, Set(input), outputs, name, config)

override def run(id: Int, force: Boolean): Experiment.Run[Flink] = {
new FlinkExperiment.SingleJobRun(id, this, force)
}

def copy(name: String = name, config: Config = config) = {
new FlinkExperiment(command, runner, runs, inputs, outputs, name, config)
new FlinkExperiment(command, systems, runner, runs, inputs, outputs, name, config)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,58 @@ import java.nio.file._
import com.typesafe.config.Config
import org.peelframework.core.beans.data.{DataSet, ExperimentOutput}
import org.peelframework.core.beans.experiment.Experiment
import org.peelframework.core.beans.system.System
import org.peelframework.core.util.shell
import org.peelframework.spark.beans.system.Spark
import org.peelframework.spark.beans.system.Spark
import spray.json._

/** An `Expriment` implementation which handles the execution of a single Spark job. */
class SparkExperiment(
command: String,
systems: Set[System],
runner : Spark,
runs : Int,
inputs : Set[DataSet],
outputs: Set[ExperimentOutput],
name : String,
config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) {
config : Config) extends Experiment(command, systems, runner, runs, inputs, outputs, name, config) {

def this(
command: String,
systems: Set[System],
runner : Spark,
runs : Int,
input : DataSet,
output : ExperimentOutput,
name : String,
config : Config) = this(command, runner, runs, Set(input), Set(output), name, config)
config : Config) = this(command, systems, runner, runs, Set(input), Set(output), name, config)

def this(
command: String,
systems: Set[System],
runner : Spark,
runs : Int,
inputs : Set[DataSet],
output : ExperimentOutput,
name : String,
config : Config) = this(command, runner, runs, inputs, Set(output), name, config)
config : Config) = this(command, systems, runner, runs, inputs, Set(output), name, config)

def this(
command: String,
systems: Set[System],
runner : Spark,
runs : Int,
input : DataSet,
outputs: Set[ExperimentOutput],
name : String,
config : Config) = this(command, runner, runs, Set(input), outputs, name, config)
config : Config) = this(command, systems, runner, runs, Set(input), outputs, name, config)

override def run(id: Int, force: Boolean): Experiment.Run[Spark] = {
new SparkExperiment.SingleJobRun(id, this, force)
}

def copy(name: String = name, config: Config = config) = {
new SparkExperiment(command, runner, runs, inputs, outputs, name, config)
new SparkExperiment(command, systems, runner, runs, inputs, outputs, name, config)
}
}

Expand Down

0 comments on commit e5f7b4e

Please sign in to comment.