]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tasks: generalise cephfs classes for multi-fs
authorJohn Spray <john.spray@redhat.com>
Sun, 3 Jan 2016 18:35:17 +0000 (18:35 +0000)
committerJohn Spray <john.spray@redhat.com>
Fri, 11 Mar 2016 10:36:43 +0000 (10:36 +0000)
Signed-off-by: John Spray <john.spray@redhat.com>
tasks/cephfs/cephfs_test_case.py
tasks/cephfs/filesystem.py
tasks/cephfs/test_failover.py
tasks/cephfs/vstart_runner.py
tasks/cephfs_test_runner.py

index ed63921a3ded00039280ee6af5dcfce9863a499f..93aef36c278d6f17c6a94881f957752f8282ce6c 100644 (file)
@@ -8,6 +8,7 @@ import re
 from StringIO import StringIO
 
 from tasks.cephfs.fuse_mount import FuseMount
+
 from teuthology.orchestra import run
 from teuthology.orchestra.run import CommandFailedError
 
@@ -42,6 +43,7 @@ class CephFSTestCase(unittest.TestCase):
     # Environment references
     mounts = None
     fs = None
+    mds_cluster = None
     ctx = None
 
     # FIXME weird explicit naming
@@ -56,6 +58,9 @@ class CephFSTestCase(unittest.TestCase):
     REQUIRE_ONE_CLIENT_REMOTE = False
     REQUIRE_MEMSTORE = False
 
+    # Whether to create the default filesystem during setUp
+    REQUIRE_FILESYSTEM = True
+
     LOAD_SETTINGS = []
 
     def setUp(self):
@@ -111,12 +116,8 @@ class CephFSTestCase(unittest.TestCase):
 
         # To avoid any issues with e.g. unlink bugs, we destroy and recreate
         # the filesystem rather than just doing a rm -rf of files
-        self.fs.mds_stop()
-        if self.fs.exists():
-            self.fs.mon_manager.raw_cluster_cmd('mds', 'cluster_down')
-            self.fs.mds_fail()
-        self.fs.delete_all()
-        self.fs.create()
+        self.mds_cluster.mds_stop()
+        self.mds_cluster.delete_all_filesystems()
 
         # In case the previous filesystem had filled up the RADOS cluster, wait for that
         # flag to pass.
@@ -153,16 +154,18 @@ class CephFSTestCase(unittest.TestCase):
             if ent_type == "client" and ent_id not in client_mount_ids and ent_id != "admin":
                 self.fs.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
 
-        self.fs.mds_restart()
-        self.fs.wait_for_daemons()
-        if not self.mount_a.is_mounted():
-            self.mount_a.mount()
-            self.mount_a.wait_until_mounted()
+        if self.REQUIRE_FILESYSTEM:
+            self.fs.create()
+            self.fs.mds_restart()
+            self.fs.wait_for_daemons()
+            if not self.mount_a.is_mounted():
+                self.mount_a.mount()
+                self.mount_a.wait_until_mounted()
 
-        if self.mount_b:
-            if not self.mount_b.is_mounted():
-                self.mount_b.mount()
-                self.mount_b.wait_until_mounted()
+            if self.mount_b:
+                if not self.mount_b.is_mounted():
+                    self.mount_b.mount()
+                    self.mount_b.wait_until_mounted()
 
         # Load an config settings of interest
         for setting in self.LOAD_SETTINGS:
@@ -181,11 +184,11 @@ class CephFSTestCase(unittest.TestCase):
             m.client_id = self._original_client_ids[i]
 
         for subsys, key in self.configs_set:
-            self.fs.clear_ceph_conf(subsys, key)
+            self.mds_cluster.clear_ceph_conf(subsys, key)
 
     def set_conf(self, subsys, key, value):
         self.configs_set.add((subsys, key))
