From 4a8e6d06f902cf304074509c55880192f03bb327 Mon Sep 17 00:00:00 2001 From: Alexander Alexandrov Date: Mon, 7 Sep 2015 16:55:15 +0300 Subject: [PATCH] Updated archetypes. --- .../src/main/resources/config/experiments.xml | 22 +- .../__rootArtifactId__-peelextensions/pom.xml | 4 + .../main/scala/ExperimentsDefinitions.scala | 202 ++++++++++++++++++ .../scala/cli/command/QueryRuntimes.scala | 2 +- .../src/main/resources/config/experiments.xml | 22 +- .../__rootArtifactId__-peelextensions/pom.xml | 4 + .../main/scala/ExperimentsDefinitions.scala | 202 ++++++++++++++++++ .../scala/cli/command/QueryRuntimes.scala | 2 +- .../src/main/resources/config/experiments.xml | 22 +- .../__rootArtifactId__-peelextensions/pom.xml | 4 + .../main/scala/ExperimentsDefinitions.scala | 202 ++++++++++++++++++ .../scala/cli/command/QueryRuntimes.scala | 2 +- 12 files changed, 684 insertions(+), 6 deletions(-) create mode 100644 peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala create mode 100644 peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala create mode 100644 peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml index 43cc6c76..e9dfa4f8 100644 --- a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml +++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml @@ -12,11 +12,31 @@ - + + + + + + + + + + + + + + + + + + + + + diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml index be301fc8..5490a13a 100644 --- a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml +++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml @@ -42,6 +42,10 @@ org.peelframework peel-core + + org.peelframework + peel-extensions + diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala new file mode 100644 index 00000000..49a6c0eb --- /dev/null +++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala @@ -0,0 +1,202 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +package ${package} + +import com.samskivert.mustache.Mustache +import com.typesafe.config.ConfigFactory +import org.peelframework.core.beans.data.{CopiedDataSet, DataSet, ExperimentOutput, GeneratedDataSet} +import org.peelframework.core.beans.experiment.ExperimentSequence.SimpleParameters +import org.peelframework.core.beans.experiment.{ExperimentSequence, ExperimentSuite} +import org.peelframework.core.beans.system.Lifespan +import org.peelframework.flink.beans.experiment.FlinkExperiment +import org.peelframework.flink.beans.job.FlinkJob +import org.peelframework.flink.beans.system.Flink +import org.peelframework.hadoop.beans.system.HDFS2 +import org.peelframework.spark.beans.experiment.SparkExperiment +import org.peelframework.spark.beans.system.Spark +import org.springframework.context.annotation.{Bean, Configuration} +import org.springframework.context.{ApplicationContext, ApplicationContextAware} + +/** Experiments definitions for the '${parentArtifactId}' bundle. */ +@Configuration +class ExperimentsDefinitions extends ApplicationContextAware { + + /* The enclosing application context. */ + var ctx: ApplicationContext = null + + def setApplicationContext(ctx: ApplicationContext): Unit = { + this.ctx = ctx + } + + // --------------------------------------------------- + // Systems + // --------------------------------------------------- + + @Bean(name = Array("hdfs-2.7.1")) + def `hdfs-2.7.1`: HDFS2 = new HDFS2( + version = "0.9.0", + configKey = "hadoop-2", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("flink-0.9.0")) + def `flink-0.9.0`: Flink = new Flink( + version = "0.9.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])), + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("spark-1.3.1")) + def `spark-1.3.1`: Spark = new Spark( + version = "1.3.1", + configKey = "spark", + lifespan = Lifespan.EXPERIMENT, + dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])), + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + // --------------------------------------------------- + // Data Generators + // --------------------------------------------------- + + @Bean(name = Array("datagen.words")) + def `datagen.words`: FlinkJob = new FlinkJob( + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + command = + """ + |-v -c ${package}.datagen.flink.WordGenerator ${symbol_escape} + |${symbol_dollar}{app.path.datagens}/${parentArtifactId}-datagens-${version}.jar ${symbol_escape} + |${symbol_dollar}{datagen.dictionary.dize} ${symbol_escape} + |${symbol_dollar}{system.default.config.parallelism.total} ${symbol_escape} + |${symbol_dollar}{datagen.tuples.per.task} ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt + """.stripMargin.trim + ) + + // --------------------------------------------------- + // Data Sets + // --------------------------------------------------- + + @Bean(name = Array("dataset.words.static")) + def `dataset.words.static`: DataSet = new CopiedDataSet( + src = "${symbol_dollar}{app.path.datasets}/rubbish.txt", + dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + @Bean(name = Array("dataset.words.generated")) + def `dataset.words.generated`: DataSet = new GeneratedDataSet( + src = ctx.getBean("datagen.words", classOf[FlinkJob]), + dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + @Bean(name = Array("wordcount.output")) + def `wordcount.output`: ExperimentOutput = new ExperimentOutput( + path = "${symbol_dollar}{system.hadoop-2.path.output}/wordcount", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + // --------------------------------------------------- + // Experiments + // --------------------------------------------------- + + @Bean(name = Array("wordcount.default")) + def `wordcount.default`: ExperimentSuite = { + val `wordcount.flink.default` = new FlinkExperiment( + name = "wordcount.flink.default", + command = + """ + |-v -c ${package}.flink.FlinkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString(""), + runs = 3, + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + val `wordcount.spark.default` = new SparkExperiment( + name = "wordcount.spark.default", + command = + """ + |--class ${package}.spark.SparkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString(""), + runs = 3, + runner = ctx.getBean("spark-1.3.1", classOf[Spark]), + inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + new ExperimentSuite(Seq( + `wordcount.flink.default`, + `wordcount.spark.default`)) + } + + @Bean(name = Array("wordcount.scale-out")) + def `wordcount.scale-out`: ExperimentSuite = { + val `wordcount.flink.prototype` = new FlinkExperiment( + name = "wordcount.flink.__topXXX__", + command = + """ + |-v -c ${package}.flink.FlinkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString( + """ + |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts} + |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism} + |datagen.dictionary.dize = 10000 + |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB + """.stripMargin.trim), + runs = 3, + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + val `wordcount.spark.prototype` = new SparkExperiment( + name = "wordcount.spark.__topXXX__", + command = + """ + |--class ${package}.spark.SparkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString( + """ + |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts} + |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism} + |datagen.dictionary.dize = 10000 + |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB + """.stripMargin.trim), + runs = 3, + runner = ctx.getBean("spark-1.3.1", classOf[Spark]), + inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + new ExperimentSuite( + new ExperimentSequence( + parameters = new SimpleParameters( + paramName = "topXXX", + paramVals = Seq("top005", "top010", "top020")), + prototypes = Seq( + `wordcount.flink.prototype`, + `wordcount.spark.prototype`))) + } +} diff --git a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala index 5c3054e7..aa7820b2 100644 --- a/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala +++ b/peel-archetypes/peel-flink-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala @@ -94,7 +94,7 @@ class QueryRuntimes extends Command { logger.info(s"------------------------------------------------------------------------------------------------") logger.info(s"| name | name | min | max | median |") logger.info(s"------------------------------------------------------------------------------------------------") - for ((suite, name, median, min, max) <- runtimes) { + for ((suite, name, min, max, median) <- runtimes) { logger.info(f"| ${symbol_dollar}suite%-25s | ${symbol_dollar}name%-25s | ${symbol_dollar}min%10d | ${symbol_dollar}max%10d | ${symbol_dollar}median%10d | ") } logger.info(s"------------------------------------------------------------------------------------------------") diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml index 43cc6c76..e9dfa4f8 100644 --- a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml +++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml @@ -12,11 +12,31 @@ - + + + + + + + + + + + + + + + + + + + + + diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml index be301fc8..5490a13a 100644 --- a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml +++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml @@ -42,6 +42,10 @@ org.peelframework peel-core + + org.peelframework + peel-extensions + diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala new file mode 100644 index 00000000..49a6c0eb --- /dev/null +++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala @@ -0,0 +1,202 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +package ${package} + +import com.samskivert.mustache.Mustache +import com.typesafe.config.ConfigFactory +import org.peelframework.core.beans.data.{CopiedDataSet, DataSet, ExperimentOutput, GeneratedDataSet} +import org.peelframework.core.beans.experiment.ExperimentSequence.SimpleParameters +import org.peelframework.core.beans.experiment.{ExperimentSequence, ExperimentSuite} +import org.peelframework.core.beans.system.Lifespan +import org.peelframework.flink.beans.experiment.FlinkExperiment +import org.peelframework.flink.beans.job.FlinkJob +import org.peelframework.flink.beans.system.Flink +import org.peelframework.hadoop.beans.system.HDFS2 +import org.peelframework.spark.beans.experiment.SparkExperiment +import org.peelframework.spark.beans.system.Spark +import org.springframework.context.annotation.{Bean, Configuration} +import org.springframework.context.{ApplicationContext, ApplicationContextAware} + +/** Experiments definitions for the '${parentArtifactId}' bundle. */ +@Configuration +class ExperimentsDefinitions extends ApplicationContextAware { + + /* The enclosing application context. */ + var ctx: ApplicationContext = null + + def setApplicationContext(ctx: ApplicationContext): Unit = { + this.ctx = ctx + } + + // --------------------------------------------------- + // Systems + // --------------------------------------------------- + + @Bean(name = Array("hdfs-2.7.1")) + def `hdfs-2.7.1`: HDFS2 = new HDFS2( + version = "0.9.0", + configKey = "hadoop-2", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("flink-0.9.0")) + def `flink-0.9.0`: Flink = new Flink( + version = "0.9.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])), + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("spark-1.3.1")) + def `spark-1.3.1`: Spark = new Spark( + version = "1.3.1", + configKey = "spark", + lifespan = Lifespan.EXPERIMENT, + dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])), + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + // --------------------------------------------------- + // Data Generators + // --------------------------------------------------- + + @Bean(name = Array("datagen.words")) + def `datagen.words`: FlinkJob = new FlinkJob( + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + command = + """ + |-v -c ${package}.datagen.flink.WordGenerator ${symbol_escape} + |${symbol_dollar}{app.path.datagens}/${parentArtifactId}-datagens-${version}.jar ${symbol_escape} + |${symbol_dollar}{datagen.dictionary.dize} ${symbol_escape} + |${symbol_dollar}{system.default.config.parallelism.total} ${symbol_escape} + |${symbol_dollar}{datagen.tuples.per.task} ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt + """.stripMargin.trim + ) + + // --------------------------------------------------- + // Data Sets + // --------------------------------------------------- + + @Bean(name = Array("dataset.words.static")) + def `dataset.words.static`: DataSet = new CopiedDataSet( + src = "${symbol_dollar}{app.path.datasets}/rubbish.txt", + dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + @Bean(name = Array("dataset.words.generated")) + def `dataset.words.generated`: DataSet = new GeneratedDataSet( + src = ctx.getBean("datagen.words", classOf[FlinkJob]), + dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + @Bean(name = Array("wordcount.output")) + def `wordcount.output`: ExperimentOutput = new ExperimentOutput( + path = "${symbol_dollar}{system.hadoop-2.path.output}/wordcount", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + // --------------------------------------------------- + // Experiments + // --------------------------------------------------- + + @Bean(name = Array("wordcount.default")) + def `wordcount.default`: ExperimentSuite = { + val `wordcount.flink.default` = new FlinkExperiment( + name = "wordcount.flink.default", + command = + """ + |-v -c ${package}.flink.FlinkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString(""), + runs = 3, + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + val `wordcount.spark.default` = new SparkExperiment( + name = "wordcount.spark.default", + command = + """ + |--class ${package}.spark.SparkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString(""), + runs = 3, + runner = ctx.getBean("spark-1.3.1", classOf[Spark]), + inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + new ExperimentSuite(Seq( + `wordcount.flink.default`, + `wordcount.spark.default`)) + } + + @Bean(name = Array("wordcount.scale-out")) + def `wordcount.scale-out`: ExperimentSuite = { + val `wordcount.flink.prototype` = new FlinkExperiment( + name = "wordcount.flink.__topXXX__", + command = + """ + |-v -c ${package}.flink.FlinkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString( + """ + |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts} + |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism} + |datagen.dictionary.dize = 10000 + |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB + """.stripMargin.trim), + runs = 3, + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + val `wordcount.spark.prototype` = new SparkExperiment( + name = "wordcount.spark.__topXXX__", + command = + """ + |--class ${package}.spark.SparkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString( + """ + |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts} + |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism} + |datagen.dictionary.dize = 10000 + |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB + """.stripMargin.trim), + runs = 3, + runner = ctx.getBean("spark-1.3.1", classOf[Spark]), + inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + new ExperimentSuite( + new ExperimentSequence( + parameters = new SimpleParameters( + paramName = "topXXX", + paramVals = Seq("top005", "top010", "top020")), + prototypes = Seq( + `wordcount.flink.prototype`, + `wordcount.spark.prototype`))) + } +} diff --git a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala index 5c3054e7..aa7820b2 100644 --- a/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala +++ b/peel-archetypes/peel-flinkspark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala @@ -94,7 +94,7 @@ class QueryRuntimes extends Command { logger.info(s"------------------------------------------------------------------------------------------------") logger.info(s"| name | name | min | max | median |") logger.info(s"------------------------------------------------------------------------------------------------") - for ((suite, name, median, min, max) <- runtimes) { + for ((suite, name, min, max, median) <- runtimes) { logger.info(f"| ${symbol_dollar}suite%-25s | ${symbol_dollar}name%-25s | ${symbol_dollar}min%10d | ${symbol_dollar}max%10d | ${symbol_dollar}median%10d | ") } logger.info(s"------------------------------------------------------------------------------------------------") diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml index 43cc6c76..e9dfa4f8 100644 --- a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml +++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-bundle/src/main/resources/config/experiments.xml @@ -12,11 +12,31 @@ - + + + + + + + + + + + + + + + + + + + + + diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml index be301fc8..5490a13a 100644 --- a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml +++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/pom.xml @@ -42,6 +42,10 @@ org.peelframework peel-core + + org.peelframework + peel-extensions + diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala new file mode 100644 index 00000000..49a6c0eb --- /dev/null +++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/ExperimentsDefinitions.scala @@ -0,0 +1,202 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +package ${package} + +import com.samskivert.mustache.Mustache +import com.typesafe.config.ConfigFactory +import org.peelframework.core.beans.data.{CopiedDataSet, DataSet, ExperimentOutput, GeneratedDataSet} +import org.peelframework.core.beans.experiment.ExperimentSequence.SimpleParameters +import org.peelframework.core.beans.experiment.{ExperimentSequence, ExperimentSuite} +import org.peelframework.core.beans.system.Lifespan +import org.peelframework.flink.beans.experiment.FlinkExperiment +import org.peelframework.flink.beans.job.FlinkJob +import org.peelframework.flink.beans.system.Flink +import org.peelframework.hadoop.beans.system.HDFS2 +import org.peelframework.spark.beans.experiment.SparkExperiment +import org.peelframework.spark.beans.system.Spark +import org.springframework.context.annotation.{Bean, Configuration} +import org.springframework.context.{ApplicationContext, ApplicationContextAware} + +/** Experiments definitions for the '${parentArtifactId}' bundle. */ +@Configuration +class ExperimentsDefinitions extends ApplicationContextAware { + + /* The enclosing application context. */ + var ctx: ApplicationContext = null + + def setApplicationContext(ctx: ApplicationContext): Unit = { + this.ctx = ctx + } + + // --------------------------------------------------- + // Systems + // --------------------------------------------------- + + @Bean(name = Array("hdfs-2.7.1")) + def `hdfs-2.7.1`: HDFS2 = new HDFS2( + version = "0.9.0", + configKey = "hadoop-2", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("flink-0.9.0")) + def `flink-0.9.0`: Flink = new Flink( + version = "0.9.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])), + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("spark-1.3.1")) + def `spark-1.3.1`: Spark = new Spark( + version = "1.3.1", + configKey = "spark", + lifespan = Lifespan.EXPERIMENT, + dependencies = Set(ctx.getBean("hdfs-2.7.1", classOf[HDFS2])), + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + // --------------------------------------------------- + // Data Generators + // --------------------------------------------------- + + @Bean(name = Array("datagen.words")) + def `datagen.words`: FlinkJob = new FlinkJob( + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + command = + """ + |-v -c ${package}.datagen.flink.WordGenerator ${symbol_escape} + |${symbol_dollar}{app.path.datagens}/${parentArtifactId}-datagens-${version}.jar ${symbol_escape} + |${symbol_dollar}{datagen.dictionary.dize} ${symbol_escape} + |${symbol_dollar}{system.default.config.parallelism.total} ${symbol_escape} + |${symbol_dollar}{datagen.tuples.per.task} ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt + """.stripMargin.trim + ) + + // --------------------------------------------------- + // Data Sets + // --------------------------------------------------- + + @Bean(name = Array("dataset.words.static")) + def `dataset.words.static`: DataSet = new CopiedDataSet( + src = "${symbol_dollar}{app.path.datasets}/rubbish.txt", + dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + @Bean(name = Array("dataset.words.generated")) + def `dataset.words.generated`: DataSet = new GeneratedDataSet( + src = ctx.getBean("datagen.words", classOf[FlinkJob]), + dst = "${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + @Bean(name = Array("wordcount.output")) + def `wordcount.output`: ExperimentOutput = new ExperimentOutput( + path = "${symbol_dollar}{system.hadoop-2.path.output}/wordcount", + fs = ctx.getBean("hdfs-2.7.1", classOf[HDFS2]) + ) + + // --------------------------------------------------- + // Experiments + // --------------------------------------------------- + + @Bean(name = Array("wordcount.default")) + def `wordcount.default`: ExperimentSuite = { + val `wordcount.flink.default` = new FlinkExperiment( + name = "wordcount.flink.default", + command = + """ + |-v -c ${package}.flink.FlinkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString(""), + runs = 3, + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + val `wordcount.spark.default` = new SparkExperiment( + name = "wordcount.spark.default", + command = + """ + |--class ${package}.spark.SparkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString(""), + runs = 3, + runner = ctx.getBean("spark-1.3.1", classOf[Spark]), + inputs = Set(ctx.getBean("dataset.words.static", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + new ExperimentSuite(Seq( + `wordcount.flink.default`, + `wordcount.spark.default`)) + } + + @Bean(name = Array("wordcount.scale-out")) + def `wordcount.scale-out`: ExperimentSuite = { + val `wordcount.flink.prototype` = new FlinkExperiment( + name = "wordcount.flink.__topXXX__", + command = + """ + |-v -c ${package}.flink.FlinkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-flink-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString( + """ + |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts} + |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism} + |datagen.dictionary.dize = 10000 + |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB + """.stripMargin.trim), + runs = 3, + runner = ctx.getBean("flink-0.9.0", classOf[Flink]), + inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + val `wordcount.spark.prototype` = new SparkExperiment( + name = "wordcount.spark.__topXXX__", + command = + """ + |--class ${package}.spark.SparkWC ${symbol_escape} + |${symbol_dollar}{app.path.apps}/${parentArtifactId}-spark-jobs-${version}.jar ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.input}/rubbish.txt ${symbol_escape} + |${symbol_dollar}{system.hadoop-2.path.output}/wordcount + """.stripMargin.trim, + config = ConfigFactory.parseString( + """ + |system.default.config.slaves = ${symbol_dollar}{env.slaves.__topXXX__.hosts} + |system.default.config.parallelism.total = ${symbol_dollar}{env.slaves.__topXXX__.total.parallelism} + |datagen.dictionary.dize = 10000 + |datagen.tuples.per.task = 10000000 ${symbol_pound} ~ 100 MB + """.stripMargin.trim), + runs = 3, + runner = ctx.getBean("spark-1.3.1", classOf[Spark]), + inputs = Set(ctx.getBean("dataset.words.generated", classOf[DataSet])), + outputs = Set(ctx.getBean("wordcount.output", classOf[ExperimentOutput])) + ) + + new ExperimentSuite( + new ExperimentSequence( + parameters = new SimpleParameters( + paramName = "topXXX", + paramVals = Seq("top005", "top010", "top020")), + prototypes = Seq( + `wordcount.flink.prototype`, + `wordcount.spark.prototype`))) + } +} diff --git a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala index 5c3054e7..aa7820b2 100644 --- a/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala +++ b/peel-archetypes/peel-spark-bundle/src/main/resources/archetype-resources/__rootArtifactId__-peelextensions/src/main/scala/cli/command/QueryRuntimes.scala @@ -94,7 +94,7 @@ class QueryRuntimes extends Command { logger.info(s"------------------------------------------------------------------------------------------------") logger.info(s"| name | name | min | max | median |") logger.info(s"------------------------------------------------------------------------------------------------") - for ((suite, name, median, min, max) <- runtimes) { + for ((suite, name, min, max, median) <- runtimes) { logger.info(f"| ${symbol_dollar}suite%-25s | ${symbol_dollar}name%-25s | ${symbol_dollar}min%10d | ${symbol_dollar}max%10d | ${symbol_dollar}median%10d | ") } logger.info(s"------------------------------------------------------------------------------------------------")