]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
task: add mds_client_recovery
authorJohn Spray <jspray@redhat.com>
Wed, 2 Jul 2014 18:25:14 +0000 (19:25 +0100)
committerJohn Spray <jspray@redhat.com>
Tue, 15 Jul 2014 12:30:16 +0000 (13:30 +0100)
This task exercises the CephFS session recovery
behavior.

Signed-off-by: John Spray <john.spray@redhat.com>
teuthology/task/cephfs/filesystem.py
teuthology/task/cephfs/fuse_mount.py
teuthology/task/cephfs/mount.py
teuthology/task/mds_client_recovery.py [new file with mode: 0644]

index 6295ac04d34ef59a278399568fdf6f1b4bc7f234..901d2ecb149a0ec033976cdc95d92c1217dff09f 100644 (file)
@@ -2,6 +2,7 @@
 from StringIO import StringIO
 import json
 import logging
+import time
 
 from teuthology import misc
 from teuthology.task import ceph_manager
@@ -41,9 +42,20 @@ class Filesystem(object):
         self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
 
     def mds_stop(self):
+        """
+        Stop the MDS daemon process.  If it held a rank, that rank
+        will eventually go laggy.
+        """
         mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
         mds.stop()
 
+    def mds_fail(self):
+        """
+        Inform MDSMonitor that the daemon process is dead.  If it held
+        a rank, that rank will be relinquished.
+        """
+        self.mds_manager.raw_cluster_cmd("mds", "fail", "0")
+
     def mds_restart(self):
         mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
         mds.restart()
@@ -96,4 +108,44 @@ class Filesystem(object):
         version = journal_header_dump['journal_header']['stream_format']
         log.info("Read journal version {0}".format(version))
 
-        return version
\ No newline at end of file
+        return version
+
+    def mds_asok(self, command):
+        proc = self.mds_manager.admin_socket('mds', self.mds_id, command)
+        response_data = proc.stdout.getvalue()
+        log.info("mds_asok output: {0}".format(response_data))
+        if response_data.strip():
+            return json.loads(response_data)
+        else:
+            return None
+
+    def wait_for_state(self, goal_state, reject=None, timeout=None):
+        """
+        Block until the MDS reaches a particular state, or a failure condition
+        is met.
+
+        :param goal_state: Return once the MDS is in this state
+        :param reject: Fail if the MDS enters this state before the goal state
+        :param timeout: Fail if this many seconds pass before reaching goal
+        :return: number of seconds waited, rounded down to integer
+        """
+
+        elapsed = 0
+        while True:
+            # mds_info is None if no daemon currently claims this rank
+            mds_info = self.mds_manager.get_mds_status(self.mds_id)
+            current_state = mds_info['state'] if mds_info else None
+
+            if current_state == goal_state:
+                log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
+                return elapsed
+            elif reject is not None and current_state == reject:
+                raise RuntimeError("MDS in reject state {0}".format(current_state))
+            elif timeout is not None and elapsed > timeout:
+                raise RuntimeError(
+                    "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
+                    elapsed, goal_state, current_state
+                ))
+            else:
+                time.sleep(1)
+                elapsed += 1
\ No newline at end of file
index 67078c6b79c1c1218208167992ceb092b330721b..98980573bf293c79f46a44066ef65c75ae90351c 100644 (file)
@@ -1,11 +1,13 @@
 
 from StringIO import StringIO
+import json
 import time
 import os
 import logging
 
 from teuthology import misc
 from ...orchestra import run
+from teuthology.orchestra.run import CommandFailedError
 from teuthology.task.cephfs.mount import CephFSMount
 
 log = logging.getLogger(__name__)
@@ -15,7 +17,7 @@ class FuseMount(CephFSMount):
     def __init__(self, client_config, test_dir, client_id, client_remote):
         super(FuseMount, self).__init__(test_dir, client_id, client_remote)
 
-        self.client_config = client_config
+        self.client_config = client_config if client_config else {}
         self.fuse_daemon = None
 
     def mount(self):
@@ -109,17 +111,18 @@ class FuseMount(CephFSMount):
             args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], )
 
     def umount(self):
-        mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
         try:
             self.client_remote.run(
                 args=[
                     'sudo',
                     'fusermount',
                     '-u',
-                    mnt,
+                    self.mountpoint,
                 ],
             )
         except run.CommandFailedError:
