Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Hadoop 3.x and Flink Yarn sessions #115

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
35857d6
Fix path of stop-dfs.sh script
he-sk Aug 13, 2019
6eeb90a
Fixed wrong spelling
he-sk Aug 13, 2019
cae26ea
Fix: Don’t start/stop Flink webclient
he-sk Aug 13, 2019
539256c
Update registration message of task managers
he-sk Dec 20, 2018
b37c58e
Added support for Hadoop 3.x
he-sk Aug 13, 2019
b8e2d7b
Added reference configuration for HDFS-3.1.1
he-sk Aug 13, 2019
c7235f7
Added support for Hadoop 3.x YARN
he-sk Aug 14, 2019
9871cf4
Added support for Flink 1.7.2
he-sk Aug 14, 2019
02983c5
Traverse dependency graphs in BFS order
he-sk Aug 15, 2019
e3ffb26
Introduce common base class for Flink systems
he-sk Aug 15, 2019
c90894f
Support for Flink YARN sessions
he-sk Aug 15, 2019
6432d67
Predefined beans for Hadoop 3.1.1 and Flink 1.7.2
he-sk Sep 2, 2019
96ebad0
Make Model.Yaml an alias for Model.KeyValue
he-sk Sep 5, 2019
3023869
Generate capacity-scheduler.xml for YARN
he-sk Sep 5, 2019
dc9e800
Specify Hadoop configuration in flink-conf.yaml
he-sk Sep 5, 2019
3465887
Bugfix: Definition of system.hadoop-3.config.slaves configuration option
he-sk Nov 19, 2019
b554da8
Generate Flink configuration option: taskmanager.useAccelerators
he-sk Nov 19, 2019
fd29d53
Generate INI-type configuration files
he-sk Nov 19, 2019
c1a8a22
Generate container-executor.cfg and resource-types.xml configuration …
he-sk Nov 19, 2019
e964278
Add support for Flink 1.7.0
he-sk Nov 19, 2019
031ae71
Fix missing class definition
he-sk Nov 19, 2019
abf5181
Add configuration defaults for Hadoop 3.x Yarn container-executor.cfg
he-sk Nov 19, 2019
e62f417
Add documentation for INI model
he-sk Nov 21, 2019
f72d404
Set number of task managers and task slots when running Flink YARN se…
he-sk Jan 16, 2020
8ef9476
Disambiguate HADOOP workers when running HDFS and YARN
he-sk Jan 16, 2020
bd8f301
Merge branch 'master' into flink-yarn
he-sk Sep 28, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Peel offers the following features for your experiments.
| HDFS | 2.7.2 | `hdfs-2.7.2` |
| HDFS | 2.7.3 | `hdfs-2.7.3` |
| HDFS | 2.8.0 | `hdfs-2.8.0` |
| HDFS | 3.1.1 | `hdfs-3.1.1` |
| Yarn | 3.1.1 | `yarn-3.1.1` |
| Flink | 0.8.0 | `flink-0.8.0` |
| Flink | 0.8.1 | `flink-0.8.1` |
| Flink | 0.9.0 | `flink-0.9.0` |
Expand All @@ -56,6 +58,10 @@ Peel offers the following features for your experiments.
| Flink | 1.3.1 | `flink-1.3.1` |
| Flink | 1.3.2 | `flink-1.3.2` |
| Flink | 1.4.0 | `flink-1.4.0` |
| Flink Standalone Cluster | 1.7.0 | `flink-1.7.0` |
| Flink Standalone Cluster | 1.7.2 | `flink-1.7.2` |
| Flink Yarn Session | 1.7.0 | `flink-yarn-1.7.0` |
| Flink Yarn Session | 1.7.2 | `flink-yarn-1.7.2` |
| MapReduce | 1.2.1 | `mapred-1.2.1` |
| MapReduce | 2.4.1 | `mapred-2.4.1` |
| Spark | 1.3.1 | `spark-1.3.1` |
Expand Down
58 changes: 56 additions & 2 deletions peel-core/src/main/scala/org/peelframework/core/config/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer
trait Model {

case class Pair(name: String, value: Any) {}
case class Section(name: String, entries: util.List[Pair]) {}

}