-        self.fs.set_ceph_conf(subsys, key, value)
+        self.mds_cluster.set_ceph_conf(subsys, key, value)
 
     def auth_list(self):
         """
@@ -263,6 +266,33 @@ class CephFSTestCase(unittest.TestCase):
 
         log.debug("wait_until_true: success")
 
+    def wait_for_daemon_start(self, daemon_ids=None):
+        """
+        Wait until all the daemons appear in the FSMap, either assigned
+        MDS ranks or in the list of standbys
+        """
+        def get_daemon_names():
+            fs_map = self.mds_cluster.get_fs_map()
+            names = [m['name'] for m in fs_map['standbys']]
+            for fs in fs_map['filesystems']:
+                names.extend([info['name'] for info in fs['mdsmap']['info'].values()])
+
+            return names
+
+        if daemon_ids is None:
+            daemon_ids = self.mds_cluster.mds_ids
+
+        try:
+            self.wait_until_true(
+                lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids),
+                timeout=30
+            )
+        except RuntimeError:
+            log.warn("Timeout waiting for daemons {0}, while we have {1}".format(
+                daemon_ids, get_daemon_names()
+            ))
+            raise
+
     def assert_mds_crash(self, daemon_id):
         """
         Assert that the a particular MDS daemon crashes (block until
index 85d480bf0e06775174004e62fa6b63cebaad5971..d0dea405b95487504b19881d0cbe948cd14e6444 100644 (file)
@@ -31,31 +31,221 @@ class ObjectNotFound(Exception):
         return "Object not found: '{0}'".format(self._object_name)
 
 
-class Filesystem(object):
+class MDSCluster(object):
     """
-    This object is for driving a CephFS filesystem.
+    Collective operations on all the MDS daemons in the Ceph cluster.  These
+    daemons may be in use by various Filesystems.
 
-    Limitations:
-     * Assume a single filesystem+cluster
-     * Assume a single MDS
+    For the benefit of pre-multi-filesystem tests, this class is also
+    a parent of Filesystem.  The correct way to use MDSCluster going forward is
+    as a separate instance outside of your (multiple) Filesystem instances.
     """
-    def __init__(self, ctx, admin_remote=None):
-        self._ctx = ctx
 
+    @property
+    def admin_remote(self):
+        first_mon = misc.get_first_mon(self._ctx, None)
+        (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
+        return result
+
+    def __init__(self, ctx):
         self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
+        self._ctx = ctx
+
         if len(self.mds_ids) == 0:
             raise RuntimeError("This task requires at least one MDS")
 
-        first_mon = misc.get_first_mon(ctx, None)
-        if admin_remote is None:
-            (self.admin_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
-        else:
-            self.admin_remote = admin_remote
         self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
         if hasattr(self._ctx, "daemons"):
             # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
             self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
 
+    def _one_or_all(self, mds_id, cb, in_parallel=True):
+        """
+        Call a callback for a single named MDS, or for all.
+
+        Note that the parallelism here isn't for performance, it's to avoid being overly kind
+        to the cluster by waiting a graceful ssh-latency of time between doing things, and to
+        avoid being overly kind by executing them in a particular order.  However, some actions
+        don't cope with being done in parallel, so it's optional (`in_parallel`)
+
+        :param mds_id: MDS daemon name, or None
+        :param cb: Callback taking single argument of MDS daemon name
+        :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
+        """
+        if mds_id is None:
+            if in_parallel:
+                with parallel() as p:
+                    for mds_id in self.mds_ids:
+                        p.spawn(cb, mds_id)
+            else:
+                for mds_id in self.mds_ids:
+                    cb(mds_id)
+        else:
+            cb(mds_id)
+
+    def mds_stop(self, mds_id=None):
+        """
+        Stop the MDS daemon process(se).  If it held a rank, that rank
+        will eventually go laggy.
+        """
+        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
+
+    def mds_fail(self, mds_id=None):
+        """
+        Inform MDSMonitor of the death of the daemon process(es).  If it held
+        a rank, that rank will be relinquished.
+        """
+        self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
+
+    def mds_restart(self, mds_id=None):
+        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
+
+    def mds_fail_restart(self, mds_id=None):
+        """
+        Variation on restart that includes marking MDSs as failed, so that doing this
+        operation followed by waiting for healthy daemon states guarantees that they
+        have gone down and come up, rather than potentially seeing the healthy states
+        that existed before the restart.
+        """
+        def _fail_restart(id_):
+            self.mds_daemons[id_].stop()
+            self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
+            self.mds_daemons[id_].restart()
+
+        self._one_or_all(mds_id, _fail_restart)
+
+    def get_filesystem(self, name):
+        return Filesystem(self._ctx, name)
+
+    def get_fs_map(self):
+        fs_map = json.loads(self.mon_manager.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
+        return fs_map
+
+    def delete_all_filesystems(self):
+        """
+        Remove all filesystems that exist, and any pools in use by them.
+        """
+        fs_ls = json.loads(self.mon_manager.raw_cluster_cmd("fs", "ls", "--format=json-pretty"))
+        for fs in fs_ls:
+            self.mon_manager.raw_cluster_cmd("fs", "set", fs['name'], "cluster_down", "true")
+            mds_map = json.loads(
+                self.mon_manager.raw_cluster_cmd(
+                    "fs", "get", fs['name'], "--format=json-pretty"))['mdsmap']
+
+            for gid in mds_map['up'].values():
+                self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
+
+            self.mon_manager.raw_cluster_cmd('fs', 'rm', fs['name'], '--yes-i-really-mean-it')
+            self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
+                                             fs['metadata_pool'],
+                                             fs['metadata_pool'],
+                                             '--yes-i-really-really-mean-it')
+            for data_pool in fs['data_pools']:
+                self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
+                                                 data_pool, data_pool,
+                                                 '--yes-i-really-really-mean-it')
+
+    def get_standby_daemons(self):
+        return set([s['name'] for s in self.get_fs_map()['standbys']])
+
+    def get_mds_hostnames(self):
+        result = set()
+        for mds_id in self.mds_ids:
+            mds_remote = self.mon_manager.find_remote('mds', mds_id)
+            result.add(mds_remote.hostname)
+
+        return list(result)
+
+    def get_config(self, key, service_type=None):
+        """
+        Get config from mon by default, or a specific service if caller asks for it
+        """
+        if service_type is None:
+            service_type = 'mon'
+
+        service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
+        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+    def set_ceph_conf(self, subsys, key, value):
+        if subsys not in self._ctx.ceph.conf:
+            self._ctx.ceph.conf[subsys] = {}
+        self._ctx.ceph.conf[subsys][key] = value
+        write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
+                               # used a different config path this won't work.
+
+    def clear_ceph_conf(self, subsys, key):
+        del self._ctx.ceph.conf[subsys][key]
+        write_conf(self._ctx)
+
+    def json_asok(self, command, service_type, service_id):
+        proc = self.mon_manager.admin_socket(service_type, service_id, command)
+        response_data = proc.stdout.getvalue()
+        log.info("_json_asok output: {0}".format(response_data))
+        if response_data.strip():
+            return json.loads(response_data)
+        else:
+            return None
+
+    def set_clients_block(self, blocked, mds_id=None):
+        """
+        Block (using iptables) client communications to this MDS.  Be careful: if
+        other services are running on this MDS, or other MDSs try to talk to this
+        MDS, their communications may also be blocked as collatoral damage.
+
+        :param mds_id: Optional ID of MDS to block, default to all
+        :return:
+        """
+        da_flag = "-A" if blocked else "-D"
+
+        def set_block(_mds_id):
+            remote = self.mon_manager.find_remote('mds', _mds_id)
+
+            addr = self.get_mds_addr(_mds_id)
+            ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
+
+            remote.run(
+                args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+                      "comment", "--comment", "teuthology"])
+            remote.run(
+                args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+                      "comment", "--comment", "teuthology"])
+
+        self._one_or_all(mds_id, set_block, in_parallel=False)
+
+    def clear_firewall(self):
+        clear_firewall(self._ctx)
+
+    def get_mds_addr(self, mds_id):
+        """
+        Return the instance addr as a string, like "10.214.133.138:6807\/10825"
+        """
+        fs_map = self.get_fs_map()
+        infos = fs_map.standbys
+        for fs in fs_map['filesystems']:
+            infos += fs['mdsmap']['info'].values()
+        for mds_info in infos:
+            if mds_info['name'] == mds_id:
+                return mds_info['addr']
+
+        log.warn(json.dumps(infos, indent=2))  # dump for debugging
+        raise RuntimeError("MDS id '{0}' not found in map".format(mds_id))
+
+
+class Filesystem(MDSCluster):
+    """
+    This object is for driving a CephFS filesystem.  The MDS daemons driven by
+    MDSCluster may be shared with other Filesystems.
+    """
+    def __init__(self, ctx, name=None):
+        super(Filesystem, self).__init__(ctx)
+
+        if name is None:
+            name = "cephfs"
+
+        self.name = name
+        self.metadata_pool_name = "{0}_metadata".format(name)
+        self.data_pool_name = "{0}_data".format(name)
+
         client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
         self.client_id = client_list[0]
         self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
