]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tasks/mds_thrash: support multimds
authorPatrick Donnelly <batrick@batbytes.com>
Tue, 30 Aug 2016 21:23:46 +0000 (17:23 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Tue, 8 Nov 2016 02:24:09 +0000 (21:24 -0500)
This commit amends the MDS thrasher task to also work on multimds
clusters. Main changes:

o New FSStatus class in tasks/cephfs/filesystem.py which gets a snapshot
  of the fsmap (`ceph fs dump`). This allows consecutive operations on
  the same fsmap without repeated fs dumps.

o Only one MDSThrasher is started for each file system.

o The MDSThrasher operates on ranks instead of names (and groups of
  standbys following the initial active).

o The MDSThrasher also will change the max_mds for the cluster to a new
  value [1, current) or (current, starting max_mds]. When reduced,
  randomly selected MDSs other than rank 0 will be deactivated to reach
  the new max_mds. The likelihood of changing max_mds in a given cycle of
  the MDSThrasher is set by the "thrash_max_mds" config.

o The MDSThrasher prints out stats on completion, e.g. number of
  mds deactivated or mds_max changed.

Pre-requisite for: http://tracker.ceph.com/issues/10792
Partially fixes: http://tracker.ceph.com/issues/15134

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
tasks/ceph.py
tasks/cephfs/cephfs_test_case.py
tasks/cephfs/filesystem.py
tasks/cephfs/test_backtrace.py
tasks/cephfs/test_data_scan.py
tasks/cephfs/test_failover.py
tasks/cephfs/test_pool_perm.py
tasks/mds_thrash.py

index a829b4bfb2f390125b2ae532707371a50968003a..1d1238b90905606c1974fa3e1fbd361f11275273 100644 (file)
@@ -316,9 +316,7 @@ def cephfs_setup(ctx, config):
     if mdss.remotes:
         log.info('Setting up CephFS filesystem...')
 
-        ceph_fs = Filesystem(ctx) # TODO: make Filesystem cluster-aware
-        if not ceph_fs.legacy_configured():
-            ceph_fs.create()
+        Filesystem(ctx, create='cephfs') # TODO: make Filesystem cluster-aware
 
         is_active_mds = lambda role: 'mds.' in role and not role.endswith('-s') and '-s-' not in role
         all_roles = [item for remote_roles in mdss.remotes.values() for item in remote_roles]
index ae9c0d636d8ee139000e4ba3f4bd069a07e711c3..d91777b17ae2035c78162ff6de45219308cbc816 100644 (file)
@@ -58,9 +58,9 @@ class CephFSTestCase(CephTestCase):
     LOAD_SETTINGS = []
 
     def setUp(self):
-        if len(self.fs.mds_ids) < self.MDSS_REQUIRED:
+        if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED:
             raise case.SkipTest("Only have {0} MDSs, require {1}".format(
-                len(self.fs.mds_ids), self.MDSS_REQUIRED
+                len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED
             ))
 
         if len(self.mounts) < self.CLIENTS_REQUIRED:
@@ -76,11 +76,11 @@ class CephFSTestCase(CephTestCase):
                     raise case.SkipTest("kclient clients must be on separate nodes")
 
         if self.REQUIRE_ONE_CLIENT_REMOTE:
-            if self.mounts[0].client_remote.hostname in self.fs.get_mds_hostnames():
+            if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames():
                 raise case.SkipTest("Require first client to be on separate server from MDSs")
 
         if self.REQUIRE_MEMSTORE:
-            objectstore = self.fs.get_config("osd_objectstore", "osd")
+            objectstore = self.mds_cluster.get_config("osd_objectstore", "osd")
             if objectstore != "memstore":
                 # You certainly *could* run this on a real OSD, but you don't want to sit
                 # here for hours waiting for the test to fill up a 1TB drive!
@@ -96,7 +96,7 @@ class CephFSTestCase(CephTestCase):
         for i in range(0, self.CLIENTS_REQUIRED):
             setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
 
-        self.fs.clear_firewall()
+        self.mds_cluster.clear_firewall()
 
         # Unmount in order to start each test on a fresh mount, such
         # that test_barrier can have a firm expectation of what OSD
@@ -112,50 +112,52 @@ class CephFSTestCase(CephTestCase):
         # the filesystem rather than just doing a rm -rf of files
         self.mds_cluster.mds_stop()
         self.mds_cluster.delete_all_filesystems()
+        self.fs = None # is now invalid!
 
         # In case the previous filesystem had filled up the RADOS cluster, wait for that
         # flag to pass.
-        osd_mon_report_interval_max = int(self.fs.get_config("osd_mon_report_interval_max", service_type='osd'))
-        self.wait_until_true(lambda: not self.fs.is_full(),
+        osd_mon_report_interval_max = int(self.mds_cluster.get_config("osd_mon_report_interval_max", service_type='osd'))
+        self.wait_until_true(lambda: not self.mds_cluster.is_full(),
                              timeout=osd_mon_report_interval_max * 5)
 
         # In case anything is in the OSD blacklist list, clear it out.  This is to avoid
         # the OSD map changing in the background (due to blacklist expiry) while tests run.
         try:
-            self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear")
+            self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear")
         except CommandFailedError:
             # Fallback for older Ceph cluster
-            blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd",
+            blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
                                   "dump", "--format=json-pretty"))['blacklist']
             log.info("Removing {0} blacklist entries".format(len(blacklist)))
             for addr, blacklisted_at in blacklist.items():
-                self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
+                self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
 
-        # In case some test messed with auth caps, reset them
         client_mount_ids = [m.client_id for m in self.mounts]
