Skip to content

Commit

Permalink
Merge pull request #528 from cylc/1.4.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 1.4.x-sync into master
  • Loading branch information
wxtim authored Nov 30, 2023
2 parents 4ba4446 + f612296 commit c746aae
Show file tree
Hide file tree
Showing 90 changed files with 1,102 additions and 920 deletions.
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## cylc-uiserver-1.4.2 (Released 2023-11-29)

[Updated cylc-ui to 2.3.0](https://github.com/cylc/cylc-ui/blob/master/CHANGES.md)

### 🔧 Fixes

[#525](https://github.com/cylc/cylc-uiserver/pull/525) - Fix bugs when playing workflows using the form:
- Specified Cylc version wasn't working
- Could not play multiple workflows

## cylc-uiserver-1.4.1 (Released 2023-11-03)

[Updated cylc-ui to 2.2.0](https://github.com/cylc/cylc-ui/blob/master/CHANGES.md)
Expand Down
97 changes: 39 additions & 58 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,6 @@ def snake_to_kebab(snake):
raise TypeError(type(snake))


def check_cylc_version(version):
"""Check the provided Cylc version is available on the CLI.
Sets CYLC_VERSION=version and tests the result of cylc --version
to make sure the requested version is installed and selectable via
the CYLC_VERSION environment variable.
"""
proc = Popen(
['cylc', '--version'],
env={**os.environ, 'CYLC_VERSION': version},
stdin=DEVNULL,
stdout=PIPE,
stderr=PIPE,
text=True
)
ret = proc.wait(timeout=5)
out, err = proc.communicate()
return ret or out.strip() == version


def _build_cmd(cmd: List, args: Dict) -> List:
"""Add args to command.
Expand Down Expand Up @@ -289,79 +269,80 @@ async def scan(
return cls._return("Scan requested")

@classmethod
async def play(cls, workflows, args, workflows_mgr, log):
async def play(
cls,
workflows: Iterable[Tokens],
args: Dict[str, Any],
workflows_mgr: 'WorkflowsManager',
log: 'Logger',
) -> List[Union[bool, str]]:
"""Calls `cylc play`."""
response = []
# get ready to run the command
try:
# check that the request cylc version is available
cylc_version = None
if 'cylc_version' in args:
cylc_version = args['cylc_version']
if not check_cylc_version(cylc_version):
return cls._error(
f'cylc version not available: {cylc_version}'
)
args = dict(args)
args.pop('cylc_version')

# build the command
cmd = ['cylc', 'play', '--color=never']
cmd = _build_cmd(cmd, args)

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)
# start each requested flow
cylc_version = args.pop('cylc_version', None)
results: Dict[str, str] = {}
failed = False
for tokens in workflows:
try:
cmd = _build_cmd(['cylc', 'play', '--color=never'], args)

if tokens['user'] and tokens['user'] != getuser():
return cls._error(
'Cannot start workflows for other users.'
)
# Note: authorisation has already taken place.
# add the workflow to the command
cmd = [*cmd, tokens['workflow']]
wflow: str = tokens['workflow']
cmd = [*cmd, wflow]

# get a representation of the command being run
cmd_repr = ' '.join(cmd)
if cylc_version:
cmd_repr = f'CYLC_VERSION={cylc_version} {cmd_repr}'
log.info(f'$ {cmd_repr}')

# run cylc run
env = os.environ.copy()
env.pop('CYLC_ENV_NAME', None)
if cylc_version:
env['CYLC_VERSION'] = cylc_version

# run cylc play
proc = Popen(
cmd,
env=env,
stdin=DEVNULL,
stdout=PIPE,
stderr=PIPE,
text=True
)
ret = proc.wait(timeout=20)
ret_code = proc.wait(timeout=20)

if ret:
if ret_code:
# command failed
out, err = proc.communicate()
msg = err.strip() or out.strip() or (
f'Could not start {tokens["workflow"]}'
f' - {cmd_repr}'
)
raise Exception(
msg
results[wflow] = err.strip() or out.strip() or (
f'Command failed ({ret_code}): {cmd_repr}'
)
failed = True
else:
results[wflow] = 'started'

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)

else:
# send a success message
return cls._return(
'Workflow started'
if failed:
if len(results) == 1:
return cls._error(results.popitem()[1])
# else log each workflow result on separate lines
return cls._error(
"\n\n" + "\n\n".join(
f"{wflow}: {msg}" for wflow, msg in results.items()
)
)

# trigger a re-scan
await workflows_mgr.scan()
return response
# send a success message
return cls._return('Workflow(s) started')

@staticmethod
async def enqueue(stream, queue):
Expand Down
180 changes: 176 additions & 4 deletions cylc/uiserver/tests/test_resolvers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
from typing import Any, Dict, List, Optional, Tuple
from async_timeout import timeout
import logging
import os
import pytest
from unittest import mock
from unittest.mock import MagicMock, Mock
from subprocess import Popen, TimeoutExpired

from cylc.flow import CYLC_LOG
from cylc.flow.id import Tokens
Expand All @@ -13,6 +16,7 @@
Services,
process_cat_log_stderr,
)
from cylc.uiserver.workflows_mgr import WorkflowsManager

services = Services()

Expand Down Expand Up @@ -47,6 +51,175 @@ def test_Services_anciliary_methods(func, message, expect):
assert func(message) == expect


@pytest.mark.parametrize(
'workflows, args, env, expected_ret, expected_env',
[
pytest.param(
[Tokens('wflow1'), Tokens('~murray/wflow2')],
{},
{},
[True, "Workflow(s) started"],
{},
id="multiple"
),
pytest.param(
[Tokens('~feynman/wflow1')],
{},
{},
[False, "Cannot start workflows for other users."],
{},
id="other user's wflow"
),
pytest.param(
[Tokens('wflow1')],
{'cylc_version': 'top'},
{'CYLC_VERSION': 'bottom', 'CYLC_ENV_NAME': 'quark'},
[True, "Workflow(s) started"],
{'CYLC_VERSION': 'top'},
id="cylc version overrides env"
),
]
)
async def test_play(
monkeypatch: pytest.MonkeyPatch,
workflows: List[Tokens],
args: Dict[str, Any],
env: Dict[str, str],
expected_ret: list,
expected_env: Dict[str, str],
):
"""It runs cylc play correctly.
Params:
workflows: list of workflow tokens
args: any args/options for cylc play
env: any environment variables
expected_ret: expected return value
expected_env: any expected environment variables
"""
for k, v in env.items():
monkeypatch.setenv(k, v)
monkeypatch.setattr('cylc.uiserver.resolvers.getuser', lambda: 'murray')
mock_popen = Mock(
spec=Popen,
return_value=Mock(
spec=Popen,
wait=Mock(return_value=0),
)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

ret = await Services.play(
workflows,
{'some': 'opt', **args},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)

assert ret == expected_ret

expected_env = {**os.environ, **expected_env}
expected_env.pop('CYLC_ENV_NAME', None)

for i, call_args in enumerate(mock_popen.call_args_list):
cmd_str = ' '.join(call_args.args[0])
assert cmd_str.startswith('cylc play')
assert '--some opt' in cmd_str
assert workflows[i]['workflow'] in cmd_str

assert call_args.kwargs['env'] == expected_env


@pytest.mark.parametrize(
'workflows, popen_ret_codes, popen_communicate, expected',
[
pytest.param(
[Tokens('wflow1')],
[1],
("", "bad things!!"),
"bad things!!",
id="one"
),
pytest.param(
[Tokens('wflow1'), Tokens('wflow2')],
[1, 0],
("", "bad things!!"),
"\n\nwflow1: bad things!!\n\nwflow2: started",
id="multiple"
),
pytest.param(
[Tokens('wflow1')],
[1],
("something", ""),
"something",
id="uses stdout if stderr empty"
),
pytest.param(
[Tokens('wflow1')],
[4],
("", ""),
"Command failed (4): cylc play",
id="fallback msg if stdout/stderr empty"
),
]
)
async def test_play_fail(
monkeypatch: pytest.MonkeyPatch,
workflows: List[Tokens],
popen_ret_codes: List[int],
popen_communicate: Tuple[str, str],
expected: str,
):
"""It returns suitable error messages if cylc play fails.
Params:
workflows: list of workflow tokens
popen_ret_codes: cylc play return codes for each workflow
popen_communicate: stdout, stderr for cylc play
expected: (beginning of) expected returned error message
"""
mock_popen = Mock(
spec=Popen,
return_value=Mock(
spec=Popen,
wait=Mock(side_effect=lambda *a, **k: popen_ret_codes.pop(0)),
communicate=Mock(return_value=popen_communicate),
)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

status, message = await Services.play(
workflows,
{},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)
assert status is False
assert message.startswith(expected)


async def test_play_timeout(monkeypatch: pytest.MonkeyPatch):
"""It returns an error if cylc play times out."""
def wait(timeout):
raise TimeoutExpired('cylc play wflow1', timeout)

mock_popen = Mock(
spec=Popen,
return_value=Mock(spec=Popen, wait=wait)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

ret = await Services.play(
[Tokens('wflow1')],
{},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)
assert ret == [
False, "Command 'cylc play wflow1' timed out after 20 seconds"
]


async def test_cat_log(workflow_run_dir):
"""This is a functional test for cat_log subscription resolver.
Expand Down Expand Up @@ -79,8 +252,7 @@ async def test_cat_log(workflow_run_dir):
"""
log_file = log_dir / '01-start-01.log'
log_file.write_text(log_file_content)
expected = log_file.read_text()
info = mock.MagicMock()
info = MagicMock()
info.root_value = 2
# mock the context
info.context = {'sub_statuses': {2: "start"}}
Expand Down Expand Up @@ -115,7 +287,7 @@ async def test_cat_log(workflow_run_dir):
assert response['connected'] is False

# the other responses should contain the log file lines
assert actual.rstrip() == expected.rstrip()
assert actual.rstrip() == log_file_content.rstrip()


@pytest.mark.parametrize(
Expand Down
126 changes: 0 additions & 126 deletions cylc/uiserver/ui/2.2.0/assets/GraphiQL-598f1c25.js

This file was deleted.

Loading

0 comments on commit c746aae

Please sign in to comment.