Skip to content

Commit

Permalink
Add CLI http comms method
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Dec 12, 2022
1 parent 65a8349 commit a378ca5
Show file tree
Hide file tree
Showing 25 changed files with 301 additions and 38 deletions.
26 changes: 25 additions & 1 deletion cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from cylc.flow.platforms import validate_platforms
from cylc.flow.exceptions import GlobalConfigError
from cylc.flow.hostuserutil import get_user_home
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.network.client_factory import CommsMeth, CLICommsMeth
from cylc.flow.parsec.config import (
ConfigNode as Conf,
ParsecConfig,
Expand Down Expand Up @@ -1198,6 +1198,9 @@ def default_for(
The workflow polls for task status (no task messaging)
ssh
Use non-interactive ssh for task communications
http
Via the running Hub proxy and/or UI-Server (requires UI
Server installation)
.. versionchanged:: 8.0.0
Expand Down Expand Up @@ -1767,6 +1770,27 @@ def default_for(
"[runtime][<namespace>][events]submission timeout"
)
))
with Conf('CLI', desc='''
Configure command line interface options.
.. versionadded:: 8.1.0
'''):
Conf('communication method',
VDR.V_STRING, 'zmq',
options=[meth.value for meth in CLICommsMeth], desc='''
The communication method used to interact with a running workflow
via the command line interface.
Options:
zmq
Direct client-server TCP communication via network ports
http
Via the running Hub proxy and/or UI-Server (requires UI
Server installation)
.. versionchanged:: 8.1.0
''')


def upg(cfg, descr):
Expand Down
41 changes: 34 additions & 7 deletions cylc/flow/network/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import os
from typing import TYPE_CHECKING, Union

from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import ClientError

if TYPE_CHECKING:
from cylc.flow.network.client import WorkflowRuntimeClientBase

Expand All @@ -28,13 +31,24 @@ class CommsMeth(Enum):
POLL = 'poll'
SSH = 'ssh'
ZMQ = 'zmq'
HTTP = 'http'


class CLICommsMeth(Enum):
"""String literals used for identifying CLI communication methods"""

ZMQ = 'zmq'
HTTP = 'http'


def get_comms_method() -> CommsMeth:
def get_comms_method(comms_method: Union[str, None] = None) -> CommsMeth:
""""Return Communication Method from environment variable, default zmq"""
return CommsMeth(
os.getenv('CYLC_TASK_COMMS_METHOD', CommsMeth.ZMQ.value)
)
if comms_method is None:
comms_method = os.getenv('CYLC_TASK_COMMS_METHOD')
# separate to avoid extra config file read
if comms_method is None:
comms_method = glbl_cfg().get(['CLI', 'communication method'])
return CommsMeth(comms_method)


