]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
task/cephfs: generalise Filesystem for multi-MDS
authorJohn Spray <jspray@redhat.com>
Thu, 17 Jul 2014 20:35:01 +0000 (21:35 +0100)
committerJohn Spray <jspray@redhat.com>
Fri, 25 Jul 2014 12:33:31 +0000 (13:33 +0100)
This enables tasks like mds_journal_migration to be
run in an environment with standby-replay MDSs present.

Signed-off-by: John Spray <john.spray@redhat.com>
teuthology/task/cephfs/filesystem.py
teuthology/task/mds_journal_migration.py

index 901d2ecb149a0ec033976cdc95d92c1217dff09f..1b83d81554da428d7a06e75625929caf40eac4db 100644 (file)
@@ -5,12 +5,16 @@ import logging
 import time
 
 from teuthology import misc
+from teuthology.parallel import parallel
 from teuthology.task import ceph_manager
 
 
 log = logging.getLogger(__name__)
 
 
+DAEMON_WAIT_TIMEOUT = 120
+
+
 class Filesystem(object):
     """
     This object is for driving a CephFS filesystem.
@@ -23,51 +27,112 @@ class Filesystem(object):
         self._ctx = ctx
         self._config = config
 
-        mds_list = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
-        if len(mds_list) != 1:
-            # Require exactly one MDS, the code path for creation failure when
-            # a standby is available is different
-            raise RuntimeError("This task requires exactly one MDS")
+        self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
+        if len(self.mds_ids) == 0:
+            raise RuntimeError("This task requires at least one MDS")
 
-        self.mds_id = mds_list[0]
-
-        (mds_remote,) = ctx.cluster.only('mds.{_id}'.format(_id=self.mds_id)).remotes.iterkeys()
-        manager = ceph_manager.CephManager(
-            mds_remote, ctx=ctx, logger=log.getChild('ceph_manager'),
-        )
-        self.mds_manager = manager
+        first_mon = misc.get_first_mon(ctx, config)
+        (mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+        self.mon_manager = ceph_manager.CephManager(mon_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
+        self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
 
         client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
         self.client_id = client_list[0]
         self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
 
-    def mds_stop(self):
+    def are_daemons_healthy(self):
+        """
+        Return true if all daemons are in one of active, standby, standby-replay
+        :return:
+        """
+        status = self.mon_manager.get_mds_status_all()
+        for mds_id, mds_status in status['info'].items():
+            if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
+                log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
+                return False
+
+        return True
+
+    def wait_for_daemons(self, timeout=None):
+        """
+        Wait until all daemons are healthy
+        :return:
         """
-        Stop the MDS daemon process.  If it held a rank, that rank
+
+        if timeout is None:
+            timeout = DAEMON_WAIT_TIMEOUT
+
+        elapsed = 0
+        while True:
+            if self.are_daemons_healthy():
+                return
+            else:
+                time.sleep(1)
+                elapsed += 1
+
+            if elapsed > timeout:
+                raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
+
+    def get_lone_mds_id(self):
+        if len(self.mds_ids) != 1:
+            raise ValueError("Explicit MDS argument required when multiple MDSs in use")
+        else:
+            return self.mds_ids[0]
+
+    def _one_or_all(self, mds_id, cb):
+        """
+        Call a callback for a single named MDS, or for all
+
+        :param mds_id: MDS daemon name, or None
+        :param cb: Callback taking single argument of MDS daemon name
+        """
+        if mds_id is None:
+            with parallel() as p:
+                for mds_id in self.mds_ids:
+                    p.spawn(cb, mds_id)
+        else:
+            cb(mds_id)
+
+    def mds_stop(self, mds_id=None):
+        """
+        Stop the MDS daemon process(se).  If it held a rank, that rank
         will eventually go laggy.
         """
-        mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
-        mds.stop()
+        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
 
-    def mds_fail(self):
+    def mds_fail(self, mds_id=None):
         """
