]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
async parallel updates
authorZack Cerza <zack@redhat.com>
Tue, 23 Jan 2024 00:15:34 +0000 (17:15 -0700)
committerZack Cerza <zack@redhat.com>
Tue, 23 Jan 2024 00:15:34 +0000 (17:15 -0700)
13 files changed:
scripts/reimage.py
teuthology/lock/cli.py
teuthology/lock/ops.py
teuthology/orchestra/cluster.py
teuthology/parallel.py
teuthology/reimage.py
teuthology/task/install/redhat.py
teuthology/task/internal/redhat.py
teuthology/task/internal/vm_setup.py
teuthology/task/kernel.py
teuthology/task/parallel.py
teuthology/task/pexec.py
teuthology/test/test_repo_utils.py

index 42ec6e8ffe08df75e51692fe754edc01108a4328..47d1054a06a826ac2a35e702778499141f6b67ab 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import docopt
 import sys
 
@@ -22,4 +23,4 @@ Standard arguments:
 
 def main(argv=sys.argv[1:]):
     args = docopt.docopt(doc, argv=argv)
-    return teuthology.reimage.main(args)
+    asyncio.run(teuthology.reimage.main(args))
index 9cc8210a22d20ccd964671305fee3a41f0568669..295b7094dffe7108f0b27b970bdbd5b2cdc5520a 100644 (file)
@@ -22,7 +22,7 @@ from teuthology.lock import (
 log = logging.getLogger(__name__)
 
 
-def main(ctx):
+async def main(ctx):
     if ctx.verbose:
         teuthology.log.setLevel(logging.DEBUG)
 
@@ -184,7 +184,7 @@ def main(ctx):
                     ctx,
                     misc.canonicalize_hostname(machine),
                 )
-        with teuthology.parallel.parallel() as p:
+        async with teuthology.parallel.parallel() as p:
             ops.update_nodes(reimage_machines, True)
             for machine in reimage_machines:
                 p.spawn(teuthology.provision.reimage, ctx, machine, machine_types[machine])
index 2d24b68e62876879a4c38ce757fd4acb80fb2360..f5a437d8fb232f5eff99e976e297372707a5c3e2 100644 (file)
@@ -303,7 +303,7 @@ def push_new_keys(keys_dict, reference):
     return ret
 
 
-def reimage_machines(ctx, machines, machine_type):
+async def reimage_machines(ctx, machines, machine_type):
     reimage_types = teuthology.provision.get_reimage_types()
     if machine_type not in reimage_types:
         log.info(f"Skipping reimage of {machines.keys()} because {machine_type} is not in {reimage_types}")
@@ -316,7 +316,7 @@ def reimage_machines(ctx, machines, machine_type):
                  for machine in machines],
     )
     with console_log.task(ctx, console_log_conf):
-        with teuthology.parallel.parallel() as p:
+        async with teuthology.parallel.parallel() as p:
             for machine in machines:
                 log.info("Start node '%s' reimaging", machine)
                 update_nodes([machine], True)
index 654ef0c3dec553d16991476e8ab706bdbab045af..d3a0ff17ffa4a32d1aa79c082b7e7ba0eddd1cc0 100644 (file)
@@ -49,7 +49,7 @@ class Cluster(object):
                 )
         self.remotes[remote] = list(roles)
 
-    def run(self, wait=True, parallel=False, **kwargs):
+    async def run(self, wait=True, parallel=False, **kwargs):
         """
         Run a command on all the nodes in this cluster.
 
@@ -88,7 +88,7 @@ class Cluster(object):
         # we have run sequentially and all processes are complete.
 
         if parallel and wait:
-            run.wait(procs)
+            await run.wait(procs)
         return procs
 
     def sh(self, script, **kwargs):
index 88c3614e9004c5d5b59638ee34b6a3c40fc70c7f..7e28565b451e1632e50ee0c85b7c4232c26de0fd 100644 (file)
@@ -13,13 +13,13 @@ class parallel(object):
 
     You add functions to be run with the spawn method::
 
-        with parallel() as p:
+        async with parallel() as p:
             for foo in bar:
                 p.spawn(quux, foo, baz=True)
 
     You can iterate over the results (which are in arbitrary order)::
 
-        with parallel() as p:
+        async with parallel() as p:
             for foo in bar:
                 p.spawn(quux, foo, baz=True)
             for result in p:
index fdc90543a6c0cdcfa048de1d732743f57d390a50..81a1955d5d14cbdd24d7fe19a208d7c4abb7ac80 100644 (file)
@@ -11,7 +11,7 @@ from teuthology.misc import decanonicalize_hostname as shortname
 
 log = logging.getLogger(__name__)
 
-def main(args):
+async def main(args):
     if (args['--verbose']):
         teuthology.log.setLevel(logging.DEBUG)
 
@@ -51,7 +51,7 @@ def main(args):
         ops.update_nodes([machine_name])
         log.debug("Node '%s' reimaging is complete", machine_name)
 
-    with parallel() as p:
+    async with parallel() as p:
         for node in statuses:
             log.debug("Start node '%s' reimaging", node['name'])
             p.spawn(reimage_node, ctx, shortname(node['name']), node['machine_type'])
index 5118088655fb74adeff82c6797f6a325bfee4930..8d5340a18d1a37aff6a194ab60637b17ff82af72 100644 (file)
@@ -11,8 +11,8 @@ from teuthology.config import config as teuth_config
 log = logging.getLogger(__name__)
 
 
-@contextlib.contextmanager
-def install(ctx, config):
+@contextlib.asynccontextmanager
+async def install(ctx, config):
     """
     Installs rh ceph on all hosts in ctx.
 
