From bd3ae1eeba236868bf1f80f555cc94d0637c960b Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 21 Jan 2015 14:28:13 +0000 Subject: [PATCH] tasks/cephfs: add test_strays This tests the new purge file/ops throttling in the MDS, via the new perf counters for strays/purging. Fixes: #10390 Signed-off-by: John Spray --- tasks/cephfs/filesystem.py | 50 ++++ tasks/cephfs/mount.py | 25 +- tasks/cephfs/test_strays.py | 468 ++++++++++++++++++++++++++++++++++++ 3 files changed, 537 insertions(+), 6 deletions(-) create mode 100644 tasks/cephfs/test_strays.py diff --git a/tasks/cephfs/filesystem.py b/tasks/cephfs/filesystem.py index 40da6e11ab036..b7b4d606265af 100644 --- a/tasks/cephfs/filesystem.py +++ b/tasks/cephfs/filesystem.py @@ -491,6 +491,56 @@ class Filesystem(object): return json.loads(p.stdout.getvalue().strip()) + def _enumerate_data_objects(self, ino, size): + """ + Get the list of expected data objects for a range, and the list of objects + that really exist. + + :return a tuple of two lists of strings (expected, actual) + """ + stripe_size = 1024 * 1024 * 4 + + size = max(stripe_size, size) + + want_objects = [ + "{0:x}.{1:08x}".format(ino, n) + for n in range(0, ((size - 1) / stripe_size) + 1) + ] + + exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n") + + return want_objects, exist_objects + + def data_objects_present(self, ino, size): + """ + Check that *all* the expected data objects for an inode are present in the data pool + """ + + want_objects, exist_objects = self._enumerate_data_objects(ino, size) + missing = set(want_objects) - set(exist_objects) + + if missing: + log.info("Objects missing (ino {0}, size {1}): {2}".format( + ino, size, missing + )) + return False + else: + log.info("All objects for ino {0} size {1} found".format(ino, size)) + return True + + def data_objects_absent(self, ino, size): + want_objects, exist_objects = self._enumerate_data_objects(ino, size) + present = set(want_objects) & set(exist_objects) + + if present: + log.info("Objects not absent (ino {0}, size {1}): {2}".format( + ino, size, present + )) + return False + else: + log.info("All objects for ino {0} size {1} are absent".format(ino, size)) + return True + def rados(self, args, pool=None): """ Call into the `rados` CLI from an MDS diff --git a/tasks/cephfs/mount.py b/tasks/cephfs/mount.py index 1413b818f3bb7..1d4212028cd14 100644 --- a/tasks/cephfs/mount.py +++ b/tasks/cephfs/mount.py @@ -310,12 +310,25 @@ class CephFSMount(object): def teardown(self): for p in self.background_procs: log.info("Terminating background process") - if p.stdin: - p.stdin.close() - try: - p.wait() - except (CommandFailedError, ConnectionLostError): - pass + self._kill_background(p) + + self.background_procs = [] + + def _kill_background(self, p): + if p.stdin: + p.stdin.close() + try: + p.wait() + except (CommandFailedError, ConnectionLostError): + pass + + def kill_background(self, p): + """ + For a process that was returned by one of the _background member functions, + kill it hard. + """ + self._kill_background(p) + self.background_procs.remove(p) def spam_dir_background(self, path): """ diff --git a/tasks/cephfs/test_strays.py b/tasks/cephfs/test_strays.py new file mode 100644 index 0000000000000..df4da0ebb6578 --- /dev/null +++ b/tasks/cephfs/test_strays.py @@ -0,0 +1,468 @@ + +import logging +from textwrap import dedent +import time +import gevent +from tasks.cephfs.cephfs_test_case import CephFSTestCase + +log = logging.getLogger(__name__) + + +class TestStrays(CephFSTestCase): + MDSS_REQUIRED = 2 + + OPS_THROTTLE = 1 + FILES_THROTTLE = 2 + + # Range of different file sizes used in throttle test's workload + throttle_workload_size_range = 16 + + def test_ops_throttle(self): + self._test_throttling(self.OPS_THROTTLE) + + def test_files_throttle(self): + self._test_throttling(self.FILES_THROTTLE) + + def _test_throttling(self, throttle_type): + """ + That the mds_max_purge_ops setting is respected + """ + + def set_throttles(files, ops): + """ + Helper for updating ops/files limits, and calculating effective + ops_per_pg setting to give the same ops limit. + """ + self.set_conf('mds', 'mds_max_purge_files', "%d" % files) + self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) + + pgs = self.fs.mon_manager.get_pool_property( + self.fs.get_data_pool_name(), + "pg_num" + ) + ops_per_pg = float(ops) / pgs + self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) + + # Test conditions depend on what we're going to be exercising. + # * Lift the threshold on whatever throttle we are *not* testing, so + # that the throttle of interest is the one that will be the bottleneck + # * Create either many small files (test file count throttling) or fewer + # large files (test op throttling) + if throttle_type == self.OPS_THROTTLE: + set_throttles(files=100000000, ops=16) + size_unit = 1024 * 1024 # big files, generate lots of ops + file_multiplier = 100 + elif throttle_type == self.FILES_THROTTLE: + # The default value of file limit is pretty permissive, so to avoid + # the test running too fast, create lots of files and set the limit + # pretty low. + set_throttles(ops=100000000, files=6) + size_unit = 1024 # small, numerous files + file_multiplier = 200 + else: + raise NotImplemented(throttle_type) + + # Pick up config changes + self.fs.mds_fail_restart() + self.fs.wait_for_daemons() + + create_script = dedent(""" + import os + + mount_path = "{mount_path}" + subdir = "delete_me" + size_unit = {size_unit} + file_multiplier = {file_multiplier} + os.mkdir(os.path.join(mount_path, subdir)) + for i in xrange(0, file_multiplier): + for size in xrange(0, {size_range}*size_unit, size_unit): + filename = "{{0}}_{{1}}.bin".format(i, size / size_unit) + f = open(os.path.join(mount_path, subdir, filename), 'w') + f.write(size * 'x') + f.close() + """.format( + mount_path=self.mount_a.mountpoint, + size_unit=size_unit, + file_multiplier=file_multiplier, + size_range=self.throttle_workload_size_range + )) + + self.mount_a.run_python(create_script) + + # We will run the deletion in the background, to reduce the risk of it completing before + # we have started monitoring the stray statistics. + def background(): + self.mount_a.run_shell(["sudo", "rm", "-rf", "delete_me"]) + self.fs.mds_asok(["flush", "journal"]) + + background_thread = gevent.spawn(background) + + total_inodes = file_multiplier * self.throttle_workload_size_range + 1 + mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) + mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) + + # During this phase we look for the concurrent ops to exceed half + # the limit (a heuristic) and not exceed the limit (a correctness + # condition). + purge_timeout = 600 + elapsed = 0 + files_high_water = 0 + ops_high_water = 0 + while True: + mdc_stats = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache'] + if elapsed >= purge_timeout: + raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) + + num_strays = mdc_stats['num_strays'] + num_strays_purging = mdc_stats['num_strays_purging'] + num_purge_ops = mdc_stats['num_purge_ops'] + + files_high_water = max(files_high_water, num_strays_purging) + ops_high_water = max(ops_high_water, num_purge_ops) + + total_strays_created = mdc_stats['strays_created'] + total_strays_purged = mdc_stats['strays_purged'] + + if total_strays_purged == total_inodes: + log.info("Complete purge in {0} seconds".format(elapsed)) + break + elif total_strays_purged > total_inodes: + raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) + else: + if throttle_type == self.OPS_THROTTLE: + if num_strays_purging > mds_max_purge_files: + raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( + num_purge_ops, mds_max_purge_ops + )) + elif throttle_type == self.FILES_THROTTLE: + if num_strays_purging > mds_max_purge_files: + raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( + num_strays_purging, mds_max_purge_files + )) + else: + raise NotImplemented(throttle_type) + + log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( + num_strays_purging, num_strays, + total_strays_purged, total_strays_created + )) + time.sleep(1) + elapsed += 1 + + background_thread.join() + + # Check that we got up to a respectable rate during the purge. This is totally + # racy, but should be safeish unless the cluster is pathologically slow, or + # insanely fast such that the deletions all pass before we have polled the + # statistics. + if throttle_type == self.OPS_THROTTLE: + if ops_high_water < mds_max_purge_ops / 2: + raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( + ops_high_water, mds_max_purge_ops + )) + elif throttle_type == self.FILES_THROTTLE: + if files_high_water < mds_max_purge_files / 2: + raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( + ops_high_water, mds_max_purge_files + )) + + # Sanity check all MDC stray stats + mdc_stats = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache'] + self.assertEqual(mdc_stats['num_strays'], 0) + self.assertEqual(mdc_stats['num_strays_purging'], 0) + self.assertEqual(mdc_stats['num_strays_delayed'], 0) + self.assertEqual(mdc_stats['num_purge_ops'], 0) + self.assertEqual(mdc_stats['strays_created'], total_inodes) + self.assertEqual(mdc_stats['strays_purged'], total_inodes) + + def get_mdc_stat(self, name, mds_id=None): + return self.fs.mds_asok(['perf', 'dump', "mds_cache", name], + mds_id=mds_id)['mds_cache'][name] + + def test_open_inode(self): + """ + That the case of a dentry unlinked while a client holds an + inode open is handled correctly. + + The inode should be moved into a stray dentry, while the original + dentry and directory should be purged. + + The inode's data should be purged when the client eventually closes + it. + """ + mount_a_client_id = self.mount_a.get_global_id() + + # Write some bytes to a file + size_mb = 8 + self.mount_a.write_n_mb("open_file", size_mb) + open_file_ino = self.mount_a.path_to_ino("open_file") + + # Hold the file open + p = self.mount_a.open_background("open_file") + + self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) + + # Unlink the dentry + self.mount_a.run_shell(["rm", "-f", "open_file"]) + + # Wait to see the stray count increment + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1) + + # See that while the stray count has incremented, the purge count + # has not + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + self.assertEqual(self.get_mdc_stat("strays_purged"), 0) + + # See that the client still holds 2 caps + self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) + + # See that the data objects remain in the data pool + self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) + + # Now close the file + self.mount_a.kill_background(p) + + # Wait to see the client cap count decrement + self.wait_until_equal( + lambda: self.get_session(mount_a_client_id)['num_caps'], + expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 + ) + # Wait to see the purge counter increment, stray count go to zero + self.wait_until_equal( + lambda: self.get_mdc_stat("strays_purged"), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1 + ) + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays"), + expect_val=0, timeout=6, reject_fn=lambda x: x > 1 + ) + + # See that the data objects no longer exist + self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) + + def test_hardlink_reintegration(self): + """ + That removal of primary dentry of hardlinked inode results + in reintegration of inode into the previously-remote dentry, + rather than lingering as a stray indefinitely. + """ + # Write some bytes to file_a + size_mb = 8 + self.mount_a.write_n_mb("file_a", size_mb) + ino = self.mount_a.path_to_ino("file_a") + + # Create a hardlink named file_b + self.mount_a.run_shell(["ln", "file_a", "file_b"]) + self.assertEqual(self.mount_a.path_to_ino("file_b"), ino) + + # Flush journal + self.fs.mds_asok(['flush', 'journal']) + + # See that backtrace for the file points to the file_a path + pre_unlink_bt = self.fs.read_backtrace(ino) + self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") + + # Unlink file_a + self.mount_a.run_shell(["rm", "-f", "file_a"]) + + # See that a stray was created + self.assertEqual(self.get_mdc_stat("num_strays"), 1) + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + + # Wait, see that data objects are still present (i.e. that the + # stray did not advance to purging given time) + time.sleep(30) + self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) + self.assertEqual(self.get_mdc_stat("strays_purged"), 0) + + # See that before reintegration, the inode's backtrace points to a stray dir + self.fs.mds_asok(['flush', 'journal']) + self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) + + # Do a metadata operation on the remaining link (mv is heavy handed, but + # others like touch may be satisfied from caps without poking MDS) + self.mount_a.run_shell(["mv", "file_b", "file_c"]) + + # See the reintegration counter increment + # This should happen as a result of the eval_remote call on + # responding to a client request. + self.wait_until_equal( + lambda: self.get_mdc_stat("strays_reintegrated"), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1 + ) + + # Flush the journal + self.fs.mds_asok(['flush', 'journal']) + + # See that the backtrace for the file points to the remaining link's path + post_reint_bt = self.fs.read_backtrace(ino) + self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") + + # See that the number of strays in existence is zero + self.assertEqual(self.get_mdc_stat("num_strays"), 0) + + # Now really delete it + self.mount_a.run_shell(["rm", "-f", "file_c"]) + self.wait_until_equal( + lambda: self.get_mdc_stat("strays_purged"), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1 + ) + self.assert_purge_idle() + self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) + + # We caused the inode to go stray twice + self.assertEqual(self.get_mdc_stat("strays_created"), 2) + # One time we reintegrated it + self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 1) + # Then the second time we purged it + self.assertEqual(self.get_mdc_stat("strays_purged"), 1) + + def test_mv_hardlink_cleanup(self): + """ + That when doing a rename from A to B, and B has hardlinks, + then we make a stray for B which is then reintegrated + into one of his hardlinks. + """ + # Create file_a, file_b, and a hardlink to file_b + + # mv file_a file_b + + # Proceed as with test_hardlink_reintegration + + def test_migration_on_shutdown(self): + """ + That when an MDS rank is shut down, any not-yet-purging strays + are migrated to another MDS's stray dir. + """ + + # Set up two MDSs + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "2") + + # See that we have two active MDSs + self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, + reject_fn=lambda v: v > 2 or v < 1) + + active_mds_names = self.fs.get_active_names() + rank_0_id = active_mds_names[0] + rank_1_id = active_mds_names[1] + log.info("Ranks 0 and 1 are {0} and {1}".format( + rank_0_id, rank_1_id)) + + # Set the purge file throttle to 0 on MDS rank 1 + self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") + self.fs.mds_fail_restart(rank_1_id) + self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, + reject_fn=lambda v: v > 2 or v < 1) + + # Create a file + # Export dir on an empty dir doesn't work, so we create the file before + # calling export dir in order to kick a dirfrag into existence + size_mb = 8 + self.mount_a.run_shell(["mkdir", "ALPHA"]) + self.mount_a.write_n_mb("ALPHA/alpha_file", size_mb) + ino = self.mount_a.path_to_ino("ALPHA/alpha_file") + + result = self.fs.mds_asok(["export", "dir", "/ALPHA", "1"], rank_0_id) + self.assertEqual(result["return_code"], 0) + + # Delete the file on rank 1 + self.mount_a.run_shell(["rm", "-f", "ALPHA/alpha_file"]) + + # See the stray counter increment, but the purge counter doesn't + # See that the file objects are still on disk + self.wait_until_equal( + lambda: self.get_mdc_stat("num_strays", rank_1_id), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1) + self.assertEqual(self.get_mdc_stat("strays_created", rank_1_id), 1) + time.sleep(60) # period that we want to see if it gets purged + self.assertEqual(self.get_mdc_stat("strays_purged", rank_1_id), 0) + self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) + + # Shut down rank 1 + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1") + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1") + + # Wait til we get to a single active MDS mdsmap state + def is_stopped(): + mds_map = self.fs.get_mds_map() + return 1 not in [i['rank'] for i in mds_map['info'].values()] + + self.wait_until_true(is_stopped, timeout=120) + + # See that the stray counter on rank 0 has incremented + self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) + + # Wait til the purge counter on rank 0 increments + self.wait_until_equal( + lambda: self.get_mdc_stat("strays_purged", rank_0_id), + 1, timeout=60, reject_fn=lambda x: x > 1) + + # See that the file objects no longer exist + self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) + + def assert_backtrace(self, ino, expected_path): + """ + Assert that the backtrace in the data pool for an inode matches + an expected /foo/bar path. + """ + expected_elements = expected_path.strip("/").split("/") + bt = self.fs.read_backtrace(ino) + actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) + self.assertListEqual(expected_elements, actual_elements) + + def get_backtrace_path(self, ino): + bt = self.fs.read_backtrace(ino) + elements = reversed([dn['dname'] for dn in bt['ancestors']]) + return "/".join(elements) + + def assert_purge_idle(self): + """ + Assert that the MDS perf counters indicate no strays exist and + no ongoing purge activity. Sanity check for when PurgeQueue should + be idle. + """ + stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] + self.assertEqual(stats["num_strays"], 0) + self.assertEqual(stats["num_strays_purging"], 0) + self.assertEqual(stats["num_strays_delayed"], 0) + self.assertEqual(stats["num_purge_ops"], 0) + + def test_mv_cleanup(self): + """ + That when doing a rename from A to B, and B has no hardlinks, + then we make a stray for B and purge him. + """ + # Create file_a and file_b, write some to both + size_mb = 8 + self.mount_a.write_n_mb("file_a", size_mb) + file_a_ino = self.mount_a.path_to_ino("file_a") + self.mount_a.write_n_mb("file_b", size_mb) + file_b_ino = self.mount_a.path_to_ino("file_b") + + self.fs.mds_asok(['flush', 'journal']) + self.assert_backtrace(file_a_ino, "file_a") + self.assert_backtrace(file_b_ino, "file_b") + + # mv file_a file_b + self.mount_a.run_shell(['mv', 'file_a', 'file_b']) + + # See that stray counter increments + self.assertEqual(self.get_mdc_stat("strays_created"), 1) + # Wait for purge counter to increment + self.wait_until_equal( + lambda: self.get_mdc_stat("strays_purged"), + expect_val=1, timeout=60, reject_fn=lambda x: x > 1 + ) + self.assert_purge_idle() + + # file_b should have been purged + self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) + + # Backtrace should have updated from file_a to file_b + self.fs.mds_asok(['flush', 'journal']) + self.assert_backtrace(file_a_ino, "file_b") + + # file_a's data should still exist + self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) -- 2.39.5