+            # FIXME: this will clobber all FUSE mounts, not just this one
+
             log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
             # abort the fuse mount, killing all hung processes
             self.client_remote.run(
@@ -140,21 +143,111 @@ class FuseMount(CephFSMount):
                     'umount',
                     '-l',
                     '-f',
-                    mnt,
+                    self.mountpoint,
                 ],
             )
 
+    def umount_wait(self, force=False):
+        """
+        :param force: Complete even if the MDS is offline
+        """
+        self.umount()
+        if force:
+            self.fuse_daemon.stdin.close()
+        try:
+            self.fuse_daemon.wait()
+        except CommandFailedError:
+            pass
+        self.cleanup()
+
     def cleanup(self):
         """
         Remove the mount point.
 
         Prerequisite: the client is not mounted.
         """
-        mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
         self.client_remote.run(
             args=[
                 'rmdir',
                 '--',
-                mnt,
+                self.mountpoint,
             ],
         )
+
+    def kill(self):
+        """
+        Terminate the client without removing the mount point.
+        """
+        self.fuse_daemon.stdin.close()
+        try:
+            self.fuse_daemon.wait()
+        except CommandFailedError:
+            pass
+
+    def kill_cleanup(self):
+        """
+        Follow up ``kill`` to get to a clean unmounted state.
+        """
+        self.umount()
+        self.cleanup()
+
+    def teardown(self):
+        """
+        Whatever the state of the mount, get it gone.
+        """
+        super(FuseMount, self).teardown()
+
+        self.umount()
+        if not self.fuse_daemon.finished:
+            self.fuse_daemon.stdin.close()
+            try:
+                self.fuse_daemon.wait()
+            except CommandFailedError:
+                pass
+
+        # Indiscriminate, unlike the touchier cleanup()
+        self.client_remote.run(
+            args=[
+                'rm',
+                '-rf',
+                self.mountpoint,
+            ],
+        )
+
+    # FIXME: bad naming scheme to call this client_id and also have the
+    # 'client_id' attr which is something completely different.  This
+    # is what a MonSession calls global_id.
+    def get_client_id(self):
+        """
+        Look up the CephFS client ID for this mount
+        """
+
+        pyscript = """
+import glob
+import re
+import os
+import subprocess
+
+def find_socket(client_name):
+        files = glob.glob("/var/run/ceph/ceph-{{client_name}}.*.asok".format(client_name=client_name))
+        for f in files:
+                pid = re.match(".*\.(\d+)\.asok$", f).group(1)
+                if os.path.exists("/proc/{{0}}".format(pid)):
+                        return f
+        raise RuntimeError("Client socket {{0}} not found".format(client_name))
+
+print find_socket("{client_name}")
+""".format(client_name="client.{0}".format(self.client_id))
+
+        # Find the admin socket
+        p = self.client_remote.run(args=[
+            'python', '-c', pyscript
+        ], stdout=StringIO())
+        asok_path = p.stdout.getvalue().strip()
+        log.info("Found client admin socket at {0}".format(asok_path))
+
+        # Query client ID from admin socket
+        p = self.client_remote.run(
+            args=['sudo', 'ceph', '--admin-daemon', asok_path, 'mds_sessions'],
+            stdout=StringIO())
+        return json.loads(p.stdout.getvalue())['id']
index 337b4511041adb9ca2d4312b101454107a235404..6a02a0ab205b25b1642fe2431de3a957a49a2955 100644 (file)
@@ -1,6 +1,9 @@
 
 import logging
+import datetime
 import os
+from teuthology.orchestra import run
+from teuthology.orchestra.run import CommandFailedError
 
 log = logging.getLogger(__name__)
 
@@ -20,25 +23,119 @@ class CephFSMount(object):
         self.mountpoint = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
         self.test_files = ['a', 'b', 'c']
 
-    @property
-    def _mount_path(self):
-        return os.path.join(self.test_dir, 'mnt.{0}'.format(self.client_id))
+        self.background_procs = []
+
+    def is_mounted(self):
+        raise NotImplementedError()
+
+    def mount(self):
+        raise NotImplementedError()
+
+    def umount(self):
+        raise NotImplementedError()
+
+    def umount_wait(self):
+        raise NotImplementedError()
+
+    def kill_cleanup(self):
+        raise NotImplementedError()
+
+    def kill(self):
+        raise NotImplementedError()
+
+    def cleanup(self):
+        raise NotImplementedError()
 
     def create_files(self):
