]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
squid: qa/tasks: the quiescer task and a waiter task to test it
authorLeonid Usov <leonid.usov@ibm.com>
Sat, 16 Mar 2024 15:42:11 +0000 (11:42 -0400)
committerLeonid Usov <leonid.usov@ibm.com>
Tue, 28 May 2024 16:06:19 +0000 (19:06 +0300)
Fixes: https://tracker.ceph.com/issues/63669
Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
(cherry picked from commit 7773c5db6312064b4247ac0311572e7bcd752140)
Fixes: https://tracker.ceph.com/issues/66103
qa/tasks/quiescer.py [new file with mode: 0644]
qa/tasks/waiter.py [new file with mode: 0644]

diff --git a/qa/tasks/quiescer.py b/qa/tasks/quiescer.py
new file mode 100644 (file)
index 0000000..dfec411
--- /dev/null
@@ -0,0 +1,420 @@
+"""
+Thrash mds by randomly quiescing the fs root
+"""
+import logging
+import contextlib
+
+from teuthology import misc
+
+from tasks.cephfs.filesystem import MDSCluster, Filesystem
+from tasks.thrasher import ThrasherGreenlet
+
+import random
+import math
+import errno
+import json
+import time
+
+from io import StringIO
+
+log = logging.getLogger(__name__)
+
+class Quiescer(ThrasherGreenlet):
+    """
+    The Quiescer does periodic quiescing of the configured paths, by default - the root '/'.
+
+    quiesce_timeout: [1..)                      default: 60
+     :: maximum time in seconds to wait for the quiesce to succeed
+    quiesce_factor: [0.005..0.5]                default: 0.35
+     :: the fraction of the total runtime we want the system quiesced
+    min_quiesce: [1..)                          default: 10
+     :: the minimum pause time in seconds
+    max_quiesce: [1..)                          default: 60
+     :: the maximum pause time in seconds
+    initial_delay: [0..)                        default: 15
+     :: the time in seconds before the first quiesce
+    seed:                                       default: None
+     :: an optional seed to a pseudorandom sequence of quiesce durations
+    roots: List[String]                         default: ["/"]
+     :: the roots to quiesce
+    cancelations_cap: [-1..)                    default: 10
+     :: the number of times we ignore canceled quiesce sets
+    split_if_longer: int                        default: mean(min_quiesce, max_quiesce)
+     :: if the duration is longer than this,
+        it will be split into two back-to-back half durations
+    ops_dump_interval: [1..quiesce_timeout)     default: 0.7*quiesce_timeout
+     :: during the quiesce phase, the quiescer will dump current ops from all
+        ranks until the quiesce terminates. values outside the allowed
+        range (1 <= x < quiesce_timeout) disable the dump
+    """
+
+    MAX_QUIESCE_FACTOR      = 0.5    # 50%
+    MIN_QUIESCE_FACTOR      = 0.005  # 0.5%
+    QDB_CMD_TIMEOUT_GUARD   = 15     # sec (will be added to the configured quiesce_timeout)
+
+    def __init__(self, ctx, fscid,
+                 cluster_name='ceph',
+                 quiesce_timeout=60,
+                 quiesce_factor=0.35,
+                 min_quiesce=10,
+                 max_quiesce=60,
+                 initial_delay=15,
+                 cancelations_cap=10,
+                 seed=None,
+                 roots=None,
+                 split_if_longer=None,
+                 ops_dump_interval=None,
+                 **other_config):
+        super(Quiescer, self).__init__()
+
+        fs = Filesystem(ctx, fscid=fscid, cluster_name=cluster_name)
+        self.run_dir = fs.get_config("run_dir")
+        self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
+        self.name = 'quiescer.fs.[{f}]'.format(f=fs.name)
+        self.archive_path = ctx.archive.strip("/")
+        self.fs = fs
+        try:
+            self.cluster_fsid = ctx.ceph[cluster_name].fsid
+        except Exception as e:
+            self.logger.error(f"Couldn't get cluster fsid with exception: {e}")
+            self.cluster_fsid = ''
+
+        if seed is None:
+            # try to inherit the teuthology seed,
+            # otherwise, 1M seems sufficient and avoids possible huge numbers
+            seed = ctx.config.get('seed', random.randint(0, 999999))
+        self.logger.info(f"Initializing Quiescer with seed {seed}")
+        self.rnd = random.Random(seed)
+
+        self.quiesce_timeout = quiesce_timeout
+
+        if (quiesce_factor > self.MAX_QUIESCE_FACTOR):
+            self.logger.warn("Capping the quiesce factor at %f (requested: %f)" % (self.MAX_QUIESCE_FACTOR, quiesce_factor))
+            quiesce_factor = self.MAX_QUIESCE_FACTOR
+
+        if quiesce_factor < self.MIN_QUIESCE_FACTOR:
+            self.logger.warn("Setting the quiesce factor to %f (requested: %f)" % (self.MIN_QUIESCE_FACTOR, quiesce_factor))
+            quiesce_factor = self.MIN_QUIESCE_FACTOR
+
+        self.quiesce_factor = quiesce_factor
+        self.min_quiesce = max(1, min_quiesce)
+        self.max_quiesce = max(1, max_quiesce)
+        self.initial_delay = max(0, initial_delay)
+        self.roots = roots or ["/"]
+        self.cancelations_cap = cancelations_cap
+
+        if ops_dump_interval is None:
+            ops_dump_interval = 0.7 * self.quiesce_timeout
+
+        if ops_dump_interval < 1 or ops_dump_interval >= self.quiesce_timeout:
+            self.logger.warn(f"ops_dump_interval ({ops_dump_interval}) is outside the valid range [1..{self.quiesce_timeout}), disabling the dump")
+            self.ops_dump_interval = None
+        else:
+            self.ops_dump_interval = ops_dump_interval
+
+        # this can be used to exercise repeated quiesces with minimal delay between them 
+        self.split_if_longer = split_if_longer if split_if_longer is not None else (self.min_quiesce + self.max_quiesce) / 2
+
+    def next_quiesce_duration(self):
+        """Generate the next quiesce duration
+
+        This function is using a gauss distribution on self.rnd around the
+        midpoint of the requested quiesce duration range [min_quiesce..max_quiesce]
+        For that, the mu is set to mean(min_quiesce, max_quiesce) and the sigma
+        is chosen so as to increase the chance of getting close to the edges of the range.
+        Empirically, 3 * √(max-min), gave good results. Feel free to update this math.
+
+        Note: self.rnd is seeded, so as to allow for repeatable sequence of durations
+        Note: the duration returned by this funciton may be further split into two half-time
+              quiesces, subject to the self.split_if_longer logic"""
+        mu = (self.min_quiesce + self.max_quiesce) / 2
+        sigma = 3 * math.sqrt(self.max_quiesce - self.min_quiesce)
+        duration = round(self.rnd.gauss(mu, sigma), 1)
+        duration = max(duration, self.min_quiesce)
+        duration = min(duration, self.max_quiesce)
+        return duration
+
+    def tell_quiesce_leader(self, *args):
+        leader = None
+        rc = None
+        stdout = None
+
+        while leader is None and not self.is_stopped:
+            leader = self.fs.get_var('qdb_leader')
+            if leader is None:
+                self.logger.warn("Couldn't get quiesce db leader from the mds map")
+                self.sleep_unless_stopped(5)
+
+        while leader is not None and not self.is_stopped:
+            command = ['tell', f"mds.{leader}", 'quiesce', 'db']
+            command.extend(args)
+            self.logger.debug("Running ceph command: '%s'" % " ".join(command))
+            result = self.fs.run_ceph_cmd(args=command, check_status=False, stdout=StringIO(),
+                                          # (quiesce_timeout + guard) is a sensible cmd timeout
+                                          # for both `--quiesce --await` and `--release --await`
+                                          # It is an overkill for a query,
+                                          # but since it's just a safety net, we use it unconditionally
+                                          timeoutcmd=self.quiesce_timeout+self.QDB_CMD_TIMEOUT_GUARD)
+            rc, stdout = result.exitstatus, result.stdout.getvalue()
+            if rc == errno.ENOTTY:
+                try:
+                    resp = json.loads(stdout)
+                    leader = int(resp['leader'])
+                    self.logger.info("Retrying a quiesce db command with leader %d" % leader)
+                except Exception as e:
+                    self.logger.error("Couldn't parse ENOTTY response from an mds with error: %s\n%s" % (str(e), stdout))
+                    self.sleep_unless_stopped(5)
+            else:
+                break
+
+        return (rc, stdout)
+
+    def dump_ops_all_ranks(self, dump_tag):
+        remote_dumps = []
+
+        # begin by executing dump on all ranks
+        for info in self.fs.get_ranks():
+            name = info['name']
+            rank = info['rank']
+
+            dump_file = f"ops-{dump_tag}-mds.{name}.json"
+            daemon_path = f"{self.run_dir}/{dump_file}"
+            # This gets ugly due to the current state of cephadm support
+            remote_path = daemon_path
+            if self.fs.mon_manager.cephadm:
+                remote_path = f"{self.run_dir}/{self.cluster_fsid}/{dump_file}"
+
+            self.logger.debug(f"Dumping ops on rank {rank} ({name}) to a remote file {remote_path}")
+            try:
+                _ = self.fs.rank_tell(['ops', '--flags=locks', f'--path={daemon_path}'], rank=rank)
+                remote_dumps.append((info, remote_path))
+            except Exception as e:
+                self.logger.error(f"Couldn't execute ops dump on rank {rank}, error: {e}")
+
+        # now get the ops from the files
+        for info, remote_path in remote_dumps:
+            try:
+                name = info['name']
+                rank = info['rank']
+                mds_remote = self.fs.mon_manager.find_remote('mds', name)
+                blob = misc.get_file(mds_remote, remote_path, sudo=True).decode('utf-8')
+                self.logger.debug(f"read {len(blob)}B of ops from '{remote_path}' on mds.{rank} ({name})")
+                ops_dump = json.loads(blob)
+                out_name = f"{self.archive_path}/ops-{dump_tag}-mds.{name}.json"
+                with open(out_name, "wt") as out:
+                    out.write("{\n")
+                    out.write(f'\n"info":\n{json.dumps(info, indent=2)},\n\n"ops":[\n')
+                    first_op = True
+                    for op in ops_dump['ops']:
+                        type_data = op['type_data']
+                        flag_point = type_data['flag_point']
+                        if 'quiesce complete' not in flag_point:
+                            self.logger.debug(f"Outstanding op at rank {rank} ({name}) for {dump_tag}: '{op['description']}'")
+                        if not first_op:
+                            out.write(",\n")
+                        first_op = False
+                        json.dump(op, fp=out, indent=2)
+                    out.write("\n]}")
+                self.logger.info(f"Pulled {len(ops_dump['ops'])} ops from rank {rank} ({name}) into {out_name}")
+            except Exception as e:
+                self.logger.error(f"Couldn't pull ops dump at '{remote_path}' on rank {info['rank']} ({info['name']}), error: {e}")
+            misc.delete_file(mds_remote, remote_path, sudo=True, check=False)
+
+    def get_set_state_name(self, response, set_id = None):
+        if isinstance(response, (str, bytes, bytearray)):
+            response = json.loads(response)
+
+        sets = response['sets']
+        if len(sets) == 0:
+            raise ValueError("response has no sets")
+
+        if set_id is None:
+            if len(sets) > 1:
+                raise ValueError("set_id must be provided for a multiset response")
+            else:
+                set_id = next(iter(sets.keys()))
+
+        return response['sets'][set_id]['state']['name']
+
+    def check_canceled(self, response, set_id = None):
+        if 'CANCELED' == self.get_set_state_name(response, set_id):
+            if self.cancelations_cap == 0:
+                raise RuntimeError("Reached the cap of canceled quiesces")
+            else:
+                self.logger.warn(f"Quiesce set got cancelled (cap = {self.cancelations_cap})."
+                                "Won't raise an error since this could be a failover, "
+                                "will wait for the next quiesce attempt")
+
+            if self.cancelations_cap > 0:
+                self.cancelations_cap -= 1
+
+            return True
+        return False
+            
+    
+    def do_quiesce(self, duration):
+        
+        start_time = time.time()
+        self.logger.debug(f"Going to quiesce for duration: {duration}")
+
+        if self.ops_dump_interval is None:
+           await_args = ["--await"]
+        else:
+           await_args = ["--await-for", str(self.ops_dump_interval)]
+
+        set_id = None
+        iteration = 0
+
+        def rcinfo(rc):
+            return f"{rc} ({errno.errorcode.get(rc, 'Unknown')})"
+
+        while True:
+            iteration += 1
+            if set_id is None:
+                # quiesce the root
+                rc, stdout = self.tell_quiesce_leader(
+                    *self.roots,
+                    "--timeout", str(self.quiesce_timeout),
+                    "--expiration", str(duration + 60), # give us a minute to run the release command
+                    *await_args
+                )
+            else:
+                # await the set
+                rc, stdout = self.tell_quiesce_leader(
+                    "--set-id", set_id,
+                    *await_args
+                )
+
+            self.proceed_unless_stopped()
+
+            try:
+                response = json.loads(stdout)
+                set_id = next(iter(response["sets"].keys()))
+            except Exception as e:
+                self.logger.error(f"Couldn't parse response with error {e}; rc: {rcinfo(rc)}; stdout:\n{stdout}")
+                raise RuntimeError(f"Error parsing quiesce response: {e}")
+
+            elapsed = round(time.time() - start_time, 1)
+
+            if rc == errno.EINPROGRESS:
+                self.logger.warn(f"Set '{set_id}' hasn't quiesced after {elapsed} seconds (timeout: {self.quiesce_timeout}). Dumping ops with locks from all ranks.")
+                self.dump_ops_all_ranks(f'{set_id}-{iteration}')
+            else:
+                break
+
+        if self.check_canceled(response):
+            return
+
+        if rc != 0:
+            self.logger.error(f"Couldn't quiesce root with rc: {rcinfo(rc)}, stdout:\n{stdout}")
+            raise RuntimeError(f"Error quiescing set '{set_id}': {rcinfo(rc)}")
+
+        elapsed = round(time.time() - start_time, 1)
+        self.logger.info(f"Successfully quiesced set '{set_id}', quiesce took {elapsed} seconds. Will release after: {duration}")
+        self.sleep_unless_stopped(duration)
+
+        # release the root
+        rc, stdout = self.tell_quiesce_leader(
+            "--set-id", set_id,
+            "--release",
+            "--await"
+        )
+
+        self.proceed_unless_stopped()
+        
+        if rc != 0:
+            if self.check_canceled(stdout, set_id):
+                return
+
+            self.logger.error(f"Couldn't release set '{set_id}' with rc: {rcinfo(rc)}, stdout:\n{stdout}")
+            raise RuntimeError(f"Error releasing set '{set_id}': {rcinfo(rc)}")
+        else:
+            elapsed = round(time.time() - start_time, 1)
+            self.logger.info(f"Successfully released set '{set_id}', seconds elapsed: {elapsed}")
+
+
+    def _run(self):
+        try:
+            self.fs.wait_for_daemons()
+            log.info(f'Ready to start quiesce thrashing; initial delay: {self.initial_delay} sec')
+
+            self.sleep_unless_stopped(self.initial_delay)
+
+            while not self.is_stopped:
+                duration = self.next_quiesce_duration()
+
+                if duration > self.split_if_longer:
+                    self.logger.info(f"Total duration ({duration}) is longer than `split_if_longer` ({self.split_if_longer}), "
+                                     "will split into two consecutive quiesces")
+                    durations = [duration/2, duration/2]
+                else:
+                    durations = [duration]
+
+                for d in durations:
+                    self.do_quiesce(d)
+
+                # now we sleep to maintain the quiesce factor
+                self.sleep_unless_stopped((duration/self.quiesce_factor) - duration)
+
+        except Exception as e:
+            if not isinstance(e, self.Stopped):
+                self.set_thrasher_exception(e)
+                self.logger.exception("exception:")
+            # allow successful completion so gevent doesn't see an exception...
+
+    def stop(self):
+        log.warn('The quiescer is requested to stop, running cancel all')
+        self.tell_quiesce_leader( "--cancel", "--all" )
+        super(Quiescer, self).stop()
+
+
+def stop_all_quiescers(thrashers):
+    for thrasher in thrashers:
+        if not isinstance(thrasher, Quiescer):
+            continue
+        thrasher.stop()
+        thrasher.join()
+        if thrasher.exception is not None:
+            raise RuntimeError(f"error during quiesce thrashing: {thrasher.exception}")
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Stress test the mds by randomly quiescing the whole FS while another task/workunit
+    is running.
+    Example config (see Quiescer initializer for all available options):
+
+    - quiescer:
+        quiesce_factor: 0.2
+        max_quiesce: 30
+        quiesce_timeout: 10
+    """
+
+    if config is None:
+        config = {}
+    assert isinstance(config, dict), \
+        'quiescer task only accepts a dict for configuration'
+    mdslist = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
+    assert len(mdslist) > 0, \
+        'quiescer task requires at least 1 metadata server'
+
+    cluster_name = config.get('cluster', 'ceph')
+    # the manager should be there
+    manager = ctx.managers[cluster_name]
+    manager.wait_for_clean()
+    assert manager.is_clean()
+
+    mds_cluster = MDSCluster(ctx)
+    for fs in mds_cluster.status().get_filesystems():
+        quiescer = Quiescer(ctx=ctx, fscid=fs['id'], cluster_name=cluster_name, **config)
+        quiescer.start()
+        ctx.ceph[cluster_name].thrashers.append(quiescer)
+
+    try:
+        log.debug('Yielding')
+        yield
+    finally:
+        log.info('joining Quiescers')
+        stop_all_quiescers(ctx.ceph[cluster_name].thrashers)
+        log.info('done joining Quiescers')
diff --git a/qa/tasks/waiter.py b/qa/tasks/waiter.py
new file mode 100644 (file)
index 0000000..8cbf311
--- /dev/null
@@ -0,0 +1,13 @@
+
+import contextlib
+import gevent
+
+@contextlib.contextmanager
+def task(ctx, config):
+    on_enter = config.get("on_enter", 0)
+    on_exit = config.get("on_exit", 0)
+
+    gevent.sleep(on_enter)
+    yield
+    gevent.sleep(on_exit)
\ No newline at end of file