def _run_python(self, pyscript):
return self.client_remote.run(args=[
- 'sudo', 'daemon-helper', 'kill', 'python', '-c', pyscript
+ 'sudo', 'adjust-ulimits', 'daemon-helper', 'kill', 'python', '-c', pyscript
], wait=False, stdin=run.PIPE)
def run_shell(self, args):
self.background_procs.append(rproc)
return rproc
+ def open_n_background(self, fs_path, count):
+ """
+ Open N files for writing, hold them open in a background process
+
+ :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
+ :return: a RemoteProcess
+ """
+ assert(self.is_mounted())
+
+ abs_path = os.path.join(self.mountpoint, fs_path)
+
+ pyscript = dedent("""
+ import sys
+ import time
+ import os
+
+ n = {count}
+ abs_path = "{abs_path}"
+
+ if not os.path.exists(os.path.dirname(abs_path)):
+ os.makedirs(os.path.dirname(abs_path))
+
+ handles = []
+ for i in range(0, n):
+ fname = "{{0}}_{{1}}".format(abs_path, i)
+ handles.append(open(fname, 'w'))
+
+ while True:
+ time.sleep(1)
+ """).format(abs_path=abs_path, count=count)
+
+ rproc = self._run_python(pyscript)
+ self.background_procs.append(rproc)
+ return rproc
+
def teardown(self):
for p in self.background_procs:
log.info("Terminating background process")
--- /dev/null
+
+"""
+Exercise the MDS's behaviour when clients and the MDCache reach or
+exceed the limits of how many caps/inodes they should hold.
+"""
+
+import contextlib
+import logging
+import time
+
+from teuthology.orchestra.run import CommandFailedError
+
+from tasks.cephfs.filesystem import Filesystem
+from tasks.cephfs.fuse_mount import FuseMount
+from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
+
+
+log = logging.getLogger(__name__)
+
+
+# Arbitrary timeouts for operations involving restarting
+# an MDS or waiting for it to come up
+MDS_RESTART_GRACE = 60
+
+# Hardcoded values from Server::recall_client_state
+CAP_RECALL_RATIO = 0.8
+CAP_RECALL_MIN = 100
+
+
+def wait_until_equal(get_fn, expect_val, timeout, reject_fn=None):
+ period = 5
+ elapsed = 0
+ while True:
+ val = get_fn()
+ if val == expect_val:
+ return
+ elif reject_fn and reject_fn(val):
+ raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
+ else:
+ if elapsed >= timeout:
+ raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
+ elapsed, expect_val, val
+ ))
+ else:
+ log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
+ time.sleep(period)
+ elapsed += period
+
+ log.debug("wait_until_equal: success")
+
+
+def wait_until_true(condition, timeout):
+ period = 5
+ elapsed = 0
+ while True:
+ if condition():
+ return
+ else:
+ if elapsed >= timeout:
+ raise RuntimeError("Timed out after {0} seconds".format(elapsed))
+ else:
+ log.debug("wait_until_equal: waiting...")
+ time.sleep(period)
+ elapsed += period
+
+ log.debug("wait_until_equal: success")
+
+
+class TestClientLimits(CephFSTestCase):
+ # Environment references
+ mount_a = None
+ mount_b = None
+ mds_session_timeout = None
+ mds_reconnect_timeout = None
+ ms_max_backoff = None
+
+ def __init__(self, *args, **kwargs):
+ super(TestClientLimits, self).__init__(*args, **kwargs)
+
+ self.configs_set = set()
+
+ def set_conf(self, subsys, key, value):
+ self.configs_set.add((subsys, key))
+ self.fs.set_ceph_conf(subsys, key, value)
+
+ def setUp(self):
+ self.fs.mds_restart()
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()
+ self.mount_b.mount()
+ self.mount_b.wait_until_mounted()
+
+ def tearDown(self):
+ self.fs.clear_firewall()
+ self.mount_a.teardown()
+ self.mount_b.teardown()
+
+ for subsys, key in self.configs_set:
+ self.fs.clear_ceph_conf(subsys, key)
+
+ def wait_for_health(self, pattern, timeout):
+ """
+ Wait until 'ceph health' contains a single message matching the pattern
+ """
+ def seen_health_warning():
+ health = self.fs.mon_manager.get_mon_health()
+ summary_strings = [s['summary'] for s in health['summary']]
+ if len(summary_strings) == 0:
+ log.debug("Not expected number of summary strings ({0})".format(summary_strings))
+ return False
+ elif len(summary_strings) == 1 and pattern in summary_strings[0]:
+ return True
+ else:
+ raise RuntimeError("Unexpected health messages: {0}".format(summary_strings))
+
+ wait_until_true(seen_health_warning, timeout)
+
+ def _test_client_pin(self, use_subdir):
+ """
+ When a client pins an inode in its cache, for example because the file is held open,
+ it should reject requests from the MDS to trim these caps. The MDS should complain
+ to the user that it is unable to enforce its cache size limits because of this
+ objectionable client.
+
+ :param use_subdir: whether to put test files in a subdir or use root
+ """
+
+ cache_size = 200
+ open_files = 250
+
+ self.fs.set_ceph_conf('mds', 'mds cache size', cache_size)
+ self.fs.mds_restart()
+
+ mount_a_client_id = self.mount_a.get_client_id()
+ path = "subdir/mount_a" if use_subdir else "mount_a"
+ open_proc = self.mount_a.open_n_background(path, open_files)
+
+ # Client should now hold:
+ # `open_files` caps for the open files
+ # 1 cap for root
+ # 1 cap for subdir
+ wait_until_equal(lambda: self.get_session(mount_a_client_id)['num_caps'],
+ open_files + (2 if use_subdir else 1),
+ timeout=600,
+ reject_fn=lambda x: x > open_files + 2)
+
+ # MDS should not be happy about that, as the client is failing to comply
+ # with the SESSION_RECALL messages it is being sent
+ mds_recall_state_timeout = int(self.fs.get_config("mds_recall_state_timeout"))
+ self.wait_for_health("failing to respond to cache pressure", mds_recall_state_timeout + 10)
+
+ # When the client closes the files, it should retain only as many caps as allowed
+ # under the SESSION_RECALL policy
+ log.info("Terminating process holding files open")
+ open_proc.stdin.close()
+ try:
+ open_proc.wait()
+ except CommandFailedError:
+ # We killed it, so it raises an error
+ pass
+
+ # The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message,
+ # which depend on the cache size and overall ratio
+ wait_until_equal(
+ lambda: self.get_session(mount_a_client_id)['num_caps'],
+ int(cache_size * 0.8),
+ timeout=600,
+ reject_fn=lambda x: x < int(cache_size*.8))
+
+ def test_client_pin_root(self):
+ self._test_client_pin(False)
+
+ def test_client_pin(self):
+ self._test_client_pin(True)
+
+ def test_client_release_bug(self):
+ """
+ When a client has a bug (which we will simulate) preventing it from releasing caps,
+ the MDS should notice that releases are not being sent promptly, and generate a health
+ metric to that effect.
+ """
+
+ self.set_conf('client.{0}'.format(self.mount_a.client_id), 'client inject release failure', 'true')
+ self.mount_a.teardown()
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()
+ mount_a_client_id = self.mount_a.get_client_id()
+
+ # Client A creates a file. He will hold the write caps on the file, and later (simulated bug) fail
+ # to comply with the MDSs request to release that cap
+ self.mount_a.run_shell(["touch", "file1"])
+
+ # Client B tries to stat the file that client A created
+ rproc = self.mount_b.write_background("file1")
+
+ # After mds_revoke_cap_timeout, we should see a health warning (extra lag from
+ # MDS beacon period)
+ mds_revoke_cap_timeout = int(self.fs.get_config("mds_revoke_cap_timeout"))
+ self.wait_for_health("failing to respond to capability release", mds_revoke_cap_timeout + 10)
+
+ # Client B should still be stuck
+ self.assertFalse(rproc.finished)
+
+ # Kill client A
+ self.mount_a.kill()
+ self.mount_a.kill_cleanup()
+
+ # Client B should complete
+ self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
+ rproc.wait()
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+ fs = Filesystem(ctx, config)
+
+ # Pick out the clients we will use from the configuration
+ # =======================================================
+ if len(ctx.mounts) < 2:
+ raise RuntimeError("Need at least two clients")
+ mount_a = ctx.mounts.values()[0]
+ mount_b = ctx.mounts.values()[1]
+
+ if not isinstance(mount_a, FuseMount):
+ # TODO: make kclient mount capable of all the same test tricks as ceph_fuse
+ raise RuntimeError("Require FUSE clients")
+
+ # Stash references on ctx so that we can easily debug in interactive mode
+ # =======================================================================
+ ctx.filesystem = fs
+ ctx.mount_a = mount_a
+ ctx.mount_b = mount_b
+
+ run_tests(ctx, config, TestClientLimits, {
+ 'fs': fs,
+ 'mount_a': mount_a,
+ 'mount_b': mount_b
+ })
+
+ # Continue to any downstream tasks
+ # ================================
+ yield