Skip to content

Commit

Permalink
Refactored Zookeeper system.
Browse files Browse the repository at this point in the history
- Deleting `system.zookeeper.config.dataDir` on setup / teardown.
- Using `/tmp/$user/zookeper/data` as default `[...].config.dataDir`.
- Introduced convenience object `c` for config values access.
  • Loading branch information
aalexandrov committed Oct 20, 2016
1 parent 06dd277 commit 2a38b02
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ system {
isShared = ${system.default.path.isShared}
archive.dst = ${app.path.systems}
config = ${system.zookeeper.path.home}"/conf"
data = ${system.zookeeper.path.home}"/data"
data = ${java.io.tmpdir}"/"${system.zookeeper.user}"/zookeeper/data"
}
startup {
max.attempts = ${system.default.startup.max.attempts}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,57 +38,82 @@ class Zookeeper(
dependencies : Set[System] = Set(),
mc : Mustache.Compiler) extends System("zookeeper", version, configKey, lifespan, dependencies, mc) {

override def configuration() = SystemConfig(config, {
val conf = config.getString(s"system.$configKey.path.config")
List(
SystemConfig.Entry[Model.Site](s"system.$configKey.config", s"$conf/zoo.cfg", templatePath("conf/zoo.cfg"), mc)
)
})
override def configuration() = SystemConfig(config, List(
SystemConfig.Entry[Model.Site](s"system.$configKey.config", s"${c.conf}/zoo.cfg", templatePath("conf/zoo.cfg"), mc)
))

override def start(): Unit = if (!isUp) {
this.servers.foreach(start)
c.servers.foreach(start)
isUp = true
}

override def stop(): Unit = this.servers.foreach(stop)
override def stop(): Unit =
c.servers.foreach(stop)

def isRunning = this.servers.forall(s => isRunning(s))
def isRunning =
c.servers.forall(s => isRunning(s))

def cli = new Zookeeper.Cli(config.getString(s"system.$configKey.path.home"), servers.head.host, config.getInt(s"system.$configKey.config.clientPort"))
def cli =
new Zookeeper.Cli(c.home, c.servers.head.host, c.clientPort)

private def start(s: Zookeeper.Server) = {
logger.info(s"Starting zookeeper at ${s.host}:${s.leaderPort}:${s.quorumPort}")
val user = config.getString(s"system.$configKey.user")
val user = c.user
shell !
s"""
|ssh -t -t "$user@${s.host}" << SSHEND
| ${config.getString(s"system.$configKey.path.home")}/bin/zkServer.sh start
| echo ${s.id} > ${config.getString(s"system.$configKey.config.dataDir")}/myid
| exit
|SSHEND
|ssh -t -t "$user@${s.host}" << SSHEND
| rm -Rf ${c.dataDir}
| mkdir -p ${c.dataDir}
| ${c.home}/bin/zkServer.sh start
| echo ${s.id} > ${c.dataDir}/myid
| exit
|SSHEND
""".stripMargin.trim
}

private def stop(s: Zookeeper.Server) = {
logger.info(s"Stopping zookeeper at ${s.host}:${s.leaderPort}:${s.quorumPort}")
val user = config.getString(s"system.$configKey.user")
shell ! s""" ssh $user@${s.host} ${config.getString(s"system.$configKey.path.home")}/bin/zkServer.sh stop """
shell !
s"""
|ssh -t -t "${c.user}@${s.host}" << SSHEND
| ${c.home}/bin/zkServer.sh stop
| rm -Rf ${c.dataDir}
| exit
|SSHEND
""".stripMargin
}

private def isRunning(s: Zookeeper.Server) = {
val user = config.getString(s"system.$configKey.user")
val pidFile = s"${config.getString(s"system.$configKey.config.dataDir")}/zookeeper_server.pid"

(shell !! s""" ssh $user@${s.host} "ps -p `cat $pidFile` >/dev/null 2>&1; echo $$?" """).stripLineEnd.toInt == 0
val pidFile = s"${c.dataDir}/zookeeper_server.pid"
(shell !! s""" ssh ${c.user}@${s.host} "ps -p `cat $pidFile` >/dev/null 2>&1; echo $$?" """).stripLineEnd.toInt == 0
}

private def servers = {
// grab servers from config
val serverConfigs = config.getConfig(s"system.$configKey.config.server").entrySet().asScala.map(v => v.getKey.substring(1, v.getKey.length() - 1) + ":" + v.getValue.unwrapped().toString)
// match and return valid server configs
serverConfigs.collect({
case Zookeeper.ServerConf(id, host, quorumPort, leaderPort) => Zookeeper.Server(id.toInt, host, quorumPort.toInt, leaderPort.toInt)
})
// config parameter shorthands

private object c {

def conf =
config.getString(s"system.$configKey.path.config")

def user =
config.getString(s"system.$configKey.user")

def home =
config.getString(s"system.$configKey.path.home")

def clientPort =
config.getInt(s"system.$configKey.config.clientPort")

def dataDir =
config.getString(s"system.$configKey.config.dataDir")

def servers =
config.getConfig(s"system.$configKey.config.server").entrySet().asScala
.map(v => v.getKey.substring(1, v.getKey.length() - 1) + ":" + v.getValue.unwrapped().toString)
.collect({
case Zookeeper.ServerConf(id, host, quorumPort, leaderPort) =>
Zookeeper.Server(id.toInt, host, quorumPort.toInt, leaderPort.toInt)
})
}
}

Expand Down

0 comments on commit 2a38b02

Please sign in to comment.