@@ -72,33 +262,23 @@ class Filesystem(object):
         return pg_warn_min_per_osd * osd_count
 
     def create(self):
+        log.info("Creating filesystem '{0}'".format(self.name))
+
         pgs_per_fs_pool = self.get_pgs_per_fs_pool()
 
-        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', 'metadata', pgs_per_fs_pool.__str__())
-        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', 'data', pgs_per_fs_pool.__str__())
-        self.mon_manager.raw_cluster_cmd('fs', 'new', 'default', 'metadata', 'data')
+        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+                                         self.metadata_pool_name, pgs_per_fs_pool.__str__())
+        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+                                         self.data_pool_name, pgs_per_fs_pool.__str__())
+        self.mon_manager.raw_cluster_cmd('fs', 'new',
+                                         self.name, self.metadata_pool_name, self.data_pool_name)
 
     def exists(self):
         """
-        Whether a filesystem is enabled at all
+        Whether a filesystem exists in the mon's filesystem list
         """
         fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
-        return len(fs_list) > 0
-
-    def delete_all(self):
-        """
-        Remove all filesystems that exist, and any pools in use by them.
-        """
-        fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
-        for fs_data in fs_list:
-            self.mon_manager.raw_cluster_cmd('fs', 'rm', fs_data['name'], '--yes-i-really-mean-it')
-            self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                             fs_data['metadata_pool'], fs_data['metadata_pool'],
-                                             '--yes-i-really-really-mean-it')
-            for data_pool in fs_data['data_pools']:
-                self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                                 data_pool, data_pool,
-                                                 '--yes-i-really-really-mean-it')
+        return self.name in [fs['name'] for fs in fs_list]
 
     def legacy_configured(self):
         """
@@ -122,27 +302,22 @@ class Filesystem(object):
     def _df(self):
         return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
 
-    def _fs_ls(self):
-        fs_list = json.loads(self.mon_manager.raw_cluster_cmd("fs", "ls", "--format=json-pretty"))
-        assert len(fs_list) == 1  # we don't handle multiple filesystems yet
-        return fs_list[0]
+    def get_mds_map(self):
+        fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
+        return fs['mdsmap']
 
     def get_data_pool_name(self):
-        """
-        Return the name of the data pool if there is only one, else raise exception -- call
-        this in tests where there will only be one data pool.
-        """
-        names = self.get_data_pool_names()
-        if len(names) > 1:
-            raise RuntimeError("Multiple data pools found")
-        else:
-            return names[0]
+        return self.data_pool_name
 
     def get_data_pool_names(self):
-        return self._fs_ls()['data_pools']
+        return self.get_mds_map()['data_pools']
 
     def get_metadata_pool_name(self):
-        return self._fs_ls()['metadata_pool']
+        return self.metadata_pool_name
+
+    def get_namespace_id(self):
+        fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
+        return fs['id']
 
     def get_pool_df(self, pool_name):
         """
