]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/cephfs: add TestStrays.test_purge_on_shutdown
authorJohn Spray <john.spray@redhat.com>
Sat, 24 Dec 2016 20:11:40 +0000 (20:11 +0000)
committerJohn Spray <john.spray@redhat.com>
Wed, 8 Mar 2017 10:26:55 +0000 (10:26 +0000)
...and change test_migration_on_shutdown to
specifically target non-purgeable strays (i.e.
hardlink-ish things).

Signed-off-by: John Spray <john.spray@redhat.com>
qa/tasks/cephfs/test_strays.py

index d885b81ab849f5fbf94baf60b8a9f1dbf289ae88..24d8c9715e46af1e5bf437e8960afc2643f23463 100644 (file)
@@ -4,7 +4,7 @@ import logging
 from textwrap import dedent
 import datetime
 import gevent
-from teuthology.orchestra.run import CommandFailedError
+from teuthology.orchestra.run import CommandFailedError, Raw
 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
 
 log = logging.getLogger(__name__)
@@ -268,9 +268,10 @@ class TestStrays(CephFSTestCase):
         return self.fs.mds_asok(['perf', 'dump', subsys, name],
                                 mds_id=mds_id)[subsys][name]
 
-    def _wait_for_counter(self, subsys, counter, expect_val, timeout=60):
+    def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
+                          mds_id=None):
         self.wait_until_equal(
-            lambda: self.get_stat(subsys, counter),
+            lambda: self.get_stat(subsys, counter, mds_id),
             expect_val=expect_val, timeout=timeout,
             reject_fn=lambda x: x > expect_val
         )
@@ -453,12 +454,7 @@ class TestStrays(CephFSTestCase):
         self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
         self.assertEqual(self.get_mdc_stat("num_strays"), 0)
 
-    def test_migration_on_shutdown(self):
-        """
-        That when an MDS rank is shut down, any not-yet-purging strays
-        are migrated to another MDS's stray dir.
-        """
-
+    def _setup_two_ranks(self):
         # Set up two MDSs
         self.fs.set_allow_multimds(True)
         self.fs.set_max_mds(2)
@@ -479,21 +475,17 @@ class TestStrays(CephFSTestCase):
             self.mds_cluster.mds_stop(unneeded_mds)
             self.mds_cluster.mds_fail(unneeded_mds)
 
-        # Set the purge file throttle to 0 on MDS rank 1
-        self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
-        self.fs.mds_fail_restart(rank_1_id)
-        self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
-                              reject_fn=lambda v: v > 2 or v < 1)
+        return rank_0_id, rank_1_id
 
-        # Create a file
-        # Export dir on an empty dir doesn't work, so we create the file before
-        # calling export dir in order to kick a dirfrag into existence
-        size_mb = 8
-        self.mount_a.run_shell(["mkdir", "ALPHA"])
-        self.mount_a.write_n_mb("ALPHA/alpha_file", size_mb)
-        ino = self.mount_a.path_to_ino("ALPHA/alpha_file")
-
-        result = self.fs.mds_asok(["export", "dir", "/ALPHA", "1"], rank_0_id)
+    def _force_migrate(self, from_id, to_id, path, watch_ino):
+        """
+        :param from_id: MDS id currently containing metadata
+        :param to_id: MDS id to move it to
+        :param path: Filesystem path (string) to move
+        :param watch_ino: Inode number to look for at destination to confirm move
+        :return: None
+        """
+        result = self.fs.mds_asok(["export", "dir", path, "1"], from_id)
         self.assertEqual(result["return_code"], 0)
 
         # Poll the MDS cache dump to watch for the export completing
@@ -501,9 +493,9 @@ class TestStrays(CephFSTestCase):
         migrate_timeout = 60
         migrate_elapsed = 0
         while not migrated:
-            data = self.fs.mds_asok(["dump", "cache"], rank_1_id)
+            data = self.fs.mds_asok(["dump", "cache"], to_id)
             for inode_data in data:
-                if inode_data['ino'] == ino:
+                if inode_data['ino'] == watch_ino:
                     log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
                     if inode_data['is_auth'] is True:
                         migrated = True
