]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tasks/mds_client_recovery: network freeze test
authorJohn Spray <jspray@redhat.com>
Wed, 20 Aug 2014 11:36:02 +0000 (12:36 +0100)
committerJohn Spray <jspray@redhat.com>
Thu, 21 Aug 2014 22:09:00 +0000 (23:09 +0100)
This is about testing the CephFS client's handling
of losing connectivity to the MDS.

Fixes: #7810
Signed-off-by: John Spray <john.spray@redhat.com>
tasks/ceph_manager.py
tasks/cephfs/filesystem.py
tasks/cephfs/fuse_mount.py
tasks/mds_client_recovery.py

index 10e3bde4a6db549fb5a882d63cc9b43fe778e32d..65d2b816dcd5cf8dceadce1df3c45cef6a3b750c 100644 (file)
@@ -567,17 +567,27 @@ class CephManager:
     def osd_admin_socket(self, osd_id, command, check_status=True):
         return self.admin_socket('osd', osd_id, command, check_status)
 
-    def admin_socket(self, service_type, service_id, command, check_status=True):
+    def find_remote(self, service_type, service_id):
         """
-        Remotely start up ceph specifying the admin socket
+        Get the Remote for the host where a particular service runs.
+
+        :param service_type: 'mds', 'osd', 'client'
+        :param service_id: The second part of a role, e.g. '0' for the role 'client.0'
+        :return: a Remote instance for the host where the requested role is placed
         """
-        testdir = teuthology.get_testdir(self.ctx)
-        remote = None
         for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
             for id_ in teuthology.roles_of_type(roles_for_host, service_type):
                 if id_ == str(service_id):
-                    remote = _remote
-        assert remote is not None
+                    return _remote
+
+        raise KeyError("Service {0}.{1} not found".format(service_type, service_id))
+
+    def admin_socket(self, service_type, service_id, command, check_status=True):
+        """
+        Remotely start up ceph specifying the admin socket
+        """
+        testdir = teuthology.get_testdir(self.ctx)
+        remote = self.find_remote(service_type, service_id)
         args = [
             'sudo',
             'adjust-ulimits',
index e78680146460f5cd3b5cb2f9316f252c2ca89756..3cd6206dbc63301c53f82c4624869f5a70b2fd3b 100644 (file)
@@ -5,6 +5,7 @@ import logging
 import time
 
 from teuthology import misc
+from teuthology.nuke import clear_firewall
 from teuthology.parallel import parallel
 from tasks import ceph_manager
 
@@ -40,6 +41,14 @@ class Filesystem(object):
         self.client_id = client_list[0]
         self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
 
+    def get_mds_hostnames(self):
+        result = set()
+        for mds_id in self.mds_ids:
+            mds_remote = self.mon_manager.find_remote('mds', mds_id)
+            result.add(mds_remote.hostname)
+
+        return list(result)
+
     def are_daemons_healthy(self):
         """
         Return true if all daemons are in one of active, standby, standby-replay
@@ -186,6 +195,27 @@ class Filesystem(object):
         else:
             return None
 
+    def set_clients_block(self, blocked, mds_id=None):
+        """
+        Block (using iptables) client communications to this MDS.  Be careful: if
+        other services are running on this MDS, or other MDSs try to talk to this
+        MDS, their communications may also be blocked as collatoral damage.
+
+        :param mds_id: Optional ID of MDS to block, default to all
+        :return:
+        """
+        da_flag = "-A" if blocked else "-D"
+
+        def set_block(_mds_id):
+            remote = self.mon_manager.find_remote('mds', _mds_id)
+            remote.run(args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", "6800:6900", "-j", "REJECT", "-m", "comment", "--comment", "teuthology"])
+            remote.run(args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", "6800:6900", "-j", "REJECT", "-m", "comment", "--comment", "teuthology"])
+
+        self._one_or_all(mds_id, set_block)
+
+    def clear_firewall(self):
+        clear_firewall(self._ctx)
+
     def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None):
         """
         Block until the MDS reaches a particular state, or a failure condition
index 9adba1c47295cf6f2f0281e10e679467ece55359..51ceccc496cfdead3a8a486aa9783291a461ec7e 100644 (file)
@@ -74,16 +74,21 @@ class FuseMount(CephFSMount):
         self.fuse_daemon = proc
 
     def is_mounted(self):
-        proc = self.client_remote.run(
-            args=[
-                'stat',
-                '--file-system',
-                '--printf=%T\n',
-                '--',
-                self.mountpoint,
-            ],
-            stdout=StringIO(),
-        )
+        try:
+            proc = self.client_remote.run(
+                args=[
+                    'stat',
+                    '--file-system',
+                    '--printf=%T\n',
+                    '--',
+                    self.mountpoint,
+                ],
+                stdout=StringIO(),
+            )
+        except CommandFailedError:
+            # This happens if the mount directory doesn't exist
+            return False
+
         fstype = proc.stdout.getvalue().rstrip('\n')
         if fstype == 'fuseblk':
             log.info('ceph-fuse is mounted on %s', self.mountpoint)
@@ -110,6 +115,9 @@ class FuseMount(CephFSMount):
         self.client_remote.run(
             args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], )
 
+    def _mountpoint_exists(self):
+        return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
+
     def umount(self):
         try:
             self.client_remote.run(
@@ -121,8 +129,6 @@ class FuseMount(CephFSMount):
                 ],
             )
         except run.CommandFailedError:
-            # FIXME: this will clobber all FUSE mounts, not just this one
-
             log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
             # abort the fuse mount, killing all hung processes
             self.client_remote.run(
@@ -137,15 +143,16 @@ class FuseMount(CephFSMount):
                 ],
             )
             # make sure its unmounted