Expand Down Expand Up @@ -76,6 +77,54 @@ object Model {
}
}

/** A model for INI type configuration files (e.g. etc/hadoop/container-executor.xml).
*
* Consists of a single entry `sections` which is constructed by collecting
* all direct children under the specified `prefix` path. Each child
* represents a section in the INI file and collects the (key, value) pairs
* below it.
*
* (key, value) pairs that should not appear in a section have to be
* listed under the child with a special name "_root_" (without quotes).
*
* See https://en.wikipedia.org/wiki/INI_file for details.
*
* @param c The HOCON config to use when constructing the model.
* @param prefix The prefix path which has to be rendered.
*/
class INI(val c: Config, val prefix: String) extends Model {

val sections = {
val sectionBuffer = ListBuffer[Section]()

def sanitize(s: String) =
s.stripPrefix(s"$prefix.") // remove prefix

def fixRoot(s: String) = if (s == "_root_") null else s

def collectPairs(c: Config, name: String): Unit = {
val buffer = ListBuffer[Pair]()

for (e <- c.entrySet().asScala) {
val key = sanitize(e.getKey)
.replace("\"_root_\"", "_root_")
.stripPrefix(s"$name.")
buffer += Pair(key, c.getString(e.getKey))
}

sectionBuffer += Section(fixRoot(name), buffer.toList.asJava)
}

for (e <- c.getObject(prefix).entrySet().asScala) {
val name = sanitize(e.getKey)
collectPairs(c.withOnlyPath(s"$prefix.$name"), name)
}

sectionBuffer.toList.asJava
}

}

