Skip to content

Commit

Permalink
Refactor setting up network storage and run slurm sync script as root
Browse files Browse the repository at this point in the history
user
  • Loading branch information
harshthakkar01 committed May 13, 2024
1 parent f1fbe7e commit 07f2b24
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 285 deletions.
1 change: 1 addition & 0 deletions ansible/roles/slurm/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
- conf.py
- resume.py
- setup.py
- setup_network_storage.py
- startup.sh
- slurmsync.py
- suspend.py
Expand Down
294 changes: 9 additions & 285 deletions scripts/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@
import sys
import stat
import time
from concurrent.futures import as_completed

from functools import partialmethod
from itertools import chain
from pathlib import Path

from addict import Dict as NSDict

import util
from util import (
run,
separate,
blob_list,
)
from util import lkp, cfg, dirs, slurmdirs
Expand All @@ -50,6 +47,11 @@
login_nodeset,
)
import slurmsync
from setup_network_storage import (
setup_network_storage,
setup_nfs_exports,
)


SETUP_SCRIPT = Path(__file__)
filename = SETUP_SCRIPT.name
Expand Down Expand Up @@ -235,284 +237,6 @@ def run_custom_scripts():
raise e


def mounts_by_local(mounts):
"""convert list of mounts to dict of mounts, local_mount as key"""
return {str(Path(m.local_mount).resolve()): m for m in mounts}


def resolve_network_storage(nodeset=None):
"""Combine appropriate network_storage fields to a single list"""

if lkp.instance_role == "compute":
try:
nodeset = lkp.node_nodeset()
except Exception:
# External nodename, skip lookup
nodeset = None

# seed mounts with the default controller mounts
if cfg.disable_default_mounts:
default_mounts = []
else:
default_mounts = [
NSDict(
{
"server_ip": lkp.control_addr or lkp.control_host,
"remote_mount": str(path),
"local_mount": str(path),
"fs_type": "nfs",
"mount_options": "defaults,hard,intr",
}
)
for path in (
dirs.home,
dirs.apps,
)
]

# create dict of mounts, local_mount: mount_info
mounts = mounts_by_local(default_mounts)

# On non-controller instances, entries in network_storage could overwrite
# default exports from the controller. Be careful, of course
mounts.update(mounts_by_local(cfg.network_storage))
if lkp.instance_role in ("login", "controller"):
mounts.update(mounts_by_local(cfg.login_network_storage))

if nodeset is not None:
mounts.update(mounts_by_local(nodeset.network_storage))
return list(mounts.values())


def separate_extenral_internal_mounts(mounts):
"""separate into cluster-external and internal mounts"""

def internal_mount(mount):
# NOTE: Valid Lustre server_ip can take the form of '<IP>@tcp'
server_ip = mount.server_ip.split("@")[0]
mount_addr = util.host_lookup(server_ip)
return mount_addr == lkp.control_host_addr

return separate(internal_mount, mounts)


def setup_network_storage():
"""prepare network fs mounts and add them to fstab"""
log.info("Set up network storage")
# filter mounts into two dicts, cluster-internal and external mounts

all_mounts = resolve_network_storage()
ext_mounts, int_mounts = separate_extenral_internal_mounts(all_mounts)

if lkp.instance_role == "controller":
mounts = ext_mounts
else:
mounts = ext_mounts + int_mounts

# Determine fstab entries and write them out
fstab_entries = []
for mount in mounts:
local_mount = Path(mount.local_mount)
remote_mount = mount.remote_mount
fs_type = mount.fs_type
server_ip = mount.server_ip or ""
local_mount.mkdirp()

log.info(
"Setting up mount ({}) {}{} to {}".format(
fs_type,
server_ip + ":" if fs_type != "gcsfuse" else "",
remote_mount,
local_mount,
)
)

mount_options = mount.mount_options.split(",") if mount.mount_options else []
if not mount_options or "_netdev" not in mount_options:
mount_options += ["_netdev"]

if fs_type == "gcsfuse":
fstab_entries.append(
"{0} {1} {2} {3} 0 0".format(
remote_mount, local_mount, fs_type, ",".join(mount_options)
)
)
else:
fstab_entries.append(
"{0}:{1} {2} {3} {4} 0 0".format(
server_ip,
remote_mount,
local_mount,
fs_type,
",".join(mount_options),
)
)

fstab = Path("/etc/fstab")
if not Path(fstab.with_suffix(".bak")).is_file():
shutil.copy2(fstab, fstab.with_suffix(".bak"))
shutil.copy2(fstab.with_suffix(".bak"), fstab)
with open(fstab, "a") as f:
f.write("\n")
for entry in fstab_entries:
f.write(entry)
f.write("\n")

mount_fstab(mounts_by_local(mounts))
munge_mount_handler()


def mount_fstab(mounts):
"""Wait on each mount, then make sure all fstab is mounted"""
from more_executors import Executors, ExceptionRetryPolicy

def mount_path(path):
log.info(f"Waiting for '{path}' to be mounted...")
try:
run(f"mount {path}", timeout=120)
except Exception as e:
exc_type, _, _ = sys.exc_info()
log.error(f"mount of path '{path}' failed: {exc_type}: {e}")
raise e
log.info(f"Mount point '{path}' was mounted.")

