From f826c7e8aaf5e0f73158e517a4f4db1eec5b8d0f Mon Sep 17 00:00:00 2001 From: John Spray Date: Sat, 24 Dec 2016 20:11:40 +0000 Subject: [PATCH] qa/cephfs: add TestStrays.test_purge_on_shutdown ...and change test_migration_on_shutdown to specifically target non-purgeable strays (i.e. hardlink-ish things). Signed-off-by: John Spray --- qa/tasks/cephfs/test_strays.py | 146 ++++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 50 deletions(-) diff --git a/qa/tasks/cephfs/test_strays.py b/qa/tasks/cephfs/test_strays.py index d885b81ab849f..24d8c9715e46a 100644 --- a/qa/tasks/cephfs/test_strays.py +++ b/qa/tasks/cephfs/test_strays.py @@ -4,7 +4,7 @@ import logging from textwrap import dedent import datetime import gevent -from teuthology.orchestra.run import CommandFailedError +from teuthology.orchestra.run import CommandFailedError, Raw from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology log = logging.getLogger(__name__) @@ -268,9 +268,10 @@ class TestStrays(CephFSTestCase): return self.fs.mds_asok(['perf', 'dump', subsys, name], mds_id=mds_id)[subsys][name] - def _wait_for_counter(self, subsys, counter, expect_val, timeout=60): + def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, + mds_id=None): self.wait_until_equal( - lambda: self.get_stat(subsys, counter), + lambda: self.get_stat(subsys, counter, mds_id), expect_val=expect_val, timeout=timeout, reject_fn=lambda x: x > expect_val ) @@ -453,12 +454,7 @@ class TestStrays(CephFSTestCase): self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") self.assertEqual(self.get_mdc_stat("num_strays"), 0) - def test_migration_on_shutdown(self): - """ - That when an MDS rank is shut down, any not-yet-purging strays - are migrated to another MDS's stray dir. - """ - + def _setup_two_ranks(self): # Set up two MDSs self.fs.set_allow_multimds(True) self.fs.set_max_mds(2) @@ -479,21 +475,17 @@ class TestStrays(CephFSTestCase): self.mds_cluster.mds_stop(unneeded_mds) self.mds_cluster.mds_fail(unneeded_mds) - # Set the purge file throttle to 0 on MDS rank 1 - self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") - self.fs.mds_fail_restart(rank_1_id) - self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, - reject_fn=lambda v: v > 2 or v < 1) + return rank_0_id, rank_1_id - # Create a file - # Export dir on an empty dir doesn't work, so we create the file before - # calling export dir in order to kick a dirfrag into existence - size_mb = 8 - self.mount_a.run_shell(["mkdir", "ALPHA"]) - self.mount_a.write_n_mb("ALPHA/alpha_file", size_mb) - ino = self.mount_a.path_to_ino("ALPHA/alpha_file") - - result = self.fs.mds_asok(["export", "dir", "/ALPHA", "1"], rank_0_id) + def _force_migrate(self, from_id, to_id, path, watch_ino): + """ + :param from_id: MDS id currently containing metadata + :param to_id: MDS id to move it to + :param path: Filesystem path (string) to move + :param watch_ino: Inode number to look for at destination to confirm move + :return: None + """ + result = self.fs.mds_asok(["export", "dir", path, "1"], from_id) self.assertEqual(result["return_code"], 0) # Poll the MDS cache dump to watch for the export completing @@ -501,9 +493,9 @@ class TestStrays(CephFSTestCase): migrate_timeout = 60 migrate_elapsed = 0 while not migrated: - data = self.fs.mds_asok(["dump", "cache"], rank_1_id) + data = self.fs.mds_asok(["dump", "cache"], to_id) for inode_data in data: - if inode_data['ino'] == ino: + if inode_data['ino'] == watch_ino: log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2))) if inode_data['is_auth'] is True: migrated = True @@ -516,42 +508,96 @@ class TestStrays(CephFSTestCase): migrate_elapsed += 1 time.sleep(1) - # Delete the file on rank 1 - self.mount_a.run_shell(["rm", "-f", "ALPHA/alpha_file"]) + def _is_stopped(self, rank): + mds_map = self.fs.get_mds_map() + return rank not in [i['rank'] for i in mds_map['info'].values()] - # See the stray counter increment, but the purge counter doesn't - # See that the file objects are still on disk - self.wait_until_equal( - lambda: self.get_mdc_stat("num_strays", rank_1_id), - expect_val=1, timeout=60, reject_fn=lambda x: x > 1) - self.assertEqual(self.get_mdc_stat("strays_created", rank_1_id), 1) - time.sleep(60) # period that we want to see if it gets purged - self.assertEqual(self.get_mdc_stat("strays_purged", rank_1_id), 0) - self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) + def test_purge_on_shutdown(self): + """ + That when an MDS rank is shut down, its purge queue is + drained in the process. + """ + rank_0_id, rank_1_id = self._setup_two_ranks() + + self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") + self.mds_cluster.mds_fail_restart(rank_1_id) + self.fs.wait_for_daemons() + + file_count = 5 + + self.mount_a.create_n_files("delete_me/file", file_count) + + self._force_migrate(rank_0_id, rank_1_id, "/delete_me", + self.mount_a.path_to_ino("delete_me/file_0")) + + self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) + self.mount_a.umount_wait() + + # See all the strays go into purge queue + self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) + self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) + self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) + + # See nothing get purged from the purge queue (yet) + time.sleep(10) + self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) # Shut down rank 1 self.fs.set_max_mds(1) self.fs.deactivate(1) - # Wait til we get to a single active MDS mdsmap state - def is_stopped(): - mds_map = self.fs.get_mds_map() - return 1 not in [i['rank'] for i in mds_map['info'].values()] + # It shouldn't proceed past stopping because its still not allowed + # to purge + time.sleep(10) + self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) + self.assertFalse(self._is_stopped(1)) - self.wait_until_true(is_stopped, timeout=120) + # Permit the daemon to start purging again + self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), + 'injectargs', + "--mds_max_purge_files 100") - # See that the stray counter on rank 0 has incremented - self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) + # It should now proceed through shutdown + self.wait_until_true( + lambda: self._is_stopped(1), + timeout=60 + ) - # Wait til the purge counter on rank 0 increments - self.wait_until_equal( - lambda: self.get_mdc_stat("strays_purged", rank_0_id), - 1, timeout=60, reject_fn=lambda x: x > 1) + # ...and in the process purge all that data + self.await_data_pool_empty() - # See that the file objects no longer exist - self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) + def test_migration_on_shutdown(self): + """ + That when an MDS rank is shut down, any non-purgeable strays + get migrated to another rank. + """ - self.await_data_pool_empty() + rank_0_id, rank_1_id = self._setup_two_ranks() + + # Create a non-purgeable stray in a ~mds1 stray directory + # by doing a hard link and deleting the original file + self.mount_a.run_shell(["mkdir", "mydir"]) + self.mount_a.run_shell(["touch", "mydir/original"]) + self.mount_a.run_shell(["ln", "mydir/original", "mydir/linkto"]) + + self._force_migrate(rank_0_id, rank_1_id, "/mydir", + self.mount_a.path_to_ino("mydir/original")) + + self.mount_a.run_shell(["rm", "-f", "mydir/original"]) + self.mount_a.umount_wait() + + self._wait_for_counter("mds_cache", "strays_created", 1, + mds_id=rank_1_id) + + # Shut down rank 1 + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1") + self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1") + + # Wait til we get to a single active MDS mdsmap state + self.wait_until_true(lambda: self._is_stopped(1), timeout=120) + + # See that the stray counter on rank 0 has incremented + self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) def assert_backtrace(self, ino, expected_path): """ -- 2.39.5