@@ -158,35 +333,6 @@ class Filesystem(object):
     def get_usage(self):
         return self._df()['stats']['total_used_bytes']
 
-    def get_mds_hostnames(self):
-        result = set()
-        for mds_id in self.mds_ids:
-            mds_remote = self.mon_manager.find_remote('mds', mds_id)
-            result.add(mds_remote.hostname)
-
-        return list(result)
-
-    def get_config(self, key, service_type=None):
-        """
-        Get config from mon by default, or a specific service if caller asks for it
-        """
-        if service_type is None:
-            service_type = 'mon'
-
-        service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
-        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
-    def set_ceph_conf(self, subsys, key, value):
-        if subsys not in self._ctx.ceph.conf:
-            self._ctx.ceph.conf[subsys] = {}
-        self._ctx.ceph.conf[subsys][key] = value
-        write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
-                         # used a different config path this won't work.
-
-    def clear_ceph_conf(self, subsys, key):
-        del self._ctx.ceph.conf[subsys][key]
-        write_conf(self._ctx)
-
     def are_daemons_healthy(self):
         """
         Return true if all daemons are in one of active, standby, standby-replay, and
@@ -196,7 +342,7 @@ class Filesystem(object):
         """
 
         active_count = 0
-        status = self.mon_manager.get_mds_status_all()
+        status = self.get_mds_map()
         for mds_id, mds_status in status['info'].items():
             if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
                 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
