From 0f5e91b7df829ddf59fb4559c6a5936f07706a83 Mon Sep 17 00:00:00 2001 From: Leonid Usov Date: Sat, 16 Mar 2024 11:42:11 -0400 Subject: [PATCH] squid: qa/tasks: the quiescer task and a waiter task to test it Fixes: https://tracker.ceph.com/issues/63669 Signed-off-by: Leonid Usov (cherry picked from commit 7773c5db6312064b4247ac0311572e7bcd752140) Fixes: https://tracker.ceph.com/issues/66103 --- qa/tasks/quiescer.py | 420 +++++++++++++++++++++++++++++++++++++++++++ qa/tasks/waiter.py | 13 ++ 2 files changed, 433 insertions(+) create mode 100644 qa/tasks/quiescer.py create mode 100644 qa/tasks/waiter.py diff --git a/qa/tasks/quiescer.py b/qa/tasks/quiescer.py new file mode 100644 index 00000000000..dfec4112e66 --- /dev/null +++ b/qa/tasks/quiescer.py @@ -0,0 +1,420 @@ +""" +Thrash mds by randomly quiescing the fs root +""" +import logging +import contextlib + +from teuthology import misc + +from tasks.cephfs.filesystem import MDSCluster, Filesystem +from tasks.thrasher import ThrasherGreenlet + +import random +import math +import errno +import json +import time + +from io import StringIO + +log = logging.getLogger(__name__) + +class Quiescer(ThrasherGreenlet): + """ + The Quiescer does periodic quiescing of the configured paths, by default - the root '/'. + + quiesce_timeout: [1..) default: 60 + :: maximum time in seconds to wait for the quiesce to succeed + quiesce_factor: [0.005..0.5] default: 0.35 + :: the fraction of the total runtime we want the system quiesced + min_quiesce: [1..) default: 10 + :: the minimum pause time in seconds + max_quiesce: [1..) default: 60 + :: the maximum pause time in seconds + initial_delay: [0..) default: 15 + :: the time in seconds before the first quiesce + seed: default: None + :: an optional seed to a pseudorandom sequence of quiesce durations + roots: List[String] default: ["/"] + :: the roots to quiesce + cancelations_cap: [-1..) default: 10 + :: the number of times we ignore canceled quiesce sets + split_if_longer: int default: mean(min_quiesce, max_quiesce) + :: if the duration is longer than this, + it will be split into two back-to-back half durations + ops_dump_interval: [1..quiesce_timeout) default: 0.7*quiesce_timeout + :: during the quiesce phase, the quiescer will dump current ops from all + ranks until the quiesce terminates. values outside the allowed + range (1 <= x < quiesce_timeout) disable the dump + """ + + MAX_QUIESCE_FACTOR = 0.5 # 50% + MIN_QUIESCE_FACTOR = 0.005 # 0.5% + QDB_CMD_TIMEOUT_GUARD = 15 # sec (will be added to the configured quiesce_timeout) + + def __init__(self, ctx, fscid, + cluster_name='ceph', + quiesce_timeout=60, + quiesce_factor=0.35, + min_quiesce=10, + max_quiesce=60, + initial_delay=15, + cancelations_cap=10, + seed=None, + roots=None, + split_if_longer=None, + ops_dump_interval=None, + **other_config): + super(Quiescer, self).__init__() + + fs = Filesystem(ctx, fscid=fscid, cluster_name=cluster_name) + self.run_dir = fs.get_config("run_dir") + self.logger = log.getChild('fs.[{f}]'.format(f=fs.name)) + self.name = 'quiescer.fs.[{f}]'.format(f=fs.name) + self.archive_path = ctx.archive.strip("/") + self.fs = fs + try: + self.cluster_fsid = ctx.ceph[cluster_name].fsid + except Exception as e: + self.logger.error(f"Couldn't get cluster fsid with exception: {e}") + self.cluster_fsid = '' + + if seed is None: + # try to inherit the teuthology seed, + # otherwise, 1M seems sufficient and avoids possible huge numbers + seed = ctx.config.get('seed', random.randint(0, 999999)) + self.logger.info(f"Initializing Quiescer with seed {seed}") + self.rnd = random.Random(seed) + + self.quiesce_timeout = quiesce_timeout + + if (quiesce_factor > self.MAX_QUIESCE_FACTOR): + self.logger.warn("Capping the quiesce factor at %f (requested: %f)" % (self.MAX_QUIESCE_FACTOR, quiesce_factor)) + quiesce_factor = self.MAX_QUIESCE_FACTOR + + if quiesce_factor < self.MIN_QUIESCE_FACTOR: + self.logger.warn("Setting the quiesce factor to %f (requested: %f)" % (self.MIN_QUIESCE_FACTOR, quiesce_factor)) + quiesce_factor = self.MIN_QUIESCE_FACTOR + + self.quiesce_factor = quiesce_factor + self.min_quiesce = max(1, min_quiesce) + self.max_quiesce = max(1, max_quiesce) + self.initial_delay = max(0, initial_delay) + self.roots = roots or ["/"] + self.cancelations_cap = cancelations_cap + + if ops_dump_interval is None: + ops_dump_interval = 0.7 * self.quiesce_timeout + + if ops_dump_interval < 1 or ops_dump_interval >= self.quiesce_timeout: + self.logger.warn(f"ops_dump_interval ({ops_dump_interval}) is outside the valid range [1..{self.quiesce_timeout}), disabling the dump") + self.ops_dump_interval = None + else: + self.ops_dump_interval = ops_dump_interval + + # this can be used to exercise repeated quiesces with minimal delay between them + self.split_if_longer = split_if_longer if split_if_longer is not None else (self.min_quiesce + self.max_quiesce) / 2 + + def next_quiesce_duration(self): + """Generate the next quiesce duration + + This function is using a gauss distribution on self.rnd around the + midpoint of the requested quiesce duration range [min_quiesce..max_quiesce] + For that, the mu is set to mean(min_quiesce, max_quiesce) and the sigma + is chosen so as to increase the chance of getting close to the edges of the range. + Empirically, 3 * √(max-min), gave good results. Feel free to update this math. + + Note: self.rnd is seeded, so as to allow for repeatable sequence of durations + Note: the duration returned by this funciton may be further split into two half-time + quiesces, subject to the self.split_if_longer logic""" + mu = (self.min_quiesce + self.max_quiesce) / 2 + sigma = 3 * math.sqrt(self.max_quiesce - self.min_quiesce) + duration = round(self.rnd.gauss(mu, sigma), 1) + duration = max(duration, self.min_quiesce) + duration = min(duration, self.max_quiesce) + return duration + + def tell_quiesce_leader(self, *args): + leader = None + rc = None + stdout = None + + while leader is None and not self.is_stopped: + leader = self.fs.get_var('qdb_leader') + if leader is None: + self.logger.warn("Couldn't get quiesce db leader from the mds map") + self.sleep_unless_stopped(5) + + while leader is not None and not self.is_stopped: + command = ['tell', f"mds.{leader}", 'quiesce', 'db'] + command.extend(args) + self.logger.debug("Running ceph command: '%s'" % " ".join(command)) + result = self.fs.run_ceph_cmd(args=command, check_status=False, stdout=StringIO(), + # (quiesce_timeout + guard) is a sensible cmd timeout + # for both `--quiesce --await` and `--release --await` + # It is an overkill for a query, + # but since it's just a safety net, we use it unconditionally + timeoutcmd=self.quiesce_timeout+self.QDB_CMD_TIMEOUT_GUARD) + rc, stdout = result.exitstatus, result.stdout.getvalue() + if rc == errno.ENOTTY: + try: + resp = json.loads(stdout) + leader = int(resp['leader']) + self.logger.info("Retrying a quiesce db command with leader %d" % leader) + except Exception as e: + self.logger.error("Couldn't parse ENOTTY response from an mds with error: %s\n%s" % (str(e), stdout)) + self.sleep_unless_stopped(5) + else: + break + + return (rc, stdout) + + def dump_ops_all_ranks(self, dump_tag): + remote_dumps = [] + + # begin by executing dump on all ranks + for info in self.fs.get_ranks(): + name = info['name'] + rank = info['rank'] + + dump_file = f"ops-{dump_tag}-mds.{name}.json" + daemon_path = f"{self.run_dir}/{dump_file}" + # This gets ugly due to the current state of cephadm support + remote_path = daemon_path + if self.fs.mon_manager.cephadm: + remote_path = f"{self.run_dir}/{self.cluster_fsid}/{dump_file}" + + self.logger.debug(f"Dumping ops on rank {rank} ({name}) to a remote file {remote_path}") + try: + _ = self.fs.rank_tell(['ops', '--flags=locks', f'--path={daemon_path}'], rank=rank) + remote_dumps.append((info, remote_path)) + except Exception as e: + self.logger.error(f"Couldn't execute ops dump on rank {rank}, error: {e}") + + # now get the ops from the files + for info, remote_path in remote_dumps: + try: + name = info['name'] + rank = info['rank'] + mds_remote = self.fs.mon_manager.find_remote('mds', name) + blob = misc.get_file(mds_remote, remote_path, sudo=True).decode('utf-8') + self.logger.debug(f"read {len(blob)}B of ops from '{remote_path}' on mds.{rank} ({name})") + ops_dump = json.loads(blob) + out_name = f"{self.archive_path}/ops-{dump_tag}-mds.{name}.json" + with open(out_name, "wt") as out: + out.write("{\n") + out.write(f'\n"info":\n{json.dumps(info, indent=2)},\n\n"ops":[\n') + first_op = True + for op in ops_dump['ops']: + type_data = op['type_data'] + flag_point = type_data['flag_point'] + if 'quiesce complete' not in flag_point: + self.logger.debug(f"Outstanding op at rank {rank} ({name}) for {dump_tag}: '{op['description']}'") + if not first_op: + out.write(",\n") + first_op = False + json.dump(op, fp=out, indent=2) + out.write("\n]}") + self.logger.info(f"Pulled {len(ops_dump['ops'])} ops from rank {rank} ({name}) into {out_name}") + except Exception as e: + self.logger.error(f"Couldn't pull ops dump at '{remote_path}' on rank {info['rank']} ({info['name']}), error: {e}") + misc.delete_file(mds_remote, remote_path, sudo=True, check=False) + + def get_set_state_name(self, response, set_id = None): + if isinstance(response, (str, bytes, bytearray)): + response = json.loads(response) + + sets = response['sets'] + if len(sets) == 0: + raise ValueError("response has no sets") + + if set_id is None: + if len(sets) > 1: + raise ValueError("set_id must be provided for a multiset response") + else: + set_id = next(iter(sets.keys())) + + return response['sets'][set_id]['state']['name'] + + def check_canceled(self, response, set_id = None): + if 'CANCELED' == self.get_set_state_name(response, set_id): + if self.cancelations_cap == 0: + raise RuntimeError("Reached the cap of canceled quiesces") + else: + self.logger.warn(f"Quiesce set got cancelled (cap = {self.cancelations_cap})." + "Won't raise an error since this could be a failover, " + "will wait for the next quiesce attempt") + + if self.cancelations_cap > 0: + self.cancelations_cap -= 1 + + return True + return False + + + def do_quiesce(self, duration): + + start_time = time.time() + self.logger.debug(f"Going to quiesce for duration: {duration}") + + if self.ops_dump_interval is None: + await_args = ["--await"] + else: + await_args = ["--await-for", str(self.ops_dump_interval)] + + set_id = None + iteration = 0 + + def rcinfo(rc): + return f"{rc} ({errno.errorcode.get(rc, 'Unknown')})" + + while True: + iteration += 1 + if set_id is None: + # quiesce the root + rc, stdout = self.tell_quiesce_leader( + *self.roots, + "--timeout", str(self.quiesce_timeout), + "--expiration", str(duration + 60), # give us a minute to run the release command + *await_args + ) + else: + # await the set + rc, stdout = self.tell_quiesce_leader( + "--set-id", set_id, + *await_args + ) + + self.proceed_unless_stopped() + + try: + response = json.loads(stdout) + set_id = next(iter(response["sets"].keys())) + except Exception as e: + self.logger.error(f"Couldn't parse response with error {e}; rc: {rcinfo(rc)}; stdout:\n{stdout}") + raise RuntimeError(f"Error parsing quiesce response: {e}") + + elapsed = round(time.time() - start_time, 1) + + if rc == errno.EINPROGRESS: + self.logger.warn(f"Set '{set_id}' hasn't quiesced after {elapsed} seconds (timeout: {self.quiesce_timeout}). Dumping ops with locks from all ranks.") + self.dump_ops_all_ranks(f'{set_id}-{iteration}') + else: + break + + if self.check_canceled(response): + return + + if rc != 0: + self.logger.error(f"Couldn't quiesce root with rc: {rcinfo(rc)}, stdout:\n{stdout}") + raise RuntimeError(f"Error quiescing set '{set_id}': {rcinfo(rc)}") + + elapsed = round(time.time() - start_time, 1) + self.logger.info(f"Successfully quiesced set '{set_id}', quiesce took {elapsed} seconds. Will release after: {duration}") + self.sleep_unless_stopped(duration) + + # release the root + rc, stdout = self.tell_quiesce_leader( + "--set-id", set_id, + "--release", + "--await" + ) + + self.proceed_unless_stopped() + + if rc != 0: + if self.check_canceled(stdout, set_id): + return + + self.logger.error(f"Couldn't release set '{set_id}' with rc: {rcinfo(rc)}, stdout:\n{stdout}") + raise RuntimeError(f"Error releasing set '{set_id}': {rcinfo(rc)}") + else: + elapsed = round(time.time() - start_time, 1) + self.logger.info(f"Successfully released set '{set_id}', seconds elapsed: {elapsed}") + + + def _run(self): + try: + self.fs.wait_for_daemons() + log.info(f'Ready to start quiesce thrashing; initial delay: {self.initial_delay} sec') + + self.sleep_unless_stopped(self.initial_delay) + + while not self.is_stopped: + duration = self.next_quiesce_duration() + + if duration > self.split_if_longer: + self.logger.info(f"Total duration ({duration}) is longer than `split_if_longer` ({self.split_if_longer}), " + "will split into two consecutive quiesces") + durations = [duration/2, duration/2] + else: + durations = [duration] + + for d in durations: + self.do_quiesce(d) + + # now we sleep to maintain the quiesce factor + self.sleep_unless_stopped((duration/self.quiesce_factor) - duration) + + except Exception as e: + if not isinstance(e, self.Stopped): + self.set_thrasher_exception(e) + self.logger.exception("exception:") + # allow successful completion so gevent doesn't see an exception... + + def stop(self): + log.warn('The quiescer is requested to stop, running cancel all') + self.tell_quiesce_leader( "--cancel", "--all" ) + super(Quiescer, self).stop() + + +def stop_all_quiescers(thrashers): + for thrasher in thrashers: + if not isinstance(thrasher, Quiescer): + continue + thrasher.stop() + thrasher.join() + if thrasher.exception is not None: + raise RuntimeError(f"error during quiesce thrashing: {thrasher.exception}") + + +@contextlib.contextmanager +def task(ctx, config): + """ + Stress test the mds by randomly quiescing the whole FS while another task/workunit + is running. + Example config (see Quiescer initializer for all available options): + + - quiescer: + quiesce_factor: 0.2 + max_quiesce: 30 + quiesce_timeout: 10 + """ + + if config is None: + config = {} + assert isinstance(config, dict), \ + 'quiescer task only accepts a dict for configuration' + mdslist = list(misc.all_roles_of_type(ctx.cluster, 'mds')) + assert len(mdslist) > 0, \ + 'quiescer task requires at least 1 metadata server' + + cluster_name = config.get('cluster', 'ceph') + # the manager should be there + manager = ctx.managers[cluster_name] + manager.wait_for_clean() + assert manager.is_clean() + + mds_cluster = MDSCluster(ctx) + for fs in mds_cluster.status().get_filesystems(): + quiescer = Quiescer(ctx=ctx, fscid=fs['id'], cluster_name=cluster_name, **config) + quiescer.start() + ctx.ceph[cluster_name].thrashers.append(quiescer) + + try: + log.debug('Yielding') + yield + finally: + log.info('joining Quiescers') + stop_all_quiescers(ctx.ceph[cluster_name].thrashers) + log.info('done joining Quiescers') diff --git a/qa/tasks/waiter.py b/qa/tasks/waiter.py new file mode 100644 index 00000000000..8cbf311a153 --- /dev/null +++ b/qa/tasks/waiter.py @@ -0,0 +1,13 @@ + +import contextlib +import gevent + +@contextlib.contextmanager +def task(ctx, config): + on_enter = config.get("on_enter", 0) + on_exit = config.get("on_exit", 0) + + gevent.sleep(on_enter) + yield + gevent.sleep(on_exit) + \ No newline at end of file -- 2.39.5