]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tasks: add mds_client_limits
authorJohn Spray <jspray@redhat.com>
Wed, 3 Sep 2014 13:15:54 +0000 (14:15 +0100)
committerJohn Spray <jspray@redhat.com>
Fri, 19 Sep 2014 13:15:41 +0000 (14:15 +0100)
New CephFS tests for the behaviour of the system while
enforcing its resource limits.

Signed-off-by: John Spray <john.spray@redhat.com>
tasks/cephfs/filesystem.py
tasks/cephfs/fuse_mount.py
tasks/cephfs/mount.py
tasks/mds_client_limits.py [new file with mode: 0644]

index df0d2b21a2ba27d6607dda30519f5d2709d33697..2db056d2bbd97ecbf4ef905506bfb35fb44c0d18 100644 (file)
@@ -50,14 +50,20 @@ class Filesystem(object):
 
         return list(result)
 
+    def get_config(self, key):
+        return self.mds_asok(['config', 'get', key])[key]
+
     def set_ceph_conf(self, subsys, key, value):
-        # Set config so that journal will be created in older format
-        if 'mds' not in self._ctx.ceph.conf:
-            self._ctx.ceph.conf['mds'] = {}
-        self._ctx.ceph.conf['mds'][key] = value
+        if subsys not in self._ctx.ceph.conf:
+            self._ctx.ceph.conf[subsys] = {}
+        self._ctx.ceph.conf[subsys][key] = value
         write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
                          # used a different config path this won't work.
 
+    def clear_ceph_conf(self, subsys, key):
+        del self._ctx.ceph.conf[subsys][key]
+        write_conf(self._ctx)
+
     def are_daemons_healthy(self):
         """
         Return true if all daemons are in one of active, standby, standby-replay
@@ -159,6 +165,7 @@ class Filesystem(object):
         """
         temp_bin_path = '/tmp/out.bin'
 
+        # FIXME get the metadata pool name from mdsmap instead of hardcoding
         self.client_remote.run(args=[
             'sudo', 'rados', '-p', 'metadata', 'get', object_id, temp_bin_path
         ])
index 51ceccc496cfdead3a8a486aa9783291a461ec7e..37ef0789dfe784001feb710c22265988c3794ddf 100644 (file)
@@ -120,6 +120,7 @@ class FuseMount(CephFSMount):
 
     def umount(self):
         try:
