diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
index 43cc6c76..e9dfa4f8 100644
--- a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
+++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
@@ -12,11 +12,31 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
index be301fc8..5490a13a 100644
--- a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
+++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
@@ -42,6 +42,10 @@
org.peelframework
peel-core
+
+ org.peelframework
+ peel-extensions
+
diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala
new file mode 100644
index 00000000..49a6c0eb
--- /dev/null
+++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala
@@ -0,0 +1,202 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}
+
+import com.samskivert.mustache.Mustache
+import com.typesafe.config.ConfigFactory
+import org.peelframework.core.beans.data.{CopiedDataSet, DataSet, ExperimentOutput, GeneratedDataSet}
+import org.peelframework.core.beans.experiment.ExperimentSequence.SimpleParameters
+import org.peelframework.core.beans.experiment.{ExperimentSequence, ExperimentSuite}
+import org.peelframework.core.beans.system.Lifespan
+import org.peelframework.flink.beans.experiment.FlinkExperiment
+import org.peelframework.flink.beans.job.FlinkJob
+import org.peelframework.flink.beans.system.Flink
+import org.peelframework.hadoop.beans.system.HDFS2
+import org.peelframework.spark.beans.experiment.SparkExperiment
+import org.peelframework.spark.beans.system.Spark
+import org.springframework.context.annotation.{Bean, Configuration}
+import org.springframework.context.{ApplicationContext, ApplicationContextAware}
+
+/** Experiments definitions for the '${parentArtifactId}' bundle. */
+@Configuration
+class ExperimentsDefinitions extends ApplicationContextAware {
+
+ /* The enclosing application context. */
+ var ctx: ApplicationContext = null
+
+ def setApplicationContext(ctx: ApplicationContext): Unit = {
+ this.ctx = ctx
+ }
+
+ // ---------------------------------------------------
+ // Systems
+ // ---------------------------------------------------
+
+ @Bean(name = Array("hdfs-2.7.1"))
+ def `hdfs-2.7.1`: HDFS2 = new HDFS2(
+ version = "0.9.0",
+ configKey = "hadoop-2",
+ lifespan = Lifespan.SUITE,
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ @Bean(name = Array("flink-0.9.0"))
+ def `flink-0.9.0`: Flink = new Flink(
+ version = "0.9.0",
+ configKey = "flink",
+ lifespan = Lifespan.EXPERIMENT,
+ dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])),
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ @Bean(name = Array("spark-1.3.1"))
+ def `spark-1.3.1`: Spark = new Spark(
+ version = "1.3.1",
+ configKey = "spark",
+ lifespan = Lifespan.EXPERIMENT,
+ dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])),
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ // ---------------------------------------------------
+ // Data Generators
+ // ---------------------------------------------------
+
+ @Bean(name = Array("datagen.words"))
+ def `datagen.words`: FlinkJob = new FlinkJob(
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ command =
+ """
+ |-v -c ${package}.datagen.flink.WordGenerator ${symbol_escape}
+ |${symbol_dollar}{app.path.datagens}/${parentArtifactId}-datagens-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{datagen.dictionary.dize} ${symbol_escape}
+ |${symbol_dollar}{system.default.config.parallelism.total} ${symbol_escape}
+ |${symbol_dollar}{datagen.tuples.per.task} ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt
+ """.stripMargin.trim
+ )
+
+ // ---------------------------------------------------
+ // Data Sets
+ // ---------------------------------------------------
+
+ @Bean(name = Array("dataset.words.static"))
+ def `dataset.words.static`: DataSet = new CopiedDataSet(
+ src = "${symbol_dollar}{app.path.datasets}/rubbish.txt",
+ dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ @Bean(name = Array("dataset.words.generated"))
+ def `dataset.words.generated`: DataSet = new GeneratedDataSet(
+ src = ctx.getBean("datagen.words", classOf[FlinkJob]),
+ dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ @Bean(name = Array("wordcount.output"))
+ def `wordcount.output`: ExperimentOutput = new ExperimentOutput(
+ path = "${symbol_dollar}{system.hadoop-2.path.output}/wordcount",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ // ---------------------------------------------------
+ // Experiments
+ // ---------------------------------------------------
+
+ @Bean(name = Array("wordcount.default"))
+ def `wordcount.default`: ExperimentSuite = {
+ val `wordcount.flink.default` = new FlinkExperiment(
+ name = "wordcount.flink.default",
+ command =
+ """
+ |-v -c ${package}.flink.FlinkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(""),
+ runs = 3,
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ val `wordcount.spark.default` = new SparkExperiment(
+ name = "wordcount.spark.default",
+ command =
+ """
+ |--class ${package}.spark.SparkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(""),
+ runs = 3,
+ runner = ctx.getBean("spark-1.3.1", classOf[Spark]),
+ inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ new ExperimentSuite(Seq(
+ `wordcount.flink.default`,
+ `wordcount.spark.default`))
+ }
+
+ @Bean(name = Array("wordcount.scale-out"))
+ def `wordcount.scale-out`: ExperimentSuite = {
+ val `wordcount.flink.prototype` = new FlinkExperiment(
+ name = "wordcount.flink.__topXXX__",
+ command =
+ """
+ |-v -c ${package}.flink.FlinkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(
+ """
+ |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts}
+ |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism}
+ |datagen.dictionary.dize = 10000
+ |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB
+ """.stripMargin.trim),
+ runs = 3,
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ val `wordcount.spark.prototype` = new SparkExperiment(
+ name = "wordcount.spark.__topXXX__",
+ command =
+ """
+ |--class ${package}.spark.SparkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(
+ """
+ |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts}
+ |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism}
+ |datagen.dictionary.dize = 10000
+ |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB
+ """.stripMargin.trim),
+ runs = 3,
+ runner = ctx.getBean("spark-1.3.1", classOf[Spark]),
+ inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ new ExperimentSuite(
+ new ExperimentSequence(
+ parameters = new SimpleParameters(
+ paramName = "topXXX",
+ paramVals = Seq("top005", "top010", "top020")),
+ prototypes = Seq(
+ `wordcount.flink.prototype`,
+ `wordcount.spark.prototype`)))
+ }
+}
diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
index 5c3054e7..aa7820b2 100644
--- a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
+++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
@@ -94,7 +94,7 @@ class QueryRuntimes extends Command {
logger.info(s"------------------------------------------------------------------------------------------------")
logger.info(s"| name | name | min | max | median |")
logger.info(s"------------------------------------------------------------------------------------------------")
- for ((suite, name, median, min, max) <- runtimes) {
+ for ((suite, name, min, max, median) <- runtimes) {
logger.info(f"| ${symbol_dollar}suite%-25s | ${symbol_dollar}name%-25s | ${symbol_dollar}min%10d | ${symbol_dollar}max%10d | ${symbol_dollar}median%10d | ")
}
logger.info(s"------------------------------------------------------------------------------------------------")
diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
index 43cc6c76..e9dfa4f8 100644
--- a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
+++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
@@ -12,11 +12,31 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
index be301fc8..5490a13a 100644
--- a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
+++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
@@ -42,6 +42,10 @@
org.peelframework
peel-core
+
+ org.peelframework
+ peel-extensions
+
diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala
new file mode 100644
index 00000000..49a6c0eb
--- /dev/null
+++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala
@@ -0,0 +1,202 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}
+
+import com.samskivert.mustache.Mustache
+import com.typesafe.config.ConfigFactory
+import org.peelframework.core.beans.data.{CopiedDataSet, DataSet, ExperimentOutput, GeneratedDataSet}
+import org.peelframework.core.beans.experiment.ExperimentSequence.SimpleParameters
+import org.peelframework.core.beans.experiment.{ExperimentSequence, ExperimentSuite}
+import org.peelframework.core.beans.system.Lifespan
+import org.peelframework.flink.beans.experiment.FlinkExperiment
+import org.peelframework.flink.beans.job.FlinkJob
+import org.peelframework.flink.beans.system.Flink
+import org.peelframework.hadoop.beans.system.HDFS2
+import org.peelframework.spark.beans.experiment.SparkExperiment
+import org.peelframework.spark.beans.system.Spark
+import org.springframework.context.annotation.{Bean, Configuration}
+import org.springframework.context.{ApplicationContext, ApplicationContextAware}
+
+/** Experiments definitions for the '${parentArtifactId}' bundle. */
+@Configuration
+class ExperimentsDefinitions extends ApplicationContextAware {
+
+ /* The enclosing application context. */
+ var ctx: ApplicationContext = null
+
+ def setApplicationContext(ctx: ApplicationContext): Unit = {
+ this.ctx = ctx
+ }
+
+ // ---------------------------------------------------
+ // Systems
+ // ---------------------------------------------------
+
+ @Bean(name = Array("hdfs-2.7.1"))
+ def `hdfs-2.7.1`: HDFS2 = new HDFS2(
+ version = "0.9.0",
+ configKey = "hadoop-2",
+ lifespan = Lifespan.SUITE,
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ @Bean(name = Array("flink-0.9.0"))
+ def `flink-0.9.0`: Flink = new Flink(
+ version = "0.9.0",
+ configKey = "flink",
+ lifespan = Lifespan.EXPERIMENT,
+ dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])),
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ @Bean(name = Array("spark-1.3.1"))
+ def `spark-1.3.1`: Spark = new Spark(
+ version = "1.3.1",
+ configKey = "spark",
+ lifespan = Lifespan.EXPERIMENT,
+ dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])),
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ // ---------------------------------------------------
+ // Data Generators
+ // ---------------------------------------------------
+
+ @Bean(name = Array("datagen.words"))
+ def `datagen.words`: FlinkJob = new FlinkJob(
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ command =
+ """
+ |-v -c ${package}.datagen.flink.WordGenerator ${symbol_escape}
+ |${symbol_dollar}{app.path.datagens}/${parentArtifactId}-datagens-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{datagen.dictionary.dize} ${symbol_escape}
+ |${symbol_dollar}{system.default.config.parallelism.total} ${symbol_escape}
+ |${symbol_dollar}{datagen.tuples.per.task} ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt
+ """.stripMargin.trim
+ )
+
+ // ---------------------------------------------------
+ // Data Sets
+ // ---------------------------------------------------
+
+ @Bean(name = Array("dataset.words.static"))
+ def `dataset.words.static`: DataSet = new CopiedDataSet(
+ src = "${symbol_dollar}{app.path.datasets}/rubbish.txt",
+ dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ @Bean(name = Array("dataset.words.generated"))
+ def `dataset.words.generated`: DataSet = new GeneratedDataSet(
+ src = ctx.getBean("datagen.words", classOf[FlinkJob]),
+ dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ @Bean(name = Array("wordcount.output"))
+ def `wordcount.output`: ExperimentOutput = new ExperimentOutput(
+ path = "${symbol_dollar}{system.hadoop-2.path.output}/wordcount",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ // ---------------------------------------------------
+ // Experiments
+ // ---------------------------------------------------
+
+ @Bean(name = Array("wordcount.default"))
+ def `wordcount.default`: ExperimentSuite = {
+ val `wordcount.flink.default` = new FlinkExperiment(
+ name = "wordcount.flink.default",
+ command =
+ """
+ |-v -c ${package}.flink.FlinkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(""),
+ runs = 3,
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ val `wordcount.spark.default` = new SparkExperiment(
+ name = "wordcount.spark.default",
+ command =
+ """
+ |--class ${package}.spark.SparkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(""),
+ runs = 3,
+ runner = ctx.getBean("spark-1.3.1", classOf[Spark]),
+ inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ new ExperimentSuite(Seq(
+ `wordcount.flink.default`,
+ `wordcount.spark.default`))
+ }
+
+ @Bean(name = Array("wordcount.scale-out"))
+ def `wordcount.scale-out`: ExperimentSuite = {
+ val `wordcount.flink.prototype` = new FlinkExperiment(
+ name = "wordcount.flink.__topXXX__",
+ command =
+ """
+ |-v -c ${package}.flink.FlinkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(
+ """
+ |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts}
+ |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism}
+ |datagen.dictionary.dize = 10000
+ |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB
+ """.stripMargin.trim),
+ runs = 3,
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ val `wordcount.spark.prototype` = new SparkExperiment(
+ name = "wordcount.spark.__topXXX__",
+ command =
+ """
+ |--class ${package}.spark.SparkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(
+ """
+ |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts}
+ |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism}
+ |datagen.dictionary.dize = 10000
+ |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB
+ """.stripMargin.trim),
+ runs = 3,
+ runner = ctx.getBean("spark-1.3.1", classOf[Spark]),
+ inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ new ExperimentSuite(
+ new ExperimentSequence(
+ parameters = new SimpleParameters(
+ paramName = "topXXX",
+ paramVals = Seq("top005", "top010", "top020")),
+ prototypes = Seq(
+ `wordcount.flink.prototype`,
+ `wordcount.spark.prototype`)))
+ }
+}
diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
index 5c3054e7..aa7820b2 100644
--- a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
+++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
@@ -94,7 +94,7 @@ class QueryRuntimes extends Command {
logger.info(s"------------------------------------------------------------------------------------------------")
logger.info(s"| name | name | min | max | median |")
logger.info(s"------------------------------------------------------------------------------------------------")
- for ((suite, name, median, min, max) <- runtimes) {
+ for ((suite, name, min, max, median) <- runtimes) {
logger.info(f"| ${symbol_dollar}suite%-25s | ${symbol_dollar}name%-25s | ${symbol_dollar}min%10d | ${symbol_dollar}max%10d | ${symbol_dollar}median%10d | ")
}
logger.info(s"------------------------------------------------------------------------------------------------")
diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
index 43cc6c76..e9dfa4f8 100644
--- a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
+++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml
@@ -12,11 +12,31 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
index be301fc8..5490a13a 100644
--- a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
+++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml
@@ -42,6 +42,10 @@
org.peelframework
peel-core
+
+ org.peelframework
+ peel-extensions
+
diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala
new file mode 100644
index 00000000..49a6c0eb
--- /dev/null
+++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala
@@ -0,0 +1,202 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}
+
+import com.samskivert.mustache.Mustache
+import com.typesafe.config.ConfigFactory
+import org.peelframework.core.beans.data.{CopiedDataSet, DataSet, ExperimentOutput, GeneratedDataSet}
+import org.peelframework.core.beans.experiment.ExperimentSequence.SimpleParameters
+import org.peelframework.core.beans.experiment.{ExperimentSequence, ExperimentSuite}
+import org.peelframework.core.beans.system.Lifespan
+import org.peelframework.flink.beans.experiment.FlinkExperiment
+import org.peelframework.flink.beans.job.FlinkJob
+import org.peelframework.flink.beans.system.Flink
+import org.peelframework.hadoop.beans.system.HDFS2
+import org.peelframework.spark.beans.experiment.SparkExperiment
+import org.peelframework.spark.beans.system.Spark
+import org.springframework.context.annotation.{Bean, Configuration}
+import org.springframework.context.{ApplicationContext, ApplicationContextAware}
+
+/** Experiments definitions for the '${parentArtifactId}' bundle. */
+@Configuration
+class ExperimentsDefinitions extends ApplicationContextAware {
+
+ /* The enclosing application context. */
+ var ctx: ApplicationContext = null
+
+ def setApplicationContext(ctx: ApplicationContext): Unit = {
+ this.ctx = ctx
+ }
+
+ // ---------------------------------------------------
+ // Systems
+ // ---------------------------------------------------
+
+ @Bean(name = Array("hdfs-2.7.1"))
+ def `hdfs-2.7.1`: HDFS2 = new HDFS2(
+ version = "0.9.0",
+ configKey = "hadoop-2",
+ lifespan = Lifespan.SUITE,
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ @Bean(name = Array("flink-0.9.0"))
+ def `flink-0.9.0`: Flink = new Flink(
+ version = "0.9.0",
+ configKey = "flink",
+ lifespan = Lifespan.EXPERIMENT,
+ dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])),
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ @Bean(name = Array("spark-1.3.1"))
+ def `spark-1.3.1`: Spark = new Spark(
+ version = "1.3.1",
+ configKey = "spark",
+ lifespan = Lifespan.EXPERIMENT,
+ dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])),
+ mc = ctx.getBean(classOf[Mustache.Compiler])
+ )
+
+ // ---------------------------------------------------
+ // Data Generators
+ // ---------------------------------------------------
+
+ @Bean(name = Array("datagen.words"))
+ def `datagen.words`: FlinkJob = new FlinkJob(
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ command =
+ """
+ |-v -c ${package}.datagen.flink.WordGenerator ${symbol_escape}
+ |${symbol_dollar}{app.path.datagens}/${parentArtifactId}-datagens-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{datagen.dictionary.dize} ${symbol_escape}
+ |${symbol_dollar}{system.default.config.parallelism.total} ${symbol_escape}
+ |${symbol_dollar}{datagen.tuples.per.task} ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt
+ """.stripMargin.trim
+ )
+
+ // ---------------------------------------------------
+ // Data Sets
+ // ---------------------------------------------------
+
+ @Bean(name = Array("dataset.words.static"))
+ def `dataset.words.static`: DataSet = new CopiedDataSet(
+ src = "${symbol_dollar}{app.path.datasets}/rubbish.txt",
+ dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ @Bean(name = Array("dataset.words.generated"))
+ def `dataset.words.generated`: DataSet = new GeneratedDataSet(
+ src = ctx.getBean("datagen.words", classOf[FlinkJob]),
+ dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ @Bean(name = Array("wordcount.output"))
+ def `wordcount.output`: ExperimentOutput = new ExperimentOutput(
+ path = "${symbol_dollar}{system.hadoop-2.path.output}/wordcount",
+ fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2])
+ )
+
+ // ---------------------------------------------------
+ // Experiments
+ // ---------------------------------------------------
+
+ @Bean(name = Array("wordcount.default"))
+ def `wordcount.default`: ExperimentSuite = {
+ val `wordcount.flink.default` = new FlinkExperiment(
+ name = "wordcount.flink.default",
+ command =
+ """
+ |-v -c ${package}.flink.FlinkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(""),
+ runs = 3,
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ val `wordcount.spark.default` = new SparkExperiment(
+ name = "wordcount.spark.default",
+ command =
+ """
+ |--class ${package}.spark.SparkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(""),
+ runs = 3,
+ runner = ctx.getBean("spark-1.3.1", classOf[Spark]),
+ inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ new ExperimentSuite(Seq(
+ `wordcount.flink.default`,
+ `wordcount.spark.default`))
+ }
+
+ @Bean(name = Array("wordcount.scale-out"))
+ def `wordcount.scale-out`: ExperimentSuite = {
+ val `wordcount.flink.prototype` = new FlinkExperiment(
+ name = "wordcount.flink.__topXXX__",
+ command =
+ """
+ |-v -c ${package}.flink.FlinkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(
+ """
+ |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts}
+ |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism}
+ |datagen.dictionary.dize = 10000
+ |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB
+ """.stripMargin.trim),
+ runs = 3,
+ runner = ctx.getBean("flink-0.9.0", classOf[Flink]),
+ inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ val `wordcount.spark.prototype` = new SparkExperiment(
+ name = "wordcount.spark.__topXXX__",
+ command =
+ """
+ |--class ${package}.spark.SparkWC ${symbol_escape}
+ |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape}
+ |${symbol_dollar}{system.hadoop-2.path.output}/wordcount
+ """.stripMargin.trim,
+ config = ConfigFactory.parseString(
+ """
+ |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts}
+ |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism}
+ |datagen.dictionary.dize = 10000
+ |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB
+ """.stripMargin.trim),
+ runs = 3,
+ runner = ctx.getBean("spark-1.3.1", classOf[Spark]),
+ inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])),
+ outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput]))
+ )
+
+ new ExperimentSuite(
+ new ExperimentSequence(
+ parameters = new SimpleParameters(
+ paramName = "topXXX",
+ paramVals = Seq("top005", "top010", "top020")),
+ prototypes = Seq(
+ `wordcount.flink.prototype`,
+ `wordcount.spark.prototype`)))
+ }
+}
diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
index 5c3054e7..aa7820b2 100644
--- a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
+++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala
@@ -94,7 +94,7 @@ class QueryRuntimes extends Command {
logger.info(s"------------------------------------------------------------------------------------------------")
logger.info(s"| name | name | min | max | median |")
logger.info(s"------------------------------------------------------------------------------------------------")
- for ((suite, name, median, min, max) <- runtimes) {
+ for ((suite, name, min, max, median) <- runtimes) {
logger.info(f"| ${symbol_dollar}suite%-25s | ${symbol_dollar}name%-25s | ${symbol_dollar}min%10d | ${symbol_dollar}max%10d | ${symbol_dollar}median%10d | ")
}
logger.info(s"------------------------------------------------------------------------------------------------")