@@ -228,7 +374,7 @@ class Filesystem(object):
         :param state:
         :return:
         """
-        status = self.mon_manager.get_mds_status_all()
+        status = self.get_mds_map()
         result = []
         for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
             if mds_status['state'] == state:
@@ -252,7 +398,7 @@ class Filesystem(object):
         as well as active, but does not include standby or
         standby-replay.
         """
-        status = self.mon_manager.get_mds_status_all()
+        status = self.get_mds_map()
         result = []
         for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
             if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
@@ -295,61 +441,6 @@ class Filesystem(object):
         else:
             return self.mds_ids[0]
 
-    def _one_or_all(self, mds_id, cb, in_parallel=True):
-        """
-        Call a callback for a single named MDS, or for all.
-
-        Note that the parallelism here isn't for performance, it's to avoid being overly kind
-        to the cluster by waiting a graceful ssh-latency of time between doing things, and to
-        avoid being overly kind by executing them in a particular order.  However, some actions
-        don't cope with being done in parallel, so it's optional (`in_parallel`)
-
-        :param mds_id: MDS daemon name, or None
-        :param cb: Callback taking single argument of MDS daemon name
-        :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
-        """
-        if mds_id is None:
-            if in_parallel:
-                with parallel() as p:
-                    for mds_id in self.mds_ids:
-                        p.spawn(cb, mds_id)
-            else:
-                for mds_id in self.mds_ids:
-                    cb(mds_id)
-        else:
-            cb(mds_id)
-
-    def mds_stop(self, mds_id=None):
-        """
-        Stop the MDS daemon process(se).  If it held a rank, that rank
-        will eventually go laggy.
-        """
-        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
-
-    def mds_fail(self, mds_id=None):
-        """
-        Inform MDSMonitor of the death of the daemon process(es).  If it held
-        a rank, that rank will be relinquished.
-        """
-        self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
-
-    def mds_restart(self, mds_id=None):
-        self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
-
-    def mds_fail_restart(self, mds_id=None):
-        """
-        Variation on restart that includes marking MDSs as failed, so that doing this
-        operation followed by waiting for healthy daemon states guarantees that they
-        have gone down and come up, rather than potentially seeing the healthy states
-        that existed before the restart.
-        """
-        def _fail_restart(id_):
-            self.mds_daemons[id_].stop()
-            self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
-            self.mds_daemons[id_].restart()
-
-        self._one_or_all(mds_id, _fail_restart)
-
     def reset(self):
         log.info("Creating new filesystem")
 