-        for client_id in client_mount_ids:
-            self.fs.mon_manager.raw_cluster_cmd_result(
-                'auth', 'caps', "client.{0}".format(client_id),
-                'mds', 'allow',
-                'mon', 'allow r',
-                'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name()))
-
-        log.info(client_mount_ids)
-
         # In case the test changes the IDs of clients, stash them so that we can
         # reset in tearDown
         self._original_client_ids = client_mount_ids
+        log.info(client_mount_ids)
 
         # In case there were any extra auth identities around from a previous
         # test, delete them
         for entry in self.auth_list():
             ent_type, ent_id = entry['entity'].split(".")
             if ent_type == "client" and ent_id not in client_mount_ids and ent_id != "admin":
-                self.fs.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
+                self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
 
         if self.REQUIRE_FILESYSTEM:
-            self.fs.create()
+            self.fs = self.mds_cluster.newfs(True)
             self.fs.mds_restart()
+
+            # In case some test messed with auth caps, reset them
+            for client_id in client_mount_ids:
+                self.mds_cluster.mon_manager.raw_cluster_cmd_result(
+                    'auth', 'caps', "client.{0}".format(client_id),
+                    'mds', 'allow',
+                    'mon', 'allow r',
+                    'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name()))
+
+            # wait for mds restart to complete...
             self.fs.wait_for_daemons()
             if not self.mount_a.is_mounted():
                 self.mount_a.mount()
@@ -169,13 +171,13 @@ class CephFSTestCase(CephTestCase):
         # Load an config settings of interest
         for setting in self.LOAD_SETTINGS:
             setattr(self, setting, int(self.fs.mds_asok(
-                ['config', 'get', setting], self.fs.mds_ids[0]
+                ['config', 'get', setting], self.mds_cluster.mds_ids[0]
             )[setting]))
 
         self.configs_set = set()
 
     def tearDown(self):
-        self.fs.clear_firewall()
+        self.mds_cluster.clear_firewall()
         for m in self.mounts:
             m.teardown()
 
@@ -193,7 +195,7 @@ class CephFSTestCase(CephTestCase):
         """
         Convenience wrapper on "ceph auth list"
         """
-        return json.loads(self.fs.mon_manager.raw_cluster_cmd(
+        return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
             "auth", "list", "--format=json-pretty"
         ))['auth_dump']
 
@@ -234,12 +236,7 @@ class CephFSTestCase(CephTestCase):
         MDS ranks or in the list of standbys
         """
         def get_daemon_names():
-            fs_map = self.mds_cluster.get_fs_map()
-            names = [m['name'] for m in fs_map['standbys']]
-            for fs in fs_map['filesystems']:
-                names.extend([info['name'] for info in fs['mdsmap']['info'].values()])
-
-            return names
+            return [info['name'] for info in self.mds_cluster.status().get_all()]
 
         if daemon_ids is None:
             daemon_ids = self.mds_cluster.mds_ids
@@ -261,14 +258,14 @@ class CephFSTestCase(CephTestCase):
         it does)
         """
         try:
-            self.fs.mds_daemons[daemon_id].proc.wait()
+            self.mds_cluster.mds_daemons[daemon_id].proc.wait()
         except CommandFailedError as e:
             log.info("MDS '{0}' crashed with status {1} as expected".format(daemon_id, e.exitstatus))
-            self.fs.mds_daemons[daemon_id].proc = None
+            self.mds_cluster.mds_daemons[daemon_id].proc = None
 
             # Go remove the coredump from the crash, otherwise teuthology.internal.coredump will
             # catch it later and treat it as a failure.
-            p = self.fs.mds_daemons[daemon_id].remote.run(args=[
+            p = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
                 "sudo", "sysctl", "-n", "kernel.core_pattern"], stdout=StringIO())
             core_pattern = p.stdout.getvalue().strip()
             if os.path.dirname(core_pattern):  # Non-default core_pattern with a directory in it
@@ -278,7 +275,7 @@ class CephFSTestCase(CephTestCase):
 
                 # Determine the PID of the crashed MDS by inspecting the MDSMap, it had
                 # to talk to the mons to get assigned a rank to reach the point of crashing
-                addr = self.fs.mon_manager.get_mds_status(daemon_id)['addr']
+                addr = self.mds_cluster.mon_manager.get_mds_status(daemon_id)['addr']
                 pid_str = addr.split("/")[1]
                 log.info("Determined crasher PID was {0}".format(pid_str))
 
@@ -287,7 +284,7 @@ class CephFSTestCase(CephTestCase):
                 core_glob = re.sub("%[a-z]", "*", core_glob)  # Match all for all other % tokens
 
                 # Verify that we see the expected single coredump matching the expected pattern
-                ls_proc = self.fs.mds_daemons[daemon_id].remote.run(args=[
+                ls_proc = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
                     "sudo", "ls", run.Raw(core_glob)
                 ], stdout=StringIO())
                 cores = [f for f in ls_proc.stdout.getvalue().strip().split("\n") if f]
@@ -296,7 +293,7 @@ class CephFSTestCase(CephTestCase):
 
                 log.info("Found core file {0}, deleting it".format(cores[0]))
 
-                self.fs.mds_daemons[daemon_id].remote.run(args=[
+                self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
                     "sudo", "rm", "-f", cores[0]
                 ])
             else:
index fa4507f02feddd6d008ea1d22b3bb89756e07ef1..6b73ed4f2f42794822310740599a7c234aae011e 100644 (file)
@@ -31,6 +31,112 @@ class ObjectNotFound(Exception):
     def __str__(self):
         return "Object not found: '{0}'".format(self._object_name)
 
+class FSStatus(object):
+    """
+    Operations on a snapshot of the FSMap.
+    """
+    def __init__(self, mon_manager):
+        self.mon = mon_manager
+        self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
+
+    def __str__(self):
+        return json.dumps(self.map, indent = 2, sort_keys = True)
+
+    # Expose the fsmap for manual inspection.
+    def __getitem__(self, key):
+        """
+        Get a field from the fsmap.
+        """
+        return self.map[key]
+
+    def get_filesystems(self):
+        """
+        Iterator for all filesystems.
+        """
+        for fs in self.map['filesystems']:
+            yield fs
+
+    def get_all(self):
+        """
+        Iterator for all the mds_info components in the FSMap.
+        """
+        for info in self.get_standbys():
+            yield info
+        for fs in self.map['filesystems']:
+            for info in fs['mdsmap']['info'].values():
+                yield info
+
+    def get_standbys(self):
+        """
+        Iterator for all standbys.
+        """
+        for info in self.map['standbys']:
+            yield info
+
+    def get_fsmap(self, fscid):
+        """
+        Get the fsmap for the given FSCID.
+        """
+        for fs in self.map['filesystems']:
+            if fscid is None or fs['id'] == fscid:
+                return fs
+        raise RuntimeError("FSCID {0} not in map".format(fscid))
+
+    def get_fsmap_byname(self, name):
+        """
+        Get the fsmap for the given file system name.
+        """
+        for fs in self.map['filesystems']:
+            if name is None or fs['mdsmap']['fs_name'] == name:
+                return fs
+        raise RuntimeError("FS {0} not in map".format(name))
+
+    def get_replays(self, fscid):
+        """
+        Get the standby:replay MDS for the given FSCID.
+        """
+        fs = self.get_fsmap(fscid)
+        for info in fs['mdsmap']['info'].values():
+            if info['state'] == 'up:standby-replay':
+                yield info
+
+    def get_ranks(self, fscid):
+        """
+        Get the ranks for the given FSCID.
+        """
+        fs = self.get_fsmap(fscid)
+        for info in fs['mdsmap']['info'].values():
+            if info['rank'] >= 0:
+                yield info
+
+    def get_rank(self, fscid, rank):
+        """
+        Get the rank for the given FSCID.
+        """
+        for info in self.get_ranks(fscid):
+            if info['rank'] == rank:
+                return info
+        raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
+
+    def get_mds(self, name):
+        """
+        Get the info for the given MDS name.
+        """
+        for info in self.get_all():
+            if info['name'] == name:
+                return info
+        return None
+
+    def get_mds_addr(self, name):
+        """
+        Return the instance addr as a string, like "10.214.133.138:6807\/10825"
+        """
+        info = self.get_mds(name)
+        if info:
+            return info['addr']
+        else:
+            log.warn(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
+            raise RuntimeError("MDS id '{0}' not found in map".format(name))
 
 class CephCluster(object):
     @property
@@ -150,39 +256,43 @@ class MDSCluster(CephCluster):
 
         self._one_or_all(mds_id, _fail_restart)
 
-    def get_filesystem(self, name):
-        return Filesystem(self._ctx, name)
+    def newfs(self, name):
+        return Filesystem(self._ctx, create=name)
 
-    def get_fs_map(self):
-        fs_map = json.loads(self.mon_manager.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
-        return fs_map
+    def status(self):
+        return FSStatus(self.mon_manager)
 
     def delete_all_filesystems(self):
         """
         Remove all filesystems that exist, and any pools in use by them.
         """
-        fs_ls = json.loads(self.mon_manager.raw_cluster_cmd("fs", "ls", "--format=json-pretty"))
-        for fs in fs_ls:
-            self.mon_manager.raw_cluster_cmd("fs", "set", fs['name'], "cluster_down", "true")
-            mds_map = json.loads(
-                self.mon_manager.raw_cluster_cmd(
-                    "fs", "get", fs['name'], "--format=json-pretty"))['mdsmap']
+        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+        pool_id_name = {}
+        for pool in pools:
+            pool_id_name[pool['pool']] = pool['pool_name']
+        status = self.status()
+        for fs in status.get_filesystems():
+            mdsmap = fs['mdsmap']
+            name = mdsmap['fs_name']
+            metadata_pool = pool_id_name[mdsmap['metadata_pool']]
+
+            self.mon_manager.raw_cluster_cmd("fs", "set", name, "cluster_down", "true")
 
-            for gid in mds_map['up'].values():
+            for gid in mdsmap['up'].values():
                 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
 
-            self.mon_manager.raw_cluster_cmd('fs', 'rm', fs['name'], '--yes-i-really-mean-it')
+            self.mon_manager.raw_cluster_cmd('fs', 'rm', name, '--yes-i-really-mean-it')
             self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                             fs['metadata_pool'],
-                                             fs['metadata_pool'],
+                                             metadata_pool, metadata_pool,
                                              '--yes-i-really-really-mean-it')
-            for data_pool in fs['data_pools']:
+            for data_pool in mdsmap['data_pools']:
+                data_pool = pool_id_name[data_pool]
                 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
                                                  data_pool, data_pool,
                                                  '--yes-i-really-really-mean-it')
 
     def get_standby_daemons(self):
-        return set([s['name'] for s in self.get_fs_map()['standbys']])
+        return set([s['name'] for s in self.status().get_standbys()])
 
     def get_mds_hostnames(self):
         result = set()
@@ -205,8 +315,9 @@ class MDSCluster(CephCluster):
 
         def set_block(_mds_id):
             remote = self.mon_manager.find_remote('mds', _mds_id)
+            status = self.status()
 
-            addr = self.get_mds_addr(_mds_id)
+            addr = status.get_mds_addr(_mds_id)
             ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
 
             remote.run(
@@ -221,62 +332,80 @@ class MDSCluster(CephCluster):
     def clear_firewall(self):
         clear_firewall(self._ctx)
 
-    def _all_info(self):
-        """
-        Iterator for all the mds_info components in the FSMap
-        """
-        fs_map = self.get_fs_map()
-        for i in fs_map['standbys']:
-            yield i
-        for fs in fs_map['filesystems']:
-            for i in fs['mdsmap']['info'].values():
-                yield i
-
-    def get_mds_addr(self, mds_id):
-        """
-        Return the instance addr as a string, like "10.214.133.138:6807\/10825"
-        """
-        for mds_info in self._all_info():
-            if mds_info['name'] == mds_id:
-                return mds_info['addr']
-
-        log.warn(json.dumps(list(self._all_info()), indent=2))  # dump for debugging
-        raise RuntimeError("MDS id '{0}' not found in map".format(mds_id))
-
     def get_mds_info(self, mds_id):
-        for mds_info in self._all_info():
-            if mds_info['name'] == mds_id:
-                return mds_info
+        return FSStatus(self.mon_manager).get_mds(mds_id)
 
-        return None
-
-    def get_mds_info_by_rank(self, mds_rank):
-        for mds_info in self._all_info():
-            if mds_info['rank'] == mds_rank:
-                return mds_info
+    def is_full(self):
+        flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
+        return 'full' in flags
 
-        return None
+    def is_pool_full(self, pool_name):
+        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+        for pool in pools:
+            if pool['pool_name'] == pool_name:
+                return 'full' in pool['flags_names'].split(",")
 
+        raise RuntimeError("Pool not found '{0}'".format(pool_name))
 
 class Filesystem(MDSCluster):
     """
     This object is for driving a CephFS filesystem.  The MDS daemons driven by
     MDSCluster may be shared with other Filesystems.
     """
-    def __init__(self, ctx, name=None):
+    def __init__(self, ctx, fscid=None, create=None):
         super(Filesystem, self).__init__(ctx)
 
-        if name is None:
-            name = "cephfs"
-
-        self.name = name
-        self.metadata_pool_name = "{0}_metadata".format(name)
-        self.data_pool_name = "{0}_data".format(name)
+        self.id = None
+        self.name = None
+        self.metadata_pool_name = None
+        self.data_pools = None
 
         client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
         self.client_id = client_list[0]
         self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
 
+        if create is not None:
+            if fscid is not None:
+                raise RuntimeError("cannot specify fscid when creating fs")
+            if create is True:
+                self.name = 'cephfs'
+            else:
+                self.name = create
+            if not self.legacy_configured():
+                self.create()
+        elif fscid is not None:
+            self.id = fscid
+        self.getinfo(refresh = True)
+
+    def getinfo(self, refresh = False):
+        status = self.status()
+        if self.id is not None:
+            fsmap = status.get_fsmap(self.id)
+        elif self.name is not None:
+            fsmap = status.get_fsmap_byname(self.name)
+        else:
+            fss = [fs for fs in status.get_filesystems()]
+            if len(fss) == 1:
+                fsmap = fss[0]
+            elif len(fss) == 0:
+                raise RuntimeError("no file system available")
+            else:
+                raise RuntimeError("more than one file system available")
+        self.id = fsmap['id']
+        self.name = fsmap['mdsmap']['fs_name']
+        self.get_pool_names(status = status, refresh = refresh)
+        return status
+
+    def deactivate(self, rank):
+        if rank < 0:
+            raise RuntimeError("invalid rank")
+        elif rank == 0:
+            raise RuntimeError("cannot deactivate rank 0")
+        self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
+
+    def set_max_mds(self, max_mds):
+        self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
+
     def get_pgs_per_fs_pool(self):
         """
         Calculate how many PGs to use when creating a pool, in order to avoid raising any
@@ -288,10 +417,13 @@ class Filesystem(MDSCluster):
         osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
         return pg_warn_min_per_osd * osd_count
 
-    def get_all_mds_info(self):
-        return self.get_mds_info()
-
     def create(self):
+        if self.name is None:
+            self.name = "cephfs"
+        if self.metadata_pool_name is None:
+            self.metadata_pool_name = "{0}_metadata".format(self.name)
+        data_pool_name = "{0}_data".format(self.name)
+
         log.info("Creating filesystem '{0}'".format(self.name))
 
         pgs_per_fs_pool = self.get_pgs_per_fs_pool()
@@ -299,9 +431,11 @@ class Filesystem(MDSCluster):
         self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
                                          self.metadata_pool_name, pgs_per_fs_pool.__str__())
         self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
-                                         self.data_pool_name, pgs_per_fs_pool.__str__())
+                                         data_pool_name, pgs_per_fs_pool.__str__())
         self.mon_manager.raw_cluster_cmd('fs', 'new',
-                                         self.name, self.metadata_pool_name, self.data_pool_name)
+                                         self.name, self.metadata_pool_name, data_pool_name)
+
+        self.getinfo(refresh = True)
 
     def exists(self):
         """
@@ -319,6 +453,8 @@ class Filesystem(MDSCluster):
             out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
             pools = json.loads(out_text)
             metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
+            if metadata_pool_exists:
+                self.metadata_pool_name = 'metadata'
         except CommandFailedError as e:
             # For use in upgrade tests, Ceph cuttlefish and earlier don't support
             # structured output (--format) from the CLI.
@@ -333,35 +469,59 @@ class Filesystem(MDSCluster):
         return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
 
     def get_mds_map(self):
-        fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
-        return fs['mdsmap']
-
-    def get_data_pool_name(self):
-        return self.data_pool_name
-
-    def get_data_pool_id(self):
+        return self.status().get_fsmap(self.id)['mdsmap']
+
+    def add_data_pool(self, name):
+        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
+        self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
+        self.get_pool_names(refresh = True)
+        for poolid, fs_name in self.data_pools.items():
+            if name == fs_name:
+                return poolid
+        raise RuntimeError("could not get just created pool '{0}'".format(name))
+
+    def get_pool_names(self, refresh = False, status = None):
+        if refresh or self.metadata_pool_name is None or self.data_pools is None:
+            if status is None:
+                status = self.status()
+            fsmap = status.get_fsmap(self.id)
+
+            osd_map = self.mon_manager.get_osd_dump_json()
+            id_to_name = {}
+            for p in osd_map['pools']:
+                id_to_name[p['pool']] = p['pool_name']
+
+            self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
+            self.data_pools = {}
+            for data_pool in fsmap['mdsmap']['data_pools']:
+                self.data_pools[data_pool] = id_to_name[data_pool]
+
+    def get_data_pool_name(self, refresh = False):
+        if refresh or self.data_pools is None:
+            self.get_pool_names(refresh = True)
+        assert(len(self.data_pools) == 1)
+        return self.data_pools.values()[0]
+
+    def get_data_pool_id(self, refresh = False):
         """
         Don't call this if you have multiple data pools
         :return: integer
         """
-        pools = self.get_mds_map()['data_pools']
-        assert(len(pools) == 1)
-        return pools[0]
-
-    def get_data_pool_names(self):
-        osd_map = self.mon_manager.get_osd_dump_json()
-        id_to_name = {}
-        for p in osd_map['pools']:
-            id_to_name[p['pool']] = p['pool_name']
+        if refresh or self.data_pools is None:
+            self.get_pool_names(refresh = True)
+        assert(len(self.data_pools) == 1)
+        return self.data_pools.keys()[0]
 
-        return [id_to_name[pool_id] for pool_id in self.get_mds_map()['data_pools']]
+    def get_data_pool_names(self, refresh = False):
+        if refresh or self.data_pools is None:
+            self.get_pool_names(refresh = True)
+        return self.data_pools.values()
 
     def get_metadata_pool_name(self):
         return self.metadata_pool_name
 
     def get_namespace_id(self):
-        fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
-        return fs['id']
+        return self.id
 
     def get_pool_df(self, pool_name):
         """
@@ -521,6 +681,7 @@ class Filesystem(MDSCluster):
     def recreate(self):
         log.info("Creating new filesystem")
         self.delete_all_filesystems()
+        self.id = None
         self.create()
 
     def put_metadata_object_raw(self, object_id, infile):
@@ -591,18 +752,6 @@ class Filesystem(MDSCluster):
 
         return self.json_asok(command, 'mds', mds_id)
 
-    def is_full(self):
-        flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
-        return 'full' in flags
-
-    def is_pool_full(self, pool_name):
-        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
-        for pool in pools:
-            if pool['pool_name'] == pool_name:
-                return 'full' in pool['flags_names'].split(",")
-
-        raise RuntimeError("Pool not found '{0}'".format(pool_name))
-
     def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None):
         """
         Block until the MDS reaches a particular state, or a failure condition
@@ -619,15 +768,15 @@ class Filesystem(MDSCluster):
 
         started_at = time.time()
         while True:
+            status = self.status()
             if mds_id is not None:
                 # mds_info is None if no daemon with this ID exists in the map
-                mds_info = self.mon_manager.get_mds_status(mds_id)
+                mds_info = status.get_mds(mds_id)
                 current_state = mds_info['state'] if mds_info else None
                 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
             else:
                 # In general, look for a single MDS
-                mds_status = self.get_mds_map()
-                states = [m['state'] for m in mds_status['info'].values()]
+                states = [m['state'] for m in status.get_ranks(self.id)]
                 if [s for s in states if s == goal_state] == [goal_state]:
                     current_state = goal_state
                 elif reject in states:
@@ -643,7 +792,7 @@ class Filesystem(MDSCluster):
             elif reject is not None and current_state == reject:
                 raise RuntimeError("MDS in reject state {0}".format(current_state))
             elif timeout is not None and elapsed > timeout:
-                log.error("MDS status at timeout: {0}".format(self.get_mds_map()))
+                log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
                 raise RuntimeError(
                     "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
                         elapsed, goal_state, current_state
index 6d7308ec70d1f79b57b219b16a0662a8150258ae..98255b8ff432f2a188e0fe2114ea5ae92c421b46 100644 (file)
@@ -9,11 +9,8 @@ class TestBacktrace(CephFSTestCase):
         are updated correctly.
         """
 
-        def get_pool_id(name):
-            return self.fs.mon_manager.get_pool_dump(name)['pool']
-
         old_data_pool_name = self.fs.get_data_pool_name()
-        old_pool_id = get_pool_id(old_data_pool_name)
+        old_pool_id = self.fs.get_data_pool_id()
 
         # Create a file for subsequent checks
         self.mount_a.run_shell(["mkdir", "parent_a"])
@@ -44,10 +41,7 @@ class TestBacktrace(CephFSTestCase):
 
         # Create a new data pool
         new_pool_name = "data_new"
-        self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', new_pool_name,
-                                            self.fs.get_pgs_per_fs_pool().__str__())
-        self.fs.mon_manager.raw_cluster_cmd('mds', 'add_data_pool', new_pool_name)
-        new_pool_id = get_pool_id(new_pool_name)
+        new_pool_id = self.fs.add_data_pool(new_pool_name)
 
         # That an object which has switched pools gets its backtrace updated
         self.mount_a.run_shell(["setfattr", "-n", "ceph.file.layout.pool", "-v", new_pool_name, "./parent_b/alpha"])
index cf84b023cd0e672b4010a52e8e95439c482d5fd4..44280398821ffc11df842f311f072c0b43523c6f 100644 (file)
@@ -376,8 +376,7 @@ class TestDataScan(CephFSTestCase):
         # Start the MDS
         self.fs.mds_restart()
         self.fs.wait_for_daemons()
-        import json
-        log.info(json.dumps(self.mds_cluster.get_fs_map(), indent=2))
+        log.info(str(self.mds_cluster.status()))
 
         # Mount a client
         self.mount_a.mount()
index f8e71b0299bd8df3beb1257a2497a8a9a8e3f3e9..ff9919dce957c5aa1eca12702eefa1c6e3397844 100644 (file)
@@ -98,17 +98,14 @@ class TestStandbyReplay(CephFSTestCase):
         if replay:
             self.set_conf("mds.{0}".format(follower), "mds_standby_replay", "true")
 
-    def get_info_by_name(self, fs, mds_name):
-        if fs is None:
-            mds_info = self.mds_cluster.get_fs_map()['standbys']
+    def get_info_by_name(self, mds_name):
+        status = self.mds_cluster.status()
+        info = status.get_mds(mds_name)
+        if info is None:
+            log.warn(str(status))
+            raise RuntimeError("MDS '{0}' not found".format(mds_name))
         else:
-            mds_info = fs.get_mds_map()['info'].values()
-        for info in mds_info:
-            if info['name'] == mds_name:
-                return info
-
-        log.warn(json.dumps(mds_info, indent=2))
-        raise RuntimeError("MDS '{0}' not found".format(mds_name))
+            return info
 
     def test_standby_replay_unused(self):
         # Pick out exactly 3 daemons to be run during test
@@ -122,8 +119,7 @@ class TestStandbyReplay(CephFSTestCase):
         self.set_standby_for(mds_a, mds_c, True)
 
         # Create FS and start A
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_a.create()
+        fs_a = self.mds_cluster.newfs("alpha")
         self.mds_cluster.mds_restart(mds_a)
         fs_a.wait_for_daemons()
         self.assertEqual(fs_a.get_active_names(), [mds_a])
@@ -131,7 +127,7 @@ class TestStandbyReplay(CephFSTestCase):
         # Start B, he should go into standby replay
         self.mds_cluster.mds_restart(mds_b)
         self.wait_for_daemon_start([mds_b])
-        info_b = self.get_info_by_name(fs_a, mds_b)
+        info_b = self.get_info_by_name(mds_b)
         self.assertEqual(info_b['state'], "up:standby-replay")
         self.assertEqual(info_b['standby_for_name'], mds_a)
         self.assertEqual(info_b['rank'], 0)
@@ -139,7 +135,7 @@ class TestStandbyReplay(CephFSTestCase):
         # Start C, he should go into standby (*not* replay)
         self.mds_cluster.mds_restart(mds_c)
         self.wait_for_daemon_start([mds_c])
-        info_c = self.get_info_by_name(None, mds_c)
+        info_c = self.get_info_by_name(mds_c)
         self.assertEqual(info_c['state'], "up:standby")
         self.assertEqual(info_c['standby_for_name'], mds_a)
         self.assertEqual(info_c['rank'], -1)
@@ -148,10 +144,10 @@ class TestStandbyReplay(CephFSTestCase):
         self.mds_cluster.mds_stop(mds_b)
         self.mds_cluster.mds_fail(mds_b)
         self.wait_until_equal(
-                lambda: self.get_info_by_name(fs_a, mds_c)['state'],
+                lambda: self.get_info_by_name(mds_c)['state'],
                 "up:standby-replay",
                 60)
-        info_c = self.get_info_by_name(fs_a, mds_c)
+        info_c = self.get_info_by_name(mds_c)
         self.assertEqual(info_c['state'], "up:standby-replay")
         self.assertEqual(info_c['standby_for_name'], mds_a)
         self.assertEqual(info_c['rank'], 0)
@@ -171,8 +167,7 @@ class TestStandbyReplay(CephFSTestCase):
         self.set_standby_for(mds_b, mds_a, False)
 
         # Create FS alpha and get mds_a to come up as active
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_a.create()
+        fs_a = self.mds_cluster.newfs("alpha")
         self.mds_cluster.mds_restart(mds_a)
         fs_a.wait_for_daemons()
         self.assertEqual(fs_a.get_active_names(), [mds_a])
@@ -182,7 +177,7 @@ class TestStandbyReplay(CephFSTestCase):
         self.wait_for_daemon_start([mds_b])
 
         # See the standby come up as the correct rank
-        info_b = self.get_info_by_name(fs_a, mds_b)
+        info_b = self.get_info_by_name(mds_b)
         self.assertEqual(info_b['state'], "up:standby-replay")
         self.assertEqual(info_b['standby_for_name'], mds_a)
         self.assertEqual(info_b['rank'], 0)
@@ -214,8 +209,7 @@ class TestStandbyReplay(CephFSTestCase):
         self.set_standby_for(mds_b, mds_b_s, True)
 
         # Create FS alpha and get mds_a to come up as active
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_a.create()
+        fs_a = self.mds_cluster.newfs("alpha")
         fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name,
                                          'allow_multimds', "true",
                                          "--yes-i-really-mean-it")
@@ -232,9 +226,9 @@ class TestStandbyReplay(CephFSTestCase):
         self.wait_for_daemon_start([mds_b_s])
         self.mds_cluster.mds_restart(mds_a_s)
         self.wait_for_daemon_start([mds_a_s])
-        info_b_s = self.get_info_by_name(fs_a, mds_b_s)
+        info_b_s = self.get_info_by_name(mds_b_s)
         self.assertEqual(info_b_s['state'], "up:standby-replay")
-        info_a_s = self.get_info_by_name(fs_a, mds_a_s)
+        info_a_s = self.get_info_by_name(mds_a_s)
         self.assertEqual(info_a_s['state'], "up:standby-replay")
 
         # Shrink the cluster
@@ -261,14 +255,13 @@ class TestMultiFilesystems(CephFSTestCase):
 
     def setUp(self):
         super(TestMultiFilesystems, self).setUp()
-        self.fs.mon_manager.raw_cluster_cmd("fs", "flag", "set",
-                                            "enable_multiple", "true",
-                                            "--yes-i-really-mean-it")
+        self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
+            "enable_multiple", "true",
+            "--yes-i-really-mean-it")
+
     def _setup_two(self):
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_b = self.mds_cluster.get_filesystem("bravo")
-        fs_a.create()
-        fs_b.create()
+        fs_a = self.mds_cluster.newfs("alpha")
+        fs_b = self.mds_cluster.newfs("bravo")
 
         self.mds_cluster.mds_restart()
 
@@ -278,7 +271,7 @@ class TestMultiFilesystems(CephFSTestCase):
 
         # Reconfigure client auth caps
         for mount in self.mounts:
-            self.fs.mon_manager.raw_cluster_cmd_result(
+            self.mds_cluster.mon_manager.raw_cluster_cmd_result(
                 'auth', 'caps', "client.{0}".format(mount.client_id),
                 'mds', 'allow',
                 'mon', 'allow r',
@@ -429,15 +422,13 @@ class TestMultiFilesystems(CephFSTestCase):
         set_standby_for(mds_d, mds_c, False)
 
         # Create FS alpha and get mds_a to come up as active
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_a.create()
+        fs_a = self.mds_cluster.newfs("alpha")
         self.mds_cluster.mds_restart(mds_a)
         fs_a.wait_for_daemons()
         self.assertEqual(fs_a.get_active_names(), [mds_a])
 
         # Create FS bravo and get mds_c to come up as active
-        fs_b = self.mds_cluster.get_filesystem("bravo")
-        fs_b.create()
+        fs_b = self.mds_cluster.newfs("bravo")
         self.mds_cluster.mds_restart(mds_c)
         fs_b.wait_for_daemons()
         self.assertEqual(fs_b.get_active_names(), [mds_c])
@@ -501,10 +492,8 @@ class TestMultiFilesystems(CephFSTestCase):
             self.set_conf("mds.{0}".format(follower_id),
                           "mds_standby_for_fscid", fscid)
 
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_a.create()
-        fs_b = self.mds_cluster.get_filesystem("bravo")
-        fs_b.create()
+        fs_a = self.mds_cluster.newfs("alpha")
+        fs_b = self.mds_cluster.newfs("bravo")
         set_standby_for(0, fs_a, mds_a)
         set_standby_for(0, fs_a, mds_b)
         set_standby_for(0, fs_b, mds_c)
@@ -548,14 +537,12 @@ class TestMultiFilesystems(CephFSTestCase):
                           "mds_standby_for_fscid", fscid)
 
         # Create two filesystems which should have two ranks each
-        fs_a = self.mds_cluster.get_filesystem("alpha")
-        fs_a.create()
+        fs_a = self.mds_cluster.newfs("alpha")
         fs_a.mon_manager.raw_cluster_cmd("fs", "set", fs_a.name,
                                          "allow_multimds", "true",
                                          "--yes-i-really-mean-it")
 
-        fs_b = self.mds_cluster.get_filesystem("bravo")
-        fs_b.create()
+        fs_b = self.mds_cluster.newfs("bravo")
         fs_b.mon_manager.raw_cluster_cmd("fs", "set", fs_b.name,
                                          "allow_multimds", "true",
                                          "--yes-i-really-mean-it")
index d625afde8d2febb2bd40d512ff7ebdad06a3b7a1..edaa8ea4a66f7ad8b6811dad08e2d953141df029 100644 (file)
@@ -62,9 +62,7 @@ class TestPoolPerm(CephFSTestCase):
         # Set up
         client_name = "client.{0}".format(self.mount_a.client_id)
         new_pool_name = "data_new"
-        self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', new_pool_name,
-                                            self.fs.get_pgs_per_fs_pool().__str__())
-        self.fs.mon_manager.raw_cluster_cmd('mds', 'add_data_pool', new_pool_name)
+        self.fs.add_data_pool(new_pool_name)
 
         self.mount_a.run_shell(["touch", "layoutfile"])
         self.mount_a.run_shell(["mkdir", "layoutdir"])
index a68eeddd06a3c0b64821bfbe024c80fa09f68335..f1db6ac8db1a7cc0f771dc1784efc9a69c3eb1e7 100644 (file)
@@ -26,33 +26,40 @@ class MDSThrasher(Greenlet):
     to use when selecting a random value from a range.  To always use the maximum
     value, set no_random to true.  The config is a dict containing some or all of:
 
-    seed: [no default] seed the random number generator
-
-    randomize: [default: true] enables randomization and use the max/min values
-
-    max_thrash: [default: 1] the maximum number of MDSs that will be thrashed at
+    max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
       any given time.
 
     max_thrash_delay: [default: 30] maximum number of seconds to delay before
       thrashing again.
 
+    max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
+      the replay state before thrashing.
+
     max_revive_delay: [default: 10] maximum number of seconds to delay before
-      bringing back a thrashed MDS
+      bringing back a thrashed MDS.
 
-    thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
-      during replay.  Value should be between 0.0 and 1.0
+    randomize: [default: true] enables randomization and use the max/min values
 
-    max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
-      the replay state before thrashing
+    seed: [no default] seed the random number generator
 
-    thrash_weights: allows specific MDSs to be thrashed more/less frequently.  This option
-      overrides anything specified by max_thrash.  This option is a dict containing
-      mds.x: weight pairs.  For example, [mds.a: 0.7, mds.b: 0.3, mds.c: 0.0].  Each weight
-      is a value from 0.0 to 1.0.  Any MDSs not specified will be automatically
-      given a weight of 0.0.  For a given MDS, by default the trasher delays for up
-      to max_thrash_delay, trashes, waits for the MDS to recover, and iterates.  If a non-zero
-      weight is specified for an MDS, for each iteration the thrasher chooses whether to thrash
-      during that iteration based on a random value [0-1] not exceeding the weight of that MDS.
+    thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
+      during replay.  Value should be between 0.0 and 1.0.
+
+    thrash_max_mds: [default: 0.25] likelihood that the max_mds of the mds
+      cluster will be modified to a value [1, current) or (current, starting
+      max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
+      deactivated to reach the new max_mds.  Value should be between 0.0 and 1.0.
+
+    thrash_weights: allows specific MDSs to be thrashed more/less frequently.
+      This option overrides anything specified by max_thrash.  This option is a
+      dict containing mds.x: weight pairs.  For example, [mds.a: 0.7, mds.b:
+      0.3, mds.c: 0.0].  Each weight is a value from 0.0 to 1.0.  Any MDSs not
+      specified will be automatically given a weight of 0.0 (not thrashed).
+      For a given MDS, by default the trasher delays for up to
+      max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
+      If a non-zero weight is specified for an MDS, for each iteration the
+      thrasher chooses whether to thrash during that iteration based on a
+      random value [0-1] not exceeding the weight of that MDS.
 
     Examples::
 
@@ -85,35 +92,29 @@ class MDSThrasher(Greenlet):
 
     """
 
-    def __init__(self, ctx, manager, mds_cluster, config, logger, failure_group, weight):
+    def __init__(self, ctx, manager, config, logger, fs, max_mds):
         super(MDSThrasher, self).__init__()
 
         self.ctx = ctx
         self.manager = manager
         assert self.manager.is_clean()
-        self.mds_cluster = mds_cluster
+        self.config = config
+        self.logger = logger
+        self.fs = fs
+        self.max_mds = max_mds
 
         self.stopping = Event()
-        self.logger = logger
-        self.config = config
 
         self.randomize = bool(self.config.get('randomize', True))
-        self.max_thrash_delay = float(self.config.get('thrash_delay', 30.0))
+        self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.25))
+        self.max_thrash = int(self.config.get('max_thrash', 1))
+        self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
         self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
         assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
             v=self.thrash_in_replay)
-
         self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
-
         self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
 
-        self.failure_group = failure_group
-        self.weight = weight
-
-        # TODO support multiple filesystems: will require behavioural change to select
-        # which filesystem to act on when doing rank-ish things
-        self.fs = Filesystem(self.ctx)
-
     def _run(self):
         try:
             self.do_thrash()
@@ -147,13 +148,6 @@ class MDSThrasher(Greenlet):
             "powercycling requested but RemoteConsole is not "
             "initialized.  Check ipmi config.")
 
