From a96a7b027ce6ad74348d52bc0da512fe66027f70 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 17 Jul 2014 21:35:01 +0100 Subject: [PATCH] task/cephfs: generalise Filesystem for multi-MDS This enables tasks like mds_journal_migration to be run in an environment with standby-replay MDSs present. Signed-off-by: John Spray --- teuthology/task/cephfs/filesystem.py | 132 +++++++++++++++++------ teuthology/task/mds_journal_migration.py | 8 +- 2 files changed, 106 insertions(+), 34 deletions(-) diff --git a/teuthology/task/cephfs/filesystem.py b/teuthology/task/cephfs/filesystem.py index 901d2ecb149a0..1b83d81554da4 100644 --- a/teuthology/task/cephfs/filesystem.py +++ b/teuthology/task/cephfs/filesystem.py @@ -5,12 +5,16 @@ import logging import time from teuthology import misc +from teuthology.parallel import parallel from teuthology.task import ceph_manager log = logging.getLogger(__name__) +DAEMON_WAIT_TIMEOUT = 120 + + class Filesystem(object): """ This object is for driving a CephFS filesystem. @@ -23,51 +27,112 @@ class Filesystem(object): self._ctx = ctx self._config = config - mds_list = list(misc.all_roles_of_type(ctx.cluster, 'mds')) - if len(mds_list) != 1: - # Require exactly one MDS, the code path for creation failure when - # a standby is available is different - raise RuntimeError("This task requires exactly one MDS") + self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds')) + if len(self.mds_ids) == 0: + raise RuntimeError("This task requires at least one MDS") - self.mds_id = mds_list[0] - - (mds_remote,) = ctx.cluster.only('mds.{_id}'.format(_id=self.mds_id)).remotes.iterkeys() - manager = ceph_manager.CephManager( - mds_remote, ctx=ctx, logger=log.getChild('ceph_manager'), - ) - self.mds_manager = manager + first_mon = misc.get_first_mon(ctx, config) + (mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys() + self.mon_manager = ceph_manager.CephManager(mon_remote, ctx=ctx, logger=log.getChild('ceph_manager')) + self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids]) client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client')) self.client_id = client_list[0] self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1] - def mds_stop(self): + def are_daemons_healthy(self): + """ + Return true if all daemons are in one of active, standby, standby-replay + :return: + """ + status = self.mon_manager.get_mds_status_all() + for mds_id, mds_status in status['info'].items(): + if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]: + log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state'])) + return False + + return True + + def wait_for_daemons(self, timeout=None): + """ + Wait until all daemons are healthy + :return: """ - Stop the MDS daemon process. If it held a rank, that rank + + if timeout is None: + timeout = DAEMON_WAIT_TIMEOUT + + elapsed = 0 + while True: + if self.are_daemons_healthy(): + return + else: + time.sleep(1) + elapsed += 1 + + if elapsed > timeout: + raise RuntimeError("Timed out waiting for MDS daemons to become healthy") + + def get_lone_mds_id(self): + if len(self.mds_ids) != 1: + raise ValueError("Explicit MDS argument required when multiple MDSs in use") + else: + return self.mds_ids[0] + + def _one_or_all(self, mds_id, cb): + """ + Call a callback for a single named MDS, or for all + + :param mds_id: MDS daemon name, or None + :param cb: Callback taking single argument of MDS daemon name + """ + if mds_id is None: + with parallel() as p: + for mds_id in self.mds_ids: + p.spawn(cb, mds_id) + else: + cb(mds_id) + + def mds_stop(self, mds_id=None): + """ + Stop the MDS daemon process(se). If it held a rank, that rank will eventually go laggy. """ - mds = self._ctx.daemons.get_daemon('mds', self.mds_id) - mds.stop() + self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop()) - def mds_fail(self): + def mds_fail(self, mds_id=None): """ - Inform MDSMonitor that the daemon process is dead. If it held + Inform MDSMonitor of the death of the daemon process(es). If it held a rank, that rank will be relinquished. """ - self.mds_manager.raw_cluster_cmd("mds", "fail", "0") + self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_)) - def mds_restart(self): - mds = self._ctx.daemons.get_daemon('mds', self.mds_id) - mds.restart() + def mds_restart(self, mds_id=None): + self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart()) + + def mds_fail_restart(self, mds_id=None): + """ + Variation on restart that includes marking MDSs as failed, so that doing this + operation followed by waiting for healthy daemon states guarantees that they + have gone down and come up, rather than potentially seeing the healthy states + that existed before the restart. + """ + def _fail_restart(id_): + self.mds_daemons[id_].stop() + self.mon_manager.raw_cluster_cmd("mds", "fail", id_) + self.mds_daemons[id_].restart() + + self._one_or_all(mds_id, _fail_restart) def reset(self): log.info("Creating new filesystem") - assert not self._ctx.daemons.get_daemon('mds', self.mds_id).running() - self.mds_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "0") - self.mds_manager.raw_cluster_cmd_result('mds', 'fail', self.mds_id) - self.mds_manager.raw_cluster_cmd_result('fs', 'rm', "default", "--yes-i-really-mean-it") - self.mds_manager.raw_cluster_cmd_result('fs', 'new', "default", "metadata", "data") + self.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "0") + for mds_id in self.mds_ids: + assert not self._ctx.daemons.get_daemon('mds', mds_id).running() + self.mon_manager.raw_cluster_cmd_result('mds', 'fail', mds_id) + self.mon_manager.raw_cluster_cmd_result('fs', 'rm', "default", "--yes-i-really-mean-it") + self.mon_manager.raw_cluster_cmd_result('fs', 'new', "default", "metadata", "data") def get_metadata_object(self, object_type, object_id): """ @@ -110,8 +175,10 @@ class Filesystem(object): return version - def mds_asok(self, command): - proc = self.mds_manager.admin_socket('mds', self.mds_id, command) + def mds_asok(self, command, mds_id=None): + if mds_id is None: + mds_id = self.get_lone_mds_id() + proc = self.mon_manager.admin_socket('mds', mds_id, command) response_data = proc.stdout.getvalue() log.info("mds_asok output: {0}".format(response_data)) if response_data.strip(): @@ -119,7 +186,7 @@ class Filesystem(object): else: return None - def wait_for_state(self, goal_state, reject=None, timeout=None): + def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None): """ Block until the MDS reaches a particular state, or a failure condition is met. @@ -130,10 +197,13 @@ class Filesystem(object): :return: number of seconds waited, rounded down to integer """ + if mds_id is None: + mds_id = self.get_lone_mds_id() + elapsed = 0 while True: # mds_info is None if no daemon currently claims this rank - mds_info = self.mds_manager.get_mds_status(self.mds_id) + mds_info = self.mon_manager.get_mds_status(mds_id) current_state = mds_info['state'] if mds_info else None if current_state == goal_state: diff --git a/teuthology/task/mds_journal_migration.py b/teuthology/task/mds_journal_migration.py index 53ee53e0cf003..d4ff03392b74c 100644 --- a/teuthology/task/mds_journal_migration.py +++ b/teuthology/task/mds_journal_migration.py @@ -80,7 +80,11 @@ def task(ctx, config): write_conf(ctx) # Restart the MDS. - fs.mds_restart() + fs.mds_fail_restart() + fs.wait_for_daemons() + + # This ensures that all daemons come up into a valid state + fs.wait_for_daemons() # Check that files created in the initial client workload are still visible # in a client mount. @@ -94,8 +98,6 @@ def task(ctx, config): new_journal_version, journal_version() )) - # Check that all MDS daemons are still up and running an in expected state - # Leave all MDSs and clients running for any child tasks for mount in ctx.mounts.values(): mount.mount() -- 2.39.5