-            self.client_remote.run(
-                args=[
-                    'sudo',
-                    'umount',
-                    '-l',
-                    '-f',
-                    self.mountpoint,
-                ],
-            )
+            if self._mountpoint_exists():
+                self.client_remote.run(
+                    args=[
+                        'sudo',
+                        'umount',
+                        '-l',
+                        '-f',
+                        self.mountpoint,
+                    ],
+                )
 
     def umount_wait(self, force=False):
         """
@@ -197,7 +204,8 @@ class FuseMount(CephFSMount):
         """
         super(FuseMount, self).teardown()
 
-        self.umount()
+        if self.is_mounted():
+            self.umount()
         if not self.fuse_daemon.finished:
             self.fuse_daemon.stdin.close()
             try:
index 1032616977a80d8555d941766eb2a8c0942df202..b44fcd8bc07dc712ba3272afbb670aa3f05f2946 100644 (file)
@@ -30,8 +30,10 @@ class TestClientRecovery(unittest.TestCase):
     mount_b = None
     mds_session_timeout = None
     mds_reconnect_timeout = None
+    ms_max_backoff = None
 
     def setUp(self):
+        self.fs.clear_firewall()
         self.fs.mds_restart()
         self.mount_a.mount()
         self.mount_b.mount()
@@ -39,13 +41,9 @@ class TestClientRecovery(unittest.TestCase):
         self.mount_a.wait_until_mounted()
 
     def tearDown(self):
+        self.fs.clear_firewall()
         self.mount_a.teardown()
         self.mount_b.teardown()
-        # mount_a.umount()
-        # mount_b.umount()
-        # run.wait([mount_a.fuse_daemon, mount_b.fuse_daemon], timeout=600)
-        # mount_a.cleanup()
-        # mount_b.cleanup()
 
     def test_basic(self):
         # Check that two clients come up healthy and see each others' files
@@ -90,6 +88,12 @@ class TestClientRecovery(unittest.TestCase):
             expected, len(ls_data)
         ))
 
+    def assert_session_state(self, client_id,  expected_state):
+        self.assertEqual(
+            self._session_by_id(
+                self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'],
+            expected_state)
+
     def _session_list(self):
         ls_data = self.fs.mds_asok(['session', 'ls'])
         ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
@@ -246,6 +250,52 @@ class TestClientRecovery(unittest.TestCase):
         self.mount_a.mount()
         self.mount_a.wait_until_mounted()
 
+    def test_network_death(self):
+        """
+        Simulate software freeze or temporary network failure.
+
+        Check that the client blocks I/O during failure, and completes
+        I/O after failure.
+        """
+
+        # We only need one client
+        self.mount_b.umount_wait()
+
+        # Initially our one client session should be visible
+        client_id = self.mount_a.get_client_id()
+        ls_data = self._session_list()
+        self.assert_session_count(1, ls_data)
+        self.assertEqual(ls_data[0]['id'], client_id)
+        self.assert_session_state(client_id, "open")
+
+        # ...and capable of doing I/O without blocking
+        self.mount_a.create_files()
+
+        # ...but if we turn off the network
+        self.fs.set_clients_block(True)
+
+        # ...and try and start an I/O
+        write_blocked = self.mount_a.write_background()
+
+        # ...then it should block
+        self.assertFalse(write_blocked.finished)
+        self.assert_session_state(client_id, "open")
+        time.sleep(self.mds_session_timeout * 1.5)  # Long enough for MDS to consider session stale
+        self.assertFalse(write_blocked.finished)
+        self.assert_session_state(client_id, "stale")
+
+        # ...until we re-enable I/O
+        self.fs.set_clients_block(False)
+
+        # ...when it should complete promptly
+        a = time.time()
+        write_blocked.wait()
+        b = time.time()
+        recovery_time = b - a
+        log.info("recovery time: {0}".format(recovery_time))
+        self.assertLess(recovery_time, self.ms_max_backoff * 2)
+        self.assert_session_state(client_id, "open")
+
 
 class LogStream(object):
     def __init__(self):
@@ -301,7 +351,12 @@ def task(ctx, config):
 
     client_b_id = client_list[1]
     client_b_role = "client.{0}".format(client_b_id)
-    client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1]
+    client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_b_id)]))[0][1]
+
+    # Check we have at least one remote client for use with network-dependent tests
+    # =============================================================================
+    if client_a_remote.hostname in fs.get_mds_hostnames():
+        raise RuntimeError("Require first client to on separate server from MDSs")
 
     test_dir = misc.get_testdir(ctx)
 
@@ -319,6 +374,9 @@ def task(ctx, config):
     TestClientRecovery.mds_session_timeout = int(fs.mds_asok(
         ['config', 'get', 'mds_session_timeout']
     )['mds_session_timeout'])
+    TestClientRecovery.ms_max_backoff = int(fs.mds_asok(
+        ['config', 'get', 'ms_max_backoff']
+    )['ms_max_backoff'])
     TestClientRecovery.fs = fs
     TestClientRecovery.mount_a = mount_a
     TestClientRecovery.mount_b = mount_b
@@ -331,7 +389,12 @@ def task(ctx, config):
 
     # Execute test suite
     # ==================
-    suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery)
+    if 'test_name' in config:
+        suite = unittest.TestLoader().loadTestsFromName(
+            "teuthology.task.mds_client_recovery.{0}".format(config['test_name']))
+    else:
+        suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery)
+
     if ctx.config.get("interactive-on-error", False):
         InteractiveFailureResult.ctx = ctx
         result_class = InteractiveFailureResult