From 35857d6896486ec21b5322baffdcb2308aa33aa1 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 13 Aug 2019 14:35:29 +0200 Subject: [PATCH 01/25] Fix path of stop-dfs.sh script --- .../scala/org/peelframework/hadoop/beans/system/HDFS2.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS2.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS2.scala index fbdab142..95c007be 100644 --- a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS2.scala +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS2.scala @@ -126,7 +126,7 @@ class HDFS2( case e: SetUpTimeoutException => failedStartUpAttempts = failedStartUpAttempts + 1 if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { - shell ! s"$home/bin/stop-dfs.sh" + shell ! s"$home/sbin/stop-dfs.sh" logger.info(s"Could not bring system '$toString' up in time, trying again...") } else { throw e From 6eeb90a48fd1e5cc889b8660b7e86a3c115ba909 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 13 Aug 2019 15:19:48 +0200 Subject: [PATCH 02/25] Fixed wrong spelling --- .../main/scala/org/peelframework/core/config/SystemConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peel-core/src/main/scala/org/peelframework/core/config/SystemConfig.scala b/peel-core/src/main/scala/org/peelframework/core/config/SystemConfig.scala index bdb34171..d7e48855 100644 --- a/peel-core/src/main/scala/org/peelframework/core/config/SystemConfig.scala +++ b/peel-core/src/main/scala/org/peelframework/core/config/SystemConfig.scala @@ -53,7 +53,7 @@ object SystemConfig { private final val template = { val rs = Option(getClass.getResourceAsStream(templatePath)) mc.compile(new BufferedReader(new InputStreamReader( - rs.getOrElse(throw new RuntimeException("Cannot find template resoure %s".format(templatePath)))))) + rs.getOrElse(throw new RuntimeException("Cannot find template resource %s".format(templatePath)))))) } /** Check whether file represented by this entry has changed with resepect to a given `config` instance. From cae26ea81ef262c0ead8e78105d1c0200c57c1e4 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 13 Aug 2019 18:04:36 +0200 Subject: [PATCH 03/25] =?UTF-8?q?Fix:=20Don=E2=80=99t=20start/stop=20Flink?= =?UTF-8?q?=20webclient?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scripts bin/{start,stop}-webclient.sh no longer exist in Flink release-1.0.0 --- .../org/peelframework/flink/beans/system/Flink.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala index dc5dcae8..c64a1cf4 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala @@ -21,7 +21,7 @@ import com.samskivert.mustache.Mustache import org.peelframework.core.beans.system.Lifespan.Lifespan import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} import org.peelframework.core.config.{Model, SystemConfig} -import org.peelframework.core.util.shell +import org.peelframework.core.util.{Version, shell} import scala.collection.JavaConverters._ import scala.collection.Seq @@ -108,7 +108,9 @@ class Flink( val init = 0 // Flink resets the job manager log on startup shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-cluster.sh" - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-webclient.sh" + if (Version(version) < Version("1.0")) { + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-webclient.sh" + } logger.info(s"Waiting for nodes to connect") var curr = init @@ -143,7 +145,9 @@ class Flink( override protected def stop() = { shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" + if (Version(version) < Version("1.0")) { + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" + } shell ! s"rm -f ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid" isUp = false } From 539256cffa3d9fe2034650b0a4e0e9660d5a6d3f Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 20 Dec 2018 13:59:26 +0100 Subject: [PATCH 04/25] Update registration message of task managers --- .../scala/org/peelframework/flink/beans/system/Flink.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala index c64a1cf4..60393578 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala @@ -120,10 +120,12 @@ class Flink( // wait a bit Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) // get new values - if (version.startsWith("0.6")) + if (Version(version) <= Version("0.6")) curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Creating instance' | wc -l""").trim()) - else + else if (Version(version) <= Version("1.6")) curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Registered TaskManager' | wc -l""").trim()) + else + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-standalonesession-*.log | grep 'Registering TaskManager' | wc -l""").trim()) // timeout if counter goes below zero cntr = cntr - 1 if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") From b37c58eec2283ce0e6247cfcfeda856e3d6d46d4 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 13 Aug 2019 15:19:48 +0200 Subject: [PATCH 05/25] Added support for Hadoop 3.x --- .../main/resources/reference.hadoop-3.x.conf | 54 +++ .../hadoop-3/conf/hadoop-env.sh.mustache | 440 ++++++++++++++++++ .../templates/hadoop-3/conf/hosts.mustache | 3 + .../templates/hadoop-3/conf/site.xml.mustache | 12 + .../hadoop/beans/system/HDFS3.scala | 179 +++++++ 5 files changed, 688 insertions(+) create mode 100644 peel-extensions/src/main/resources/reference.hadoop-3.x.conf create mode 100644 peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache create mode 100644 peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache create mode 100644 peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache create mode 100644 peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala diff --git a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf new file mode 100644 index 00000000..74e08314 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf @@ -0,0 +1,54 @@ +system { + hadoop-3 { + user = ${system.default.user} + group = ${system.default.group} + path { + isShared = ${system.default.path.isShared} + archive.dst = ${app.path.systems} + config = ${system.hadoop-3.path.home}"/etc/hadoop" + log = ${system.hadoop-3.path.home}"/logs" + input = ${system.hadoop-3.config.core.fs.default.name}"/tmp/input" + output = ${system.hadoop-3.config.core.fs.default.name}"/tmp/output" + } + format = true + startup { + max.attempts = ${system.default.startup.max.attempts} + polling { + counter = ${system.default.startup.polling.counter} + interval = ${system.default.startup.polling.interval} + } + } + config { + # put list of masters + masters = ${system.default.config.masters} + # put list of workers + workers = ${system.default.config.slaves} + # unfortunately, the slaves config key is hard-coded in Java code + slaves = ${system.hadoop-3.config.workers} + # hadoop-env.sh entries + env { + JAVA_HOME = ${system.default.config.java.home} + HADOOP_INSTALL = ${system.hadoop-3.path.home} + # directory where process IDs are stored + HADOOP_PID_DIR = "/tmp/hadoop-3/pid" + # avoids loading wrong native library in the default case + # override with /lib/native if lib exists + HADOOP_COMMON_LIB_NATIVE_DIR = "$HADOOP_INSTALL/lib/native" + } + # core-site.xml entries + core { + fs.default.name = "hdfs://localhost:9000" + } + # hdfs-site.xml entries + hdfs { + dfs.replication = 1 + dfs.namenode.name.dir = "file:///tmp/hdfs-3/name" + dfs.datanode.data.dir = "file:///tmp/hdfs-3/data" + } + # mapred-site.xml entries + mapred { } + # yarn-site.xml entries + yarn { } + } + } +} diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache new file mode 100644 index 00000000..3c85688a --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache @@ -0,0 +1,440 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set Hadoop-specific environment variables here. +export HADOOP_HOME={{HADOOP_INSTALL}} # TODO remove + +## +## THIS FILE ACTS AS THE MASTER FILE FOR ALL HADOOP PROJECTS. +## SETTINGS HERE WILL BE READ BY ALL HADOOP COMMANDS. THEREFORE, +## ONE CAN USE THIS FILE TO SET YARN, HDFS, AND MAPREDUCE +## CONFIGURATION OPTIONS INSTEAD OF xxx-env.sh. +## +## Precedence rules: +## +## {yarn-env.sh|hdfs-env.sh} > hadoop-env.sh > hard-coded defaults +## +## {YARN_xyz|HDFS_xyz} > HADOOP_xyz > hard-coded defaults +## + +# Many of the options here are built from the perspective that users +# may want to provide OVERWRITING values on the command line. +# For example: +# +# JAVA_HOME=/usr/java/testing hdfs dfs -ls +# +# Therefore, the vast majority (BUT NOT ALL!) of these defaults +# are configured for substitution and not append. If append +# is preferable, modify this file accordingly. + +### +# Generic settings for HADOOP +### + +# Technically, the only required environment variable is JAVA_HOME. +# All others are optional. However, the defaults are probably not +# preferred. Many sites configure these options outside of Hadoop, +# such as in /etc/profile.d + +# The java implementation to use. By default, this environment +# variable is REQUIRED on ALL platforms except OS X! +{{#JAVA_HOME}}export JAVA_HOME={{JAVA_HOME}}{{/JAVA_HOME}}{{^JAVA_HOME}}export JAVA_HOME=${JAVA_HOME}{{/JAVA_HOME}} + +#Hadoop paths # TODO Is this really necessary? Can this be generalized? +export HADOOP_INSTALL={{HADOOP_INSTALL}} +export PATH=$PATH:$HADOOP_INSTALL/bin +export PATH=$PATH:$HADOOP_INSTALL/sbin +export HADOOP_HOME=$HADOOP_INSTALL +export HADOOP_MAPRED_HOME=$HADOOP_INSTALL +export HADOOP_COMMON_HOME=$HADOOP_INSTALL +export HADOOP_HDFS_HOME=$HADOOP_INSTALL +export YARN_HOME=$HADOOP_INSTALL + +# Location of Hadoop. By default, Hadoop will attempt to determine +# this location based upon its execution path. +# export HADOOP_HOME= + +# Location of Hadoop's configuration information. i.e., where this +# file is living. If this is not defined, Hadoop will attempt to +# locate it based upon its execution path. +# +# NOTE: It is recommend that this variable not be set here but in +# /etc/profile.d or equivalent. Some options (such as +# --config) may react strangely otherwise. +# +# export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop + +# The maximum amount of heap to use (Java -Xmx). If no unit +# is provided, it will be converted to MB. Daemons will +# prefer any Xmx setting in their respective _OPT variable. +# There is no default; the JVM will autoscale based upon machine +# memory size. +# export HADOOP_HEAPSIZE_MAX= + +# The minimum amount of heap to use (Java -Xms). If no unit +# is provided, it will be converted to MB. Daemons will +# prefer any Xms setting in their respective _OPT variable. +# There is no default; the JVM will autoscale based upon machine +# memory size. +# export HADOOP_HEAPSIZE_MIN= + +# Enable extra debugging of Hadoop's JAAS binding, used to set up +# Kerberos security. +# export HADOOP_JAAS_DEBUG=true + +# Extra Java runtime options for all Hadoop commands. We don't support +# IPv6 yet/still, so by default the preference is set to IPv4. +# export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true" +{{#HADOOP_OPTS}}export HADOOP_OPTS={{HADOOP_OPTS}}{{/HADOOP_OPTS}}{{^HADOOP_OPTS}}export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"{{/HADOOP_OPTS}} + +# For Kerberos debugging, an extended option set logs more invormation +# export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug" + +# Some parts of the shell code may do special things dependent upon +# the operating system. We have to set this here. See the next +# section as to why.... +export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)} + + +# Under certain conditions, Java on OS X will throw SCDynamicStore errors +# in the system logs. +# See HADOOP-8719 for more information. If one needs Kerberos +# support on OS X, one will want to change/remove this extra bit. +case ${HADOOP_OS_TYPE} in +Darwin*) +export HADOOP_OPTS="${HADOOP_OPTS} -Djava.security.krb5.realm= " +export HADOOP_OPTS="${HADOOP_OPTS} -Djava.security.krb5.kdc= " +export HADOOP_OPTS="${HADOOP_OPTS} -Djava.security.krb5.conf= " +;; +esac + +# Extra Java runtime options for some Hadoop commands +# and clients (i.e., hdfs dfs -blah). These get appended to HADOOP_OPTS for +# such commands. In most cases, # this should be left empty and +# let users supply it on the command line. +# export HADOOP_CLIENT_OPTS="" + +# +# A note about classpaths. +# +# By default, Apache Hadoop overrides Java's CLASSPATH +# environment variable. It is configured such +# that it sarts out blank with new entries added after passing +# a series of checks (file/dir exists, not already listed aka +# de-deduplication). During de-depulication, wildcards and/or +# directories are *NOT* expanded to keep it simple. Therefore, +# if the computed classpath has two specific mentions of +# awesome-methods-1.0.jar, only the first one added will be seen. +# If two directories are in the classpath that both contain +# awesome-methods-1.0.jar, then Java will pick up both versions. + +# An additional, custom CLASSPATH. Site-wide configs should be +# handled via the shellprofile functionality, utilizing the +# hadoop_add_classpath function for greater control and much +# harder for apps/end-users to accidentally override. +# Similarly, end users should utilize ${HOME}/.hadooprc . +# This variable should ideally only be used as a short-cut, +# interactive way for temporary additions on the command line. +# export HADOOP_CLASSPATH="/some/cool/path/on/your/machine" + +# Should HADOOP_CLASSPATH be first in the official CLASSPATH? +# export HADOOP_USER_CLASSPATH_FIRST="yes" + +# If HADOOP_USE_CLIENT_CLASSLOADER is set, the classpath along +# with the main jar are handled by a separate isolated +# client classloader when 'hadoop jar', 'yarn jar', or 'mapred job' +# is utilized. If it is set, HADOOP_CLASSPATH and +# HADOOP_USER_CLASSPATH_FIRST are ignored. +# export HADOOP_USE_CLIENT_CLASSLOADER=true + +# HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES overrides the default definition of +# system classes for the client classloader when HADOOP_USE_CLIENT_CLASSLOADER +# is enabled. Names ending in '.' (period) are treated as package names, and +# names starting with a '-' are treated as negative matches. For example, +# export HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES="-org.apache.hadoop.UserClass,java.,javax.,org.apache.hadoop." + +# Enable optional, bundled Hadoop features +# This is a comma delimited list. It may NOT be overridden via .hadooprc +# Entries may be added/removed as needed. +# export HADOOP_OPTIONAL_TOOLS="hadoop-azure-datalake,hadoop-azure,hadoop-openstack,hadoop-kafka,hadoop-aws,hadoop-aliyun" + +### +# Options for remote shell connectivity +### + +# There are some optional components of hadoop that allow for +# command and control of remote hosts. For example, +# start-dfs.sh will attempt to bring up all NNs, DNS, etc. + +# Options to pass to SSH when one of the "log into a host and +# start/stop daemons" scripts is executed +# export HADOOP_SSH_OPTS="-o BatchMode=yes -o StrictHostKeyChecking=no -o ConnectTimeout=10s" + +# The built-in ssh handler will limit itself to 10 simultaneous connections. +# For pdsh users, this sets the fanout size ( -f ) +# Change this to increase/decrease as necessary. +# export HADOOP_SSH_PARALLEL=10 + +# Filename which contains all of the hosts for any remote execution +# helper scripts # such as workers.sh, start-dfs.sh, etc. +# export HADOOP_WORKERS="${HADOOP_CONF_DIR}/workers" + +### +# Options for all daemons +### +# + +# +# Many options may also be specified as Java properties. It is +# very common, and in many cases, desirable, to hard-set these +# in daemon _OPTS variables. Where applicable, the appropriate +# Java property is also identified. Note that many are re-used +# or set differently in certain contexts (e.g., secure vs +# non-secure) +# + +# Where (primarily) daemon log files are stored. +# ${HADOOP_HOME}/logs by default. +# Java property: hadoop.log.dir +# export HADOOP_LOG_DIR=${HADOOP_HOME}/logs + +# A string representing this instance of hadoop. $USER by default. +# This is used in writing log and pid files, so keep that in mind! +# Java property: hadoop.id.str +# export HADOOP_IDENT_STRING=$USER +{{#HADOOP_IDENT_STRING}}export HADOOP_IDENT_STRING={{HADOOP_IDENT_STRING}}{{/HADOOP_IDENT_STRING}}{{^HADOOP_IDENT_STRING}}export HADOOP_IDENT_STRING=$USER{{/HADOOP_IDENT_STRING}} + +# How many seconds to pause after stopping a daemon +# export HADOOP_STOP_TIMEOUT=5 + +# Where pid files are stored. /tmp by default. +# export HADOOP_PID_DIR=/tmp +{{#HADOOP_PID_DIR}}export HADOOP_PID_DIR={{HADOOP_PID_DIR}}{{/HADOOP_PID_DIR}}{{^HADOOP_PID_DIR}}export HADOOP_PID_DIR=${HADOOP_PID_DIR}{{/HADOOP_PID_DIR}} + + +# Default log4j setting for interactive commands +# Java property: hadoop.root.logger +# export HADOOP_ROOT_LOGGER=INFO,console +{{#HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER={{HADOOP_ROOT_LOGGER}}{{/HADOOP_ROOT_LOGGER}}{{^HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER=INFO,console{{/HADOOP_ROOT_LOGGER}} + +# Default log4j setting for daemons spawned explicitly by +# --daemon option of hadoop, hdfs, mapred and yarn command. +# Java property: hadoop.root.logger +# export HADOOP_DAEMON_ROOT_LOGGER=INFO,RFA + +# Default log level and output location for security-related messages. +# You will almost certainly want to change this on a per-daemon basis via +# the Java property (i.e., -Dhadoop.security.logger=foo). (Note that the +# defaults for the NN and 2NN override this by default.) +# Java property: hadoop.security.logger +# export HADOOP_SECURITY_LOGGER=INFO,NullAppender + +# Default process priority level +# Note that sub-processes will also run at this level! +# export HADOOP_NICENESS=0 + +# Default name for the service level authorization file +# Java property: hadoop.policy.file +# export HADOOP_POLICYFILE="hadoop-policy.xml" + +# +# NOTE: this is not used by default! <----- +# You can define variables right here and then re-use them later on. +# For example, it is common to use the same garbage collection settings +# for all the daemons. So one could define: +# +# export HADOOP_GC_SETTINGS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps" +# +# .. and then use it as per the b option under the namenode. + +### +# Secure/privileged execution +### + +# +# Out of the box, Hadoop uses jsvc from Apache Commons to launch daemons +# on privileged ports. This functionality can be replaced by providing +# custom functions. See hadoop-functions.sh for more information. +# + +# The jsvc implementation to use. Jsvc is required to run secure datanodes +# that bind to privileged ports to provide authentication of data transfer +# protocol. Jsvc is not required if SASL is configured for authentication of +# data transfer protocol using non-privileged ports. +# export JSVC_HOME=/usr/bin + +# +# This directory contains pids for secure and privileged processes. +#export HADOOP_SECURE_PID_DIR=${HADOOP_PID_DIR} +{{#HADOOP_SECURE_PID_DIR}}export HADOOP_SECURE_PID_DIR={{HADOOP_SECURE_PID_DIR}}{{/HADOOP_SECURE_PID_DIR}}{{^HADOOP_SECURE_PID_DIR}}export HADOOP_SECURE_PID_DIR=${HADOOP_PID_DIR}{{/HADOOP_SECURE_PID_DIR}} + +# +# This directory contains the logs for secure and privileged processes. +# Java property: hadoop.log.dir +# export HADOOP_SECURE_LOG=${HADOOP_LOG_DIR} + +# +# When running a secure daemon, the default value of HADOOP_IDENT_STRING +# ends up being a bit bogus. Therefore, by default, the code will +# replace HADOOP_IDENT_STRING with HADOOP_xx_SECURE_USER. If one wants +# to keep HADOOP_IDENT_STRING untouched, then uncomment this line. +# export HADOOP_SECURE_IDENT_PRESERVE="true" + +### +# NameNode specific parameters +### + +# Default log level and output location for file system related change +# messages. For non-namenode daemons, the Java property must be set in +# the appropriate _OPTS if one wants something other than INFO,NullAppender +# Java property: hdfs.audit.logger +# export HDFS_AUDIT_LOGGER=INFO,NullAppender + +# Specify the JVM options to be used when starting the NameNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# a) Set JMX options +# export HDFS_NAMENODE_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=1026" +# +# b) Set garbage collection logs +# export HDFS_NAMENODE_OPTS="${HADOOP_GC_SETTINGS} -Xloggc:${HADOOP_LOG_DIR}/gc-rm.log-$(date +'%Y%m%d%H%M')" +# +# c) ... or set them directly +# export HDFS_NAMENODE_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:${HADOOP_LOG_DIR}/gc-rm.log-$(date +'%Y%m%d%H%M')" + +# this is the default: +# export HDFS_NAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS" + +### +# SecondaryNameNode specific parameters +### +# Specify the JVM options to be used when starting the SecondaryNameNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# This is the default: +# export HDFS_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS" + +### +# DataNode specific parameters +### +# Specify the JVM options to be used when starting the DataNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# This is the default: +# export HDFS_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS" + +# On secure datanodes, user to run the datanode as after dropping privileges. +# This **MUST** be uncommented to enable secure HDFS if using privileged ports +# to provide authentication of data transfer protocol. This **MUST NOT** be +# defined if SASL is configured for authentication of data transfer protocol +# using non-privileged ports. +# This will replace the hadoop.id.str Java property in secure mode. +# export HDFS_DATANODE_SECURE_USER=hdfs + +# Supplemental options for secure datanodes +# By default, Hadoop uses jsvc which needs to know to launch a +# server jvm. +# export HDFS_DATANODE_SECURE_EXTRA_OPTS="-jvm server" + +### +# NFS3 Gateway specific parameters +### +# Specify the JVM options to be used when starting the NFS3 Gateway. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_NFS3_OPTS="" + +# Specify the JVM options to be used when starting the Hadoop portmapper. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_PORTMAP_OPTS="-Xmx512m" + +# Supplemental options for priviliged gateways +# By default, Hadoop uses jsvc which needs to know to launch a +# server jvm. +# export HDFS_NFS3_SECURE_EXTRA_OPTS="-jvm server" + +# On privileged gateways, user to run the gateway as after dropping privileges +# This will replace the hadoop.id.str Java property in secure mode. +# export HDFS_NFS3_SECURE_USER=nfsserver + +### +# ZKFailoverController specific parameters +### +# Specify the JVM options to be used when starting the ZKFailoverController. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_ZKFC_OPTS="" + +### +# QuorumJournalNode specific parameters +### +# Specify the JVM options to be used when starting the QuorumJournalNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_JOURNALNODE_OPTS="" + +### +# HDFS Balancer specific parameters +### +# Specify the JVM options to be used when starting the HDFS Balancer. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_BALANCER_OPTS="" + +### +# HDFS Mover specific parameters +### +# Specify the JVM options to be used when starting the HDFS Mover. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_MOVER_OPTS="" + +### +# Router-based HDFS Federation specific parameters +# Specify the JVM options to be used when starting the RBF Routers. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_DFSROUTER_OPTS="" +### + +### +# Advanced Users Only! +### + +# +# When building Hadoop, one can add the class paths to the commands +# via this special env var: +# export HADOOP_ENABLE_BUILD_PATHS="true" + +# +# To prevent accidents, shell commands be (superficially) locked +# to only allow certain users to execute certain subcommands. +# It uses the format of (command)_(subcommand)_USER. +# +# For example, to limit who can execute the namenode command, +# export HDFS_NAMENODE_USER=hdfs diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache new file mode 100644 index 00000000..f70d5453 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache @@ -0,0 +1,3 @@ +{{#hosts}} +{{{.}}} +{{/hosts}} diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache new file mode 100644 index 00000000..68076ff3 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache @@ -0,0 +1,12 @@ + + + + + + +{{#properties}} + + {{{name}}} + {{{value}}} + {{/properties}} + diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala new file mode 100644 index 00000000..183353ac --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.hadoop.beans.system + +import java.net.URI +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.shell + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.matching.Regex + +/** Wrapper class for HDFS3. + * + * Implements HDFS3 as a Peel `System` and provides setup and teardown methods. + * Additionally it offers the Filesysem capabilities to interact with hdfs. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}`. + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class HDFS3( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends System("hdfs-3", version, configKey, lifespan, dependencies, mc) + with HDFSFileSystem + with DistributedLogCollection { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + override def hosts = { + val master = config.getString("runtime.hostname") + val workers = config.getStringList(s"system.$configKey.config.workers").asScala + master +: workers + } + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + hosts.map(Pattern.quote).flatMap(host => Seq( + s"hadoop-$user-namenode-$host\\.log".r, + s"hadoop-$user-namenode-$host\\.out".r, + s"hadoop-$user-datanode-$host\\.log".r, + s"hadoop-$user-datanode-$host\\.out".r)) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.hdfs", s"$conf/hdfs-site.xml", templatePath("conf/site.xml"), mc) + ) + }) + + /** Checks if all datanodes have connected and the system is out of safemode. */ + override protected def start(): Unit = { + if (config.getBoolean(s"system.$configKey.format")) format() + + val user = config.getString(s"system.$configKey.user") + val home = config.getString(s"system.$configKey.path.home") + val logDir = config.getString(s"system.$configKey.path.log") + val hostname = config.getString("app.hostname") + + var failedStartUpAttempts = 0 + while (!isUp) { + try { + val totl = config.getStringList(s"system.$configKey.config.workers").size() + var init = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-namenode-$hostname.log | grep 'registerDatanode:' | wc -l""").trim()) + + shell ! s"$home/sbin/start-dfs.sh" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var safe = !(shell !! s"$home/bin/hdfs dfsadmin -safemode get").toLowerCase.contains("off") + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < totl || safe) { + logger.info(s"Connected ${curr - init} from $totl nodes, safemode is ${if (safe) "ON" else "OFF"}") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + // depending on the log level its either in *.log or *.out (debug) + curr = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-namenode-$hostname.out | grep 'registerDatanode:' | wc -l""").trim()) + if (curr == 0) { + curr = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-namenode-$hostname.log | grep 'registerDatanode:' | wc -l""").trim()) + } + safe = !(shell !! s"$home/bin/hdfs dfsadmin -safemode get").toLowerCase.contains("off") + // timeout if counter goes below zero + cntr = cntr - 1 + if (curr - init < 0) init = 0 // protect against log reset on startup + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + shell ! s"$home/sbin/stop-dfs.sh" + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + mkdir(config.getString(s"system.$configKey.path.input")) + } + + override protected def stop() = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/stop-dfs.sh" + if (config.getBoolean(s"system.$configKey.format")) format() + isUp = false + } + + def isRunning = { + val pidDir = config.getString(s"system.$configKey.config.env.HADOOP_PID_DIR") + (shell ! s""" ps -p `cat $pidDir/hadoop-*-namenode.pid` """) == 0 || + (shell ! s""" ps -p `cat $pidDir/hadoop-*-secondarynamenode.pid` """) == 0 || + (shell ! s""" ps -p `cat $pidDir/hadoop-*-datanode.pid` """) == 0 + } + + // --------------------------------------------------- + // Helper methods. + // --------------------------------------------------- + + private def format() = { + val user = config.getString(s"system.$configKey.user") + val home = config.getString(s"system.$configKey.path.home") + + logger.info(s"Formatting namenode") + shell ! (s"$home/bin/hdfs namenode -format -nonInteractive -force", "Unable to format namenode.") + + val init = (host: String, path: String) => + s""" ssh $user@$host "rm -Rf $path && mkdir -p $path/current" """ + + val list = for { + host <- config.getStringList(s"system.$configKey.config.workers").asScala + path <- config.getString(s"system.$configKey.config.hdfs.dfs.datanode.data.dir").split(',') + } yield (host, new URI(path).getPath) + + val futureInitOps = Future.traverse(list)(((host: String, path: String) => Future { + logger.info(s"Initializing HDFS data directory '$path' at $host") + shell ! (init(host, path), s"Unable to initialize HDFS data directory '$path' at $host.") + }).tupled) + + // await for all futureInitOps to finish + Await.result(futureInitOps, Math.max(30, 5 * list.size).seconds) + } +} From b8e2d7b3b5886f7f70b90da8ae6612abd7ab1961 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 13 Aug 2019 16:34:28 +0200 Subject: [PATCH 06/25] Added reference configuration for HDFS-3.1.1 --- .../main/resources/reference.hdfs-3.1.1.conf | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf diff --git a/peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf b/peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf new file mode 100644 index 00000000..588b423b --- /dev/null +++ b/peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf @@ -0,0 +1,20 @@ +# include common hadoop-3.x configuration +include "reference.hadoop-3.x.conf" + +system { + hadoop-3 { + path { + archive.url = "http://archive.apache.org/dist/hadoop/core/hadoop-3.1.1/hadoop-3.1.1.tar.gz" + archive.md5 = "0b6ab06b59ae75f433de387783f19011" + archive.src = ${app.path.downloads}"/hadoop-3.1.1.tar.gz" + home = ${system.hadoop-3.path.archive.dst}"/hadoop-3.1.1" + } + config { + # hadoop-env.sh entries + env { + # directory where process IDs are stored + HADOOP_PID_DIR = "/tmp/hadoop-3.1.1-pid" + } + } + } +} \ No newline at end of file From c7235f73779ab62298419da006e21591d6ffdba1 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Wed, 14 Aug 2019 15:03:16 +0200 Subject: [PATCH 07/25] Added support for Hadoop 3.x YARN --- .../main/resources/reference.yarn-3.1.1.conf | 5 + .../main/resources/reference.yarn-3.x.conf | 2 + .../hadoop/beans/system/Yarn3.scala | 124 ++++++++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 peel-extensions/src/main/resources/reference.yarn-3.1.1.conf create mode 100644 peel-extensions/src/main/resources/reference.yarn-3.x.conf create mode 100644 peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala diff --git a/peel-extensions/src/main/resources/reference.yarn-3.1.1.conf b/peel-extensions/src/main/resources/reference.yarn-3.1.1.conf new file mode 100644 index 00000000..56baabe2 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.yarn-3.1.1.conf @@ -0,0 +1,5 @@ +# include common yarn-3.x configuration +include "reference.yarn-3.x.conf" + +# pick up hdfs-3.1.1 configuration containing archive data +include "reference.hdfs-3.1.1.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.yarn-3.x.conf b/peel-extensions/src/main/resources/reference.yarn-3.x.conf new file mode 100644 index 00000000..d752059b --- /dev/null +++ b/peel-extensions/src/main/resources/reference.yarn-3.x.conf @@ -0,0 +1,2 @@ +# include common hadoop-3.x configuration +include "reference.hadoop-3.x.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala new file mode 100644 index 00000000..f39b0c2a --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.hadoop.beans.system + +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{LogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.shell + +import scala.collection.JavaConverters._ +import scala.util.matching.Regex + +/** Wrapper class for Yarn. + * + * Implements Yarn as a Peel `System` and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}` + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class Yarn3( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends System("yarn", version, configKey, lifespan, dependencies, mc) + with LogCollection { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + // TODO: rework based on http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/ + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + config.getStringList(s"system.$configKey.config.workers").asScala.map(Pattern.quote).map(slave => + s"hadoop-$user-resourcemanager-$slave\\.log".r) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.yarn", s"$conf/yarn-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.mapred", s"$conf/mapred-site.xml", templatePath("conf/site.xml"), mc) + ) + }) + + override def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + var failedStartUpAttempts = 0 + while(!isUp) { + try { + val total = config.getStringList(s"system.$configKey.config.workers").size() + // yarn does not reset the resourcemanagers log at startup + val init = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-resourcemanager-*.log | grep 'registered with capability:' | wc -l""").trim()) + + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/start-yarn.sh" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < total) { + logger.info(s"Connected ${curr - init} from $total nodes") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + curr = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-resourcemanager-*.log | grep 'registered with capability:' | wc -l""").trim()) + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + logger.info(s"Connected ${curr - init} from $total nodes") + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + stop() + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + } + + override def stop(): Unit = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/stop-yarn.sh" + isUp = false + } + + def isRunning = { + (shell ! s""" ps -p `cat ${config.getString(s"system.$configKey.config.env.HADOOP_PID_DIR")}/hadoop-*-resourcemanager.pid` """) == 0 || + (shell ! s""" ps -p `cat ${config.getString(s"system.$configKey.config.env.HADOOP_PID_DIR")}/hadoop-*-nodemanager.pid` """) == 0 + } +} From 9871cf43a4cff373b7e9a2730007f2192e1c904d Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Wed, 14 Aug 2019 16:35:43 +0200 Subject: [PATCH 08/25] Added support for Flink 1.7.2 --- .../main/resources/reference.flink-1.7.2.conf | 19 ++ .../flink/1.7/conf/flink-conf.yaml.mustache | 247 ++++++++++++++++++ .../flink/1.7/conf/log4j.properties.mustache | 38 +++ .../flink/beans/system/Flink.scala | 9 +- 4 files changed, 310 insertions(+), 3 deletions(-) create mode 100644 peel-extensions/src/main/resources/reference.flink-1.7.2.conf create mode 100644 peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache create mode 100644 peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache diff --git a/peel-extensions/src/main/resources/reference.flink-1.7.2.conf b/peel-extensions/src/main/resources/reference.flink-1.7.2.conf new file mode 100644 index 00000000..ddf4d250 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-1.7.2.conf @@ -0,0 +1,19 @@ +# include common flink configuration +include "reference.flink.conf" + +system { + flink { + path { + archive.url = "http://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop28-scala_2.12.tgz" + archive.md5 = "b9469d4d166520ec767bac0d82c165a7" + archive.src = ${app.path.downloads}"/flink-1.7.2-bin-hadoop28-scala_2.12.tgz" + home = ${system.flink.path.archive.dst}"/flink-1.7.2" + } + config { + # flink.yaml entries + yaml { + env.pid.dir = "/tmp/flink-1.7.2-pid" + } + } + } +} diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache new file mode 100644 index 00000000..7f6b2fc0 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache @@ -0,0 +1,247 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +#============================================================================== +# Common +#============================================================================== + +# The external address of the host on which the JobManager runs and can be +# reached by the TaskManagers and any clients which want to connect. This setting +# is only used in Standalone mode and may be overwritten on the JobManager side +# by specifying the --host parameter of the bin/jobmanager.sh executable. +# In high availability mode, if you use the bin/start-cluster.sh script and setup +# the conf/masters file, this will be taken care of automatically. Yarn/Mesos +# automatically configure the host name based on the hostname of the node where the +# JobManager runs. + +{{#jobmanager.rpc.address}}jobmanager.rpc.address: {{jobmanager.rpc.address}}{{/jobmanager.rpc.address}}{{^jobmanager.rpc.address}}jobmanager.rpc.address: localhost{{/jobmanager.rpc.address}} + +# The RPC port where the JobManager is reachable. + +{{#jobmanager.rpc.port}}jobmanager.rpc.port: {{jobmanager.rpc.port}}{{/jobmanager.rpc.port}}{{^jobmanager.rpc.port}}jobmanager.rpc.port: 6123{{/jobmanager.rpc.port}} + + +# The heap size for the JobManager JVM + +{{#jobmanager.heap.size}}jobmanager.heap.size: {{jobmanager.heap.size}}{{/jobmanager.heap.size}}{{^jobmanager.heap.size}}jobmanager.heap.size: 1024m{{/jobmanager.heap.size}} + + +# The heap size for the TaskManager JVM + +{{#taskmanager.heap.size}}taskmanager.heap.size: {{taskmanager.heap.size}}{{/taskmanager.heap.size}}{{^taskmanager.heap.size}}taskmanager.heap.size: 1024m{{/taskmanager.heap.size}} + + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + +{{#taskmanager.numberOfTaskSlots}}taskmanager.numberOfTaskSlots: {{taskmanager.numberOfTaskSlots}}{{/taskmanager.numberOfTaskSlots}}{{^taskmanager.numberOfTaskSlots}}taskmanager.numberOfTaskSlots: 1{{/taskmanager.numberOfTaskSlots}} + +# The parallelism used for programs that did not specify and other parallelism. + +{{#parallelism.default}}parallelism.default: {{parallelism.default}}{{/parallelism.default}}{{^parallelism.default}}parallelism.default: 1{{/parallelism.default}} + +# The default file system scheme and authority. +# +# By default file paths without scheme are interpreted relative to the local +# root file system 'file:///'. Use this to override the default and interpret +# relative paths relative to a different file system, +# for example 'hdfs://mynamenode:12345' +# +# fs.default-scheme + +#============================================================================== +# High Availability +#============================================================================== + +# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# +{{#high-availability}}high-availability: {{high-availability}}{{/high-availability}}{{^high-availability}}# high-availability: zookeeper{{/high-availability}} + +# The path where metadata for master recovery is persisted. While ZooKeeper stores +# the small ground truth for checkpoint and leader election, this location stores +# the larger objects, like persisted dataflow graphs. +# +# Must be a durable file system that is accessible from all nodes +# (like HDFS, S3, Ceph, nfs, ...) +# +{{#high-availability.storageDir}}high-availability.storageDir: {{high-availability.storageDir}}{{/high-availability.storageDir}}{{^high-availability.storageDir}}# high-availability.storageDir: hdfs:///flink/ha/{{/high-availability.storageDir}} + +# The list of ZooKeeper quorum peers that coordinate the high-availability +# setup. This must be a list of the form: +# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# +{{#high-availability.zookeeper.quorum}}high-availability.zookeeper.quorum: {{high-availability.zookeeper.quorum}}{{/high-availability.zookeeper.quorum}}{{^high-availability.zookeeper.quorum}}# high-availability.zookeeper.quorum: localhost:2181{{/high-availability.zookeeper.quorum}} + + +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +{{#high-availability.zookeeper.client.acl}}high-availability.zookeeper.client.acl: {{high-availability.zookeeper.client.acl}}{{/high-availability.zookeeper.client.acl}}{{^high-availability.zookeeper.client.acl}}# high-availability.zookeeper.client.acl: open{{/high-availability.zookeeper.client.acl}} + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. +# +# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the +# . +# +{{#state.backend}}state.backend: {{state.backend}}{{/state.backend}}{{^state.backend}}# state.backend: filesystem{{/state.backend}} + +# Directory for checkpoints filesystem, when using any of the default bundled +# state backends. +# +{{#state.checkpoints.dir}}state.checkpoints.dir: {{state.checkpoints.dir}}{{/state.checkpoints.dir}}{{^state.checkpoints.dir}}# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints{{/state.checkpoints.dir}} + +# Default target directory for savepoints, optional. +# +{{#state.savepoints.dir}}state.savepoints.dir: {{state.savepoints.dir}}{{/state.savepoints.dir}}{{^state.savepoints.dir}}# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints{{/state.savepoints.dir}} + +# Flag to enable/disable incremental checkpoints for backends that +# support incremental checkpoints (like the RocksDB state backend). +# +{{#state.backend.incremental}}state.backend.incremental: {{state.backend.incremental}}{{/state.backend.incremental}}{{^state.backend.incremental}}# state.backend.incremental: false{{/state.backend.incremental}} + +#============================================================================== +# Web Frontend +#============================================================================== + +# The address under which the web-based runtime monitor listens. +# +#web.address: 0.0.0.0 + +# The port under which the web-based runtime monitor listens. +# A value of -1 deactivates the web server. + +{{#rest.port}}rest.port: {{rest.port}}{{/rest.port}}{{^rest.port}}rest.port: 8081{{/rest.port}} + +# Flag to specify whether job submission is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.submit.enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# Override the directories for temporary files. If not specified, the +# system-specific Java temporary directory (java.io.tmpdir property) is taken. +# +# For framework setups on Yarn or Mesos, Flink will automatically pick up the +# containers' temp directories without any need for configuration. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +{{#Note}}Note: {{Note}}{{/Note}}{{^Note}}# Note: Each directory entry is read from and written to by a different I/O{{/Note}} +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +{{#io.tmp.dirs}}io.tmp.dirs: {{io.tmp.dirs}}{{/io.tmp.dirs}}{{^io.tmp.dirs}}# io.tmp.dirs: /tmp{{/io.tmp.dirs}} + +# Specify whether TaskManager's managed memory should be allocated when starting +# up (true) or when memory is requested. +# +# We recommend to set this value to 'true' only in setups for pure batch +# processing (DataSet API). Streaming setups currently do not use the TaskManager's +{{#managed memory}}managed memory: {{managed memory}}{{/managed memory}}{{^managed memory}}# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,{{/managed memory}} +# while the 'memory' and 'filesystem' backends explicitly keep data as objects +# to save on serialization cost. +# +{{#taskmanager.memory.preallocate}}taskmanager.memory.preallocate: {{taskmanager.memory.preallocate}}{{/taskmanager.memory.preallocate}}{{^taskmanager.memory.preallocate}}# taskmanager.memory.preallocate: false{{/taskmanager.memory.preallocate}} + +# The classloading resolve order. Possible values are 'child-first' (Flink's default) +# and 'parent-first' (Java's default). +# +# Child first classloading allows users to use different dependency/library +# versions in their application than those in the classpath. Switching back +# to 'parent-first' may help with debugging dependency issues. +# +{{#classloader.resolve-order}}classloader.resolve-order: {{classloader.resolve-order}}{{/classloader.resolve-order}}{{^classloader.resolve-order}}# classloader.resolve-order: child-first{{/classloader.resolve-order}} + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, teh default max is 1GB. +# +{{#taskmanager.network.memory.fraction}}taskmanager.network.memory.fraction: {{taskmanager.network.memory.fraction}}{{/taskmanager.network.memory.fraction}}{{^taskmanager.network.memory.fraction}}# taskmanager.network.memory.fraction: 0.1{{/taskmanager.network.memory.fraction}} +{{#taskmanager.network.memory.min}}taskmanager.network.memory.min: {{taskmanager.network.memory.min}}{{/taskmanager.network.memory.min}}{{^taskmanager.network.memory.min}}# taskmanager.network.memory.min: 64mb{{/taskmanager.network.memory.min}} +{{#taskmanager.network.memory.max}}taskmanager.network.memory.max: {{taskmanager.network.memory.max}}{{/taskmanager.network.memory.max}}{{^taskmanager.network.memory.max}}# taskmanager.network.memory.max: 1gb{{/taskmanager.network.memory.max}} + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# The below configure how Kerberos credentials are provided. A keytab will be used instead of +# a ticket cache if the keytab path and principal are set. + +{{#security.kerberos.login.use-ticket-cache}}security.kerberos.login.use-ticket-cache: {{security.kerberos.login.use-ticket-cache}}{{/security.kerberos.login.use-ticket-cache}}{{^security.kerberos.login.use-ticket-cache}}# security.kerberos.login.use-ticket-cache: true{{/security.kerberos.login.use-ticket-cache}} +{{#security.kerberos.login.keytab}}security.kerberos.login.keytab: {{security.kerberos.login.keytab}}{{/security.kerberos.login.keytab}}{{^security.kerberos.login.keytab}}# security.kerberos.login.keytab: /path/to/kerberos/keytab{{/security.kerberos.login.keytab}} +{{#security.kerberos.login.principal}}security.kerberos.login.principal: {{security.kerberos.login.principal}}{{/security.kerberos.login.principal}}{{^security.kerberos.login.principal}}# security.kerberos.login.principal: flink-user{{/security.kerberos.login.principal}} + +# The configuration below defines which JAAS login contexts + +{{#security.kerberos.login.contexts}}security.kerberos.login.contexts: {{security.kerberos.login.contexts}}{{/security.kerberos.login.contexts}}{{^security.kerberos.login.contexts}}# security.kerberos.login.contexts: Client,KafkaClient{{/security.kerberos.login.contexts}} + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# Below configurations are applicable if ZK ensemble is configured for security + +# Override below configuration to provide custom ZK service name if configured +{{#zookeeper.sasl.service-name}}zookeeper.sasl.service-name: {{zookeeper.sasl.service-name}}{{/zookeeper.sasl.service-name}}{{^zookeeper.sasl.service-name}}# zookeeper.sasl.service-name: zookeeper{{/zookeeper.sasl.service-name}} + +# The configuration below must match one of the values set in "security.kerberos.login.contexts" +{{#zookeeper.sasl.login-context-name}}zookeeper.sasl.login-context-name: {{zookeeper.sasl.login-context-name}}{{/zookeeper.sasl.login-context-name}}{{^zookeeper.sasl.login-context-name}}# zookeeper.sasl.login-context-name: Client{{/zookeeper.sasl.login-context-name}} + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) + +# Directory to upload completed jobs to. Add this directory to the list of +# monitored directories of the HistoryServer as well (see below). +#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ + +# The address under which the web-based HistoryServer listens. +#historyserver.web.address: 0.0.0.0 + +# The port under which the web-based HistoryServer listens. +#historyserver.web.port: 8082 + +# Comma separated list of directories to monitor for completed jobs. +#historyserver.archive.fs.dir: hdfs:///completed-jobs/ + +# Interval in milliseconds for refreshing the monitored directories. +#historyserver.archive.fs.refresh-interval: 10000 + +# The directory where the PID files are stored +{{#env.pid.dir}}env.pid.dir: {{env.pid.dir}}{{/env.pid.dir}}{{^env.pid.dir}}env.pid.dir: /tmp{{/env.pid.dir}} diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache new file mode 100644 index 00000000..78f7a22e --- /dev/null +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache @@ -0,0 +1,38 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This affects logging for both user code and Flink +log4j.rootLogger=INFO, file + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +log4j.logger.akka=INFO +log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO + +# Log all infos in the given file +{{#log4j.appender.file._root_}}log4j.appender.file={{log4j.appender.file._root_}}{{/log4j.appender.file._root_}}{{^log4j.appender.file._root_}}log4j.appender.file=org.apache.log4j.FileAppender{{/log4j.appender.file._root_}} +{{#log4j.appender.file.file}}log4j.appender.file.file={{log4j.appender.file.file}}{{/log4j.appender.file.file}}{{^log4j.appender.file.file}}log4j.appender.file.file=${log.file}{{/log4j.appender.file.file}} +{{#log4j.appender.file.append}}log4j.appender.file.append={{log4j.appender.file.append}}{{/log4j.appender.file.append}}{{^log4j.appender.file.append}}log4j.appender.file.append=false{{/log4j.appender.file.append}} +{{#log4j.appender.file.layout._root_}}log4j.appender.file.layout={{log4j.appender.file.layout._root_}}{{/log4j.appender.file.layout._root_}}{{^log4j.appender.file.layout._root_}}log4j.appender.file.layout=org.apache.log4j.PatternLayout{{/log4j.appender.file.layout._root_}} +{{#log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern={{log4j.appender.file.layout.ConversionPattern}}{{/log4j.appender.file.layout.ConversionPattern}}{{^log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n{{/log4j.appender.file.layout.ConversionPattern}} + +# suppress the irrelevant (wrong) warnings from the netty channel handler +{{#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline={{log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{^log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}} diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala index 60393578..7126fdc1 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala @@ -62,6 +62,8 @@ class Flink( override protected def logFilePatterns(): Seq[Regex] = { val user = Pattern.quote(config.getString(s"system.$configKey.user")) hosts.map(Pattern.quote).flatMap(host => Seq( + s"flink-$user-standalonesession-\\d+-$host\\.log".r, + s"flink-$user-standalonesession-\\d+-$host\\.out".r, s"flink-$user-jobmanager-\\d+-$host\\.log".r, s"flink-$user-jobmanager-\\d+-$host\\.out".r, s"flink-$user-taskmanager-\\d+-$host\\.log".r, @@ -120,12 +122,13 @@ class Flink( // wait a bit Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) // get new values - if (Version(version) <= Version("0.6")) + if (Version(version) < Version("0.6")) { curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Creating instance' | wc -l""").trim()) - else if (Version(version) <= Version("1.6")) + } else if (Version(version) < Version("1.6")) { curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Registered TaskManager' | wc -l""").trim()) - else + } else { curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-standalonesession-*.log | grep 'Registering TaskManager' | wc -l""").trim()) + } // timeout if counter goes below zero cntr = cntr - 1 if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") From 02983c598749fe4129645723a584394f26667276 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 15 Aug 2019 17:21:03 +0200 Subject: [PATCH 09/25] Traverse dependency graphs in BFS order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let’s assume that Flink is setup with YARN and HDFS as dependencies. I.e., the dependency graph looks like this: flink-1.7.2 -> yarn-3.1.1 flink-1.7.2 -> hdfs-3.1.1 hdfs-3.1.1 -> () yarn-3.1.1 -> () Do determine the order in which dependencies are setup, the graph is reversed. hdfs-3.1.1 -> flink-1.7.2 yarn-3.1.1 -> flink-1.7.2 flink-1.7.2 -> () The graph is then traversed by starting with the nodes with in-degree > 0 and adding their dependencies to the list of nodes to visit. If DFS order is used, the following activation order is possible: hdfs-3.1.1, flink-1.7.2, yarn-3.1.1 That is because the traversal starts with hdfs-3.1.1 and then follows the edge to flink-1.7.2 before continuing with yarn-3.1.1. However, if a Flink YARN session is used, then Flink needs to connect to YARN at startup. Therefore, all dependencies of Flink have to be activated before it. This is achieved by traversing the graph in BFS order. --- .../scala/org/peelframework/core/graph/DependencyGraph.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala b/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala index 465173c4..03867783 100644 --- a/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala +++ b/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala @@ -173,7 +173,7 @@ class DependencyGraph[T: ClassTag] { throw new Exception("Cannot reverse empty Graph!") } - /** Collects descendants in a depth-first manner starting from the given set. + /** Collects descendants in a breadth-first manner starting from the given set. * * @param toVisit A set of nodes that are yet to be visited. * @param visited A list of already visited nodes. @@ -191,7 +191,7 @@ class DependencyGraph[T: ClassTag] { case x: Any => !visited.contains(x) } - collect(children ++ toVisit.tail, next :: visited, excluded) + collect(toVisit.tail ++ children, next :: visited, excluded) } } From e3ffb26eaad1c461b3584f4899751c780c95fa90 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 15 Aug 2019 17:26:35 +0200 Subject: [PATCH 10/25] Introduce common base class for Flink systems --- .../scala/org/peelframework/extensions.scala | 43 ++--- .../flink/beans/system/Flink.scala | 118 +------------ .../beans/system/FlinkStandaloneCluster.scala | 158 ++++++++++++++++++ 3 files changed, 183 insertions(+), 136 deletions(-) create mode 100644 peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala diff --git a/peel-extensions/src/main/scala/org/peelframework/extensions.scala b/peel-extensions/src/main/scala/org/peelframework/extensions.scala index a5583be2..17b38245 100644 --- a/peel-extensions/src/main/scala/org/peelframework/extensions.scala +++ b/peel-extensions/src/main/scala/org/peelframework/extensions.scala @@ -19,6 +19,7 @@ import com.samskivert.mustache.Mustache import org.peelframework.core.beans.system.Lifespan import org.peelframework.dstat.beans.system.Dstat import org.peelframework.flink.beans.system.Flink +import org.peelframework.flink.beans.system.FlinkStandaloneCluster import org.peelframework.hadoop.beans.system.{HDFS2, Yarn} import org.peelframework.spark.beans.system.Spark import org.peelframework.zookeeper.beans.system.Zookeeper @@ -120,7 +121,7 @@ class extensions extends ApplicationContextAware { // Flink @Bean(name = Array("flink-0.8.0")) - def `flink-0.8.0`: Flink = new Flink( + def `flink-0.8.0`: FlinkStandaloneCluster = new FlinkStandaloneCluster( version = "0.8.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -128,7 +129,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.8.1")) - def `flink-0.8.1`: Flink = new Flink( + def `flink-0.8.1`: Flink = new FlinkStandaloneCluster( version = "0.8.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -136,7 +137,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.9.0")) - def `flink-0.9.0`: Flink = new Flink( + def `flink-0.9.0`: Flink = new FlinkStandaloneCluster( version = "0.9.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -144,7 +145,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.10.0")) - def `flink-0.10.0`: Flink = new Flink( + def `flink-0.10.0`: Flink = new FlinkStandaloneCluster( version = "0.10.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -152,7 +153,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.10.1")) - def `flink-0.10.1`: Flink = new Flink( + def `flink-0.10.1`: Flink = new FlinkStandaloneCluster( version = "0.10.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -160,7 +161,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.10.2")) - def `flink-0.10.2`: Flink = new Flink( + def `flink-0.10.2`: Flink = new FlinkStandaloneCluster( version = "0.10.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -168,7 +169,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.0")) - def `flink-1.0.0`: Flink = new Flink( + def `flink-1.0.0`: Flink = new FlinkStandaloneCluster( version = "1.0.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -176,7 +177,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.1")) - def `flink-1.0.1`: Flink = new Flink( + def `flink-1.0.1`: Flink = new FlinkStandaloneCluster( version = "1.0.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -184,7 +185,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.2")) - def `flink-1.0.2`: Flink = new Flink( + def `flink-1.0.2`: Flink = new FlinkStandaloneCluster( version = "1.0.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -192,7 +193,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.3")) - def `flink-1.0.3`: Flink = new Flink( + def `flink-1.0.3`: Flink = new FlinkStandaloneCluster( version = "1.0.3", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -200,7 +201,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.0")) - def `flink-1.1.0`: Flink = new Flink( + def `flink-1.1.0`: Flink = new FlinkStandaloneCluster( version = "1.1.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -208,7 +209,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.1")) - def `flink-1.1.1`: Flink = new Flink( + def `flink-1.1.1`: Flink = new FlinkStandaloneCluster( version = "1.1.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -216,7 +217,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.2")) - def `flink-1.1.2`: Flink = new Flink( + def `flink-1.1.2`: Flink = new FlinkStandaloneCluster( version = "1.1.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -224,7 +225,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.3")) - def `flink-1.1.3`: Flink = new Flink( + def `flink-1.1.3`: Flink = new FlinkStandaloneCluster( version = "1.1.3", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -232,7 +233,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.4")) - def `flink-1.1.4`: Flink = new Flink( + def `flink-1.1.4`: Flink = new FlinkStandaloneCluster( version = "1.1.4", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -240,7 +241,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.2.0")) - def `flink-1.2.0`: Flink = new Flink( + def `flink-1.2.0`: Flink = new FlinkStandaloneCluster( version = "1.2.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -248,7 +249,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.2.1")) - def `flink-1.2.1`: Flink = new Flink( + def `flink-1.2.1`: Flink = new FlinkStandaloneCluster( version = "1.2.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -256,7 +257,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.3.0")) - def `flink-1.3.0`: Flink = new Flink( + def `flink-1.3.0`: Flink = new FlinkStandaloneCluster( version = "1.3.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -264,7 +265,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.3.1")) - def `flink-1.3.1`: Flink = new Flink( + def `flink-1.3.1`: Flink = new FlinkStandaloneCluster( version = "1.3.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -272,7 +273,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.3.2")) - def `flink-1.3.2`: Flink = new Flink( + def `flink-1.3.2`: Flink = new FlinkStandaloneCluster( version = "1.3.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -280,7 +281,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.4.0")) - def `flink-1.4.0`: Flink = new Flink( + def `flink-1.4.0`: Flink = new FlinkStandaloneCluster( version = "1.4.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala index 7126fdc1..3c6afada 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala @@ -30,9 +30,9 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} import scala.util.matching.Regex -/** Wrapper class for Flink. +/** Base class for Flink systems. * - * Implements Flink as a Peel `System` and provides setup and teardown methods. + * Common base class to implement Flink Cluster and Flink YARN session environments as a Peel `System` and provides setup and teardown methods. * * @param version Version of the system (e.g. "7.1") * @param configKey The system configuration resides under `system.\${configKey}` @@ -40,7 +40,7 @@ import scala.util.matching.Regex * @param dependencies Set of dependencies that this system needs * @param mc The moustache compiler to compile the templates that are used to generate property files for the system */ -class Flink( +abstract class Flink( version : String, configKey : String, lifespan : Lifespan, @@ -48,116 +48,4 @@ class Flink( mc : Mustache.Compiler) extends System("flink", version, configKey, lifespan, dependencies, mc) with DistributedLogCollection { - // --------------------------------------------------- - // LogCollection. - // --------------------------------------------------- - - override def hosts = { - val master = config.getString(s"system.$configKey.config.yaml.jobmanager.rpc.address") - val slaves = config.getStringList(s"system.$configKey.config.slaves").asScala - master +: slaves - } - - /** The patterns of the log files to watch. */ - override protected def logFilePatterns(): Seq[Regex] = { - val user = Pattern.quote(config.getString(s"system.$configKey.user")) - hosts.map(Pattern.quote).flatMap(host => Seq( - s"flink-$user-standalonesession-\\d+-$host\\.log".r, - s"flink-$user-standalonesession-\\d+-$host\\.out".r, - s"flink-$user-jobmanager-\\d+-$host\\.log".r, - s"flink-$user-jobmanager-\\d+-$host\\.out".r, - s"flink-$user-taskmanager-\\d+-$host\\.log".r, - s"flink-$user-taskmanager-\\d+-$host\\.out".r)) - } - - // --------------------------------------------------- - // System. - // --------------------------------------------------- - - override def configuration() = SystemConfig(config, { - val conf = config.getString(s"system.$configKey.path.config") - List( - SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), - SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.yaml", s"$conf/flink-conf.yaml", templatePath("conf/flink-conf.yaml"), mc), - SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.log4j", s"$conf/log4j.properties", templatePath("conf/log4j.properties"), mc) - ) - }) - - override protected def start(): Unit = { - val user = config.getString(s"system.$configKey.user") - val logDir = config.getString(s"system.$configKey.path.log") - - val init = (host: String, paths: Seq[String]) => { - val cmd = paths.map(path => s"rm -Rf $path && mkdir -p $path").mkString(" && ") - s""" ssh $user@$host "$cmd" """ - } - - val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala - val paths = config.getString(s"system.$configKey.config.yaml.taskmanager.tmp.dirs").split(':') - - val futureInitOps = Future.traverse(hosts)(host => Future { - logger.info(s"Initializing Flink tmp directories '${paths.mkString(":")}' at $host") - shell ! (init(host, paths), s"Unable to initialize Flink tmp directories '${paths.mkString(":")}' at $host.") - }) - - // await for all futureInitOps to finish - Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) - - var failedStartUpAttempts = 0 - while (!isUp) { - try { - val totl = config.getStringList(s"system.$configKey.config.slaves").size() - val init = 0 // Flink resets the job manager log on startup - - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-cluster.sh" - if (Version(version) < Version("1.0")) { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-webclient.sh" - } - logger.info(s"Waiting for nodes to connect") - - var curr = init - var cntr = config.getInt(s"system.$configKey.startup.polling.counter") - while (curr - init < totl) { - logger.info(s"Connected ${curr - init} from $totl nodes") - // wait a bit - Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) - // get new values - if (Version(version) < Version("0.6")) { - curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Creating instance' | wc -l""").trim()) - } else if (Version(version) < Version("1.6")) { - curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Registered TaskManager' | wc -l""").trim()) - } else { - curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-standalonesession-*.log | grep 'Registering TaskManager' | wc -l""").trim()) - } - // timeout if counter goes below zero - cntr = cntr - 1 - if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") - } - isUp = true - } catch { - case e: SetUpTimeoutException => - failedStartUpAttempts = failedStartUpAttempts + 1 - if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" - logger.info(s"Could not bring system '$toString' up in time, trying again...") - } else { - throw e - } - } - } - } - - override protected def stop() = { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" - if (Version(version) < Version("1.0")) { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" - } - shell ! s"rm -f ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid" - isUp = false - } - - def isRunning = { - (shell ! s"""ps -p `cat ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid`""") == 0 - } } diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala new file mode 100644 index 00000000..a8ad31a1 --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.flink.beans.system + +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.{Version, shell} + +import scala.collection.JavaConverters._ +import scala.collection.Seq +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.matching.Regex + +/** Wrapper class for Flink. + * + * Implements a Flink standalone cluster as a Peel `System` and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}` + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class FlinkStandaloneCluster( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends Flink(version, configKey, lifespan, dependencies, mc) { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + override def hosts = { + val master = config.getString(s"system.$configKey.config.yaml.jobmanager.rpc.address") + val slaves = config.getStringList(s"system.$configKey.config.slaves").asScala + master +: slaves + } + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + hosts.map(Pattern.quote).flatMap(host => Seq( + s"flink-$user-standalonesession-\\d+-$host\\.log".r, + s"flink-$user-standalonesession-\\d+-$host\\.out".r, + s"flink-$user-jobmanager-\\d+-$host\\.log".r, + s"flink-$user-jobmanager-\\d+-$host\\.out".r, + s"flink-$user-taskmanager-\\d+-$host\\.log".r, + s"flink-$user-taskmanager-\\d+-$host\\.out".r)) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.yaml", s"$conf/flink-conf.yaml", templatePath("conf/flink-conf.yaml"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.log4j", s"$conf/log4j.properties", templatePath("conf/log4j.properties"), mc) + ) + }) + + override protected def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + val init = (host: String, paths: Seq[String]) => { + val cmd = paths.map(path => s"rm -Rf $path && mkdir -p $path").mkString(" && ") + s""" ssh $user@$host "$cmd" """ + } + + val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala + val paths = config.getString(s"system.$configKey.config.yaml.taskmanager.tmp.dirs").split(':') + + val futureInitOps = Future.traverse(hosts)(host => Future { + logger.info(s"Initializing Flink tmp directories '${paths.mkString(":")}' at $host") + shell ! (init(host, paths), s"Unable to initialize Flink tmp directories '${paths.mkString(":")}' at $host.") + }) + + // await for all futureInitOps to finish + Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) + + var failedStartUpAttempts = 0 + while (!isUp) { + try { + val totl = config.getStringList(s"system.$configKey.config.slaves").size() + val init = 0 // Flink resets the job manager log on startup + + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-cluster.sh" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-webclient.sh" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < totl) { + logger.info(s"Connected ${curr - init} from $totl nodes") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + if (Version(version) < Version("0.6")) { + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Creating instance' | wc -l""").trim()) + } else if (Version(version) < Version("1.6")) { + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Registered TaskManager' | wc -l""").trim()) + } else { + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-standalonesession-*.log | grep 'Registering TaskManager' | wc -l""").trim()) + } + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + } + + override protected def stop() = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" + shell ! s"rm -f ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid" + isUp = false + } + + def isRunning = { + (shell ! s"""ps -p `cat ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid`""") == 0 + } +} From c90894f80d4b73d852f4ae064d6e9593194fc8c3 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 15 Aug 2019 17:26:50 +0200 Subject: [PATCH 11/25] Support for Flink YARN sessions --- .../resources/reference.flink-yarn-1.7.2.conf | 2 + .../log4j-yarn-session.properties.mustache | 43 +++++ .../flink/beans/system/FlinkYarnSession.scala | 148 ++++++++++++++++++ 3 files changed, 193 insertions(+) create mode 100644 peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf create mode 100644 peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache create mode 100644 peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala diff --git a/peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf new file mode 100644 index 00000000..1e7c71c8 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf @@ -0,0 +1,2 @@ +# include common flink configuration +include "reference.flink-1.7.2.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache new file mode 100644 index 00000000..63331c44 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache @@ -0,0 +1,43 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This affects logging for both user code and Flink +log4j.rootLogger=INFO, file + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +log4j.logger.akka=INFO +log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO + +# Log all infos in the given file +{{#log4j.appender.file._root_}}log4j.appender.file={{log4j.appender.file._root_}}{{/log4j.appender.file._root_}}{{^log4j.appender.file._root_}}log4j.appender.file=org.apache.log4j.FileAppender{{/log4j.appender.file._root_}} +{{#log4j.appender.file.file}}log4j.appender.file.file={{log4j.appender.file.file}}{{/log4j.appender.file.file}}{{^log4j.appender.file.file}}log4j.appender.file.file=${log.file}{{/log4j.appender.file.file}} +{{#log4j.appender.file.append}}log4j.appender.file.append={{log4j.appender.file.append}}{{/log4j.appender.file.append}}{{^log4j.appender.file.append}}log4j.appender.file.append=false{{/log4j.appender.file.append}} +{{#log4j.appender.file.layout._root_}}log4j.appender.file.layout={{log4j.appender.file.layout._root_}}{{/log4j.appender.file.layout._root_}}{{^log4j.appender.file.layout._root_}}log4j.appender.file.layout=org.apache.log4j.PatternLayout{{/log4j.appender.file.layout._root_}} +{{#log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern={{log4j.appender.file.layout.ConversionPattern}}{{/log4j.appender.file.layout.ConversionPattern}}{{^log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n{{/log4j.appender.file.layout.ConversionPattern}} + +# suppress the irrelevant (wrong) warnings from the netty channel handler +{{#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline={{log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{^log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}} +{{#log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline={{log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}{{/log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}{{^log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, stdout{{/log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}} +{{#log4j.logger.org.apache.zookeeper}}log4j.logger.org.apache.zookeeper={{log4j.logger.org.apache.zookeeper}}{{/log4j.logger.org.apache.zookeeper}}{{^log4j.logger.org.apache.zookeeper}}log4j.logger.org.apache.zookeeper=WARN, stdout{{/log4j.logger.org.apache.zookeeper}} +{{#log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}log4j.logger.org.apache.flink.shaded.org.apache.curator.framework={{log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}{{/log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}{{^log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}log4j.logger.org.apache.flink.shaded.org.apache.curator.framework=WARN, stdout{{/log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}} +{{#log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils={{log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}{{/log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}{{^log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils=WARN, stdout{{/log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}} +{{#log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService={{log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}{{/log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}{{^log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService=WARN, stdout{{/log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}} diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala new file mode 100644 index 00000000..e55a5d58 --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala @@ -0,0 +1,148 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.flink.beans.system + +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.{Version, shell} + +import scala.collection.JavaConverters._ +import scala.collection.Seq +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.matching.Regex + +/** Wrapper class for a Flink YARN session. + * + * Implements a Flink YARN session as a Peel `System` and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}` + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class FlinkYarnSession( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends Flink(version, configKey, lifespan, dependencies, mc) { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + override def hosts: Seq[String] = Seq(config.getString(s"system.$configKey.config.yaml.jobmanager.rpc.address")) + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + hosts.map(Pattern.quote).flatMap(host => Seq( + s"flink-$user-yarn-session-\\d+-$host\\.log".r, + s"flink-$user-yarn-session-\\d+-$host\\.out".r)) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.yaml", s"$conf/flink-conf.yaml", templatePath("conf/flink-conf.yaml"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.log4j", s"$conf/log4j-yarn-session.properties", templatePath("conf/log4j-yarn-session.properties"), mc) + ) + }) + + override protected def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + val init = (host: String, paths: Seq[String]) => { + val cmd = paths.map(path => s"rm -Rf $path && mkdir -p $path").mkString(" && ") + s""" ssh $user@$host "$cmd" """ + } + + val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala + val paths = config.getString(s"system.$configKey.config.yaml.taskmanager.tmp.dirs").split(':') + + val futureInitOps = Future.traverse(hosts)(host => Future { + logger.info(s"Initializing Flink tmp directories '${paths.mkString(":")}' at $host") + shell ! (init(host, paths), s"Unable to initialize Flink tmp directories '${paths.mkString(":")}' at $host.") + }) + + // await for all futureInitOps to finish + Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) + + var failedStartUpAttempts = 0 + while (!isUp) { + try { + var done = false + + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/yarn-session.sh -d" + + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (!done) { + logger.info(s"Waiting for session to start") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + done = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-yarn-session-*.log | grep 'YARN application has been deployed successfully.' | wc -l""").trim) == 1 + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + // Flink yarn-session.sh does not create the Flink PID directory (that happens in config.sh and flink-daemon.sh). + // However, the application ID is stored in /tmp/.yarn-properties-$user + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + // should Peel try to stop the system here? + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + shell ! s"""rm -f /tmp/.yarn-properties-$user""" + throw e + } + } + } + } + + override protected def stop(): Unit = { + val user = config.getString(s"system.$configKey.user") + val appId = (shell !! s"""grep applicationID /tmp/.yarn-properties-$user | sed -e 's/applicationID=\\(.*\\).*/\\1/'""").trim() + shell ! s"""echo quit | ${config.getString(s"system.$configKey.path.home")}/bin/yarn-session.sh -id $appId""" + if (isRunning) { + logger.warn(s"Flink YARN session still appears to be running after attempted shutdown (file /tmp/.yarn-properties-$user exists)") + shell ! s"rm -f /tmp/.yarn-properties-$user" + } + isUp = false + } + + def isRunning: Boolean = { + // maybe query YARN rest API + val user = config.getString(s"system.$configKey.user") + (shell ! s"""ls /tmp/.yarn-properties-$user""") == 0 + } +} From 6432d67d4a5ae601beb15d1fe262109dd359a84f Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Mon, 2 Sep 2019 18:17:13 +0200 Subject: [PATCH 12/25] Predefined beans for Hadoop 3.1.1 and Flink 1.7.2 --- .../scala/org/peelframework/extensions.scala | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/peel-extensions/src/main/scala/org/peelframework/extensions.scala b/peel-extensions/src/main/scala/org/peelframework/extensions.scala index 17b38245..5bf8c692 100644 --- a/peel-extensions/src/main/scala/org/peelframework/extensions.scala +++ b/peel-extensions/src/main/scala/org/peelframework/extensions.scala @@ -18,9 +18,8 @@ package org.peelframework import com.samskivert.mustache.Mustache import org.peelframework.core.beans.system.Lifespan import org.peelframework.dstat.beans.system.Dstat -import org.peelframework.flink.beans.system.Flink -import org.peelframework.flink.beans.system.FlinkStandaloneCluster -import org.peelframework.hadoop.beans.system.{HDFS2, Yarn} +import org.peelframework.flink.beans.system.{Flink, FlinkStandaloneCluster, FlinkYarnSession} +import org.peelframework.hadoop.beans.system.{HDFS2, HDFS3, Yarn, Yarn3} import org.peelframework.spark.beans.system.Spark import org.peelframework.zookeeper.beans.system.Zookeeper import org.springframework.context.annotation.{Bean, Configuration} @@ -100,6 +99,14 @@ class extensions extends ApplicationContextAware { mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("hdfs-3.1.1")) + def `hdfs-3.1.1`: HDFS3 = new HDFS3( + version = "3.1.1", + configKey = "hadoop-3", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + // Yarn @Bean(name = Array("yarn-2.4.1")) @@ -118,6 +125,14 @@ class extensions extends ApplicationContextAware { mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("yarn-3.1.1")) + def `yarn-3.1.1`: Yarn3 = new Yarn3( + version = "3.1.1", + configKey = "hadoop-3", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + // Flink @Bean(name = Array("flink-0.8.0")) @@ -288,6 +303,24 @@ class extensions extends ApplicationContextAware { mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("flink-1.7.2")) + def `flink-1.7.2`: Flink = new FlinkStandaloneCluster( + version = "1.7.2", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + // Flink YARN session + + @Bean(name = Array("flink-yarn-1.7.2")) + def `flink-yarn-1.7.2`: Flink = new FlinkYarnSession( + version = "1.7.2", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + // Spark @Bean(name = Array("spark-1.3.1")) From 96ebad0df20aceb9f0340c720f2aec06e22cc6eb Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 5 Sep 2019 11:39:38 +0200 Subject: [PATCH 13/25] Make Model.Yaml an alias for Model.KeyValue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The key-value model is useful for any configuration file where keys are pre-defined and have to be present. Hadoop’s capacity-scheduler.xml is such a file. If it is empty, YARN will not start. However, if we use Model.Site to pre-define default values for the different configuration keys, we loose the helpful descriptions that are stored in the file. Actually, Model.Yaml should be similar to Model.Site and use the key-value pairs stored in the properties HashMap but with different syntax. The flink-conf.yaml file should be generated with Model.KeyValue to keep the helpful comments. --- .../main/scala/org/peelframework/core/config/Model.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala index aa84d3f0..f4409169 100644 --- a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala +++ b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala @@ -99,7 +99,7 @@ object Model { } } - /** A model for (key, value) based yaml files (e.g. conf/flink-conf.yaml). + /** A model for (key, value) based files (e.g. conf/flink-conf.yaml or etc/hadoop/capacity-manager.xml). * * Consists of multiple (key, value) entries which are constructed by collecting all values under * the specified `prefix` path. Intermediate paths are thereby flattened, i.e. @@ -118,7 +118,7 @@ object Model { * @param c The HOCON config to use when constructing the model. * @param prefix The prefix path which has to be rendered. */ - class Yaml(val c: Config, val prefix: String) extends util.HashMap[String, Object] with Model { + class KeyValue(val c: Config, val prefix: String) extends util.HashMap[String, Object] with Model { // constructor { @@ -139,6 +139,11 @@ object Model { } } + /** Alias for Model.KeyValue. + * + */ + class Yaml(override val c: Config, override val prefix: String) extends KeyValue(c, prefix) { } + /** A model for list based hosts files (e.g. etc/hadoop/slaves). * * Consists of a single entry `hosts` which is constructed by extracting the string list value under the From 3023869493924270d2b6cb13e948d9aedf735ea8 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 5 Sep 2019 11:41:10 +0200 Subject: [PATCH 14/25] Generate capacity-scheduler.xml for YARN --- .../main/resources/reference.hadoop-3.x.conf | 2 + .../conf/capacity-scheduler.xml.mustache | 217 ++++++++++++++++++ .../hadoop/beans/system/Yarn3.scala | 3 +- 3 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache diff --git a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf index 74e08314..79f4fdd2 100644 --- a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf +++ b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf @@ -49,6 +49,8 @@ system { mapred { } # yarn-site.xml entries yarn { } + # capacity-scheduler.xml entries + capacity-scheduler { } } } } diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache new file mode 100644 index 00000000..6bda9cac --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache @@ -0,0 +1,217 @@ + + + + + yarn.scheduler.capacity.maximum-applications + {{#yarn.scheduler.capacity.maximum-applications}}{{yarn.scheduler.capacity.maximum-applications}}{{/yarn.scheduler.capacity.maximum-applications}}{{^yarn.scheduler.capacity.maximum-applications}}10000{{/yarn.scheduler.capacity.maximum-applications}} + + Maximum number of applications that can be pending and running. + + + + + yarn.scheduler.capacity.maximum-am-resource-percent + {{#yarn.scheduler.capacity.maximum-am-resource-percent}}{{yarn.scheduler.capacity.maximum-am-resource-percent}}{{/yarn.scheduler.capacity.maximum-am-resource-percent}}{{^yarn.scheduler.capacity.maximum-am-resource-percent}}0.1{{/yarn.scheduler.capacity.maximum-am-resource-percent}} + + Maximum percent of resources in the cluster which can be used to run + application masters i.e. controls number of concurrent running + applications. + + + + + yarn.scheduler.capacity.resource-calculator + {{#yarn.scheduler.capacity.resource-calculator}}{{yarn.scheduler.capacity.resource-calculator}}{{/yarn.scheduler.capacity.resource-calculator}}{{^yarn.scheduler.capacity.resource-calculator}}org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator{{/yarn.scheduler.capacity.resource-calculator}} + + The ResourceCalculator implementation to be used to compare + Resources in the scheduler. + The default i.e. DefaultResourceCalculator only uses Memory while + DominantResourceCalculator uses dominant-resource to compare + multi-dimensional resources such as Memory, CPU etc. + + + + + yarn.scheduler.capacity.root.queues + {{#yarn.scheduler.capacity.root.queues}}{{yarn.scheduler.capacity.root.queues}}{{/yarn.scheduler.capacity.root.queues}}{{^yarn.scheduler.capacity.root.queues}}default{{/yarn.scheduler.capacity.root.queues}} + + The queues at the this level (root is the root queue). + + + + + yarn.scheduler.capacity.root.default.capacity + {{#yarn.scheduler.capacity.root.default.capacity}}{{yarn.scheduler.capacity.root.default.capacity}}{{/yarn.scheduler.capacity.root.default.capacity}}{{^yarn.scheduler.capacity.root.default.capacity}}100{{/yarn.scheduler.capacity.root.default.capacity}} + Default queue target capacity. + + + + yarn.scheduler.capacity.root.default.user-limit-factor + {{#yarn.scheduler.capacity.root.default.user-limit-factor}}{{yarn.scheduler.capacity.root.default.user-limit-factor}}{{/yarn.scheduler.capacity.root.default.user-limit-factor}}{{^yarn.scheduler.capacity.root.default.user-limit-factor}}1{{/yarn.scheduler.capacity.root.default.user-limit-factor}} + + Default queue user limit a percentage from 0.0 to 1.0. + + + + + yarn.scheduler.capacity.root.default.maximum-capacity + {{#yarn.scheduler.capacity.root.default.maximum-capacity}}{{yarn.scheduler.capacity.root.default.maximum-capacity}}{{/yarn.scheduler.capacity.root.default.maximum-capacity}}{{^yarn.scheduler.capacity.root.default.maximum-capacity}}100{{/yarn.scheduler.capacity.root.default.maximum-capacity}} + + The maximum capacity of the default queue. + + + + + yarn.scheduler.capacity.root.default.state + {{#yarn.scheduler.capacity.root.default.state}}{{yarn.scheduler.capacity.root.default.state}}{{/yarn.scheduler.capacity.root.default.state}}{{^yarn.scheduler.capacity.root.default.state}}RUNNING{{/yarn.scheduler.capacity.root.default.state}} + + The state of the default queue. State can be one of RUNNING or STOPPED. + + + + + yarn.scheduler.capacity.root.default.acl_submit_applications + {{#yarn.scheduler.capacity.root.default.acl_submit_applications}}{{yarn.scheduler.capacity.root.default.acl_submit_applications}}{{/yarn.scheduler.capacity.root.default.acl_submit_applications}}{{^yarn.scheduler.capacity.root.default.acl_submit_applications}}*{{/yarn.scheduler.capacity.root.default.acl_submit_applications}} + + The ACL of who can submit jobs to the default queue. + + + + + yarn.scheduler.capacity.root.default.acl_administer_queue + {{#yarn.scheduler.capacity.root.default.acl_administer_queue}}{{yarn.scheduler.capacity.root.default.acl_administer_queue}}{{/yarn.scheduler.capacity.root.default.acl_administer_queue}}{{^yarn.scheduler.capacity.root.default.acl_administer_queue}}*{{/yarn.scheduler.capacity.root.default.acl_administer_queue}} + + The ACL of who can administer jobs on the default queue. + + + + + yarn.scheduler.capacity.root.default.acl_application_max_priority + {{#yarn.scheduler.capacity.root.default.acl_application_max_priority}}{{yarn.scheduler.capacity.root.default.acl_application_max_priority}}{{/yarn.scheduler.capacity.root.default.acl_application_max_priority}}{{^yarn.scheduler.capacity.root.default.acl_application_max_priority}}*{{/yarn.scheduler.capacity.root.default.acl_application_max_priority}} + + The ACL of who can submit applications with configured priority. + For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}] + + + + + yarn.scheduler.capacity.root.default.maximum-application-lifetime + {{#yarn.scheduler.capacity.root.default.maximum-application-lifetime}}{{yarn.scheduler.capacity.root.default.maximum-application-lifetime}}{{/yarn.scheduler.capacity.root.default.maximum-application-lifetime}}{{^yarn.scheduler.capacity.root.default.maximum-application-lifetime}}-1{{/yarn.scheduler.capacity.root.default.maximum-application-lifetime}} + + Maximum lifetime of an application which is submitted to a queue + in seconds. Any value less than or equal to zero will be considered as + disabled. + This will be a hard time limit for all applications in this + queue. If positive value is configured then any application submitted + to this queue will be killed after exceeds the configured lifetime. + User can also specify lifetime per application basis in + application submission context. But user lifetime will be + overridden if it exceeds queue maximum lifetime. It is point-in-time + configuration. + Note : Configuring too low value will result in killing application + sooner. This feature is applicable only for leaf queue. + + + + + yarn.scheduler.capacity.root.default.default-application-lifetime + {{#yarn.scheduler.capacity.root.default.default-application-lifetime}}{{yarn.scheduler.capacity.root.default.default-application-lifetime}}{{/yarn.scheduler.capacity.root.default.default-application-lifetime}}{{^yarn.scheduler.capacity.root.default.default-application-lifetime}}-1{{/yarn.scheduler.capacity.root.default.default-application-lifetime}} + + Default lifetime of an application which is submitted to a queue + in seconds. Any value less than or equal to zero will be considered as + disabled. + If the user has not submitted application with lifetime value then this + value will be taken. It is point-in-time configuration. + Note : Default lifetime can't exceed maximum lifetime. This feature is + applicable only for leaf queue. + + + + + yarn.scheduler.capacity.node-locality-delay + {{#yarn.scheduler.capacity.node-locality-delay}}{{yarn.scheduler.capacity.node-locality-delay}}{{/yarn.scheduler.capacity.node-locality-delay}}{{^yarn.scheduler.capacity.node-locality-delay}}40{{/yarn.scheduler.capacity.node-locality-delay}} + + Number of missed scheduling opportunities after which the CapacityScheduler + attempts to schedule rack-local containers. + When setting this parameter, the size of the cluster should be taken into account. + We use 40 as the default value, which is approximately the number of nodes in one rack. + Note, if this value is -1, the locality constraint in the container request + will be ignored, which disables the delay scheduling. + + + + + yarn.scheduler.capacity.rack-locality-additional-delay + {{#yarn.scheduler.capacity.rack-locality-additional-delay}}{{yarn.scheduler.capacity.rack-locality-additional-delay}}{{/yarn.scheduler.capacity.rack-locality-additional-delay}}{{^yarn.scheduler.capacity.rack-locality-additional-delay}}-1{{/yarn.scheduler.capacity.rack-locality-additional-delay}} + + Number of additional missed scheduling opportunities over the node-locality-delay + ones, after which the CapacityScheduler attempts to schedule off-switch containers, + instead of rack-local ones. + Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler will + attempt rack-local assignments after 40 missed opportunities, and off-switch assignments + after 40+20=60 missed opportunities. + When setting this parameter, the size of the cluster should be taken into account. + We use -1 as the default value, which disables this feature. In this case, the number + of missed opportunities for assigning off-switch containers is calculated based on + the number of containers and unique locations specified in the resource request, + as well as the size of the cluster. + + + + + yarn.scheduler.capacity.queue-mappings + {{#yarn.scheduler.capacity.queue-mappings}}{{yarn.scheduler.capacity.queue-mappings}}{{/yarn.scheduler.capacity.queue-mappings}}{{^yarn.scheduler.capacity.queue-mappings}}{{/yarn.scheduler.capacity.queue-mappings}} + + A list of mappings that will be used to assign jobs to queues + The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]* + Typically this list will be used to map users to queues, + for example, u:%user:%user maps all users to queues with the same name + as the user. + + + + + yarn.scheduler.capacity.queue-mappings-override.enable + {{#yarn.scheduler.capacity.queue-mappings-override.enable}}{{yarn.scheduler.capacity.queue-mappings-override.enable}}{{/yarn.scheduler.capacity.queue-mappings-override.enable}}{{^yarn.scheduler.capacity.queue-mappings-override.enable}}false{{/yarn.scheduler.capacity.queue-mappings-override.enable}} + + If a queue mapping is present, will it override the value specified + by the user? This can be used by administrators to place jobs in queues + that are different than the one specified by the user. + The default is false. + + + + + yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments + {{#yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}{{yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}{{/yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}{{^yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}1{{/yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}} + + Controls the number of OFF_SWITCH assignments allowed + during a node's heartbeat. Increasing this value can improve + scheduling rate for OFF_SWITCH containers. Lower values reduce + "clumping" of applications on particular nodes. The default is 1. + Legal values are 1-MAX_INT. This config is refreshable. + + + + + yarn.scheduler.capacity.application.fail-fast + {{#yarn.scheduler.capacity.application.fail-fast}}{{yarn.scheduler.capacity.application.fail-fast}}{{/yarn.scheduler.capacity.application.fail-fast}}{{^yarn.scheduler.capacity.application.fail-fast}}false{{/yarn.scheduler.capacity.application.fail-fast}} + + Whether RM should fail during recovery if previous applications' + queue is no longer valid. + + + + diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala index f39b0c2a..510be4fc 100644 --- a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala @@ -67,7 +67,8 @@ class Yarn3( SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.yarn", s"$conf/yarn-site.xml", templatePath("conf/site.xml"), mc), - SystemConfig.Entry[Model.Site](s"system.$configKey.config.mapred", s"$conf/mapred-site.xml", templatePath("conf/site.xml"), mc) + SystemConfig.Entry[Model.Site](s"system.$configKey.config.mapred", s"$conf/mapred-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.KeyValue](s"system.$configKey.config.capacity-scheduler", s"$conf/capacity-scheduler.xml", templatePath("conf/capacity-scheduler.xml"), mc) ) }) From dc9e800c67a3d42c0e617c0fb767657c1bfe2898 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 5 Sep 2019 11:40:49 +0200 Subject: [PATCH 15/25] Specify Hadoop configuration in flink-conf.yaml Unfortunately, this does not fix the warning that HADOOP_CONF_DIR or YARN_CONF_DIR are missing in the log file. The Flink documentation prefers to use environment variables over this configuration option. --- .../resources/templates/flink/1.7/conf/flink-conf.yaml.mustache | 1 + 1 file changed, 1 insertion(+) diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache index 7f6b2fc0..cef261f4 100644 --- a/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache @@ -245,3 +245,4 @@ # The directory where the PID files are stored {{#env.pid.dir}}env.pid.dir: {{env.pid.dir}}{{/env.pid.dir}}{{^env.pid.dir}}env.pid.dir: /tmp{{/env.pid.dir}} +{{#env.hadoop.conf.dir}}env.hadoop.conf.dir: {{env.hadoop.conf.dir}}{{/env.hadoop.conf.dir}} \ No newline at end of file From 3465887ced41cb29fe01baa3a8cb2c6965d58233 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 17:09:43 +0100 Subject: [PATCH 16/25] Bugfix: Definition of system.hadoop-3.config.slaves configuration option --- peel-extensions/src/main/resources/reference.hadoop-3.x.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf index 79f4fdd2..1f1445d5 100644 --- a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf +++ b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf @@ -24,7 +24,7 @@ system { # put list of workers workers = ${system.default.config.slaves} # unfortunately, the slaves config key is hard-coded in Java code - slaves = ${system.hadoop-3.config.workers} + slaves = ${system.default.config.slaves} # hadoop-env.sh entries env { JAVA_HOME = ${system.default.config.java.home} From b554da8905068faeccdfdf3d28998c43a85e6cfd Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 17:12:24 +0100 Subject: [PATCH 17/25] Generate Flink configuration option: taskmanager.useAccelerators --- .../templates/flink/1.7/conf/flink-conf.yaml.mustache | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache index cef261f4..a00b60a7 100644 --- a/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache @@ -245,4 +245,9 @@ # The directory where the PID files are stored {{#env.pid.dir}}env.pid.dir: {{env.pid.dir}}{{/env.pid.dir}}{{^env.pid.dir}}env.pid.dir: /tmp{{/env.pid.dir}} -{{#env.hadoop.conf.dir}}env.hadoop.conf.dir: {{env.hadoop.conf.dir}}{{/env.hadoop.conf.dir}} \ No newline at end of file + +# Hadoop / YARN configuration directory +{{#env.hadoop.conf.dir}}env.hadoop.conf.dir: {{env.hadoop.conf.dir}}{{/env.hadoop.conf.dir}} + +# Use accelerators provided by Hadoop +{{#taskmanager.useAccelerators}}taskmanager.useAccelerators: {{taskmanager.useAccelerators}}{{/taskmanager.useAccelerators}} \ No newline at end of file From fd29d53ca9308eaf70f40401517b625677c9fd23 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 17:12:56 +0100 Subject: [PATCH 18/25] Generate INI-type configuration files --- .../org/peelframework/core/config/Model.scala | 33 +++++++++++++++++++ .../templates/hadoop-3/conf/ini.mustache | 7 ++++ pom.xml | 2 +- 3 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache diff --git a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala index f4409169..e52c0fcb 100644 --- a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala +++ b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala @@ -76,6 +76,39 @@ object Model { } } + class INI(val c: Config, val prefix: String) extends Model { + + val sections = { + val sectionBuffer = ListBuffer[Section]() + + def sanitize(s: String) = + s.stripPrefix(s"$prefix.") // remove prefix + + def fixRoot(s: String) = if (s == "_root_") null else s + + def collectPairs(c: Config, name: String): Unit = { + val buffer = ListBuffer[Pair]() + + for (e <- c.entrySet().asScala) { + val key = sanitize(e.getKey) + .replace("\"_root_\"", "_root_") + .stripPrefix(s"$name.") + buffer += Pair(key, c.getString(e.getKey)) + } + + sectionBuffer += Section(fixRoot(name), buffer.toList.asJava) + } + + for (e <- c.getObject(prefix).entrySet().asScala) { + val name = sanitize(e.getKey) + collectPairs(c.withOnlyPath(s"$prefix.$name"), name) + } + + sectionBuffer.toList.asJava + } + + } + /** A model for environment files (e.g., etc/hadoop/hadoop-env.sh). * * The children of the specified `prefix` path in the given `c` config are converted as (key, value) pairs in a diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache new file mode 100644 index 00000000..9119b7b6 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache @@ -0,0 +1,7 @@ +{{#sections}}{{#name}}[{{name}}] +{{/name}} +{{#entries}} +{{name}}={{value}} +{{/entries}} + +{{/sections}} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 657c3f07..66b7c2bc 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ 1.3.2 3.4 - 1.9 + 1.15 0.6.0 From c1a8a2281d3248b811aa3652bcce20e42aafaf87 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 17:13:29 +0100 Subject: [PATCH 19/25] Generate container-executor.cfg and resource-types.xml configuration files --- .../scala/org/peelframework/hadoop/beans/system/Yarn3.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala index 510be4fc..84fc3b95 100644 --- a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala @@ -68,7 +68,9 @@ class Yarn3( SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.yarn", s"$conf/yarn-site.xml", templatePath("conf/site.xml"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.mapred", s"$conf/mapred-site.xml", templatePath("conf/site.xml"), mc), - SystemConfig.Entry[Model.KeyValue](s"system.$configKey.config.capacity-scheduler", s"$conf/capacity-scheduler.xml", templatePath("conf/capacity-scheduler.xml"), mc) + SystemConfig.Entry[Model.Site](s"system.$configKey.config.resource-types", s"$conf/resource-types.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.KeyValue](s"system.$configKey.config.capacity-scheduler", s"$conf/capacity-scheduler.xml", templatePath("conf/capacity-scheduler.xml"), mc), + SystemConfig.Entry[Model.INI](s"system.$configKey.config.container-executor", s"$conf/container-executor.cfg", templatePath("conf/ini"), mc) ) }) From e964278cd6c40b5ac67c03b0198e219fa7002fdb Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 17:27:37 +0100 Subject: [PATCH 20/25] Add support for Flink 1.7.0 --- README.md | 6 ++++++ .../main/resources/reference.flink-1.7.0.conf | 19 +++++++++++++++++++ .../resources/reference.flink-yarn-1.7.0.conf | 2 ++ .../scala/org/peelframework/extensions.scala | 16 ++++++++++++++++ 4 files changed, 43 insertions(+) create mode 100644 peel-extensions/src/main/resources/reference.flink-1.7.0.conf create mode 100644 peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf diff --git a/README.md b/README.md index cf8e4e89..3374933e 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,8 @@ Peel offers the following features for your experiments. | HDFS | 2.7.2 | `hdfs-2.7.2` | | HDFS | 2.7.3 | `hdfs-2.7.3` | | HDFS | 2.8.0 | `hdfs-2.8.0` | +| HDFS | 3.1.1 | `hdfs-3.1.1` | +| Yarn | 3.1.1 | `yarn-3.1.1` | | Flink | 0.8.0 | `flink-0.8.0` | | Flink | 0.8.1 | `flink-0.8.1` | | Flink | 0.9.0 | `flink-0.9.0` | @@ -56,6 +58,10 @@ Peel offers the following features for your experiments. | Flink | 1.3.1 | `flink-1.3.1` | | Flink | 1.3.2 | `flink-1.3.2` | | Flink | 1.4.0 | `flink-1.4.0` | +| Flink Standalone Cluster | 1.7.0 | `flink-1.7.0` | +| Flink Standalone Cluster | 1.7.2 | `flink-1.7.2` | +| Flink Yarn Session | 1.7.0 | `flink-yarn-1.7.0` | +| Flink Yarn Session | 1.7.2 | `flink-yarn-1.7.2` | | MapReduce | 1.2.1 | `mapred-1.2.1` | | MapReduce | 2.4.1 | `mapred-2.4.1` | | Spark | 1.3.1 | `spark-1.3.1` | diff --git a/peel-extensions/src/main/resources/reference.flink-1.7.0.conf b/peel-extensions/src/main/resources/reference.flink-1.7.0.conf new file mode 100644 index 00000000..e0731977 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-1.7.0.conf @@ -0,0 +1,19 @@ +# include common flink configuration +include "reference.flink.conf" + +system { + flink { + path { + archive.url = "http://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz" + archive.md5 = "b66459379703adb25c64fbe20bf8f4b1" + archive.src = ${app.path.downloads}"/flink-1.7.0-bin-hadoop28-scala_2.12.tgz" + home = ${system.flink.path.archive.dst}"/flink-1.7.0" + } + config { + # flink.yaml entries + yaml { + env.pid.dir = "/tmp/flink-1.7.0-pid" + } + } + } +} diff --git a/peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf new file mode 100644 index 00000000..ec2dd218 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf @@ -0,0 +1,2 @@ +# include common flink configuration +include "reference.flink-1.7.0.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/scala/org/peelframework/extensions.scala b/peel-extensions/src/main/scala/org/peelframework/extensions.scala index 5bf8c692..d1d8d94e 100644 --- a/peel-extensions/src/main/scala/org/peelframework/extensions.scala +++ b/peel-extensions/src/main/scala/org/peelframework/extensions.scala @@ -303,6 +303,14 @@ class extensions extends ApplicationContextAware { mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("flink-1.7.0")) + def `flink-1.7.0`: Flink = new FlinkStandaloneCluster( + version = "1.7.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + @Bean(name = Array("flink-1.7.2")) def `flink-1.7.2`: Flink = new FlinkStandaloneCluster( version = "1.7.2", @@ -313,6 +321,14 @@ class extensions extends ApplicationContextAware { // Flink YARN session + @Bean(name = Array("flink-yarn-1.7.0")) + def `flink-yarn-1.7.0`: Flink = new FlinkYarnSession( + version = "1.7.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + @Bean(name = Array("flink-yarn-1.7.2")) def `flink-yarn-1.7.2`: Flink = new FlinkYarnSession( version = "1.7.2", From 031ae710ceaab248fbaf8e6773118d7408f680b6 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 17:59:36 +0100 Subject: [PATCH 21/25] Fix missing class definition --- .../src/main/scala/org/peelframework/core/config/Model.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala index e52c0fcb..bd0e5747 100644 --- a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala +++ b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer trait Model { case class Pair(name: String, value: Any) {} + case class Section(name: String, entries: util.List[Pair]) {} } From abf51811556adda4c1b077662b3be24fc08f92ad Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Tue, 19 Nov 2019 18:35:21 +0100 Subject: [PATCH 22/25] Add configuration defaults for Hadoop 3.x Yarn container-executor.cfg --- peel-extensions/src/main/resources/reference.hadoop-3.x.conf | 2 ++ 1 file changed, 2 insertions(+) diff --git a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf index 1f1445d5..adc827ca 100644 --- a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf +++ b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf @@ -51,6 +51,8 @@ system { yarn { } # capacity-scheduler.xml entries capacity-scheduler { } + # container-executor.cfg entries + container-executor { } } } } From e62f4172005e5b0c5d6ee6aafa97cfda0a6888e3 Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 21 Nov 2019 17:37:41 +0100 Subject: [PATCH 23/25] Add documentation for INI model --- .../org/peelframework/core/config/Model.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala index bd0e5747..acfae2aa 100644 --- a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala +++ b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala @@ -77,6 +77,21 @@ object Model { } } + /** A model for INI type configuration files (e.g. etc/hadoop/container-executor.xml). + * + * Consists of a single entry `sections` which is constructed by collecting + * all direct children under the specified `prefix` path. Each child + * represents a section in the INI file and collects the (key, value) pairs + * below it. + * + * (key, value) pairs that should not appear in a section have to be + * listed under the child with a special name "_root_" (without quotes). + * + * See https://en.wikipedia.org/wiki/INI_file for details. + * + * @param c The HOCON config to use when constructing the model. + * @param prefix The prefix path which has to be rendered. + */ class INI(val c: Config, val prefix: String) extends Model { val sections = { From f72d40436cf68ec98f0965c1ad5e60f5c59234bc Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 16 Jan 2020 09:35:59 +0100 Subject: [PATCH 24/25] Set number of task managers and task slots when running Flink YARN session --- .../peelframework/flink/beans/system/FlinkYarnSession.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala index e55a5d58..661e1108 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala @@ -93,13 +93,14 @@ class FlinkYarnSession( // await for all futureInitOps to finish Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) + val numberOfTaskSlots = config.getString(s"system.$configKey.config.yaml.taskmanager.numberOfTaskSlots") var failedStartUpAttempts = 0 while (!isUp) { try { var done = false - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/yarn-session.sh -d" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/yarn-session.sh -n ${hosts.size} -s $numberOfTaskSlots -d" var cntr = config.getInt(s"system.$configKey.startup.polling.counter") while (!done) { From 8ef9476e8e94519821dca56df2020b3df7304dcc Mon Sep 17 00:00:00 2001 From: Viktor Rosenfeld Date: Thu, 16 Jan 2020 09:36:16 +0100 Subject: [PATCH 25/25] Disambiguate HADOOP workers when running HDFS and YARN --- .../main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala | 1 + .../main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala index 183353ac..4309054b 100644 --- a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala @@ -78,6 +78,7 @@ class HDFS3( val conf = config.getString(s"system.$configKey.path.config") List( SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/hdfs-workers", templatePath("conf/hosts"), mc), SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.hdfs", s"$conf/hdfs-site.xml", templatePath("conf/site.xml"), mc) diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala index 84fc3b95..3c94ea27 100644 --- a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala @@ -64,6 +64,7 @@ class Yarn3( val conf = config.getString(s"system.$configKey.path.config") List( SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/yarn-workers", templatePath("conf/hosts"), mc), SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), SystemConfig.Entry[Model.Site](s"system.$configKey.config.yarn", s"$conf/yarn-site.xml", templatePath("conf/site.xml"), mc),