]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
task/watch_notify_stress: watch_notify_stress now thrashes clients
authorSamuel Just <samuel.just@dreamhost.com>
Thu, 6 Oct 2011 20:33:17 +0000 (13:33 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Thu, 6 Oct 2011 21:34:44 +0000 (14:34 -0700)
This should exercise the watch notify timeout code.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
teuthology/task/proc_thrasher.py [new file with mode: 0644]
teuthology/task/watch_notify_stress.py

diff --git a/teuthology/task/proc_thrasher.py b/teuthology/task/proc_thrasher.py
new file mode 100644 (file)
index 0000000..472b6ec
--- /dev/null
@@ -0,0 +1,63 @@
+import logging
+import gevent
+import random
+import time
+
+from ..orchestra import run
+
+log = logging.getLogger(__name__)
+
+class ProcThrasher:
+    """ Kills and restarts some number of the specified process on the specified
+        remote
+    """
+    def __init__(self, config, remote, *proc_args, **proc_kwargs):
+        self.proc_kwargs = proc_kwargs
+        self.proc_args = proc_args
+        self.config = config
+        self.greenlet = None
+        self.logger = proc_kwargs.get("logger", log.getChild('proc_thrasher'))
+        self.remote = remote
+
+        # config:
+        self.num_procs = self.config.get("num_procs", 5)
+        self.rest_period = self.config.get("rest_period", 100) # seconds
+        self.run_time = self.config.get("run_time", 1000) # seconds
+
+    def log(self, msg):
+        self.logger.info(msg)
+
+    def start(self):
+        if self.greenlet is not None:
+            return
+        self.greenlet = gevent.Greenlet(self.loop)
+        self.greenlet.start()
+
+    def join(self):
+        self.greenlet.join()
+
+    def loop(self):
+        time_started = time.time()
+        procs = []
+        self.log("Starting")
+        while time_started + self.run_time > time.time():
+            if len(procs) > 0:
+                self.log("Killing proc")
+                proc = random.choice(procs)
+                procs.remove(proc)
+                proc.stdin.close()
+                self.log("About to wait")
+                run.wait([proc])
+                self.log("Killed proc")
+                
+            while len(procs) < self.num_procs:
+                self.log("Creating proc " + str(len(procs) + 1))
+                self.log("args are " + str(self.proc_args) + " kwargs: " + str(self.proc_kwargs))
+                procs.append(self.remote.run(
+                        *self.proc_args,
+                        ** self.proc_kwargs))
+            self.log("About to sleep")
+            time.sleep(self.rest_period)
+            self.log("Just woke")
+
+        run.wait(procs)
index baea5e5a5ac69219fb6f333f5d74abbef64cec23..efc97369b7366127353d23e521878b9ace8e4dd4 100644 (file)
@@ -1,5 +1,6 @@
 import contextlib
 import logging
+import proc_thrasher
 
 from ..orchestra import run
 
@@ -26,7 +27,7 @@ def task(ctx, config):
     log.info('Beginning test_stress_watch...')
     assert isinstance(config, dict), \
         "please list clients to run on"
-    testsnaps = {}
+    testwatch = {}
 
     (mon,) = ctx.cluster.only('mon.0').remotes.iterkeys()
     remotes = []
@@ -38,48 +39,28 @@ def task(ctx, config):
         (remote,) = ctx.cluster.only(role).remotes.iterkeys()
         remotes.append(remote)
 
-        remote.run(
-            args=[
-                'cp',
-                '/tmp/cephtest/ceph.conf',
-                '/tmp/cephtest/data/ceph.conf',
-                ],
-            logger=log.getChild('test_stress_watch.{id}'.format(id=id_)),
-            wait=True,
-            )
-
-        args =[ '/bin/sh', '-c',
-                " ".join([
-                    'cd', '/tmp/cephtest/data;',
-                    'export CEPH_CLIENT_ID={id_}; export CEPH_CONF=ceph.conf; export CEPH_ARGS="{flags}"; LD_PRELOAD=/tmp/cephtest/binary/usr/local/lib/librados.so.2 /tmp/cephtest/binary/usr/local/bin/test_stress_watch {flags}'.format(
-                        id_=id_,
-                        flags=config.get('flags', ''),
-                        )
-                    ])
-                ]
+        args =['CEPH_CLIENT_ID={id_}'.format(id_=id_),
+               'CEPH_CONF=/tmp/cephtest/ceph.conf',
+               'CEPH_ARGS="{flags}"'.format(flags=config.get('flags', '')),
+               'LD_PRELOAD=/tmp/cephtest/binary/usr/local/lib/librados.so.2',
+               '/tmp/cephtest/daemon-helper', 'kill',
+               '/tmp/cephtest/binary/usr/local/bin/multi_stress_watch foo foo'
+               ]
 
         log.info("args are %s" % (args,))
 
-        proc = remote.run(
-            args=args,
-            logger=log.getChild('testsnaps.{id}'.format(id=id_)),
+        proc = proc_thrasher.ProcThrasher({}, remote,
+            args=[run.Raw(i) for i in args],
+            logger=log.getChild('testwatch.{id}'.format(id=id_)),
             stdin=run.PIPE,
             wait=False
             )
-        testsnaps[id_] = proc
+        proc.start()
+        testwatch[id_] = proc
 
     try:
         yield
     finally:
-        for i in remotes:
-            i.run(
-                args=[
-                    'rm',
-                    '/tmp/cephtest/data/ceph.conf'
-                    ],
-                logger=log.getChild('testsnaps.{id}'.format(id=id_)),
-                wait=True,
-                )
-
         log.info('joining watch_notify_stress')
-        run.wait(testsnaps.itervalues())
+        for i in testwatch.itervalues():
+            i.join()