/** A model for environment files (e.g., etc/hadoop/hadoop-env.sh).
*
* The children of the specified `prefix` path in the given `c` config are converted as (key, value) pairs in a
Expand All @@ -99,7 +148,7 @@ object Model {
}
}

/** A model for (key, value) based yaml files (e.g. conf/flink-conf.yaml).
/** A model for (key, value) based files (e.g. conf/flink-conf.yaml or etc/hadoop/capacity-manager.xml).
*
* Consists of multiple (key, value) entries which are constructed by collecting all values under
* the specified `prefix` path. Intermediate paths are thereby flattened, i.e.
Expand All @@ -118,7 +167,7 @@ object Model {
* @param c The HOCON config to use when constructing the model.
* @param prefix The prefix path which has to be rendered.
*/
class Yaml(val c: Config, val prefix: String) extends util.HashMap[String, Object] with Model {
class KeyValue(val c: Config, val prefix: String) extends util.HashMap[String, Object] with Model {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using KeyValue as a base class for only one concrete subclass (Yaml), what is the purpose of factoring it out now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure anymore. It could just be that I wanted to distinguish different types of usages. For example, Model.Yaml is also used for log4j.properties which isn't really a Yaml file.


// constructor
{
Expand All @@ -139,6 +188,11 @@ object Model {
}
}

/** Alias for Model.KeyValue.
*
*/
class Yaml(override val c: Config, override val prefix: String) extends KeyValue(c, prefix) { }

/** A model for list based hosts files (e.g. etc/hadoop/slaves).
*
* Consists of a single entry `hosts` which is constructed by extracting the string list value under the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class DependencyGraph[T: ClassTag] {
throw new Exception("Cannot reverse empty Graph!")
}

/** Collects descendants in a depth-first manner starting from the given set.
/** Collects descendants in a breadth-first manner starting from the given set.
*
* @param toVisit A set of nodes that are yet to be visited.
* @param visited A list of already visited nodes.
Expand All @@ -191,7 +191,7 @@ class DependencyGraph[T: ClassTag] {
case x: Any => !visited.contains(x)
}

collect(children ++ toVisit.tail, next :: visited, excluded)
collect(toVisit.tail ++ children, next :: visited, excluded)
}
}

Expand Down
19 changes: 19 additions & 0 deletions peel-extensions/src/main/resources/reference.flink-1.7.0.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# include common flink configuration
include "reference.flink.conf"

system {
flink {
path {
archive.url = "http://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz"
archive.md5 = "b66459379703adb25c64fbe20bf8f4b1"
archive.src = ${app.path.downloads}"/flink-1.7.0-bin-hadoop28-scala_2.12.tgz"
home = ${system.flink.path.archive.dst}"/flink-1.7.0"
}
config {
# flink.yaml entries
yaml {
env.pid.dir = "/tmp/flink-1.7.0-pid"
}
}
}
}
19 changes: 19 additions & 0 deletions peel-extensions/src/main/resources/reference.flink-1.7.2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# include common flink configuration
include "reference.flink.conf"

system {
flink {
path {
archive.url = "http://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop28-scala_2.12.tgz"
archive.md5 = "b9469d4d166520ec767bac0d82c165a7"
archive.src = ${app.path.downloads}"/flink-1.7.2-bin-hadoop28-scala_2.12.tgz"
home = ${system.flink.path.archive.dst}"/flink-1.7.2"
}
config {
# flink.yaml entries
yaml {
env.pid.dir = "/tmp/flink-1.7.2-pid"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# include common flink configuration
include "reference.flink-1.7.0.conf"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# include common flink configuration
include "reference.flink-1.7.2.conf"
58 changes: 58 additions & 0 deletions peel-extensions/src/main/resources/reference.hadoop-3.x.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
system {
hadoop-3 {
user = ${system.default.user}
group = ${system.default.group}
path {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
config = ${system.hadoop-3.path.home}"/etc/hadoop"
log = ${system.hadoop-3.path.home}"/logs"
input = ${system.hadoop-3.config.core.fs.default.name}"/tmp/input"
output = ${system.hadoop-3.config.core.fs.default.name}"/tmp/output"
}
format = true
startup {
max.attempts = ${system.default.startup.max.attempts}
polling {
counter = ${system.default.startup.polling.counter}
interval = ${system.default.startup.polling.interval}
}
}
config {
# put list of masters
masters = ${system.default.config.masters}
# put list of workers
workers = ${system.default.config.slaves}
# unfortunately, the slaves config key is hard-coded in Java code
slaves = ${system.default.config.slaves}
# hadoop-env.sh entries
env {
JAVA_HOME = ${system.default.config.java.home}
HADOOP_INSTALL = ${system.hadoop-3.path.home}
# directory where process IDs are stored
HADOOP_PID_DIR = "/tmp/hadoop-3/pid"
# avoids loading wrong native library in the default case
# override with /lib/native if lib exists
HADOOP_COMMON_LIB_NATIVE_DIR = "$HADOOP_INSTALL/lib/native"
}
# core-site.xml entries
core {
fs.default.name = "hdfs://localhost:9000"
}
# hdfs-site.xml entries
hdfs {
dfs.replication = 1
dfs.namenode.name.dir = "file:///tmp/hdfs-3/name"
dfs.datanode.data.dir = "file:///tmp/hdfs-3/data"
}
# mapred-site.xml entries
mapred { }
# yarn-site.xml entries
yarn { }
# capacity-scheduler.xml entries
capacity-scheduler { }
# container-executor.cfg entries
container-executor { }
}
}
}
20 changes: 20 additions & 0 deletions peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# include common hadoop-3.x configuration
include "reference.hadoop-3.x.conf"

system {
hadoop-3 {
path {
archive.url = "http://archive.apache.org/dist/hadoop/core/hadoop-3.1.1/hadoop-3.1.1.tar.gz"
archive.md5 = "0b6ab06b59ae75f433de387783f19011"
archive.src = ${app.path.downloads}"/hadoop-3.1.1.tar.gz"
home = ${system.hadoop-3.path.archive.dst}"/hadoop-3.1.1"
}
config {
# hadoop-env.sh entries
env {
# directory where process IDs are stored
HADOOP_PID_DIR = "/tmp/hadoop-3.1.1-pid"
}
}
}
}
5 changes: 5 additions & 0 deletions peel-extensions/src/main/resources/reference.yarn-3.1.1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# include common yarn-3.x configuration
include "reference.yarn-3.x.conf"

# pick up hdfs-3.1.1 configuration containing archive data
include "reference.hdfs-3.1.1.conf"
2 changes: 2 additions & 0 deletions peel-extensions/src/main/resources/reference.yarn-3.x.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# include common hadoop-3.x configuration
include "reference.hadoop-3.x.conf"
Loading