-    def kill_mds_by_rank(self, rank):
-        """
-        kill_mds wrapper to kill based on rank passed.
-        """
-        status = self.mds_cluster.get_mds_info_by_rank(rank)
-        self.kill_mds(status['name'])
-
     def revive_mds(self, mds, standby_for_rank=None):
         """
         Revive mds -- do an ipmpi powercycle (if indicated by the config)
@@ -172,23 +166,44 @@ class MDSThrasher(Greenlet):
             args.extend(['--hot-standby', standby_for_rank])
         self.ctx.daemons.get_daemon('mds', mds).restart(*args)
 
-    def revive_mds_by_rank(self, rank, standby_for_rank=None):
-        """
-        revive_mds wrapper to revive based on rank passed.
-        """
-        status = self.mds_cluster.get_mds_info_by_rank(rank)
-        self.revive_mds(status['name'], standby_for_rank)
-
-    def get_mds_status_all(self):
-        return self.fs.get_mds_map()
+    def wait_for_stable(self, rank = None, gid = None):
+        self.log('waiting for mds cluster to stabilize...')
+        status = self.fs.status()
+        itercount = 0
+        while True:
+            max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+            if rank is not None:
+                try:
+                    info = status.get_rank(self.fs.id, rank)
+                    if info['gid'] != gid:
+                        self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
+                        return status, info['name']
+                except:
+                    pass # no rank present
+            else:
+                ranks = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, list(status.get_ranks(self.fs.id)))
+                count = len(ranks)
+                if count >= max_mds:
+                    self.log('mds cluster has {count} alive and active, now stable!'.format(count = count))
+                    return status, None
+            itercount = itercount + 1
+            if itercount > 10:
+                self.log('mds map: {status}'.format(status=self.fs.status()))
+            time.sleep(2)
+            status = self.fs.status()
 
     def do_thrash(self):
         """
         Perform the random thrashing action
         """
 
-        self.log('starting mds_do_thrash for failure group: ' + ', '.join(
-            ['mds.{_id}'.format(_id=_f) for _f in self.failure_group]))
+        self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
+        stats = {
+            "max_mds": 0,
+            "deactivate": 0,
+            "kill": 0,
+        }
+
         while not self.stopping.is_set():
             delay = self.max_thrash_delay
             if self.randomize:
@@ -200,112 +215,128 @@ class MDSThrasher(Greenlet):
                 if self.stopping.is_set():
                     continue
 
-            skip = random.randrange(0.0, 1.0)
-            if self.weight < 1.0 and skip > self.weight:
-                self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip,
-                                                                                                   weight=self.weight))
-                continue
-
-            # find the active mds in the failure group
-            statuses = [self.mds_cluster.get_mds_info(m) for m in self.failure_group]
-            actives = filter(lambda s: s and s['state'] == 'up:active', statuses)
-            assert len(actives) == 1, 'Can only have one active in a failure group'
-
-            active_mds = actives[0]['name']
-            active_rank = actives[0]['rank']
-
-            self.log('kill mds.{id} (rank={r})'.format(id=active_mds, r=active_rank))
-            self.kill_mds_by_rank(active_rank)
-
-            # wait for mon to report killed mds as crashed
-            last_laggy_since = None
-            itercount = 0
-            while True:
-                failed = self.fs.get_mds_map()['failed']
-                status = self.mds_cluster.get_mds_info(active_mds)
-                if not status:
-                    break
-                if 'laggy_since' in status:
-                    last_laggy_since = status['laggy_since']
-                    break
-                if any([(f == active_mds) for f in failed]):
-                    break
-                self.log(
-                    'waiting till mds map indicates mds.{_id} is laggy/crashed, in failed state, or mds.{_id} is removed from mdsmap'.format(
-                        _id=active_mds))
-                itercount = itercount + 1
-                if itercount > 10:
-                    self.log('mds map: {status}'.format(status=self.mds_cluster.get_fs_map()))
-                time.sleep(2)
-            if last_laggy_since:
-                self.log(
-                    'mds.{_id} reported laggy/crashed since: {since}'.format(_id=active_mds, since=last_laggy_since))
-            else:
-                self.log('mds.{_id} down, removed from mdsmap'.format(_id=active_mds, since=last_laggy_since))
-
-            # wait for a standby mds to takeover and become active
-            takeover_mds = None
-            takeover_rank = None
-            itercount = 0
-            while True:
-                statuses = [self.mds_cluster.get_mds_info(m) for m in self.failure_group]
-                actives = filter(lambda s: s and s['state'] == 'up:active', statuses)
-                if len(actives) > 0:
-                    assert len(actives) == 1, 'Can only have one active in failure group'
-                    takeover_mds = actives[0]['name']
-                    takeover_rank = actives[0]['rank']
-                    break
-                itercount = itercount + 1
-                if itercount > 10:
-                    self.log('mds map: {status}'.format(status=self.mds_cluster.get_fs_map()))
-
-            self.log('New active mds is mds.{_id}'.format(_id=takeover_mds))
-
-            # wait for a while before restarting old active to become new
-            # standby
-            delay = self.max_revive_delay
-            if self.randomize:
-                delay = random.randrange(0.0, self.max_revive_delay)
-
-            self.log('waiting for {delay} secs before reviving mds.{id}'.format(
-                delay=delay, id=active_mds))
-            time.sleep(delay)
-
-            self.log('reviving mds.{id}'.format(id=active_mds))
-            self.revive_mds(active_mds, standby_for_rank=takeover_rank)
-
-            status = {}
-            while True:
-                status = self.mds_cluster.get_mds_info(active_mds)
-                if status and (status['state'] == 'up:standby' or status['state'] == 'up:standby-replay'):
+            status = self.fs.status()
+
+            if random.randrange(0.0, 1.0) <= self.thrash_max_mds:
+                max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+                options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
+                if len(options) > 0:
+                    sample = random.sample(options, 1)
+                    new_max_mds = sample[0]
+                    self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
+                    self.fs.set_max_mds(new_max_mds)
+                    stats['max_mds'] += 1
+
+                    # Now randomly deactivate mds if we shrank
+                    for rank in random.sample(range(1, max_mds), max(0, max_mds-new_max_mds)):
+                        self.fs.deactivate(rank)
+                        stats['deactivate'] += 1
+
+                    status = self.wait_for_stable()[0]
+
+            count = 0
+            for info in status.get_ranks(self.fs.id):
+                name = info['name']
+                label = 'mds.' + name
+                rank = info['rank']
+                gid = info['gid']
+
+                # if thrash_weights isn't specified and we've reached max_thrash,
+                # we're done
+                count = count + 1
+                if 'thrash_weights' not in self.config and count > self.max_thrash:
                     break
-                self.log(
-                    'waiting till mds map indicates mds.{_id} is in standby or standby-replay'.format(_id=active_mds))
-                time.sleep(2)
-            self.log('mds.{_id} reported in {state} state'.format(_id=active_mds, state=status['state']))
 
-            # don't do replay thrashing right now
-            continue
-            # this might race with replay -> active transition...
-            if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
-
-                delay = self.max_replay_thrash_delay
-                if self.randomize:
-                    delay = random.randrange(0.0, self.max_replay_thrash_delay)
-                time.sleep(delay)
-                self.log('kill replaying mds.{id}'.format(id=self.to_kill))
-                self.kill_mds(self.to_kill)
+                weight = 1.0
+                if 'thrash_weights' in self.config:
+                    weight = self.config['thrash_weights'].get(label, '0.0')
+                skip = random.randrange(0.0, 1.0)
+                if weight <= skip:
+                    self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
+                    continue
 
+                self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
+                self.kill_mds(name)
+                stats['kill'] += 1
+
+                # wait for mon to report killed mds as crashed
+                last_laggy_since = None
+                itercount = 0
+                while True:
+                    status = self.fs.status()
+                    info = status.get_mds(name)
+                    if not info:
+                        break
+                    if 'laggy_since' in info:
+                        last_laggy_since = info['laggy_since']
+                        break
+                    if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
+                        break
+                    self.log(
+                        'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
+                            label=label))
+                    itercount = itercount + 1
+                    if itercount > 10:
+                        self.log('mds map: {status}'.format(status=status))
+                    time.sleep(2)
+
+                if last_laggy_since:
+                    self.log(
+                        '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
+                else:
+                    self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
+
+                # wait for a standby mds to takeover and become active
+                status, takeover_mds = self.wait_for_stable(rank, gid)
+                self.log('New active mds is mds.{_id}'.format(_id=takeover_mds))
+
+                # wait for a while before restarting old active to become new
+                # standby
                 delay = self.max_revive_delay
                 if self.randomize:
                     delay = random.randrange(0.0, self.max_revive_delay)
 
-                self.log('waiting for {delay} secs before reviving mds.{id}'.format(
-                    delay=delay, id=self.to_kill))
+                self.log('waiting for {delay} secs before reviving {label}'.format(
+                    delay=delay, label=label))
                 time.sleep(delay)
 
-                self.log('revive mds.{id}'.format(id=self.to_kill))
-                self.revive_mds(self.to_kill)
+                self.log('reviving {label}'.format(label=label))
+                self.revive_mds(name)
+
+                while True:
+                    status = self.fs.status()
+                    info = status.get_mds(name)
+                    if info and info['state'] in ('up:standby', 'up:standby-replay'):
+                        self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
+                        break
+                    self.log(
+                        'waiting till mds map indicates {label} is in standby or standby-replay'.format(label=label))
+                    time.sleep(2)
+
+        for stat in stats:
+            self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
+
+             # don't do replay thrashing right now
+#            for info in status.get_replays(self.fs.id):
+#                # this might race with replay -> active transition...
+#                if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
+#                    delay = self.max_replay_thrash_delay
+#                    if self.randomize:
+#                        delay = random.randrange(0.0, self.max_replay_thrash_delay)
+#                time.sleep(delay)
+#                self.log('kill replaying mds.{id}'.format(id=self.to_kill))
+#                self.kill_mds(self.to_kill)
+#
+#                delay = self.max_revive_delay
+#                if self.randomize:
+#                    delay = random.randrange(0.0, self.max_revive_delay)
+#
+#                self.log('waiting for {delay} secs before reviving mds.{id}'.format(
+#                    delay=delay, id=self.to_kill))
+#                time.sleep(delay)
+#
+#                self.log('revive mds.{id}'.format(id=self.to_kill))
+#                self.revive_mds(self.to_kill)
 
 
 @contextlib.contextmanager
@@ -336,9 +367,6 @@ def task(ctx, config):
     log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
     random.seed(seed)
 
-    max_thrashers = config.get('max_thrash', 1)
-    thrashers = {}
-
     (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
     manager = ceph_manager.CephManager(
         first, ctx=ctx, logger=log.getChild('ceph_manager'),
@@ -346,71 +374,41 @@ def task(ctx, config):
 
     # make sure everyone is in active, standby, or standby-replay
     log.info('Wait for all MDSs to reach steady state...')
-    statuses = None
-    statuses_by_rank = None
+    status = mds_cluster.status()
     while True:
-        statuses = {m: mds_cluster.get_mds_info(m) for m in mdslist}
-        statuses_by_rank = {}
-        for _, s in statuses.iteritems():
-            if isinstance(s, dict):
-                statuses_by_rank[s['rank']] = s
-
-        ready = filter(lambda (_, s): s is not None and (s['state'] == 'up:active'
-                                                         or s['state'] == 'up:standby'
-                                                         or s['state'] == 'up:standby-replay'),
-                       statuses.items())
-        if len(ready) == len(statuses):
+        steady = True
+        for info in status.get_all():
+            state = info['state']
+            if state not in ('up:active', 'up:standby', 'up:standby-replay'):
+                steady = False
+                break
+        if steady:
             break
         time.sleep(2)
+        status = mds_cluster.status()
     log.info('Ready to start thrashing')
 
-    # setup failure groups
-    failure_groups = {}
-    actives = {s['name']: s for (_, s) in statuses.iteritems() if s['state'] == 'up:active'}
-    log.info('Actives is: {d}'.format(d=actives))
-    log.info('Statuses is: {d}'.format(d=statuses_by_rank))
-    for active in actives:
-        for (r, s) in statuses.iteritems():
-            if s['standby_for_name'] == active:
-                if not active in failure_groups:
-                    failure_groups[active] = []
-                log.info('Assigning mds rank {r} to failure group {g}'.format(r=r, g=active))
-                failure_groups[active].append(r)
-
     manager.wait_for_clean()
-    for (active, standbys) in failure_groups.iteritems():
-        weight = 1.0
-        if 'thrash_weights' in config:
-            weight = int(config['thrash_weights'].get('mds.{_id}'.format(_id=active), '0.0'))
-
-        failure_group = [active]
-        failure_group.extend(standbys)
-
+    thrashers = {}
+    for fs in status.get_filesystems():
+        name = fs['mdsmap']['fs_name']
+        log.info('Running thrasher against FS {f}'.format(f = name))
         thrasher = MDSThrasher(
-            ctx, manager, mds_cluster, config,
-            logger=log.getChild('mds_thrasher.failure_group.[{a}, {sbs}]'.format(
-                a=active,
-                sbs=', '.join(standbys)
+            ctx, manager, config,
+            log.getChild('fs.[{f}]'.format(f = name)),
+            Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']
             )
-            ),
-            failure_group=failure_group,
-            weight=weight)
         thrasher.start()
-        thrashers[active] = thrasher
-
-        # if thrash_weights isn't specified and we've reached max_thrash,
-        # we're done
-        if 'thrash_weights' not in config and len(thrashers) == max_thrashers:
-            break
+        thrashers[name] = thrasher
 
     try:
         log.debug('Yielding')
         yield
     finally:
         log.info('joining mds_thrashers')
-        for t in thrashers:
-            log.info('join thrasher for failure group [{fg}]'.format(fg=', '.join(failure_group)))
-            thrashers[t].stop()
-            thrashers[t].get()  # Raise any exception from _run()
-            thrashers[t].join()
+        for name in thrashers:
+            log.info('join thrasher mds_thrasher.fs.[{f}]'.format(f=name))
+            thrashers[name].stop()
+            thrashers[name].get()  # Raise any exception from _run()
+            thrashers[name].join()
         log.info('done joining')