]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
task/pexec: Add barrier capability
authorSam Lang <sam.lang@inktank.com>
Thu, 27 Dec 2012 23:33:07 +0000 (17:33 -0600)
committerSam Lang <sam.lang@inktank.com>
Tue, 8 Jan 2013 15:32:08 +0000 (09:32 -0600)
This patch adds the ability to barrier between
parallel exec tasks so that all tasks will perform
the following step (after the barrier) at the same
time.

Signed-off-by: Sam Lang <sam.lang@inktank.com>
teuthology/task/pexec.py

index 0667dbfa473ab30de0bae60c5e64a010ff5bc206..5e7b7be1986f28eca293222aae1c133981fb6886 100644 (file)
@@ -9,7 +9,29 @@ from teuthology.orchestra import run as tor
 
 log = logging.getLogger(__name__)
 
-def _exec_host(remote, sudo, ls):
+from gevent import queue as queue
+from gevent import event as event
+
+def _init_barrier(barrier_queue, remote):
+    barrier_queue.put(remote)
+
+def _do_barrier(barrier, barrier_queue, remote):
+    # special case for barrier
+    barrier_queue.get()
+    if barrier_queue.empty():
+        barrier.set()
+        barrier.clear()
+    else:
+        barrier.wait()
+
+    barrier_queue.put(remote)
+    if barrier_queue.full():
+        barrier.set()
+        barrier.clear()
+    else:
+        barrier.wait()
+
+def _exec_host(barrier, barrier_queue, remote, sudo, ls):
     log.info('Running commands on host %s', remote.name)
     args = ['bash', '-s']
     if sudo:
@@ -18,6 +40,10 @@ def _exec_host(remote, sudo, ls):
     r.stdin.writelines(['set -e\n'])
     r.stdin.flush()
     for l in ls:
+        if l == "barrier":
+            _do_barrier(barrier, barrier_queue, remote)
+            continue
+
         r.stdin.writelines([l, '\n'])
         r.stdin.flush()
     r.stdin.writelines(['\n'])
@@ -25,9 +51,28 @@ def _exec_host(remote, sudo, ls):
     r.stdin.close()
     tor.wait([r])
 
+def _generate_remotes(ctx, config):
+    if 'all' in config and len(config) == 1:
+        ls = config['all']
+        for remote in ctx.cluster.remotes.iterkeys():
+            yield (remote, ls)
+    elif 'clients' in config:
+        ls = config['clients']
+        for role in teuthology.all_roles_of_type(ctx.cluster, 'client'):
+            (remote,) = ctx.cluster.only('client.{r}'.format(r=role)).remotes.iterkeys()
+            yield (remote, ls)
+        del config['clients']
+        for role, ls in config.iteritems():
+            (remote,) = ctx.cluster.only(role).remotes.iterkeys()
+            yield (remote, ls)
+    else:
+        for role, ls in config.iteritems():
+            (remote,) = ctx.cluster.only(role).remotes.iterkeys()
+            yield (remote, ls)
+
 def task(ctx, config):
     """
-    Execute commands on multiple roles in parallel
+    Execute commands on multiple hosts in parallel
 
         tasks:
         - ceph:
@@ -40,6 +85,38 @@ def task(ctx, config):
               - tail -f bar
         - interactive:
 
+    Execute commands on all hosts in the cluster in parallel.  This
+    is useful if there are many hosts and you want to run the same
+    command on all:
+
+        tasks:
+        - pexec:
+            all:
+              - grep FAIL /tmp/cephtest/archive/log/*
+
+    Or if you want to run in parallel on all clients:
+
+        tasks:
+        - pexec:
+            clients:
+              - dd if=/dev/zero of=/tmp/cephtest/mnt.* count=1024 bs=1024
+
+    You can also ensure that parallel commands are synchronized with the
+    special 'barrier' statement:
+
+    tasks:
+    - pexec:
+        clients:
+          - cd /tmp/cephtest/mnt.*
+          - while true; do
+          -   barrier
+          -   dd if=/dev/zero of=./foo count=1024 bs=1024
+          - done
+
+    The above writes to the file foo on all clients over and over, but ensures that
+    all clients perform each write command in sync.  If one client takes longer to
+    write, all the other clients will wait.
+
     """
     log.info('Executing custom commands...')
     assert isinstance(config, dict), "task pexec got invalid config"
@@ -49,13 +126,14 @@ def task(ctx, config):
         sudo = config['sudo']
         del config['sudo']
 
-    if 'all' in config and len(config) == 1:
-        ls = config['all']
-        with parallel() as p:
-            for remote in ctx.cluster.remotes.iterkeys():
-                p.spawn(_exec_host, remote, sudo, ls)
-    else:
-        with parallel() as p:
-            for role, ls in config.iteritems():
-                (remote,) = ctx.cluster.only(role).remotes.iterkeys()
-                p.spawn(_exec_host, remote, sudo, ls)
+
+    remotes = list(_generate_remotes(ctx, config))
+    count = len(remotes)
+    barrier_queue = queue.Queue(count)
+    barrier = event.Event()
+
+    for remote in remotes:
+        _init_barrier(barrier_queue, remote[0])
+    with parallel() as p:
+        for remote in remotes:
+            p.spawn(_exec_host, barrier, barrier_queue, remote[0], sudo, remote[1])