--- /dev/null
+import logging
+import gevent
+import random
+import time
+
+from ..orchestra import run
+
+log = logging.getLogger(__name__)
+
+class ProcThrasher:
+ """ Kills and restarts some number of the specified process on the specified
+ remote
+ """
+ def __init__(self, config, remote, *proc_args, **proc_kwargs):
+ self.proc_kwargs = proc_kwargs
+ self.proc_args = proc_args
+ self.config = config
+ self.greenlet = None
+ self.logger = proc_kwargs.get("logger", log.getChild('proc_thrasher'))
+ self.remote = remote
+
+ # config:
+ self.num_procs = self.config.get("num_procs", 5)
+ self.rest_period = self.config.get("rest_period", 100) # seconds
+ self.run_time = self.config.get("run_time", 1000) # seconds
+
+ def log(self, msg):
+ self.logger.info(msg)
+
+ def start(self):
+ if self.greenlet is not None:
+ return
+ self.greenlet = gevent.Greenlet(self.loop)
+ self.greenlet.start()
+
+ def join(self):
+ self.greenlet.join()
+
+ def loop(self):
+ time_started = time.time()
+ procs = []
+ self.log("Starting")
+ while time_started + self.run_time > time.time():
+ if len(procs) > 0:
+ self.log("Killing proc")
+ proc = random.choice(procs)
+ procs.remove(proc)
+ proc.stdin.close()
+ self.log("About to wait")
+ run.wait([proc])
+ self.log("Killed proc")
+
+ while len(procs) < self.num_procs:
+ self.log("Creating proc " + str(len(procs) + 1))
+ self.log("args are " + str(self.proc_args) + " kwargs: " + str(self.proc_kwargs))
+ procs.append(self.remote.run(
+ *self.proc_args,
+ ** self.proc_kwargs))
+ self.log("About to sleep")
+ time.sleep(self.rest_period)
+ self.log("Just woke")
+
+ run.wait(procs)
import contextlib
import logging
+import proc_thrasher
from ..orchestra import run
log.info('Beginning test_stress_watch...')
assert isinstance(config, dict), \
"please list clients to run on"
- testsnaps = {}
+ testwatch = {}
(mon,) = ctx.cluster.only('mon.0').remotes.iterkeys()
remotes = []
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
remotes.append(remote)
- remote.run(
- args=[
- 'cp',
- '/tmp/cephtest/ceph.conf',
- '/tmp/cephtest/data/ceph.conf',
- ],
- logger=log.getChild('test_stress_watch.{id}'.format(id=id_)),
- wait=True,
- )
-
- args =[ '/bin/sh', '-c',
- " ".join([
- 'cd', '/tmp/cephtest/data;',
- 'export CEPH_CLIENT_ID={id_}; export CEPH_CONF=ceph.conf; export CEPH_ARGS="{flags}"; LD_PRELOAD=/tmp/cephtest/binary/usr/local/lib/librados.so.2 /tmp/cephtest/binary/usr/local/bin/test_stress_watch {flags}'.format(
- id_=id_,
- flags=config.get('flags', ''),
- )
- ])
- ]
+ args =['CEPH_CLIENT_ID={id_}'.format(id_=id_),
+ 'CEPH_CONF=/tmp/cephtest/ceph.conf',
+ 'CEPH_ARGS="{flags}"'.format(flags=config.get('flags', '')),
+ 'LD_PRELOAD=/tmp/cephtest/binary/usr/local/lib/librados.so.2',
+ '/tmp/cephtest/daemon-helper', 'kill',
+ '/tmp/cephtest/binary/usr/local/bin/multi_stress_watch foo foo'
+ ]
log.info("args are %s" % (args,))
- proc = remote.run(
- args=args,
- logger=log.getChild('testsnaps.{id}'.format(id=id_)),
+ proc = proc_thrasher.ProcThrasher({}, remote,
+ args=[run.Raw(i) for i in args],
+ logger=log.getChild('testwatch.{id}'.format(id=id_)),
stdin=run.PIPE,
wait=False
)
- testsnaps[id_] = proc
+ proc.start()
+ testwatch[id_] = proc
try:
yield
finally:
- for i in remotes:
- i.run(
- args=[
- 'rm',
- '/tmp/cephtest/data/ceph.conf'
- ],
- logger=log.getChild('testsnaps.{id}'.format(id=id_)),
- wait=True,
- )
-
log.info('joining watch_notify_stress')
- run.wait(testsnaps.itervalues())
+ for i in testwatch.itervalues():
+ i.join()