@@ -368,9 +459,8 @@ class Filesystem(object):
         """
         temp_bin_path = '/tmp/out.bin'
 
-        # FIXME get the metadata pool name from mdsmap instead of hardcoding
         self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', 'metadata', 'get', object_id, temp_bin_path
+            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
         ])
 
         stdout = StringIO()
@@ -403,69 +493,12 @@ class Filesystem(object):
 
         return version
 
-    def json_asok(self, command, service_type, service_id):
-        proc = self.mon_manager.admin_socket(service_type, service_id, command)
-        response_data = proc.stdout.getvalue()
-        log.info("_json_asok output: {0}".format(response_data))
-        if response_data.strip():
-            return json.loads(response_data)
-        else:
-            return None
-
     def mds_asok(self, command, mds_id=None):
         if mds_id is None:
             mds_id = self.get_lone_mds_id()
 
         return self.json_asok(command, 'mds', mds_id)
 
-    def get_mds_map(self):
-        """
-        Return the MDS map, as a JSON-esque dict from 'mds dump'
-        """
-        return json.loads(self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json-pretty'))
-
-    def get_mds_addr(self, mds_id):
-        """
-        Return the instance addr as a string, like "10.214.133.138:6807\/10825"
-        """
-        mds_map = self.get_mds_map()
-        for gid_string, mds_info in mds_map['info'].items():
-            # For some reason
-            if mds_info['name'] == mds_id:
-                return mds_info['addr']
-
-        log.warn(json.dumps(mds_map, indent=2))  # dump map for debugging
-        raise RuntimeError("MDS id '{0}' not found in MDS map".format(mds_id))
-
-    def set_clients_block(self, blocked, mds_id=None):
-        """
-        Block (using iptables) client communications to this MDS.  Be careful: if
-        other services are running on this MDS, or other MDSs try to talk to this
-        MDS, their communications may also be blocked as collatoral damage.
-
-        :param mds_id: Optional ID of MDS to block, default to all
-        :return:
-        """
-        da_flag = "-A" if blocked else "-D"
-
-        def set_block(_mds_id):
-            remote = self.mon_manager.find_remote('mds', _mds_id)
-
-            addr = self.get_mds_addr(_mds_id)
-            ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
-
-            remote.run(
-                args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
-                      "comment", "--comment", "teuthology"])
-            remote.run(
-                args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
-                      "comment", "--comment", "teuthology"])
-
-        self._one_or_all(mds_id, set_block, in_parallel=False)
-
-    def clear_firewall(self):
-        clear_firewall(self._ctx)
-
     def is_full(self):
         flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
         return 'full' in flags
@@ -501,7 +534,7 @@ class Filesystem(object):
                 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
             else:
                 # In general, look for a single MDS
-                mds_status = self.mon_manager.get_mds_status_all()
+                mds_status = self.get_mds_map()
                 states = [m['state'] for m in mds_status['info'].values()]
                 if [s for s in states if s == goal_state] == [goal_state]:
                     current_state = goal_state
@@ -518,7 +551,7 @@ class Filesystem(object):
             elif reject is not None and current_state == reject:
                 raise RuntimeError("MDS in reject state {0}".format(current_state))
             elif timeout is not None and elapsed > timeout:
-                log.error("MDS status at timeout: {0}".format(self.mon_manager.get_mds_status_all()))
+                log.error("MDS status at timeout: {0}".format(self.get_mds_map()))
                 raise RuntimeError(
                     "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
                         elapsed, goal_state, current_state
index d3871d087bb2b29f66c45728f9ea9fa9e23f2b37..c9e8a33e4f87cf36e100002b50bb4cc341f7545c 100644 (file)
@@ -223,15 +223,15 @@ class TestMultiFilesystems(CephFSTestCase):
         log.info("Using MDS daemons: {0}".format(use_daemons))
 
         def set_standby_for(leader, follower, replay):
-            self.set_conf("mds.{0}".format(follower), "standby_for_name", leader)
+            self.set_conf("mds.{0}".format(follower), "mds_standby_for_name", leader)
             if replay:
-                self.set_conf("mds.{0}".format(follower), "standby_replay", "true")
+                self.set_conf("mds.{0}".format(follower), "mds_standby_replay", "true")
 
         # Configure two pairs of MDSs that are standby for each other
         set_standby_for(mds_a, mds_b, True)
-        set_standby_for(mds_b, mds_a, True)
+        set_standby_for(mds_b, mds_a, False)
         set_standby_for(mds_c, mds_d, True)
-        set_standby_for(mds_d, mds_c, True)
+        set_standby_for(mds_d, mds_c, False)
 
         # Create FS alpha and get mds_a to come up as active
         fs_a = self.mds_cluster.get_filesystem("alpha")
index 34a567d215b07eb6a3c12e51ad3cd4672ed5fd08..fc60daab64e7ff1775be88299c1305787a3e709f 100644 (file)
@@ -54,7 +54,7 @@ try:
     from teuthology.exceptions import CommandFailedError
     from tasks.ceph_manager import CephManager
     from tasks.cephfs.fuse_mount import FuseMount
-    from tasks.cephfs.filesystem import Filesystem
+    from tasks.cephfs.filesystem import Filesystem, MDSCluster
     from teuthology.contextutil import MaxWhileTries
     from teuthology.task import interactive
 except ImportError:
@@ -508,36 +508,20 @@ class LocalCephManager(CephManager):
         return j
 
 
-class LocalFilesystem(Filesystem):
+class LocalMDSCluster(MDSCluster):
     def __init__(self, ctx):
         # Deliberately skip calling parent constructor
         self._ctx = ctx
 
-        self.admin_remote = LocalRemote()
-
         self.mds_ids = ctx.daemons.daemons['mds'].keys()
         if not self.mds_ids:
             raise RuntimeError("No MDSs found in ceph.conf!")
 
         self.mon_manager = LocalCephManager()
-
-        self.mds_daemons = ctx.daemons.daemons["mds"]
-
-        self.client_remote = LocalRemote()
+        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
 
         self._conf = defaultdict(dict)
 
-    @property
-    def _prefix(self):
-        return BIN_PREFIX
-
-    def set_clients_block(self, blocked, mds_id=None):
-        raise NotImplementedError()
-
-    def get_pgs_per_fs_pool(self):
-        # FIXME: assuming there are 3 OSDs
-        return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
-
     def get_config(self, key, service_type=None):
         if service_type is None:
             service_type = 'mon'
@@ -601,6 +585,59 @@ class LocalFilesystem(Filesystem):
         # FIXME: unimplemented
         pass
 
+    def get_filesystem(self, name):
+        return LocalFilesystem(self._ctx, name)
+
+
+class LocalFilesystem(Filesystem, LocalMDSCluster):
+    @property
+    def admin_remote(self):
+        return LocalRemote()
+
+    def __init__(self, ctx, name=None):
+        # Deliberately skip calling parent constructor
+        self._ctx = ctx
+
+        if name is None:
+            name = "cephfs"
+
+        self.name = name
+        self.metadata_pool_name = "{0}_metadata".format(name)
+        self.data_pool_name = "{0}_data".format(name)
+
+        # Hack: cheeky inspection of ceph.conf to see what MDSs exist
+        self.mds_ids = set()
+        for line in open("ceph.conf").readlines():
+            match = re.match("^\[mds\.(.+)\]$", line)
+            if match:
+                self.mds_ids.add(match.group(1))
+
+        if not self.mds_ids:
+            raise RuntimeError("No MDSs found in ceph.conf!")
+
+        self.mds_ids = list(self.mds_ids)
+
+        log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
+
+        self.mon_manager = LocalCephManager()
+
+        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+        self.client_remote = LocalRemote()
+
+        self._conf = defaultdict(dict)
+
+    @property
+    def _prefix(self):
+        return BIN_PREFIX
+
+    def set_clients_block(self, blocked, mds_id=None):
+        raise NotImplementedError()
+
+    def get_pgs_per_fs_pool(self):
+        # FIXME: assuming there are 3 OSDs
+        return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
+
 
 class InteractiveFailureResult(unittest.TextTestResult):
     """