@@ -60,7 +60,7 @@ def install(ctx, config):
         log.info("%s is a supported version", version)
     else:
         raise RuntimeError("Unsupported RH Ceph version %s", version)
-    with parallel() as p:
+    async with parallel() as p:
         for remote in ctx.cluster.remotes.keys():
             if remote.os.name == 'rhel':
                 log.info("Installing on RHEL node: %s", remote.shortname)
@@ -75,7 +75,7 @@ def install(ctx, config):
         if config.get('skip_uninstall'):
             log.info("Skipping uninstall of Ceph")
         else:
-            with parallel() as p:
+            async with parallel() as p:
                 for remote in ctx.cluster.remotes.keys():
                     p.spawn(uninstall_pkgs, ctx, remote, downstream_config)
 
index 3f2030bd67209c44ccb5f1180fd06815e91b5d92..a8ddf22da0921f5d44df7af26cb50c8b7bec481c 100644 (file)
@@ -16,8 +16,8 @@ from teuthology.exceptions import CommandFailedError, ConfigError
 log = logging.getLogger(__name__)
 
 
-@contextlib.contextmanager
-def setup_stage_cdn(ctx, config):
+@contextlib.asynccontextmanager
+async def setup_stage_cdn(ctx, config):
     """
     Configure internal stage cdn
     """
@@ -31,7 +31,7 @@ def setup_stage_cdn(ctx, config):
         raise ConfigError('Provide rhbuild attribute')
     teuthconfig.rhbuild = str(rhbuild)
 
-    with parallel() as p:
+    async with parallel() as p:
         for remote in ctx.cluster.remotes.keys():
             if remote.os.name == 'rhel':
                 log.info("subscribing stage cdn on : %s", remote.shortname)
@@ -39,7 +39,7 @@ def setup_stage_cdn(ctx, config):
     try:
         yield
     finally:
-        with parallel() as p:
+        async with parallel() as p:
             for remote in ctx.cluster.remotes.keys():
                 p.spawn(_unsubscribe_stage_cdn, remote)
 
@@ -184,11 +184,11 @@ def setup_base_repo(ctx, config):
                                      ], check_status=False)
 
 
-def _setup_latest_repo(ctx, config):
+async def _setup_latest_repo(ctx, config):
     """
     Setup repo based on redhat nodes
     """
-    with parallel():
+    async with parallel():
         for remote in ctx.cluster.remotes.keys():
             if remote.os.package_type == 'rpm':
                 # pre-cleanup
index f210bc7f41636595be4f106554490271b42aa4b3..5d5a4c66c5321e2ce5100921e4d4cc8ef4f88bf5 100644 (file)
@@ -9,7 +9,7 @@ from teuthology.exceptions import CommandFailedError
 log = logging.getLogger(__name__)
 
 
-def vm_setup(ctx, config):
+async def vm_setup(ctx, config):
     """
     Look for virtual machines and handle their initialization
     """
@@ -18,7 +18,7 @@ def vm_setup(ctx, config):
     if 'kernel' in all_tasks and 'ansible.cephlab' not in all_tasks:
         need_ansible = True
     ansible_hosts = set()
-    with parallel():
+    async with parallel():
         editinfo = os.path.join(os.path.dirname(__file__), 'edit_sudoers.sh')
         for rem in ctx.cluster.remotes.keys():
             if rem.is_vm:
index 70d49059c6e65f334d596009e2e98e14119b62ed..f449c12ac99d5ab712bbe3344f01c94064bf1429 100644 (file)
@@ -449,7 +449,7 @@ def _no_grub_link(in_file, remote, kernel_ver):
     )
 
 
