+import asyncio
import docopt
import sys
def main(argv=sys.argv[1:]):
args = docopt.docopt(doc, argv=argv)
- return teuthology.reimage.main(args)
+ asyncio.run(teuthology.reimage.main(args))
log = logging.getLogger(__name__)
-def main(ctx):
+async def main(ctx):
if ctx.verbose:
teuthology.log.setLevel(logging.DEBUG)
ctx,
misc.canonicalize_hostname(machine),
)
- with teuthology.parallel.parallel() as p:
+ async with teuthology.parallel.parallel() as p:
ops.update_nodes(reimage_machines, True)
for machine in reimage_machines:
p.spawn(teuthology.provision.reimage, ctx, machine, machine_types[machine])
return ret
-def reimage_machines(ctx, machines, machine_type):
+async def reimage_machines(ctx, machines, machine_type):
reimage_types = teuthology.provision.get_reimage_types()
if machine_type not in reimage_types:
log.info(f"Skipping reimage of {machines.keys()} because {machine_type} is not in {reimage_types}")
for machine in machines],
)
with console_log.task(ctx, console_log_conf):
- with teuthology.parallel.parallel() as p:
+ async with teuthology.parallel.parallel() as p:
for machine in machines:
log.info("Start node '%s' reimaging", machine)
update_nodes([machine], True)
)
self.remotes[remote] = list(roles)
- def run(self, wait=True, parallel=False, **kwargs):
+ async def run(self, wait=True, parallel=False, **kwargs):
"""
Run a command on all the nodes in this cluster.
# we have run sequentially and all processes are complete.
if parallel and wait:
- run.wait(procs)
+ await run.wait(procs)
return procs
def sh(self, script, **kwargs):
You add functions to be run with the spawn method::
- with parallel() as p:
+ async with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
You can iterate over the results (which are in arbitrary order)::
- with parallel() as p:
+ async with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
for result in p:
log = logging.getLogger(__name__)
-def main(args):
+async def main(args):
if (args['--verbose']):
teuthology.log.setLevel(logging.DEBUG)
ops.update_nodes([machine_name])
log.debug("Node '%s' reimaging is complete", machine_name)
- with parallel() as p:
+ async with parallel() as p:
for node in statuses:
log.debug("Start node '%s' reimaging", node['name'])
p.spawn(reimage_node, ctx, shortname(node['name']), node['machine_type'])
log = logging.getLogger(__name__)
-@contextlib.contextmanager
-def install(ctx, config):
+@contextlib.asynccontextmanager
+async def install(ctx, config):
"""
Installs rh ceph on all hosts in ctx.
log.info("%s is a supported version", version)
else:
raise RuntimeError("Unsupported RH Ceph version %s", version)
- with parallel() as p:
+ async with parallel() as p:
for remote in ctx.cluster.remotes.keys():
if remote.os.name == 'rhel':
log.info("Installing on RHEL node: %s", remote.shortname)
if config.get('skip_uninstall'):
log.info("Skipping uninstall of Ceph")
else:
- with parallel() as p:
+ async with parallel() as p:
for remote in ctx.cluster.remotes.keys():
p.spawn(uninstall_pkgs, ctx, remote, downstream_config)
log = logging.getLogger(__name__)
-@contextlib.contextmanager
-def setup_stage_cdn(ctx, config):
+@contextlib.asynccontextmanager
+async def setup_stage_cdn(ctx, config):
"""
Configure internal stage cdn
"""
raise ConfigError('Provide rhbuild attribute')
teuthconfig.rhbuild = str(rhbuild)
- with parallel() as p:
+ async with parallel() as p:
for remote in ctx.cluster.remotes.keys():
if remote.os.name == 'rhel':
log.info("subscribing stage cdn on : %s", remote.shortname)
try:
yield
finally:
- with parallel() as p:
+ async with parallel() as p:
for remote in ctx.cluster.remotes.keys():
p.spawn(_unsubscribe_stage_cdn, remote)
], check_status=False)
-def _setup_latest_repo(ctx, config):
+async def _setup_latest_repo(ctx, config):
"""
Setup repo based on redhat nodes
"""
- with parallel():
+ async with parallel():
for remote in ctx.cluster.remotes.keys():
if remote.os.package_type == 'rpm':
# pre-cleanup
log = logging.getLogger(__name__)
-def vm_setup(ctx, config):
+async def vm_setup(ctx, config):
"""
Look for virtual machines and handle their initialization
"""
if 'kernel' in all_tasks and 'ansible.cephlab' not in all_tasks:
need_ansible = True
ansible_hosts = set()
- with parallel():
+ async with parallel():
editinfo = os.path.join(os.path.dirname(__file__), 'edit_sudoers.sh')
for rem in ctx.cluster.remotes.keys():
if rem.is_vm:
)
-def install_latest_rh_kernel(ctx, config):
+async def install_latest_rh_kernel(ctx, config):
"""
Installs the lastest z stream kernel
Reboot for the new kernel to take effect
config = {}
if config.get('skip'):
return
- with parallel() as p:
+ async with parallel() as p:
for remote in ctx.cluster.remotes.keys():
p.spawn(update_rh_kernel, remote)
return sha1
-@contextlib.contextmanager
-def task(ctx, config):
+@contextlib.asynccontextmanager
+async def task(ctx, config):
"""
Make sure the specified kernel is installed.
This can be a branch, tag, or sha1 of ceph-client.git or a local
validate_config(ctx, config)
log.info('config %s, timeout %d' % (config, timeout))
- with parallel() as p:
+ async with parallel() as p:
for role, role_config in config.items():
p.spawn(process_role, ctx, config, timeout, role, role_config)
log = logging.getLogger(__name__)
-def task(ctx, config):
+async def task(ctx, config):
"""
Run a group of tasks in parallel.
"""
log.info('starting parallel...')
- with parallel.parallel() as p:
+ async with parallel.parallel() as p:
for entry in config:
if not isinstance(entry, dict):
entry = ctx.config.get(entry, {})
import logging
from teuthology import misc as teuthology
-from teuthology.orchestra.run import PIPE, wait
+from teuthology.orchestra.run import PIPE
log = logging.getLogger(__name__)
(remote,) = ctx.cluster.only(role).remotes.keys()
yield (remote, ls)
-def task(ctx, config):
+async def task(ctx, config):
"""
Execute commands on multiple hosts in parallel
tasks = set()
for remote in remotes:
task = _exec_host(remote[0], sudo, testdir, remote[1])
+ # FIXME
# task = asyncio.create_task(
# _exec_host(remote[0], sudo, testdir, remote[1])
# )
tasks.add(task)
- asyncio.gather(list(tasks))
+ await asyncio.gather(*tasks)
import unittest.mock as mock
import os
import os.path
-from pytest import raises, mark
import shutil
import subprocess
import tempfile
+
from packaging.version import parse
+from pytest import raises, mark
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError
from teuthology import repo_utils
with raises(ValueError):
repo_utils.enforce_repo_state(self.repo_url, self.dest_path, 'a b', self.commit)
- def test_simultaneous_access(self):
+ @mark.asyncio
+ async def test_simultaneous_access(self):
count = 5
- with parallel.parallel() as p:
+ async with parallel.parallel() as p:
for i in range(count):
p.spawn(repo_utils.enforce_repo_state, self.repo_url,
self.dest_path, 'main', self.commit)
for result in p:
assert result is None
- def test_simultaneous_access_different_branches(self):
+ @mark.asyncio
+ async def test_simultaneous_access_different_branches(self):
branches = [('main', self.commit), ('main', self.commit), ('nobranch', 'nocommit'),
('nobranch', 'nocommit'), ('main', self.commit), ('nobranch', 'nocommit')]
- with parallel.parallel() as p:
+ async with parallel.parallel() as p:
for branch, commit in branches:
if branch == 'main':
p.spawn(repo_utils.enforce_repo_state, self.repo_url,
def test_current_branch(self):
repo_utils.clone_repo(self.repo_url, self.dest_path, 'main', self.commit)
- assert repo_utils.current_branch(self.dest_path) == "main"
\ No newline at end of file
+ assert repo_utils.current_branch(self.dest_path) == "main"