diff --git a/peel-extensions/src/main/resources/reference.spark.conf b/peel-extensions/src/main/resources/reference.spark.conf index 208fda29..bbc078df 100644 --- a/peel-extensions/src/main/resources/reference.spark.conf +++ b/peel-extensions/src/main/resources/reference.spark.conf @@ -8,6 +8,7 @@ system { home = ${app.path.systems}"/spark" config = ${system.spark.path.home}"/conf" log = ${system.spark.path.home}"/logs" + work = ${system.spark.path.home}"/work" } startup { max.attempts = ${system.default.startup.max.attempts} @@ -28,6 +29,8 @@ system { SPARK_WORKER_CORES = ${system.default.config.parallelism.per-node} SPARK_EXECUTOR_CORES = ${system.default.config.parallelism.per-node} SPARK_EXECUTOR_MEMORY = "512m" + # Enables periodic cleanup of worker / application dirs every 5 min. for data older than 1 hour. + SPARK_WORKER_OPTS = """"-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=300 -Dspark.worker.cleanup.appDataTtl=3600"""" } # spark-defaults.conf entries defaults { diff --git a/peel-extensions/src/main/resources/templates/spark/conf/spark-env.sh.mustache b/peel-extensions/src/main/resources/templates/spark/conf/spark-env.sh.mustache index d10ab56a..fd9ef567 100644 --- a/peel-extensions/src/main/resources/templates/spark/conf/spark-env.sh.mustache +++ b/peel-extensions/src/main/resources/templates/spark/conf/spark-env.sh.mustache @@ -42,7 +42,7 @@ {{#SPARK_WORKER_WEBUI_PORT}}SPARK_WORKER_WEBUI_PORT={{SPARK_WORKER_WEBUI_PORT}}{{/SPARK_WORKER_WEBUI_PORT}}{{^SPARK_WORKER_WEBUI_PORT}}# - SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker{{/SPARK_WORKER_WEBUI_PORT}} {{#SPARK_WORKER_INSTANCES}}SPARK_WORKER_INSTANCES={{SPARK_WORKER_INSTANCES}}{{/SPARK_WORKER_INSTANCES}}{{^SPARK_WORKER_INSTANCES}}# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node{{/SPARK_WORKER_INSTANCES}} {{#SPARK_WORKER_DIR}}SPARK_WORKER_DIR={{SPARK_WORKER_DIR}}{{/SPARK_WORKER_DIR}}{{^SPARK_WORKER_DIR}}# - SPARK_WORKER_DIR, to set the working directory of worker processes{{/SPARK_WORKER_DIR}} -{{#SPARK_WORKER_OPTS}}SPARK_WORKER_OPTS={{SPARK_WORKER_OPTS}}{{/SPARK_WORKER_OPTS}}{{^SPARK_WORKER_OPTS}}# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y"){{/SPARK_WORKER_OPTS}} +{{#SPARK_WORKER_OPTS}}SPARK_WORKER_OPTS={{{SPARK_WORKER_OPTS}}}{{/SPARK_WORKER_OPTS}}{{^SPARK_WORKER_OPTS}}# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y"){{/SPARK_WORKER_OPTS}} {{#SPARK_HISTORY_OPTS}}SPARK_HISTORY_OPTS={{SPARK_HISTORY_OPTS}}{{/SPARK_HISTORY_OPTS}}{{^SPARK_HISTORY_OPTS}}# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y"){{/SPARK_HISTORY_OPTS}} {{#SPARK_DAEMON_JAVA_OPTS}}SPARK_DAEMON_JAVA_OPTS={{SPARK_DAEMON_JAVA_OPTS}}{{/SPARK_DAEMON_JAVA_OPTS}}{{^SPARK_DAEMON_JAVA_OPTS}}# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y"){{/SPARK_DAEMON_JAVA_OPTS}} {{#SPARK_PUBLIC_DNS}}SPARK_PUBLIC_DNS={{SPARK_PUBLIC_DNS}}{{/SPARK_PUBLIC_DNS}}{{^SPARK_PUBLIC_DNS}}# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers{{/SPARK_PUBLIC_DNS}} diff --git a/peel-extensions/src/main/scala/org/peelframework/spark/beans/system/Spark.scala b/peel-extensions/src/main/scala/org/peelframework/spark/beans/system/Spark.scala index db58aeda..00a1dc2e 100644 --- a/peel-extensions/src/main/scala/org/peelframework/spark/beans/system/Spark.scala +++ b/peel-extensions/src/main/scala/org/peelframework/spark/beans/system/Spark.scala @@ -140,13 +140,27 @@ class Spark( s""" ssh $user@$host "$cmd" """ } + val rmWorkDir = (host: String, workDir: String) => { + val cmd = s""" rm -Rf $workDir/* """ + s""" ssh $user@$host "$cmd" """ + } + val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala val paths = config.getString(s"system.$configKey.config.defaults.spark.local.dir").split(',') + val workDir = config.getString(s"system.$configKey.path.work") - val futureInitOps = Future.traverse(hosts)(host => Future { - logger.info(s"Initializing Spark tmp directories '${paths.mkString(",")}' at $host") - shell ! (init(host, paths), s"Unable to initialize Spark tmp directories '${paths.mkString(",")}' at $host.") - }) + val futureInitOps = Future.traverse(hosts){ host => + for { + _ <- Future { + logger.info(s"Initializing Spark tmp directories '${paths.mkString(",")}' at $host") + shell ! (init(host, paths), s"Unable to initialize Spark tmp directories '${paths.mkString(",")}' at $host.") + } + f <- Future { + logger.debug(s"Removing Spark work directory content '$workDir' at $host") + shell ! (rmWorkDir(host, workDir), s"Unable to remove Spark work directory content '$workDir' at $host.", fatal = false) + } + } yield f + } // await for all futureInitOps to finish Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds)