From: Leonid Usov Date: Sat, 16 Mar 2024 15:39:51 +0000 (-0400) Subject: squid: qa/tasks: introduce ThrasherGreenlet X-Git-Tag: v19.1.1~299^2~31 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=862cbda1712e7cc96cd4ca3bfcb641d4a2520d54;p=ceph.git squid: qa/tasks: introduce ThrasherGreenlet Signed-off-by: Leonid Usov (cherry picked from commit ed6e3f8a34f3e314d87a1fe38446d0176bc55aba) Fixes: https://tracker.ceph.com/issues/66103 --- diff --git a/qa/tasks/cephfs_mirror_thrash.py b/qa/tasks/cephfs_mirror_thrash.py index 91f60ac50137c..b69d41e88bfb4 100644 --- a/qa/tasks/cephfs_mirror_thrash.py +++ b/qa/tasks/cephfs_mirror_thrash.py @@ -9,18 +9,14 @@ import signal import socket import time -from gevent import sleep -from gevent.greenlet import Greenlet -from gevent.event import Event - from teuthology.exceptions import CommandFailedError from teuthology.orchestra import run -from tasks.thrasher import Thrasher +from tasks.thrasher import ThrasherGreenlet log = logging.getLogger(__name__) -class CephFSMirrorThrasher(Thrasher, Greenlet): +class CephFSMirrorThrasher(ThrasherGreenlet): """ CephFSMirrorThrasher:: @@ -71,7 +67,6 @@ class CephFSMirrorThrasher(Thrasher, Greenlet): self.logger = log self.name = 'thrasher.cephfs_mirror.[{cluster}]'.format(cluster = cluster) - self.stopping = Event() self.randomize = bool(self.config.get('randomize', True)) self.max_thrash = int(self.config.get('max_thrash', 1)) @@ -93,9 +88,6 @@ class CephFSMirrorThrasher(Thrasher, Greenlet): """Write data to logger assigned to this CephFSMirrorThrasher""" self.logger.info(x) - def stop(self): - self.stopping.set() - def do_thrash(self): """ Perform the random thrashing action @@ -106,16 +98,14 @@ class CephFSMirrorThrasher(Thrasher, Greenlet): "kill": 0, } - while not self.stopping.is_set(): + while not self.is_stopped: delay = self.max_thrash_delay if self.randomize: delay = random.randrange(self.min_thrash_delay, self.max_thrash_delay) if delay > 0.0: self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) - self.stopping.wait(delay) - if self.stopping.is_set(): - continue + self.sleep_unless_stopped(delay) killed_daemons = [] @@ -149,7 +139,7 @@ class CephFSMirrorThrasher(Thrasher, Greenlet): delay = random.randrange(0.0, self.max_revive_delay) self.log('waiting for {delay} secs before reviving daemons'.format(delay=delay)) - sleep(delay) + self.sleep_unless_stopped(delay) for daemon in killed_daemons: self.log('waiting for {label}'.format(label=daemon.id_)) diff --git a/qa/tasks/fwd_scrub.py b/qa/tasks/fwd_scrub.py index c1e0059cdafc7..2ac92439de62c 100644 --- a/qa/tasks/fwd_scrub.py +++ b/qa/tasks/fwd_scrub.py @@ -4,18 +4,16 @@ Thrash mds by simulating failures import logging import contextlib -from gevent import sleep, GreenletExit -from gevent.greenlet import Greenlet -from gevent.event import Event +from gevent import sleep from teuthology import misc as teuthology from tasks import ceph_manager from tasks.cephfs.filesystem import MDSCluster, Filesystem -from tasks.thrasher import Thrasher +from tasks.thrasher import ThrasherGreenlet log = logging.getLogger(__name__) -class ForwardScrubber(Thrasher, Greenlet): +class ForwardScrubber(ThrasherGreenlet): """ ForwardScrubber:: @@ -29,7 +27,6 @@ class ForwardScrubber(Thrasher, Greenlet): self.logger = log.getChild('fs.[{f}]'.format(f=fs.name)) self.fs = fs self.name = 'thrasher.fs.[{f}]'.format(f=fs.name) - self.stopping = Event() self.scrub_timeout = scrub_timeout self.sleep_between_iterations = sleep_between_iterations @@ -41,21 +38,15 @@ class ForwardScrubber(Thrasher, Greenlet): self.logger.exception("exception:") # allow successful completion so gevent doesn't see an exception... - def stop(self): - self.stopping.set() - def do_scrub(self): """ Perform the file-system scrubbing """ self.logger.info(f'start scrubbing fs: {self.fs.name}') - try: - while not self.stopping.is_set(): - self._scrub() - sleep(self.sleep_between_iterations) - except GreenletExit: - pass + while not self.is_stopped: + self._scrub() + self.sleep_unless_stopped(self.sleep_between_iterations) self.logger.info(f'end scrubbing fs: {self.fs.name}') diff --git a/qa/tasks/mds_thrash.py b/qa/tasks/mds_thrash.py index 7b7b420f9ea58..e7b3023caf582 100644 --- a/qa/tasks/mds_thrash.py +++ b/qa/tasks/mds_thrash.py @@ -8,17 +8,15 @@ import random import time from gevent import sleep -from gevent.greenlet import Greenlet -from gevent.event import Event from teuthology import misc as teuthology from tasks import ceph_manager from tasks.cephfs.filesystem import MDSCluster, Filesystem, FSMissing -from tasks.thrasher import Thrasher +from tasks.thrasher import ThrasherGreenlet log = logging.getLogger(__name__) -class MDSThrasher(Thrasher, Greenlet): +class MDSThrasher(ThrasherGreenlet): """ MDSThrasher:: @@ -107,7 +105,6 @@ class MDSThrasher(Thrasher, Greenlet): self.manager = manager self.max_mds = max_mds self.name = 'thrasher.fs.[{f}]'.format(f = fs.name) - self.stopping = Event() self.randomize = bool(self.config.get('randomize', True)) self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05)) @@ -146,9 +143,6 @@ class MDSThrasher(Thrasher, Greenlet): """Write data to the logger assigned to MDSThrasher""" self.logger.info(x) - def stop(self): - self.stopping.set() - def kill_mds(self, mds): if self.config.get('powercycle'): (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). @@ -233,16 +227,14 @@ class MDSThrasher(Thrasher, Greenlet): "kill": 0, } - while not self.stopping.is_set(): + while not self.is_stopped: delay = self.max_thrash_delay if self.randomize: delay = random.randrange(0.0, self.max_thrash_delay) if delay > 0.0: self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) - self.stopping.wait(delay) - if self.stopping.is_set(): - continue + self.sleep_unless_stopped(delay) status = self.fs.status() @@ -319,7 +311,7 @@ class MDSThrasher(Thrasher, Greenlet): self.log('waiting for {delay} secs before reviving {label}'.format( delay=delay, label=label)) - sleep(delay) + self.sleep_unless_stopped(delay) self.log('reviving {label}'.format(label=label)) self.revive_mds(name) @@ -334,7 +326,7 @@ class MDSThrasher(Thrasher, Greenlet): break self.log( 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label)) - sleep(2) + self.sleep_unless_stopped(2) for stat in stats: self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat])) diff --git a/qa/tasks/thrasher.py b/qa/tasks/thrasher.py index 0ea1bf0ee9bc4..e8fc14e404891 100644 --- a/qa/tasks/thrasher.py +++ b/qa/tasks/thrasher.py @@ -1,6 +1,11 @@ """ Thrasher base class """ + + +from gevent.greenlet import Greenlet +from gevent.event import Event + class Thrasher(object): def __init__(self): @@ -13,3 +18,31 @@ class Thrasher(object): def set_thrasher_exception(self, e): self._exception = e + +class ThrasherGreenlet(Thrasher, Greenlet): + + class Stopped(Exception): ... + + def __init__(self): + super(ThrasherGreenlet, self).__init__() + self._should_stop_event = Event() + + @property + def is_stopped(self): + return self._should_stop_event.is_set() + + def stop(self): + self._should_stop_event.set() + + def set_thrasher_exception(self, e): + if not isinstance(e, self.Stopped): + super(ThrasherGreenlet, self).set_thrasher_exception(e) + + def proceed_unless_stopped(self): + self.sleep_unless_stopped(0, raise_stopped=True) + + def sleep_unless_stopped(self, seconds, raise_stopped = True): + self._should_stop_event.wait(seconds) + if self.is_stopped and raise_stopped: + raise self.Stopped() + return not self.is_stopped