@@ -711,6 +748,7 @@ def exec_test():
             if os.path.exists(mount.mountpoint):
                 os.rmdir(mount.mountpoint)
     filesystem = LocalFilesystem(ctx)
+    mds_cluster = LocalMDSCluster(ctx)
 
     from tasks.cephfs_test_runner import DecoratingLoader
 
@@ -734,7 +772,8 @@ def exec_test():
     decorating_loader = DecoratingLoader({
         "ctx": ctx,
         "mounts": mounts,
-        "fs": filesystem
+        "fs": filesystem,
+        "mds_cluster": mds_cluster
     })
 
     # For the benefit of polling tests like test_full -- in teuthology land we set this
index 912f355a6b16d9ba4d7a73710528406a82799979..61d2acfb7e1b1e81217adfeaf6509ac5c9128c81 100644 (file)
@@ -4,8 +4,7 @@ import os
 import unittest
 from unittest import suite, loader, case
 from teuthology.task import interactive
-from tasks.cephfs.filesystem import Filesystem
-
+from tasks.cephfs.filesystem import Filesystem, MDSCluster
 
 log = logging.getLogger(__name__)
 
@@ -116,6 +115,7 @@ def task(ctx, config):
 
     """
     fs = Filesystem(ctx)
+    mds_cluster = MDSCluster(ctx)
 
     # Mount objects, sorted by ID
     mounts = [v for k, v in sorted(ctx.mounts.items(), lambda a, b: cmp(a[0], b[0]))]