-        Inform MDSMonitor that the daemon process is dead.  If it held
+        Inform MDSMonitor of the death of the daemon process(es).  If it held
         a rank, that rank will be relinquished.
         """
-        self.mds_manager.raw_cluster_cmd("mds", "fail", "0")
+        self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
 
-    def mds_restart(self):
-        mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
-        mds.restart()
+    def mds_restart(self, mds_id=None):
+        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
+
+    def mds_fail_restart(self, mds_id=None):
+        """
+        Variation on restart that includes marking MDSs as failed, so that doing this
+        operation followed by waiting for healthy daemon states guarantees that they
+        have gone down and come up, rather than potentially seeing the healthy states
+        that existed before the restart.
+        """
+        def _fail_restart(id_):
+            self.mds_daemons[id_].stop()
+            self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
+            self.mds_daemons[id_].restart()
+
+        self._one_or_all(mds_id, _fail_restart)
 
     def reset(self):
         log.info("Creating new filesystem")
 
-        assert not self._ctx.daemons.get_daemon('mds', self.mds_id).running()
-        self.mds_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "0")
-        self.mds_manager.raw_cluster_cmd_result('mds', 'fail', self.mds_id)
-        self.mds_manager.raw_cluster_cmd_result('fs', 'rm', "default", "--yes-i-really-mean-it")
-        self.mds_manager.raw_cluster_cmd_result('fs', 'new', "default", "metadata", "data")
+        self.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "0")
+        for mds_id in self.mds_ids:
+            assert not self._ctx.daemons.get_daemon('mds', mds_id).running()
+            self.mon_manager.raw_cluster_cmd_result('mds', 'fail', mds_id)
+        self.mon_manager.raw_cluster_cmd_result('fs', 'rm', "default", "--yes-i-really-mean-it")
+        self.mon_manager.raw_cluster_cmd_result('fs', 'new', "default", "metadata", "data")
 
     def get_metadata_object(self, object_type, object_id):
         """
@@ -110,8 +175,10 @@ class Filesystem(object):
 
         return version
 
-    def mds_asok(self, command):
-        proc = self.mds_manager.admin_socket('mds', self.mds_id, command)
+    def mds_asok(self, command, mds_id=None):
+        if mds_id is None:
+            mds_id = self.get_lone_mds_id()
+        proc = self.mon_manager.admin_socket('mds', mds_id, command)
         response_data = proc.stdout.getvalue()
         log.info("mds_asok output: {0}".format(response_data))
         if response_data.strip():
@@ -119,7 +186,7 @@ class Filesystem(object):
         else:
             return None
 
-    def wait_for_state(self, goal_state, reject=None, timeout=None):
+    def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None):
         """
         Block until the MDS reaches a particular state, or a failure condition
         is met.
@@ -130,10 +197,13 @@ class Filesystem(object):
         :return: number of seconds waited, rounded down to integer
         """
 
+        if mds_id is None:
+            mds_id = self.get_lone_mds_id()
+
         elapsed = 0
         while True:
             # mds_info is None if no daemon currently claims this rank
-            mds_info = self.mds_manager.get_mds_status(self.mds_id)
+            mds_info = self.mon_manager.get_mds_status(mds_id)
             current_state = mds_info['state'] if mds_info else None
 
             if current_state == goal_state:
index 53ee53e0cf0036b81ec93475c3cefa2ca14e46e8..d4ff03392b74cdca7a787897d2868b3953b0429b 100644 (file)
@@ -80,7 +80,11 @@ def task(ctx, config):
     write_conf(ctx)
 
     # Restart the MDS.
-    fs.mds_restart()
+    fs.mds_fail_restart()
+    fs.wait_for_daemons()
+
+    # This ensures that all daemons come up into a valid state
+    fs.wait_for_daemons()
 
     # Check that files created in the initial client workload are still visible
     # in a client mount.
@@ -94,8 +98,6 @@ def task(ctx, config):
             new_journal_version, journal_version()
         ))
 
-    # Check that all MDS daemons are still up and running an in expected state
-
     # Leave all MDSs and clients running for any child tasks
     for mount in ctx.mounts.values():
         mount.mount()