def get_runtime_client(
Expand All @@ -50,14 +64,27 @@ def get_runtime_client(
"""
if comms_method == CommsMeth.SSH:
from cylc.flow.network.ssh_client import WorkflowRuntimeClient
elif comms_method == CommsMeth.HTTP:
try:
from cylc.uiserver.client import ( # type: ignore[no-redef]
WorkflowRuntimeClient
)
except ImportError as exc:
raise ClientError(
'HTTP comms method requires UI Server installation',
f'{exc}'
)
else:
from cylc.flow.network.client import ( # type: ignore[no-redef]
WorkflowRuntimeClient
)
return WorkflowRuntimeClient(workflow, timeout=timeout)


def get_client(workflow, timeout=None):
def get_client(workflow, timeout=None, method=None):
"""Get communication method and return correct WorkflowRuntimeClient"""

return get_runtime_client(get_comms_method(), workflow, timeout=timeout)
return get_runtime_client(
get_comms_method(method),
workflow,
timeout=timeout
)
15 changes: 15 additions & 0 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ class CylcOptionParser(OptionParser):
" site/user config file documentation."
),
action='store', default=None, dest='comms_timeout', useif='comms'),
OptionSettings(
['--comms-method'], metavar='SEC',
help=(
"Set the communications method for network connections"
" to the running workflow. The default is 'zmq'."
" Communications using 'http' via the Hub and/or"
" UI Server require UI Server installation and"
" associated API info file of the running server."
),
action='store', default=None, dest='comms_method',
useif='commsmethod'
),
OptionSettings(
['-s', '--set'], metavar='NAME=VALUE',
help=(
Expand Down Expand Up @@ -409,6 +421,7 @@ def __init__(
usage: str,
argdoc: Optional[List[Tuple[str, str]]] = None,
comms: bool = False,
commsmethod: bool = False,
jset: bool = False,
multitask: bool = False,
multiworkflow: bool = False,
Expand All @@ -423,6 +436,7 @@ def __init__(
argdoc: The args for the command, to be inserted into the usage
instructions. Optional list of tuples of (name, description).
comms: If True, allow the --comms-timeout option.
commsmethod: If True, allow the --comms-method option.
jset: If True, allow the Jinja2 --set option.
multitask: If True, insert the multitask text into the
usage instructions.
Expand All @@ -447,6 +461,7 @@ def __init__(
self.n_optional_args = 0
self.unlimited_args = False
self.comms = comms
self.commsmethod = commsmethod
self.jset = jset
self.color = color
# Whether to log messages that are below warning level to stdout
Expand Down
98 changes: 96 additions & 2 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
close_log,
RotatingLogFileHandler,
)
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.network.client_factory import get_client, CommsMeth
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
Expand Down Expand Up @@ -106,6 +106,45 @@
happened to them while the workflow was down.
"""

PLAY_MUTATION = '''
mutation (
$wFlows: [WorkflowID]!,
$iCP: CyclePoint,
$startCP: CyclePoint,
$fCP: CyclePoint,
$stopCP: CyclePoint,
$pauseOnStart: Boolean,
$holdCP: CyclePoint,
$runMode: RunMode,
$runHost: String,
$mainLoopPlugins: [String],
$abortIfTaskFail: Boolean,
$debug: Boolean,
$noTimestamp: Boolean,
$setJinja2: [String],
$setJinja2File: String,
) {
play (
workflows: $wFlows,
initialCyclePoint: $iCP,
startCyclePoint: $startCP,
finalCyclePoint: $fCP,
stopCyclePoint: $stopCP,
pause: $pauseOnStart,
holdCyclePoint: $holdCP,
mode: $runMode,
host: $runHost,
mainLoop: $mainLoopPlugins,
abortIfAnyTaskFails: $abortIfTaskFail,
debug: $debug,
noTimestamp: $noTimestamp,
set: $setJinja2,
setFile: $setJinja2File
) {
result
}
}
'''

RESUME_MUTATION = '''
mutation (
Expand Down Expand Up @@ -300,6 +339,7 @@ def get_option_parser(add_std_opts: bool = False) -> COP:
PLAY_DOC,
jset=True,
comms=True,
commsmethod=True,
argdoc=[WORKFLOW_ID_ARG_DOC]
)

Expand Down Expand Up @@ -370,6 +410,9 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# warn_depr=False, # TODO
)

# start workflow with UI Server if comms method http
_http_play(workflow_id, options)

# resume the workflow if it is already running
_resume(workflow_id, options)

Expand Down Expand Up @@ -430,9 +473,10 @@ def _resume(workflow_id, options):
detect_old_contact_file(workflow_id)
except ServiceFileError as exc:
print(f"Resuming already-running workflow\n\n{exc}")
pclient = WorkflowRuntimeClient(
pclient = get_client(
workflow_id,
timeout=options.comms_timeout,
method=options.comms_method
)
mutation_kwargs = {
'request_string': RESUME_MUTATION,
Expand All @@ -444,6 +488,56 @@ def _resume(workflow_id, options):
sys.exit(0)


def _http_play(workflow_id, options):
"""Resume the workflow if it is already running."""
if options.comms_method == CommsMeth.HTTP.value:
print(
cparse(
"Requesting UI Server started workflow.\n"
"<orange>"
"Note: "
"</orange>"
" The workflow will be started at the same cylc version as"
" the UI Server. Any set files need to be accessible to the"
" UI Server."
" Options currently unavailable with UIS started runs:"
" - start-task"
" - no-detach"
" - profile"
" - reference-*"
" - format"
" - downgrade/upgrade"
)
)
pclient = get_client(
workflow_id,
timeout=options.comms_timeout,
method=options.comms_method
)
mutation_kwargs = {
'request_string': PLAY_MUTATION,
'variables': {
'wFlows': [workflow_id],
'iCP': options.icp,
'startCP': options.startcp,
'fCP': options.fcp,
'stopCP': options.stopcp,
'pauseOnStart': options.paused_start,
'holdCP': options.holdcp,
'runMode': options.run_mode or 'Live',
'runHost': options.host,
'mainLoopPlugins': options.main_loop,
'abortIfTaskFail': options.abort_if_any_task_fails,
'debug': options.verbosity,
'noTimestamp': options.log_timestamp,
'setJinja2': options.templatevars,
'setJinja2File': options.templatevars_file,
}
}
pclient('graphql', mutation_kwargs)
sys.exit(0)


def _version_check(
db_file: Path,
can_upgrade: bool,
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/scripts/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def get_option_parser() -> COP:
parser = COP(
__doc__,
comms=True,
commsmethod=True,
multiworkflow=True,
argdoc=[WORKFLOW_ID_MULTI_ARG_DOC],
)
Expand Down Expand Up @@ -321,7 +322,11 @@ def get_option_parser() -> COP:

async def run(options: 'Values', workflow_id):
"""Implement cylc broadcast."""
pclient = get_client(workflow_id, timeout=options.comms_timeout)
pclient = get_client(
workflow_id,
timeout=options.comms_timeout,
method=options.comms_method
)

ret: Dict[str, Any] = {
'stdout': [],
Expand Down
10 changes: 7 additions & 3 deletions cylc/flow/scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
)
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.network.client_factory import get_client
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.terminal import cli_function

Expand All @@ -47,7 +47,7 @@

def get_option_parser():
parser = COP(
__doc__, comms=True,
__doc__, comms=True, commsmethod=True,
argdoc=[
WORKFLOW_ID_ARG_DOC,
('METHOD', 'Network API function name')
Expand All @@ -68,7 +68,11 @@ def main(_, options: 'Values', workflow_id: str, func: str) -> None:
workflow_id,
constraint='workflows',
)
pclient = WorkflowRuntimeClient(workflow_id, timeout=options.comms_timeout)
pclient = get_client(
workflow_id,
timeout=options.comms_timeout,
method=options.comms_method
)
if options.no_input:
kwargs = {}
else:
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/scripts/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def get_option_parser():
parser = COP(
__doc__,
comms=True,
commsmethod=True,
argdoc=[WORKFLOW_ID_ARG_DOC],
)
parser.add_option(
Expand Down Expand Up @@ -184,7 +185,11 @@ def main(_, options: 'Values', workflow_id: str) -> None:
workflow_id,
constraint='workflows',
)
pclient = get_client(workflow_id, timeout=options.comms_timeout)
pclient = get_client(
workflow_id,
timeout=options.comms_timeout,
method=options.comms_method
)

if options.sort_by_cycle:
sort_args = {'keys': ['cyclePoint', 'name']}
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/scripts/ext_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def get_option_parser() -> COP:
parser = COP(
__doc__,
comms=True,
commsmethod=True,
argdoc=[
WORKFLOW_ID_ARG_DOC,
("MSG", "External trigger message"),
Expand Down Expand Up @@ -116,7 +117,11 @@ def main(
LOG.info(
'Send to workflow %s: "%s" (%s)', workflow_id, event_msg, event_id
)
pclient = get_client(workflow_id, timeout=options.comms_timeout)
pclient = get_client(
workflow_id,
timeout=options.comms_timeout,
method=options.comms_method
)

max_n_tries = int(options.max_n_tries)
retry_intvl_secs = float(options.retry_intvl_secs)
Expand Down
Loading

0 comments on commit a378ca5

Please sign in to comment.