+        assert(self.is_mounted())
+
         for suffix in self.test_files:
             log.info("Creating file {0}".format(suffix))
             self.client_remote.run(args=[
-                'sudo', 'touch', os.path.join(self._mount_path, suffix)
+                'sudo', 'touch', os.path.join(self.mountpoint, suffix)
             ])
 
     def check_files(self):
-        """
-        This will raise a CommandFailedException if expected files are not present
-        """
+        assert(self.is_mounted())
+
         for suffix in self.test_files:
             log.info("Checking file {0}".format(suffix))
             r = self.client_remote.run(args=[
-                'sudo', 'ls', os.path.join(self._mount_path, suffix)
+                'sudo', 'ls', os.path.join(self.mountpoint, suffix)
             ], check_status=False)
             if r.exitstatus != 0:
                 raise RuntimeError("Expected file {0} not found".format(suffix))
+
+    def create_destroy(self):
+        assert(self.is_mounted())
+
+        filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
+        log.debug("Creating test file {0}".format(filename))
+        self.client_remote.run(args=[
+            'sudo', 'touch', os.path.join(self.mountpoint, filename)
+        ])
+        log.debug("Deleting test file {0}".format(filename))
+        self.client_remote.run(args=[
+            'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
+        ])
+
+    def _run_python(self, pyscript):
+        return self.client_remote.run(args=[
+            'sudo', 'daemon-helper', 'kill', 'python', '-c', pyscript
+        ], wait=False, stdin=run.PIPE)
+
+    def open_background(self, basename="background_file"):
+        """
+        Open a file for writing, then block such that the client
+        will hold a capability
+        """
+        assert(self.is_mounted())
+
+        path = os.path.join(self.mountpoint, basename)
+
+        pyscript = """
+import time
+
+f = open("{path}", 'w')
+f.write('content')
+f.flush()
+f.write('content2')
+while True:
+    time.sleep(1)
+""".format(path=path)
+
+        rproc = self._run_python(pyscript)
+        self.background_procs.append(rproc)
+        return rproc
+
+    def write_background(self, basename="background_file"):
+        """
+        Open a file for writing, complete as soon as you can
+        :param basename:
+        :return:
+        """
+        assert(self.is_mounted())
+
+        path = os.path.join(self.mountpoint, basename)
+
+        pyscript = """
+import time
+
+f = open("{path}", 'w')
+f.write('content')
+f.close()
+""".format(path=path)
+
+        rproc = self._run_python(pyscript)
+        self.background_procs.append(rproc)
+        return rproc
+
+    def teardown(self):
+        for p in self.background_procs:
+            log.info("Terminating background process")
+            if p.stdin:
+                p.stdin.close()
+                try:
+                    p.wait()
+                except CommandFailedError:
+                    pass
diff --git a/teuthology/task/mds_client_recovery.py b/teuthology/task/mds_client_recovery.py
new file mode 100644 (file)
index 0000000..903a70a
--- /dev/null
@@ -0,0 +1,352 @@
+
+"""
+Teuthology task for exercising CephFS client recovery
+"""
+
+import contextlib
+import logging
+import time
+import unittest
+
+from teuthology import misc
+from teuthology.orchestra.run import CommandFailedError
+from teuthology.task import interactive
+from teuthology.task.cephfs.filesystem import Filesystem
+from teuthology.task.ceph_fuse import get_client_configs, FuseMount
+
+
+log = logging.getLogger(__name__)
+
+
+# Arbitrary timeouts for operations involving restarting
+# an MDS or waiting for it to come up
+MDS_RESTART_GRACE = 60
+
+
+class TestClientRecovery(unittest.TestCase):
+    # Environment references
+    fs = None
+    mount_a = None
+    mount_b = None
+    mds_session_timeout = None
+    mds_reconnect_timeout = None
+
+    def setUp(self):
+        self.fs.mds_restart()
+        self.mount_a.mount()
+        self.mount_b.mount()
+        self.mount_a.wait_until_mounted()
+        self.mount_a.wait_until_mounted()
+
+    def tearDown(self):
+        self.mount_a.teardown()
+        self.mount_b.teardown()
+        # mount_a.umount()
+        # mount_b.umount()
+        # run.wait([mount_a.fuse_daemon, mount_b.fuse_daemon], timeout=600)
+        # mount_a.cleanup()
+        # mount_b.cleanup()
+
+    def test_basic(self):
+        # Check that two clients come up healthy and see each others' files
+        # =====================================================
+        self.mount_a.create_files()
+        self.mount_a.check_files()
+        self.mount_a.umount_wait()
+
+        self.mount_b.check_files()
+
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()
+
+        # Check that the admin socket interface is correctly reporting
+        # two sessions
+        # =====================================================
+        ls_data = self._session_list()
+        self.assert_session_count(2, ls_data)
+
+        self.assertSetEqual(
+            set([l['id'] for l in ls_data]),
+            {self.mount_a.get_client_id(), self.mount_b.get_client_id()}
+        )
+
+    def test_restart(self):
+        # Check that after an MDS restart both clients reconnect and continue
+        # to handle I/O
+        # =====================================================
+        self.fs.mds_stop()
+        self.fs.mds_fail()
+        self.fs.mds_restart()
+        self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
+
+        self.mount_a.create_destroy()
+        self.mount_b.create_destroy()
+
+    def assert_session_count(self, expected, ls_data=None):
+        if ls_data is None:
+            ls_data = self.fs.mds_asok(['session', 'ls'])
+
+        self.assertEqual(expected, len(ls_data), "Expected {0} sessions, found {1}".format(
+            expected, len(ls_data)
+        ))
+
+    def _session_list(self):
+        ls_data = self.fs.mds_asok(['session', 'ls'])
+        ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
+        return ls_data
+
+    def _session_by_id(self, session_ls):
+        return dict([(s['id'], s) for s in session_ls])
+
+    def test_reconnect_timeout(self):
+        # Reconnect timeout
+        # =================
+        # Check that if I stop an MDS and a client goes away, the MDS waits
+        # for the reconnect period
+        self.fs.mds_stop()
+        self.fs.mds_fail()
+
+        mount_a_client_id = self.mount_a.get_client_id()
+        self.mount_a.umount_wait(force=True)
+
+        self.fs.mds_restart()
+
+        self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
+
+        ls_data = self._session_list()
+        self.assert_session_count(2, ls_data)
+
+        # The session for the dead client should have the 'reconnect' flag set
+        self.assertTrue(self._session_by_id(ls_data)[mount_a_client_id]['reconnecting'])
+
+        # Wait for the reconnect state to clear, this should take the
+        # reconnect timeout period.
+        in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2)
+        # Check that the period we waited to enter active is within a factor
+        # of two of the reconnect timeout.
+        self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout / 2,
+                           "Should have been in reconnect phase for {0} but only took {1}".format(
+                               self.mds_reconnect_timeout, in_reconnect_for
+                           ))
+
+        self.assert_session_count(1)
+
+        # Check that the client that timed out during reconnect can
+        # mount again and do I/O
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()
+        self.mount_a.create_destroy()
+
+        self.assert_session_count(2)
+
+    def test_reconnect_eviction(self):
+        # Eviction during reconnect
+        # =========================
+        self.fs.mds_stop()
+        self.fs.mds_fail()
+
+        mount_a_client_id = self.mount_a.get_client_id()
+        self.mount_a.umount_wait(force=True)
+
+        self.fs.mds_restart()
+
+        # Enter reconnect phase
+        self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
+        self.assert_session_count(2)
+
+        # Evict the stuck client
+        self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
+        self.assert_session_count(1)
+
+        # Observe that we proceed to active phase without waiting full reconnect timeout
+        evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
+        # Once we evict the troublemaker, the reconnect phase should complete
+        # in well under the reconnect timeout.
+        self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5,
+                        "reconnect did not complete soon enough after eviction, took {0}".format(
+                            evict_til_active
+                        ))
+
+        # Bring the client back
+        self.mount_a.mount()
+        self.mount_a.create_destroy()
+
+    def test_stale_caps(self):
+        # Capability release from stale session
+        # =====================================
+        cap_holder = self.mount_a.open_background()
+        self.mount_a.kill()
+
+        # Now, after mds_session_timeout seconds, the waiter should
+        # complete their operation when the MDS marks the holder's
+        # session stale.
+        cap_waiter = self.mount_b.write_background()
+        a = time.time()
+        cap_waiter.wait()
+        b = time.time()
+        cap_waited = b - a
+        log.info("cap_waiter waited {0}s".format(cap_waited))
+        self.assertTrue(self.mds_session_timeout / 2.0 <= cap_waited <= self.mds_session_timeout * 2.0,
+                        "Capability handover took {0}, expected approx {1}".format(
+                            cap_waited, self.mds_session_timeout
+                        ))
+
+        cap_holder.stdin.close()
+        try:
+            cap_holder.wait()
+        except CommandFailedError:
+            # We killed it, so it raises an error
+            pass
+
+        self.mount_a.kill_cleanup()
+
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()
+
+    def test_evicted_caps(self):
+        # Eviction while holding a capability
+        # ===================================
+
+        # Take out a write capability on a file on client A,
+        # and then immediately kill it.
+        cap_holder = self.mount_a.open_background()
+        mount_a_client_id = self.mount_a.get_client_id()
+        self.mount_a.kill()
+
+        # The waiter should get stuck waiting for the capability
+        # held on the MDS by the now-dead client A
+        cap_waiter = self.mount_b.write_background()
+        time.sleep(5)
+        self.assertFalse(cap_waiter.finished)
+
+        self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
+        # Now, because I evicted the old holder of the capability, it should
+        # immediately get handed over to the waiter
+        a = time.time()
+        cap_waiter.wait()
+        b = time.time()
+        cap_waited = b - a
+        log.info("cap_waiter waited {0}s".format(cap_waited))
+        # This is the check that it happened 'now' rather than waiting
+        # for the session timeout
+        self.assertLess(cap_waited, self.mds_session_timeout / 2.0,
+                        "Capability handover took {0}, expected less than {1}".format(
+                            cap_waited, self.mds_session_timeout / 2.0
+                        ))
+
+        cap_holder.stdin.close()
+        try:
+            cap_holder.wait()
+        except CommandFailedError:
+            # We killed it, so it raises an error
+            pass
+
+        self.mount_a.kill_cleanup()
+
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()
+
+
+class LogStream(object):
+    def __init__(self):
+        self.buffer = ""
+
+    def write(self, data):
+        self.buffer += data
+        if "\n" in self.buffer:
+            lines = self.buffer.split("\n")
+            for line in lines[:-1]:
+                log.info(line)
+            self.buffer = lines[-1]
+
+    def flush(self):
+        pass
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+    """
+    Specialization that implements interactive-on-error style
+    behavior.
+    """
+    ctx = None
+
+    def addFailure(self, test, err):
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Failure in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=self.ctx, config=None)
+
+    def addError(self, test, err):
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Error in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=self.ctx, config=None)
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    fs = Filesystem(ctx, config)
+
+    # Pick out the clients we will use from the configuration
+    # =======================================================
+    client_list = list(misc.all_roles_of_type(ctx.cluster, 'client'))
+    if len(client_list) < 2:
+        raise RuntimeError("Need at least two clients")
+
+    client_a_id = client_list[0]
+    client_a_role = "client.{0}".format(client_a_id)
+    client_a_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1]
+
+    client_b_id = client_list[1]
+    client_b_role = "client.{0}".format(client_b_id)
+    client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1]
+
+    test_dir = misc.get_testdir(ctx)
+
+    # TODO: enable switching FUSE to kclient here
+    # or perhaps just use external client tasks and consume ctx.mounts here?
+    client_configs = get_client_configs(ctx, config)
+    mount_a = FuseMount(client_configs.get(client_a_role, {}), test_dir, client_a_id, client_a_remote)
+    mount_b = FuseMount(client_configs.get(client_b_role, {}), test_dir, client_b_id, client_b_remote)
+
+    # Attach environment references to test case
+    # ==========================================
+    TestClientRecovery.mds_reconnect_timeout = int(fs.mds_asok(
+        ['config', 'get', 'mds_reconnect_timeout']
+    )['mds_reconnect_timeout'])
+    TestClientRecovery.mds_session_timeout = int(fs.mds_asok(
+        ['config', 'get', 'mds_session_timeout']
+    )['mds_session_timeout'])
+    TestClientRecovery.fs = fs
+    TestClientRecovery.mount_a = mount_a
+    TestClientRecovery.mount_b = mount_b
+
+    # Stash references on ctx so that we can easily debug in interactive mode
+    # =======================================================================
+    ctx.filesystem = fs
+    ctx.mount_a = mount_a
+    ctx.mount_b = mount_b
+
+    # Execute test suite
+    # ==================
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery)
+    if ctx.config.get("interactive-on-error", False):
+        InteractiveFailureResult.ctx = ctx
+        result_class = InteractiveFailureResult
+    else:
+        result_class = unittest.TextTestResult
+    result = unittest.TextTestRunner(
+        stream=LogStream(),
+        resultclass=result_class,
+        verbosity=2,
+        failfast=True).run(suite)
+
+    if not result.wasSuccessful():
+        result.printErrors()  # duplicate output at end for convenience
+        raise RuntimeError("Test failure.")
+
+    # Continue to any downstream tasks
+    # ================================
+    yield