@@ -516,42 +508,96 @@ class TestStrays(CephFSTestCase):
                     migrate_elapsed += 1
                     time.sleep(1)
 
-        # Delete the file on rank 1
-        self.mount_a.run_shell(["rm", "-f", "ALPHA/alpha_file"])
+    def _is_stopped(self, rank):
+        mds_map = self.fs.get_mds_map()
+        return rank not in [i['rank'] for i in mds_map['info'].values()]
 
-        # See the stray counter increment, but the purge counter doesn't
-        # See that the file objects are still on disk
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("num_strays", rank_1_id),
-            expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
-        self.assertEqual(self.get_mdc_stat("strays_created", rank_1_id), 1)
-        time.sleep(60)  # period that we want to see if it gets purged
-        self.assertEqual(self.get_mdc_stat("strays_purged", rank_1_id), 0)
-        self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
+    def test_purge_on_shutdown(self):
+        """
+        That when an MDS rank is shut down, its purge queue is
+        drained in the process.
+        """
+        rank_0_id, rank_1_id = self._setup_two_ranks()
+
+        self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
+        self.mds_cluster.mds_fail_restart(rank_1_id)
+        self.fs.wait_for_daemons()
+
+        file_count = 5
+
+        self.mount_a.create_n_files("delete_me/file", file_count)
+
+        self._force_migrate(rank_0_id, rank_1_id, "/delete_me",
+                            self.mount_a.path_to_ino("delete_me/file_0"))
+
+        self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
+        self.mount_a.umount_wait()
+
+        # See all the strays go into purge queue
+        self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
+        self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
+        self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
+
+        # See nothing get purged from the purge queue (yet)
+        time.sleep(10)
+        self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
 
         # Shut down rank 1
         self.fs.set_max_mds(1)
         self.fs.deactivate(1)
 
-        # Wait til we get to a single active MDS mdsmap state
-        def is_stopped():
-            mds_map = self.fs.get_mds_map()
-            return 1 not in [i['rank'] for i in mds_map['info'].values()]
+        # It shouldn't proceed past stopping because its still not allowed
+        # to purge
+        time.sleep(10)
+        self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
+        self.assertFalse(self._is_stopped(1))
 
-        self.wait_until_true(is_stopped, timeout=120)
+        # Permit the daemon to start purging again
+        self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
+                                            'injectargs',
+                                            "--mds_max_purge_files 100")
 
-        # See that the stray counter on rank 0 has incremented
-        self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
+        # It should now proceed through shutdown
+        self.wait_until_true(
+            lambda: self._is_stopped(1),
+            timeout=60
+        )
 
-        # Wait til the purge counter on rank 0 increments
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged", rank_0_id),
-            1, timeout=60, reject_fn=lambda x: x > 1)
+        # ...and in the process purge all that data
+        self.await_data_pool_empty()
 
-        # See that the file objects no longer exist
-        self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
+    def test_migration_on_shutdown(self):
+        """
+        That when an MDS rank is shut down, any non-purgeable strays
+        get migrated to another rank.
+        """
 
-        self.await_data_pool_empty()
+        rank_0_id, rank_1_id = self._setup_two_ranks()
+
+        # Create a non-purgeable stray in a ~mds1 stray directory
+        # by doing a hard link and deleting the original file
+        self.mount_a.run_shell(["mkdir", "mydir"])
+        self.mount_a.run_shell(["touch", "mydir/original"])
+        self.mount_a.run_shell(["ln", "mydir/original", "mydir/linkto"])
+
+        self._force_migrate(rank_0_id, rank_1_id, "/mydir",
+                            self.mount_a.path_to_ino("mydir/original"))
+
+        self.mount_a.run_shell(["rm", "-f", "mydir/original"])
+        self.mount_a.umount_wait()
+
+        self._wait_for_counter("mds_cache", "strays_created", 1,
+                               mds_id=rank_1_id)
+
+        # Shut down rank 1
+        self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
+        self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
+
+        # Wait til we get to a single active MDS mdsmap state
+        self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
+
+        # See that the stray counter on rank 0 has incremented
+        self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
 
     def assert_backtrace(self, ino, expected_path):
         """