From c04f67fa6969505fd37068dbd3bb191303c3aed1 Mon Sep 17 00:00:00 2001 From: Zack Cerza Date: Mon, 22 Jan 2024 17:15:34 -0700 Subject: [PATCH] async parallel updates --- scripts/reimage.py | 3 ++- teuthology/lock/cli.py | 4 ++-- teuthology/lock/ops.py | 4 ++-- teuthology/orchestra/cluster.py | 4 ++-- teuthology/parallel.py | 4 ++-- teuthology/reimage.py | 4 ++-- teuthology/task/install/redhat.py | 8 ++++---- teuthology/task/internal/redhat.py | 12 ++++++------ teuthology/task/internal/vm_setup.py | 4 ++-- teuthology/task/kernel.py | 10 +++++----- teuthology/task/parallel.py | 4 ++-- teuthology/task/pexec.py | 7 ++++--- teuthology/test/test_repo_utils.py | 15 +++++++++------ 13 files changed, 44 insertions(+), 39 deletions(-) diff --git a/scripts/reimage.py b/scripts/reimage.py index 42ec6e8ffe..47d1054a06 100644 --- a/scripts/reimage.py +++ b/scripts/reimage.py @@ -1,3 +1,4 @@ +import asyncio import docopt import sys @@ -22,4 +23,4 @@ Standard arguments: def main(argv=sys.argv[1:]): args = docopt.docopt(doc, argv=argv) - return teuthology.reimage.main(args) + asyncio.run(teuthology.reimage.main(args)) diff --git a/teuthology/lock/cli.py b/teuthology/lock/cli.py index 9cc8210a22..295b7094df 100644 --- a/teuthology/lock/cli.py +++ b/teuthology/lock/cli.py @@ -22,7 +22,7 @@ from teuthology.lock import ( log = logging.getLogger(__name__) -def main(ctx): +async def main(ctx): if ctx.verbose: teuthology.log.setLevel(logging.DEBUG) @@ -184,7 +184,7 @@ def main(ctx): ctx, misc.canonicalize_hostname(machine), ) - with teuthology.parallel.parallel() as p: + async with teuthology.parallel.parallel() as p: ops.update_nodes(reimage_machines, True) for machine in reimage_machines: p.spawn(teuthology.provision.reimage, ctx, machine, machine_types[machine]) diff --git a/teuthology/lock/ops.py b/teuthology/lock/ops.py index 2d24b68e62..f5a437d8fb 100644 --- a/teuthology/lock/ops.py +++ b/teuthology/lock/ops.py @@ -303,7 +303,7 @@ def push_new_keys(keys_dict, reference): return ret -def reimage_machines(ctx, machines, machine_type): +async def reimage_machines(ctx, machines, machine_type): reimage_types = teuthology.provision.get_reimage_types() if machine_type not in reimage_types: log.info(f"Skipping reimage of {machines.keys()} because {machine_type} is not in {reimage_types}") @@ -316,7 +316,7 @@ def reimage_machines(ctx, machines, machine_type): for machine in machines], ) with console_log.task(ctx, console_log_conf): - with teuthology.parallel.parallel() as p: + async with teuthology.parallel.parallel() as p: for machine in machines: log.info("Start node '%s' reimaging", machine) update_nodes([machine], True) diff --git a/teuthology/orchestra/cluster.py b/teuthology/orchestra/cluster.py index 654ef0c3de..d3a0ff17ff 100644 --- a/teuthology/orchestra/cluster.py +++ b/teuthology/orchestra/cluster.py @@ -49,7 +49,7 @@ class Cluster(object): ) self.remotes[remote] = list(roles) - def run(self, wait=True, parallel=False, **kwargs): + async def run(self, wait=True, parallel=False, **kwargs): """ Run a command on all the nodes in this cluster. @@ -88,7 +88,7 @@ class Cluster(object): # we have run sequentially and all processes are complete. if parallel and wait: - run.wait(procs) + await run.wait(procs) return procs def sh(self, script, **kwargs): diff --git a/teuthology/parallel.py b/teuthology/parallel.py index 88c3614e90..7e28565b45 100644 --- a/teuthology/parallel.py +++ b/teuthology/parallel.py @@ -13,13 +13,13 @@ class parallel(object): You add functions to be run with the spawn method:: - with parallel() as p: + async with parallel() as p: for foo in bar: p.spawn(quux, foo, baz=True) You can iterate over the results (which are in arbitrary order):: - with parallel() as p: + async with parallel() as p: for foo in bar: p.spawn(quux, foo, baz=True) for result in p: diff --git a/teuthology/reimage.py b/teuthology/reimage.py index fdc90543a6..81a1955d5d 100644 --- a/teuthology/reimage.py +++ b/teuthology/reimage.py @@ -11,7 +11,7 @@ from teuthology.misc import decanonicalize_hostname as shortname log = logging.getLogger(__name__) -def main(args): +async def main(args): if (args['--verbose']): teuthology.log.setLevel(logging.DEBUG) @@ -51,7 +51,7 @@ def main(args): ops.update_nodes([machine_name]) log.debug("Node '%s' reimaging is complete", machine_name) - with parallel() as p: + async with parallel() as p: for node in statuses: log.debug("Start node '%s' reimaging", node['name']) p.spawn(reimage_node, ctx, shortname(node['name']), node['machine_type']) diff --git a/teuthology/task/install/redhat.py b/teuthology/task/install/redhat.py index 5118088655..8d5340a18d 100644 --- a/teuthology/task/install/redhat.py +++ b/teuthology/task/install/redhat.py @@ -11,8 +11,8 @@ from teuthology.config import config as teuth_config log = logging.getLogger(__name__) -@contextlib.contextmanager -def install(ctx, config): +@contextlib.asynccontextmanager +async def install(ctx, config): """ Installs rh ceph on all hosts in ctx. @@ -60,7 +60,7 @@ def install(ctx, config): log.info("%s is a supported version", version) else: raise RuntimeError("Unsupported RH Ceph version %s", version) - with parallel() as p: + async with parallel() as p: for remote in ctx.cluster.remotes.keys(): if remote.os.name == 'rhel': log.info("Installing on RHEL node: %s", remote.shortname) @@ -75,7 +75,7 @@ def install(ctx, config): if config.get('skip_uninstall'): log.info("Skipping uninstall of Ceph") else: - with parallel() as p: + async with parallel() as p: for remote in ctx.cluster.remotes.keys(): p.spawn(uninstall_pkgs, ctx, remote, downstream_config) diff --git a/teuthology/task/internal/redhat.py b/teuthology/task/internal/redhat.py index 3f2030bd67..a8ddf22da0 100644 --- a/teuthology/task/internal/redhat.py +++ b/teuthology/task/internal/redhat.py @@ -16,8 +16,8 @@ from teuthology.exceptions import CommandFailedError, ConfigError log = logging.getLogger(__name__) -@contextlib.contextmanager -def setup_stage_cdn(ctx, config): +@contextlib.asynccontextmanager +async def setup_stage_cdn(ctx, config): """ Configure internal stage cdn """ @@ -31,7 +31,7 @@ def setup_stage_cdn(ctx, config): raise ConfigError('Provide rhbuild attribute') teuthconfig.rhbuild = str(rhbuild) - with parallel() as p: + async with parallel() as p: for remote in ctx.cluster.remotes.keys(): if remote.os.name == 'rhel': log.info("subscribing stage cdn on : %s", remote.shortname) @@ -39,7 +39,7 @@ def setup_stage_cdn(ctx, config): try: yield finally: - with parallel() as p: + async with parallel() as p: for remote in ctx.cluster.remotes.keys(): p.spawn(_unsubscribe_stage_cdn, remote) @@ -184,11 +184,11 @@ def setup_base_repo(ctx, config): ], check_status=False) -def _setup_latest_repo(ctx, config): +async def _setup_latest_repo(ctx, config): """ Setup repo based on redhat nodes """ - with parallel(): + async with parallel(): for remote in ctx.cluster.remotes.keys(): if remote.os.package_type == 'rpm': # pre-cleanup diff --git a/teuthology/task/internal/vm_setup.py b/teuthology/task/internal/vm_setup.py index f210bc7f41..5d5a4c66c5 100644 --- a/teuthology/task/internal/vm_setup.py +++ b/teuthology/task/internal/vm_setup.py @@ -9,7 +9,7 @@ from teuthology.exceptions import CommandFailedError log = logging.getLogger(__name__) -def vm_setup(ctx, config): +async def vm_setup(ctx, config): """ Look for virtual machines and handle their initialization """ @@ -18,7 +18,7 @@ def vm_setup(ctx, config): if 'kernel' in all_tasks and 'ansible.cephlab' not in all_tasks: need_ansible = True ansible_hosts = set() - with parallel(): + async with parallel(): editinfo = os.path.join(os.path.dirname(__file__), 'edit_sudoers.sh') for rem in ctx.cluster.remotes.keys(): if rem.is_vm: diff --git a/teuthology/task/kernel.py b/teuthology/task/kernel.py index 70d49059c6..f449c12ac9 100644 --- a/teuthology/task/kernel.py +++ b/teuthology/task/kernel.py @@ -449,7 +449,7 @@ def _no_grub_link(in_file, remote, kernel_ver): ) -def install_latest_rh_kernel(ctx, config): +async def install_latest_rh_kernel(ctx, config): """ Installs the lastest z stream kernel Reboot for the new kernel to take effect @@ -458,7 +458,7 @@ def install_latest_rh_kernel(ctx, config): config = {} if config.get('skip'): return - with parallel() as p: + async with parallel() as p: for remote in ctx.cluster.remotes.keys(): p.spawn(update_rh_kernel, remote) @@ -1131,8 +1131,8 @@ def get_sha1_from_pkg_name(path): return sha1 -@contextlib.contextmanager -def task(ctx, config): +@contextlib.asynccontextmanager +async def task(ctx, config): """ Make sure the specified kernel is installed. This can be a branch, tag, or sha1 of ceph-client.git or a local @@ -1232,7 +1232,7 @@ def task(ctx, config): validate_config(ctx, config) log.info('config %s, timeout %d' % (config, timeout)) - with parallel() as p: + async with parallel() as p: for role, role_config in config.items(): p.spawn(process_role, ctx, config, timeout, role, role_config) diff --git a/teuthology/task/parallel.py b/teuthology/task/parallel.py index 6999c0aae3..9024d5a80d 100644 --- a/teuthology/task/parallel.py +++ b/teuthology/task/parallel.py @@ -10,7 +10,7 @@ from teuthology import parallel log = logging.getLogger(__name__) -def task(ctx, config): +async def task(ctx, config): """ Run a group of tasks in parallel. @@ -45,7 +45,7 @@ def task(ctx, config): """ log.info('starting parallel...') - with parallel.parallel() as p: + async with parallel.parallel() as p: for entry in config: if not isinstance(entry, dict): entry = ctx.config.get(entry, {}) diff --git a/teuthology/task/pexec.py b/teuthology/task/pexec.py index 5832044465..1192b1465b 100644 --- a/teuthology/task/pexec.py +++ b/teuthology/task/pexec.py @@ -5,7 +5,7 @@ import asyncio import logging from teuthology import misc as teuthology -from teuthology.orchestra.run import PIPE, wait +from teuthology.orchestra.run import PIPE log = logging.getLogger(__name__) @@ -52,7 +52,7 @@ def _generate_remotes(ctx, config): (remote,) = ctx.cluster.only(role).remotes.keys() yield (remote, ls) -def task(ctx, config): +async def task(ctx, config): """ Execute commands on multiple hosts in parallel @@ -97,8 +97,9 @@ def task(ctx, config): tasks = set() for remote in remotes: task = _exec_host(remote[0], sudo, testdir, remote[1]) + # FIXME # task = asyncio.create_task( # _exec_host(remote[0], sudo, testdir, remote[1]) # ) tasks.add(task) - asyncio.gather(list(tasks)) + await asyncio.gather(*tasks) diff --git a/teuthology/test/test_repo_utils.py b/teuthology/test/test_repo_utils.py index a155fd410e..0d68753802 100644 --- a/teuthology/test/test_repo_utils.py +++ b/teuthology/test/test_repo_utils.py @@ -2,11 +2,12 @@ import logging import unittest.mock as mock import os import os.path -from pytest import raises, mark import shutil import subprocess import tempfile + from packaging.version import parse +from pytest import raises, mark from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError from teuthology import repo_utils @@ -192,20 +193,22 @@ class TestRepoUtils(object): with raises(ValueError): repo_utils.enforce_repo_state(self.repo_url, self.dest_path, 'a b', self.commit) - def test_simultaneous_access(self): + @mark.asyncio + async def test_simultaneous_access(self): count = 5 - with parallel.parallel() as p: + async with parallel.parallel() as p: for i in range(count): p.spawn(repo_utils.enforce_repo_state, self.repo_url, self.dest_path, 'main', self.commit) for result in p: assert result is None - def test_simultaneous_access_different_branches(self): + @mark.asyncio + async def test_simultaneous_access_different_branches(self): branches = [('main', self.commit), ('main', self.commit), ('nobranch', 'nocommit'), ('nobranch', 'nocommit'), ('main', self.commit), ('nobranch', 'nocommit')] - with parallel.parallel() as p: + async with parallel.parallel() as p: for branch, commit in branches: if branch == 'main': p.spawn(repo_utils.enforce_repo_state, self.repo_url, @@ -239,4 +242,4 @@ class TestRepoUtils(object): def test_current_branch(self): repo_utils.clone_repo(self.repo_url, self.dest_path, 'main', self.commit) - assert repo_utils.current_branch(self.dest_path) == "main" \ No newline at end of file + assert repo_utils.current_branch(self.dest_path) == "main" -- 2.39.5