import socket
import time
-from gevent import sleep
-from gevent.greenlet import Greenlet
-from gevent.event import Event
-
from teuthology.exceptions import CommandFailedError
from teuthology.orchestra import run
-from tasks.thrasher import Thrasher
+from tasks.thrasher import ThrasherGreenlet
log = logging.getLogger(__name__)
-class CephFSMirrorThrasher(Thrasher, Greenlet):
+class CephFSMirrorThrasher(ThrasherGreenlet):
"""
CephFSMirrorThrasher::
self.logger = log
self.name = 'thrasher.cephfs_mirror.[{cluster}]'.format(cluster = cluster)
- self.stopping = Event()
self.randomize = bool(self.config.get('randomize', True))
self.max_thrash = int(self.config.get('max_thrash', 1))
"""Write data to logger assigned to this CephFSMirrorThrasher"""
self.logger.info(x)
- def stop(self):
- self.stopping.set()
-
def do_thrash(self):
"""
Perform the random thrashing action
"kill": 0,
}
- while not self.stopping.is_set():
+ while not self.is_stopped:
delay = self.max_thrash_delay
if self.randomize:
delay = random.randrange(self.min_thrash_delay, self.max_thrash_delay)
if delay > 0.0:
self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
- self.stopping.wait(delay)
- if self.stopping.is_set():
- continue
+ self.sleep_unless_stopped(delay)
killed_daemons = []
delay = random.randrange(0.0, self.max_revive_delay)
self.log('waiting for {delay} secs before reviving daemons'.format(delay=delay))
- sleep(delay)
+ self.sleep_unless_stopped(delay)
for daemon in killed_daemons:
self.log('waiting for {label}'.format(label=daemon.id_))
import logging
import contextlib
-from gevent import sleep, GreenletExit
-from gevent.greenlet import Greenlet
-from gevent.event import Event
+from gevent import sleep
from teuthology import misc as teuthology
from tasks import ceph_manager
from tasks.cephfs.filesystem import MDSCluster, Filesystem
-from tasks.thrasher import Thrasher
+from tasks.thrasher import ThrasherGreenlet
log = logging.getLogger(__name__)
-class ForwardScrubber(Thrasher, Greenlet):
+class ForwardScrubber(ThrasherGreenlet):
"""
ForwardScrubber::
self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
self.fs = fs
self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
- self.stopping = Event()
self.scrub_timeout = scrub_timeout
self.sleep_between_iterations = sleep_between_iterations
self.logger.exception("exception:")
# allow successful completion so gevent doesn't see an exception...
- def stop(self):
- self.stopping.set()
-
def do_scrub(self):
"""
Perform the file-system scrubbing
"""
self.logger.info(f'start scrubbing fs: {self.fs.name}')
- try:
- while not self.stopping.is_set():
- self._scrub()
- sleep(self.sleep_between_iterations)
- except GreenletExit:
- pass
+ while not self.is_stopped:
+ self._scrub()
+ self.sleep_unless_stopped(self.sleep_between_iterations)
self.logger.info(f'end scrubbing fs: {self.fs.name}')
import time
from gevent import sleep
-from gevent.greenlet import Greenlet
-from gevent.event import Event
from teuthology import misc as teuthology
from tasks import ceph_manager
from tasks.cephfs.filesystem import MDSCluster, Filesystem, FSMissing
-from tasks.thrasher import Thrasher
+from tasks.thrasher import ThrasherGreenlet
log = logging.getLogger(__name__)
-class MDSThrasher(Thrasher, Greenlet):
+class MDSThrasher(ThrasherGreenlet):
"""
MDSThrasher::
self.manager = manager
self.max_mds = max_mds
self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
- self.stopping = Event()
self.randomize = bool(self.config.get('randomize', True))
self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
"""Write data to the logger assigned to MDSThrasher"""
self.logger.info(x)
- def stop(self):
- self.stopping.set()
-
def kill_mds(self, mds):
if self.config.get('powercycle'):
(remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
"kill": 0,
}
- while not self.stopping.is_set():
+ while not self.is_stopped:
delay = self.max_thrash_delay
if self.randomize:
delay = random.randrange(0.0, self.max_thrash_delay)
if delay > 0.0:
self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
- self.stopping.wait(delay)
- if self.stopping.is_set():
- continue
+ self.sleep_unless_stopped(delay)
status = self.fs.status()
self.log('waiting for {delay} secs before reviving {label}'.format(
delay=delay, label=label))
- sleep(delay)
+ self.sleep_unless_stopped(delay)
self.log('reviving {label}'.format(label=label))
self.revive_mds(name)
break
self.log(
'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
- sleep(2)
+ self.sleep_unless_stopped(2)
for stat in stats:
self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
"""
Thrasher base class
"""
+
+
+from gevent.greenlet import Greenlet
+from gevent.event import Event
+
class Thrasher(object):
def __init__(self):
def set_thrasher_exception(self, e):
self._exception = e
+
+class ThrasherGreenlet(Thrasher, Greenlet):
+
+ class Stopped(Exception): ...
+
+ def __init__(self):
+ super(ThrasherGreenlet, self).__init__()
+ self._should_stop_event = Event()
+
+ @property
+ def is_stopped(self):
+ return self._should_stop_event.is_set()
+
+ def stop(self):
+ self._should_stop_event.set()
+
+ def set_thrasher_exception(self, e):
+ if not isinstance(e, self.Stopped):
+ super(ThrasherGreenlet, self).set_thrasher_exception(e)
+
+ def proceed_unless_stopped(self):
+ self.sleep_unless_stopped(0, raise_stopped=True)
+
+ def sleep_unless_stopped(self, seconds, raise_stopped = True):
+ self._should_stop_event.wait(seconds)
+ if self.is_stopped and raise_stopped:
+ raise self.Stopped()
+ return not self.is_stopped