From 4722d468c645eced45b72f6c28431bb3b9084c93 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 6 Oct 2011 13:33:17 -0700 Subject: [PATCH] task/watch_notify_stress: watch_notify_stress now thrashes clients This should exercise the watch notify timeout code. Signed-off-by: Samuel Just --- teuthology/task/proc_thrasher.py | 63 ++++++++++++++++++++++++++ teuthology/task/watch_notify_stress.py | 51 +++++++-------------- 2 files changed, 79 insertions(+), 35 deletions(-) create mode 100644 teuthology/task/proc_thrasher.py diff --git a/teuthology/task/proc_thrasher.py b/teuthology/task/proc_thrasher.py new file mode 100644 index 0000000000000..472b6ecbdc8ff --- /dev/null +++ b/teuthology/task/proc_thrasher.py @@ -0,0 +1,63 @@ +import logging +import gevent +import random +import time + +from ..orchestra import run + +log = logging.getLogger(__name__) + +class ProcThrasher: + """ Kills and restarts some number of the specified process on the specified + remote + """ + def __init__(self, config, remote, *proc_args, **proc_kwargs): + self.proc_kwargs = proc_kwargs + self.proc_args = proc_args + self.config = config + self.greenlet = None + self.logger = proc_kwargs.get("logger", log.getChild('proc_thrasher')) + self.remote = remote + + # config: + self.num_procs = self.config.get("num_procs", 5) + self.rest_period = self.config.get("rest_period", 100) # seconds + self.run_time = self.config.get("run_time", 1000) # seconds + + def log(self, msg): + self.logger.info(msg) + + def start(self): + if self.greenlet is not None: + return + self.greenlet = gevent.Greenlet(self.loop) + self.greenlet.start() + + def join(self): + self.greenlet.join() + + def loop(self): + time_started = time.time() + procs = [] + self.log("Starting") + while time_started + self.run_time > time.time(): + if len(procs) > 0: + self.log("Killing proc") + proc = random.choice(procs) + procs.remove(proc) + proc.stdin.close() + self.log("About to wait") + run.wait([proc]) + self.log("Killed proc") + + while len(procs) < self.num_procs: + self.log("Creating proc " + str(len(procs) + 1)) + self.log("args are " + str(self.proc_args) + " kwargs: " + str(self.proc_kwargs)) + procs.append(self.remote.run( + *self.proc_args, + ** self.proc_kwargs)) + self.log("About to sleep") + time.sleep(self.rest_period) + self.log("Just woke") + + run.wait(procs) diff --git a/teuthology/task/watch_notify_stress.py b/teuthology/task/watch_notify_stress.py index baea5e5a5ac69..efc97369b7366 100644 --- a/teuthology/task/watch_notify_stress.py +++ b/teuthology/task/watch_notify_stress.py @@ -1,5 +1,6 @@ import contextlib import logging +import proc_thrasher from ..orchestra import run @@ -26,7 +27,7 @@ def task(ctx, config): log.info('Beginning test_stress_watch...') assert isinstance(config, dict), \ "please list clients to run on" - testsnaps = {} + testwatch = {} (mon,) = ctx.cluster.only('mon.0').remotes.iterkeys() remotes = [] @@ -38,48 +39,28 @@ def task(ctx, config): (remote,) = ctx.cluster.only(role).remotes.iterkeys() remotes.append(remote) - remote.run( - args=[ - 'cp', - '/tmp/cephtest/ceph.conf', - '/tmp/cephtest/data/ceph.conf', - ], - logger=log.getChild('test_stress_watch.{id}'.format(id=id_)), - wait=True, - ) - - args =[ '/bin/sh', '-c', - " ".join([ - 'cd', '/tmp/cephtest/data;', - 'export CEPH_CLIENT_ID={id_}; export CEPH_CONF=ceph.conf; export CEPH_ARGS="{flags}"; LD_PRELOAD=/tmp/cephtest/binary/usr/local/lib/librados.so.2 /tmp/cephtest/binary/usr/local/bin/test_stress_watch {flags}'.format( - id_=id_, - flags=config.get('flags', ''), - ) - ]) - ] + args =['CEPH_CLIENT_ID={id_}'.format(id_=id_), + 'CEPH_CONF=/tmp/cephtest/ceph.conf', + 'CEPH_ARGS="{flags}"'.format(flags=config.get('flags', '')), + 'LD_PRELOAD=/tmp/cephtest/binary/usr/local/lib/librados.so.2', + '/tmp/cephtest/daemon-helper', 'kill', + '/tmp/cephtest/binary/usr/local/bin/multi_stress_watch foo foo' + ] log.info("args are %s" % (args,)) - proc = remote.run( - args=args, - logger=log.getChild('testsnaps.{id}'.format(id=id_)), + proc = proc_thrasher.ProcThrasher({}, remote, + args=[run.Raw(i) for i in args], + logger=log.getChild('testwatch.{id}'.format(id=id_)), stdin=run.PIPE, wait=False ) - testsnaps[id_] = proc + proc.start() + testwatch[id_] = proc try: yield finally: - for i in remotes: - i.run( - args=[ - 'rm', - '/tmp/cephtest/data/ceph.conf' - ], - logger=log.getChild('testsnaps.{id}'.format(id=id_)), - wait=True, - ) - log.info('joining watch_notify_stress') - run.wait(testsnaps.itervalues()) + for i in testwatch.itervalues(): + i.join() -- 2.39.5