log = logging.getLogger(__name__)
-def _exec_host(remote, sudo, ls):
+from gevent import queue as queue
+from gevent import event as event
+
+def _init_barrier(barrier_queue, remote):
+ barrier_queue.put(remote)
+
+def _do_barrier(barrier, barrier_queue, remote):
+ # special case for barrier
+ barrier_queue.get()
+ if barrier_queue.empty():
+ barrier.set()
+ barrier.clear()
+ else:
+ barrier.wait()
+
+ barrier_queue.put(remote)
+ if barrier_queue.full():
+ barrier.set()
+ barrier.clear()
+ else:
+ barrier.wait()
+
+def _exec_host(barrier, barrier_queue, remote, sudo, ls):
log.info('Running commands on host %s', remote.name)
args = ['bash', '-s']
if sudo:
r.stdin.writelines(['set -e\n'])
r.stdin.flush()
for l in ls:
+ if l == "barrier":
+ _do_barrier(barrier, barrier_queue, remote)
+ continue
+
r.stdin.writelines([l, '\n'])
r.stdin.flush()
r.stdin.writelines(['\n'])
r.stdin.close()
tor.wait([r])
+def _generate_remotes(ctx, config):
+ if 'all' in config and len(config) == 1:
+ ls = config['all']
+ for remote in ctx.cluster.remotes.iterkeys():
+ yield (remote, ls)
+ elif 'clients' in config:
+ ls = config['clients']
+ for role in teuthology.all_roles_of_type(ctx.cluster, 'client'):
+ (remote,) = ctx.cluster.only('client.{r}'.format(r=role)).remotes.iterkeys()
+ yield (remote, ls)
+ del config['clients']
+ for role, ls in config.iteritems():
+ (remote,) = ctx.cluster.only(role).remotes.iterkeys()
+ yield (remote, ls)
+ else:
+ for role, ls in config.iteritems():
+ (remote,) = ctx.cluster.only(role).remotes.iterkeys()
+ yield (remote, ls)
+
def task(ctx, config):
"""
- Execute commands on multiple roles in parallel
+ Execute commands on multiple hosts in parallel
tasks:
- ceph:
- tail -f bar
- interactive:
+ Execute commands on all hosts in the cluster in parallel. This
+ is useful if there are many hosts and you want to run the same
+ command on all:
+
+ tasks:
+ - pexec:
+ all:
+ - grep FAIL /tmp/cephtest/archive/log/*
+
+ Or if you want to run in parallel on all clients:
+
+ tasks:
+ - pexec:
+ clients:
+ - dd if=/dev/zero of=/tmp/cephtest/mnt.* count=1024 bs=1024
+
+ You can also ensure that parallel commands are synchronized with the
+ special 'barrier' statement:
+
+ tasks:
+ - pexec:
+ clients:
+ - cd /tmp/cephtest/mnt.*
+ - while true; do
+ - barrier
+ - dd if=/dev/zero of=./foo count=1024 bs=1024
+ - done
+
+ The above writes to the file foo on all clients over and over, but ensures that
+ all clients perform each write command in sync. If one client takes longer to
+ write, all the other clients will wait.
+
"""
log.info('Executing custom commands...')
assert isinstance(config, dict), "task pexec got invalid config"
sudo = config['sudo']
del config['sudo']
- if 'all' in config and len(config) == 1:
- ls = config['all']
- with parallel() as p:
- for remote in ctx.cluster.remotes.iterkeys():
- p.spawn(_exec_host, remote, sudo, ls)
- else:
- with parallel() as p:
- for role, ls in config.iteritems():
- (remote,) = ctx.cluster.only(role).remotes.iterkeys()
- p.spawn(_exec_host, remote, sudo, ls)
+
+ remotes = list(_generate_remotes(ctx, config))
+ count = len(remotes)
+ barrier_queue = queue.Queue(count)
+ barrier = event.Event()
+
+ for remote in remotes:
+ _init_barrier(barrier_queue, remote[0])
+ with parallel() as p:
+ for remote in remotes:
+ p.spawn(_exec_host, barrier, barrier_queue, remote[0], sudo, remote[1])