"""
import logging
import contextlib
-import threading
-from gevent import sleep, killall, joinall, GreenletExit
+from gevent import sleep, GreenletExit
from gevent.greenlet import Greenlet
from gevent.event import Event
from teuthology import misc as teuthology
log = logging.getLogger(__name__)
+class ForwardScrubber(Thrasher, Greenlet):
+ """
+ ForwardScrubber::
+
+ The ForwardScrubber does forward scrubbing of file-systems during execution
+ of other tasks (workunits, etc).
+ """
+
+ def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
+ super(ForwardScrubber, self).__init__()
-class MDSRankScrubber(Thrasher, Greenlet):
- def __init__(self, fs, mds_rank, scrub_timeout=300):
- super(MDSRankScrubber, self).__init__()
self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
self.fs = fs
- self.mds_rank = mds_rank
+ self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
+ self.stopping = Event()
self.scrub_timeout = scrub_timeout
+ self.sleep_between_iterations = sleep_between_iterations
def _run(self):
try:
self.logger.exception("exception:")
# allow successful completion so gevent doesn't see an exception...
- def do_scrub(self, path="/", recursive=True):
+ def stop(self):
+ self.stopping.set()
+
+ def do_scrub(self):
+ """
+ Perform the file-system scrubbing
+ """
+ self.logger.info(f'start scrubbing fs: {self.fs.name}')
+
+ try:
+ while not self.stopping.is_set():
+ self._scrub()
+ sleep(self.sleep_between_iterations)
+ except GreenletExit:
+ pass
+
+ self.logger.info(f'end scrubbing fs: {self.fs.name}')
+
+ def _scrub(self, path="/", recursive=True):
+ self.logger.info(f"scrubbing fs: {self.fs.name}")
recopt = ["recursive", "force"] if recursive else ["force"]
- out_json = self.fs.rank_tell(["scrub", "start", path] + recopt,
- rank=self.mds_rank)
+ out_json = self.fs.rank_tell(["scrub", "start", path] + recopt)
assert out_json is not None
tag = out_json['scrub_tag']
assert out_json['return_code'] == 0
assert out_json['mode'] == 'asynchronous'
- self.wait_until_scrub_complete(tag)
+ return self._wait_until_scrub_complete(tag)
- def wait_until_scrub_complete(self, tag):
+ def _wait_until_scrub_complete(self, tag):
# time out after scrub_timeout seconds and assume as done
with contextutil.safe_while(sleep=30, tries=self.scrub_timeout//30) as proceed:
while proceed():
try:
- out_json = self.fs.rank_tell(["scrub", "status"],
- rank=self.mds_rank)
+ out_json = self.fs.rank_tell(["scrub", "status"])
assert out_json is not None
if out_json['status'] == "no active scrubs running":
self.logger.info("all active scrubs completed")
self.logger.info("retrying scrub status command in a while")
pass
- self.logger.info("timed out waiting for scrub to complete")
-
-
-class ForwardScrubber(Thrasher, Greenlet):
- """
- ForwardScrubber::
-
- The ForwardScrubber does forward scrubbing of file-systems during execution
- of other tasks (workunits, etc).
-
- """
- def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
- super(ForwardScrubber, self).__init__()
-
- self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
- self.fs = fs
- self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
- self.stopping = Event()
- self.lock = threading.Lock()
- self.scrubbers = []
- self.scrub_timeout = scrub_timeout
- self.sleep_between_iterations = sleep_between_iterations
-
- def _run(self):
- try:
- self.do_scrub()
- except Exception as e:
- self.set_thrasher_exception(e)
- self.logger.exception("exception:")
- # allow successful completion so gevent doesn't see an exception...
-
- def log(self, x):
- """Write data to the logger assigned to ForwardScrubber"""
- self.logger.info(x)
-
- def stop(self):
- self.stopping.set()
- self.lock.acquire()
- try:
- self.log("killing all scrubbers")
- killall(self.scrubbers)
- finally:
- self.lock.release()
-
- def do_scrub(self):
- """
- Perform the file-system scrubbing
- """
- self.log(f'starting do_scrub for fs: {self.fs.name}')
-
- try:
- while not self.stopping.is_set():
- ranks = self.fs.get_all_mds_rank()
-
- for r in ranks:
- scrubber = MDSRankScrubber(self.fs, r, self.scrub_timeout)
- self.lock.acquire()
- try:
- self.scrubbers.append(scrubber)
- finally:
- self.lock.release()
- scrubber.start()
-
- # wait for all scrubbers to complete
- self.log("joining all scrubbers")
- joinall(self.scrubbers)
-
- for s in self.scrubbers:
- if s.exception is not None:
- raise RuntimeError('error during scrub thrashing')
-
- self.lock.acquire()
- try:
- self.scrubbers.clear()
- finally:
- self.lock.release()
-
- sleep(self.sleep_between_iterations)
- except GreenletExit:
- pass
-
-
def stop_all_fwd_scrubbers(thrashers):
for thrasher in thrashers:
if not isinstance(thrasher, ForwardScrubber):
continue
thrasher.stop()
+ thrasher.join()
if thrasher.exception is not None:
raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}")
- thrasher.join()
@contextlib.contextmanager