MAX_MOUNT_TIMEOUT = 60 * 5
future_list = []
retry_policy = ExceptionRetryPolicy(
max_attempts=40, exponent=1.6, sleep=1.0, max_sleep=16.0
)
with Executors.thread_pool().with_timeout(MAX_MOUNT_TIMEOUT).with_retry(
retry_policy=retry_policy
) as exe:
for path in mounts:
future = exe.submit(mount_path, path)
future_list.append(future)

# Iterate over futures, checking for exceptions
for future in as_completed(future_list):
try:
future.result()
except Exception as e:
raise e


def munge_mount_handler():
if not cfg.munge_mount:
log.error("Missing munge_mount in cfg")
elif lkp.instance_role == "controller":
return

mount = cfg.munge_mount
server_ip = (
mount.server_ip
if mount.server_ip
else (cfg.slurm_control_addr or cfg.slurm_control_host)
)
remote_mount = mount.remote_mount
local_mount = Path("/mnt/munge")
fs_type = mount.fs_type if mount.fs_type is not None else "nfs"
mount_options = (
mount.mount_options
if mount.mount_options is not None
else "defaults,hard,intr,_netdev"
)

munge_key = Path(dirs.munge / "munge.key")

log.info(f"Mounting munge share to: {local_mount}")
local_mount.mkdir()
if fs_type.lower() == "gcsfuse".lower():
if remote_mount is None:
remote_mount = ""
cmd = [
"gcsfuse",
f"--only-dir={remote_mount}" if remote_mount != "" else None,
server_ip,
str(local_mount),
]
else:
if remote_mount is None:
remote_mount = Path("/etc/munge")
cmd = [
"mount",
f"--types={fs_type}",
f"--options={mount_options}" if mount_options != "" else None,
f"{server_ip}:{remote_mount}",
str(local_mount),
]
# wait max 120s for munge mount
timeout = 120
for retry, wait in enumerate(util.backoff_delay(0.5, timeout), 1):
try:
run(cmd, timeout=timeout)
break
except Exception as e:
log.error(
f"munge mount failed: '{cmd}' {e}, try {retry}, waiting {wait:0.2f}s"
)
time.sleep(wait)
err = e
continue
else:
raise err

log.info(f"Copy munge.key from: {local_mount}")
shutil.copy2(Path(local_mount / "munge.key"), munge_key)

log.info("Restrict permissions of munge.key")
shutil.chown(munge_key, user="munge", group="munge")
os.chmod(munge_key, stat.S_IRUSR)

log.info(f"Unmount {local_mount}")
if fs_type.lower() == "gcsfuse".lower():
run(f"fusermount -u {local_mount}", timeout=120)
else:
run(f"umount {local_mount}", timeout=120)
shutil.rmtree(local_mount)


def setup_nfs_exports():
"""nfs export all needed directories"""
# The controller only needs to set up exports for cluster-internal mounts
# switch the key to remote mount path since that is what needs exporting
mounts = resolve_network_storage()
# manually add munge_mount
mounts.append(
NSDict(
{
"server_ip": cfg.munge_mount.server_ip,
"remote_mount": cfg.munge_mount.remote_mount,
"local_mount": Path(f"{dirs.munge}_tmp"),
"fs_type": cfg.munge_mount.fs_type,
"mount_options": cfg.munge_mount.mount_options,
}
)
)
# controller mounts
_, con_mounts = separate_extenral_internal_mounts(mounts)
con_mounts = {m.remote_mount: m for m in con_mounts}
for nodeset in cfg.nodeset.values():
# get internal mounts for each nodeset by calling
# resolve_network_storage as from a node in each nodeset
ns_mounts = resolve_network_storage(nodeset=nodeset)
_, int_mounts = separate_extenral_internal_mounts(ns_mounts)
con_mounts.update({m.remote_mount: m for m in int_mounts})

# export path if corresponding selector boolean is True
exports = []
for path in con_mounts:
Path(path).mkdirp()
run(rf"sed -i '\#{path}#d' /etc/exports", timeout=30)
exports.append(f"{path} *(rw,no_subtree_check,no_root_squash)")

exportsd = Path("/etc/exports.d")
exportsd.mkdirp()
with (exportsd / "slurm.exports").open("w") as f:
f.write("\n")
f.write("\n".join(exports))
run("exportfs -a", timeout=30)


def setup_secondary_disks():
"""Format and mount secondary disk"""
run(
Expand Down Expand Up @@ -675,7 +399,7 @@ def setup_controller(args):

if cfg.controller_secondary_disk:
setup_secondary_disks()
setup_network_storage()
setup_network_storage(log)

run_custom_scripts()

Expand Down Expand Up @@ -740,7 +464,7 @@ def setup_login(args):
update_system_config("slurmd", sysconf)
install_custom_scripts()

setup_network_storage()
setup_network_storage(log)
setup_sudoers()
run("systemctl restart munge")
run("systemctl enable slurmd", timeout=30)
Expand Down Expand Up @@ -774,7 +498,7 @@ def setup_compute(args):
install_custom_scripts()

setup_nss_slurm()
setup_network_storage()
setup_network_storage(log)

has_gpu = run("lspci | grep --ignore-case 'NVIDIA' | wc -l", shell=True).returncode
if has_gpu:
Expand Down
Loading

0 comments on commit 07f2b24

Please sign in to comment.