+            log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
             self.client_remote.run(
                 args=[
                     'sudo',
index 1d34079ed9bf318aab722fa343f38db3c639214f..114c3f50dd222f13bee1af5bec24e16d8fddfa83 100644 (file)
@@ -99,7 +99,7 @@ class CephFSMount(object):
 
     def _run_python(self, pyscript):
         return self.client_remote.run(args=[
-            'sudo', 'daemon-helper', 'kill', 'python', '-c', pyscript
+            'sudo', 'adjust-ulimits', 'daemon-helper', 'kill', 'python', '-c', pyscript
         ], wait=False, stdin=run.PIPE)
 
     def run_shell(self, args):
@@ -169,6 +169,41 @@ class CephFSMount(object):
         self.background_procs.append(rproc)
         return rproc
 
+    def open_n_background(self, fs_path, count):
+        """
+        Open N files for writing, hold them open in a background process
+
+        :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
+        :return: a RemoteProcess
+        """
+        assert(self.is_mounted())
+
+        abs_path = os.path.join(self.mountpoint, fs_path)
+
+        pyscript = dedent("""
+            import sys
+            import time
+            import os
+
+            n = {count}
+            abs_path = "{abs_path}"
+
+            if not os.path.exists(os.path.dirname(abs_path)):
+                os.makedirs(os.path.dirname(abs_path))
+
+            handles = []
+            for i in range(0, n):
+                fname = "{{0}}_{{1}}".format(abs_path, i)
+                handles.append(open(fname, 'w'))
+
+            while True:
+                time.sleep(1)
+            """).format(abs_path=abs_path, count=count)
+
+        rproc = self._run_python(pyscript)
+        self.background_procs.append(rproc)
+        return rproc
+
     def teardown(self):
         for p in self.background_procs:
             log.info("Terminating background process")
diff --git a/tasks/mds_client_limits.py b/tasks/mds_client_limits.py
new file mode 100644 (file)
index 0000000..a4a25cc
--- /dev/null
@@ -0,0 +1,242 @@
+
+"""
+Exercise the MDS's behaviour when clients and the MDCache reach or
+exceed the limits of how many caps/inodes they should hold.
+"""
+
+import contextlib
+import logging
+import time
+
+from teuthology.orchestra.run import CommandFailedError
+
+from tasks.cephfs.filesystem import Filesystem
+from tasks.cephfs.fuse_mount import FuseMount
+from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
+
+
+log = logging.getLogger(__name__)
+
+
+# Arbitrary timeouts for operations involving restarting
+# an MDS or waiting for it to come up
+MDS_RESTART_GRACE = 60
+
+# Hardcoded values from Server::recall_client_state
+CAP_RECALL_RATIO = 0.8
+CAP_RECALL_MIN = 100
+
+
+def wait_until_equal(get_fn, expect_val, timeout, reject_fn=None):
+    period = 5
+    elapsed = 0
+    while True:
+        val = get_fn()
+        if val == expect_val:
+            return
+        elif reject_fn and reject_fn(val):
+            raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
+        else:
+            if elapsed >= timeout:
+                raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
+                    elapsed, expect_val, val
+                ))
+            else:
+                log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
+            time.sleep(period)
+            elapsed += period
+
+    log.debug("wait_until_equal: success")
+
+
+def wait_until_true(condition, timeout):
+    period = 5
+    elapsed = 0
+    while True:
+        if condition():
+            return
+        else:
+            if elapsed >= timeout:
+                raise RuntimeError("Timed out after {0} seconds".format(elapsed))
+            else:
+                log.debug("wait_until_equal: waiting...")
+            time.sleep(period)
+            elapsed += period
+
+    log.debug("wait_until_equal: success")
+
+
+class TestClientLimits(CephFSTestCase):
+    # Environment references
+    mount_a = None
+    mount_b = None
+    mds_session_timeout = None
+    mds_reconnect_timeout = None
+    ms_max_backoff = None
+
+    def __init__(self, *args, **kwargs):
+        super(TestClientLimits, self).__init__(*args, **kwargs)
+
+        self.configs_set = set()
+
+    def set_conf(self, subsys, key, value):
+        self.configs_set.add((subsys, key))
+        self.fs.set_ceph_conf(subsys, key, value)
+
+    def setUp(self):
+        self.fs.mds_restart()
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()
+        self.mount_b.mount()
+        self.mount_b.wait_until_mounted()
+
+    def tearDown(self):
+        self.fs.clear_firewall()
+        self.mount_a.teardown()
+        self.mount_b.teardown()
+
+        for subsys, key in self.configs_set:
+            self.fs.clear_ceph_conf(subsys, key)
+
+    def wait_for_health(self, pattern, timeout):
+        """
+        Wait until 'ceph health' contains a single message matching the pattern
+        """
+        def seen_health_warning():
+            health = self.fs.mon_manager.get_mon_health()
+            summary_strings = [s['summary'] for s in health['summary']]
+            if len(summary_strings) == 0:
+                log.debug("Not expected number of summary strings ({0})".format(summary_strings))
+                return False
+            elif len(summary_strings) == 1 and pattern in summary_strings[0]:
+                return True
+            else:
+                raise RuntimeError("Unexpected health messages: {0}".format(summary_strings))
+
+        wait_until_true(seen_health_warning, timeout)
+
+    def _test_client_pin(self, use_subdir):
+        """
+        When a client pins an inode in its cache, for example because the file is held open,
+        it should reject requests from the MDS to trim these caps.  The MDS should complain
+        to the user that it is unable to enforce its cache size limits because of this
+        objectionable client.
+
+        :param use_subdir: whether to put test files in a subdir or use root
+        """
+
+        cache_size = 200
+        open_files = 250
+
+        self.fs.set_ceph_conf('mds', 'mds cache size', cache_size)
+        self.fs.mds_restart()
+
+        mount_a_client_id = self.mount_a.get_client_id()
+        path = "subdir/mount_a" if use_subdir else "mount_a"
+        open_proc = self.mount_a.open_n_background(path, open_files)
+
+        # Client should now hold:
+        # `open_files` caps for the open files
+        # 1 cap for root
+        # 1 cap for subdir
+        wait_until_equal(lambda: self.get_session(mount_a_client_id)['num_caps'],
+                         open_files + (2 if use_subdir else 1),
+                         timeout=600,
+                         reject_fn=lambda x: x > open_files + 2)
+
+        # MDS should not be happy about that, as the client is failing to comply
+        # with the SESSION_RECALL messages it is being sent
+        mds_recall_state_timeout = int(self.fs.get_config("mds_recall_state_timeout"))
+        self.wait_for_health("failing to respond to cache pressure", mds_recall_state_timeout + 10)
+
+        # When the client closes the files, it should retain only as many caps as allowed
+        # under the SESSION_RECALL policy
+        log.info("Terminating process holding files open")
+        open_proc.stdin.close()
+        try:
+            open_proc.wait()
+        except CommandFailedError:
+            # We killed it, so it raises an error
+            pass
+
+        # The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message,
+        # which depend on the cache size and overall ratio
+        wait_until_equal(
+            lambda: self.get_session(mount_a_client_id)['num_caps'],
+            int(cache_size * 0.8),
+            timeout=600,
+            reject_fn=lambda x: x < int(cache_size*.8))
+
+    def test_client_pin_root(self):
+        self._test_client_pin(False)
+
+    def test_client_pin(self):
+        self._test_client_pin(True)
+
+    def test_client_release_bug(self):
+        """
+        When a client has a bug (which we will simulate) preventing it from releasing caps,
+        the MDS should notice that releases are not being sent promptly, and generate a health
+        metric to that effect.
+        """
+
+        self.set_conf('client.{0}'.format(self.mount_a.client_id), 'client inject release failure', 'true')
+        self.mount_a.teardown()
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()
+        mount_a_client_id = self.mount_a.get_client_id()
+
+        # Client A creates a file.  He will hold the write caps on the file, and later (simulated bug) fail
+        # to comply with the MDSs request to release that cap
+        self.mount_a.run_shell(["touch", "file1"])
+
+        # Client B tries to stat the file that client A created
+        rproc = self.mount_b.write_background("file1")
+
+        # After mds_revoke_cap_timeout, we should see a health warning (extra lag from
+        # MDS beacon period)
+        mds_revoke_cap_timeout = int(self.fs.get_config("mds_revoke_cap_timeout"))
+        self.wait_for_health("failing to respond to capability release", mds_revoke_cap_timeout + 10)
+
+        # Client B should still be stuck
+        self.assertFalse(rproc.finished)
+
+        # Kill client A
+        self.mount_a.kill()
+        self.mount_a.kill_cleanup()
+
+        # Client B should complete
+        self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
+        rproc.wait()
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    fs = Filesystem(ctx, config)
+
+    # Pick out the clients we will use from the configuration
+    # =======================================================
+    if len(ctx.mounts) < 2:
+        raise RuntimeError("Need at least two clients")
+    mount_a = ctx.mounts.values()[0]
+    mount_b = ctx.mounts.values()[1]
+
+    if not isinstance(mount_a, FuseMount):
+        # TODO: make kclient mount capable of all the same test tricks as ceph_fuse
+        raise RuntimeError("Require FUSE clients")
+
+    # Stash references on ctx so that we can easily debug in interactive mode
+    # =======================================================================
+    ctx.filesystem = fs
+    ctx.mount_a = mount_a
+    ctx.mount_b = mount_b
+
+    run_tests(ctx, config, TestClientLimits, {
+        'fs': fs,
+        'mount_a': mount_a,
+        'mount_b': mount_b
+    })
+
+    # Continue to any downstream tasks
+    # ================================
+    yield