-def install_latest_rh_kernel(ctx, config):
+async def install_latest_rh_kernel(ctx, config):
     """
     Installs the lastest z stream kernel
     Reboot for the new kernel to take effect
@@ -458,7 +458,7 @@ def install_latest_rh_kernel(ctx, config):
         config = {}
     if config.get('skip'):
         return
-    with parallel() as p:
+    async with parallel() as p:
         for remote in ctx.cluster.remotes.keys():
             p.spawn(update_rh_kernel, remote)
 
@@ -1131,8 +1131,8 @@ def get_sha1_from_pkg_name(path):
     return sha1
 
 
-@contextlib.contextmanager
-def task(ctx, config):
+@contextlib.asynccontextmanager
+async def task(ctx, config):
     """
     Make sure the specified kernel is installed.
     This can be a branch, tag, or sha1 of ceph-client.git or a local
@@ -1232,7 +1232,7 @@ def task(ctx, config):
     validate_config(ctx, config)
     log.info('config %s, timeout %d' % (config, timeout))
 
-    with parallel() as p:
+    async with parallel() as p:
         for role, role_config in config.items():
             p.spawn(process_role, ctx, config, timeout, role, role_config)
 
index 6999c0aae38cc36cd3e4f95401ea4ad725ed521f..9024d5a80dcfd2f46cef10d62cdb29e2db1ee452 100644 (file)
@@ -10,7 +10,7 @@ from teuthology import parallel
 log = logging.getLogger(__name__)
 
 
-def task(ctx, config):
+async def task(ctx, config):
     """
     Run a group of tasks in parallel.
 
@@ -45,7 +45,7 @@ def task(ctx, config):
     """
 
     log.info('starting parallel...')
-    with parallel.parallel() as p:
+    async with parallel.parallel() as p:
         for entry in config:
             if not isinstance(entry, dict):
                 entry = ctx.config.get(entry, {})
index 5832044465e63180967f414675bf55506b76b20e..1192b1465b20fa0a5c0c878564a70e7aa5af4d47 100644 (file)
@@ -5,7 +5,7 @@ import asyncio
 import logging
 
 from teuthology import misc as teuthology
-from teuthology.orchestra.run import PIPE, wait
+from teuthology.orchestra.run import PIPE
 
 log = logging.getLogger(__name__)
 
@@ -52,7 +52,7 @@ def _generate_remotes(ctx, config):
             (remote,) = ctx.cluster.only(role).remotes.keys()
             yield (remote, ls)
 
-def task(ctx, config):
+async def task(ctx, config):
     """
     Execute commands on multiple hosts in parallel
 
@@ -97,8 +97,9 @@ def task(ctx, config):
     tasks = set()
     for remote in remotes:
         task = _exec_host(remote[0], sudo, testdir, remote[1])
+        # FIXME
         # task = asyncio.create_task(
         #     _exec_host(remote[0], sudo, testdir, remote[1])
         # )
         tasks.add(task)
-    asyncio.gather(list(tasks))
+    await asyncio.gather(*tasks)
index a155fd410ef608ed1b91aae915b4248dbae725f9..0d68753802d41cad7baab9c4aaef6d1f1b75fc74 100644 (file)
@@ -2,11 +2,12 @@ import logging
 import unittest.mock as mock
 import os
 import os.path
-from pytest import raises, mark
 import shutil
 import subprocess
 import tempfile
+
 from packaging.version import parse
+from pytest import raises, mark
 
 from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError
 from teuthology import repo_utils
@@ -192,20 +193,22 @@ class TestRepoUtils(object):
         with raises(ValueError):
             repo_utils.enforce_repo_state(self.repo_url, self.dest_path, 'a b', self.commit)
 
-    def test_simultaneous_access(self):
+    @mark.asyncio
+    async def test_simultaneous_access(self):
         count = 5
-        with parallel.parallel() as p:
+        async with parallel.parallel() as p:
             for i in range(count):
                 p.spawn(repo_utils.enforce_repo_state, self.repo_url,
                         self.dest_path, 'main', self.commit)
             for result in p:
                 assert result is None
 
-    def test_simultaneous_access_different_branches(self):
+    @mark.asyncio
+    async def test_simultaneous_access_different_branches(self):
         branches = [('main', self.commit),  ('main', self.commit), ('nobranch', 'nocommit'),
                     ('nobranch', 'nocommit'), ('main', self.commit), ('nobranch', 'nocommit')]
 
-        with parallel.parallel() as p:
+        async with parallel.parallel() as p:
             for branch, commit in branches:
                 if branch == 'main':
                     p.spawn(repo_utils.enforce_repo_state, self.repo_url,
@@ -239,4 +242,4 @@ class TestRepoUtils(object):
 
     def test_current_branch(self):
         repo_utils.clone_repo(self.repo_url, self.dest_path, 'main', self.commit)
-        assert repo_utils.current_branch(self.dest_path) == "main"
\ No newline at end of file
+        assert repo_utils.current_branch(self.dest_path) == "main"