]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa: update test_strays for purgequeue
authorJohn Spray <john.spray@redhat.com>
Fri, 23 Dec 2016 18:59:39 +0000 (18:59 +0000)
committerJohn Spray <john.spray@redhat.com>
Wed, 8 Mar 2017 10:20:59 +0000 (10:20 +0000)
Signed-off-by: John Spray <john.spray@redhat.com>
qa/tasks/cephfs/filesystem.py
qa/tasks/cephfs/test_strays.py

index 95c5c41bb2da4d707694ce94678e937fc0a66ad6..9869bae244b90de7c46218b49ab4c66994832512 100644 (file)
@@ -973,6 +973,14 @@ class Filesystem(MDSCluster):
             log.info("All objects for ino {0} size {1} are absent".format(ino, size))
             return True
 
+    def dirfrag_exists(self, ino, frag):
+        try:
+            self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
+        except CommandFailedError as e:
+            return False
+        else:
+            return True
+
     def rados(self, args, pool=None, namespace=None, stdin_data=None):
         """
         Call into the `rados` CLI from an MDS
index b94d859ff9473350fa43e0638929691536bcf6fc..d885b81ab849f5fbf94baf60b8a9f1dbf289ae88 100644 (file)
@@ -2,6 +2,7 @@ import json
 import time
 import logging
 from textwrap import dedent
+import datetime
 import gevent
 from teuthology.orchestra.run import CommandFailedError
 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
@@ -54,18 +55,51 @@ class TestStrays(CephFSTestCase):
         ))
 
         self.mount_a.run_python(create_script)
+
+        # That the dirfrag object is created
+        self.fs.mds_asok(["flush", "journal"])
+        dir_ino = self.mount_a.path_to_ino("delete_me")
+        self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
+
+        # Remove everything
         self.mount_a.run_shell(["rm", "-rf", "delete_me"])
         self.fs.mds_asok(["flush", "journal"])
+
+        # That all the removed files get created as strays
         strays = self.get_mdc_stat("strays_created")
         self.assertEqual(strays, file_count + 1)
+
+        # That the strays all get enqueued for purge
         self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged"),
+            lambda: self.get_mdc_stat("strays_enqueued"),
             strays,
             timeout=600
 
         )
 
+        # That all the purge operations execute
+        self.wait_until_equal(
+            lambda: self.get_stat("purge_queue", "pq_executed"),
+            strays,
+            timeout=600
+        )
+
+        # That finally, the directory metadata object is gone
+        self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
+
+        # That finally, the data objects are all gone
+        self.await_data_pool_empty()
+
     def _test_throttling(self, throttle_type):
+        self.data_log = []
+        try:
+            return self._do_test_throttling(throttle_type)
+        except:
+            for l in self.data_log:
+                log.info(",".join([l_.__str__() for l_ in l]))
+            raise
+
+    def _do_test_throttling(self, throttle_type):
         """
         That the mds_max_purge_ops setting is respected
         """
@@ -150,20 +184,25 @@ class TestStrays(CephFSTestCase):
         elapsed = 0
         files_high_water = 0
         ops_high_water = 0
+
         while True:
-            mdc_stats = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']
+            stats = self.fs.mds_asok(['perf', 'dump'])
+            mdc_stats = stats['mds_cache']
+            pq_stats = stats['purge_queue']
             if elapsed >= purge_timeout:
                 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
 
             num_strays = mdc_stats['num_strays']
-            num_strays_purging = mdc_stats['num_strays_purging']
-            num_purge_ops = mdc_stats['num_purge_ops']
+            num_strays_purging = pq_stats['pq_executing']
+            num_purge_ops = pq_stats['pq_executing_ops']
+
+            self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
 
             files_high_water = max(files_high_water, num_strays_purging)
             ops_high_water = max(ops_high_water, num_purge_ops)
 
             total_strays_created = mdc_stats['strays_created']
-            total_strays_purged = mdc_stats['strays_purged']
+            total_strays_purged = pq_stats['pq_executed']
 
             if total_strays_purged == total_inodes:
                 log.info("Complete purge in {0} seconds".format(elapsed))
@@ -172,7 +211,9 @@ class TestStrays(CephFSTestCase):
                 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
             else:
                 if throttle_type == self.OPS_THROTTLE:
-                    if num_strays_purging > mds_max_purge_files:
+                    # 11 is filer_max_purge_ops plus one for the backtrace:
+                    # limit is allowed to be overshot by this much.
+                    if num_purge_ops > mds_max_purge_ops + 11:
                         raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
                             num_purge_ops, mds_max_purge_ops
                         ))
@@ -209,17 +250,30 @@ class TestStrays(CephFSTestCase):
                 ))
 
         # Sanity check all MDC stray stats
-        mdc_stats = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']
+        stats = self.fs.mds_asok(['perf', 'dump'])
+        mdc_stats = stats['mds_cache']
+        pq_stats = stats['purge_queue']
         self.assertEqual(mdc_stats['num_strays'], 0)
-        self.assertEqual(mdc_stats['num_strays_purging'], 0)
         self.assertEqual(mdc_stats['num_strays_delayed'], 0)
-        self.assertEqual(mdc_stats['num_purge_ops'], 0)
+        self.assertEqual(pq_stats['pq_executing'], 0)
+        self.assertEqual(pq_stats['pq_executing_ops'], 0)
         self.assertEqual(mdc_stats['strays_created'], total_inodes)
-        self.assertEqual(mdc_stats['strays_purged'], total_inodes)
+        self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
+        self.assertEqual(pq_stats['pq_executed'], total_inodes)
 
     def get_mdc_stat(self, name, mds_id=None):
-        return self.fs.mds_asok(['perf', 'dump', "mds_cache", name],
-                                mds_id=mds_id)['mds_cache'][name]
+        return self.get_stat("mds_cache", name, mds_id)
+
+    def get_stat(self, subsys, name, mds_id=None):
+        return self.fs.mds_asok(['perf', 'dump', subsys, name],
+                                mds_id=mds_id)[subsys][name]
+
+    def _wait_for_counter(self, subsys, counter, expect_val, timeout=60):
+        self.wait_until_equal(
+            lambda: self.get_stat(subsys, counter),
+            expect_val=expect_val, timeout=timeout,
+            reject_fn=lambda x: x > expect_val
+        )
 
     def test_open_inode(self):
         """
@@ -252,10 +306,10 @@ class TestStrays(CephFSTestCase):
             lambda: self.get_mdc_stat("num_strays"),
             expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
 
-        # See that while the stray count has incremented, the purge count
-        # has not
+        # See that while the stray count has incremented, none have passed
+        # on to the purge queue
         self.assertEqual(self.get_mdc_stat("strays_created"), 1)
-        self.assertEqual(self.get_mdc_stat("strays_purged"), 0)
+        self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
 
         # See that the client still holds 2 caps
         self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
@@ -272,14 +326,12 @@ class TestStrays(CephFSTestCase):
             expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
         )
         # Wait to see the purge counter increment, stray count go to zero
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged"),
-            expect_val=1, timeout=60, reject_fn=lambda x: x > 1
-        )
+        self._wait_for_counter("mds_cache", "strays_enqueued", 1)
         self.wait_until_equal(
             lambda: self.get_mdc_stat("num_strays"),
             expect_val=0, timeout=6, reject_fn=lambda x: x > 1
         )
+        self._wait_for_counter("purge_queue", "pq_executed", 1)
 
         # See that the data objects no longer exist
         self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
@@ -319,7 +371,7 @@ class TestStrays(CephFSTestCase):
         # stray did not advance to purging given time)
         time.sleep(30)
         self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
-        self.assertEqual(self.get_mdc_stat("strays_purged"), 0)
+        self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
 
         # See that before reintegration, the inode's backtrace points to a stray dir
         self.fs.mds_asok(['flush', 'journal'])
@@ -332,10 +384,7 @@ class TestStrays(CephFSTestCase):
         # See the reintegration counter increment
         # This should happen as a result of the eval_remote call on
         # responding to a client request.
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_reintegrated"),
-            expect_val=1, timeout=60, reject_fn=lambda x: x > 1
-        )
+        self._wait_for_counter("mds_cache", "strays_reintegrated", 1)
 
         # Flush the journal
         self.fs.mds_asok(['flush', 'journal'])
@@ -349,10 +398,9 @@ class TestStrays(CephFSTestCase):
 
         # Now really delete it
         self.mount_a.run_shell(["rm", "-f", "file_c"])
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged"),
-            expect_val=1, timeout=60, reject_fn=lambda x: x > 1
-        )
+        self._wait_for_counter("mds_cache", "strays_enqueued", 1)
+        self._wait_for_counter("purge_queue", "pq_executed", 1)
+
         self.assert_purge_idle()
         self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
 
@@ -361,7 +409,7 @@ class TestStrays(CephFSTestCase):
         # One time we reintegrated it
         self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 1)
         # Then the second time we purged it
-        self.assertEqual(self.get_mdc_stat("strays_purged"), 1)
+        self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
 
     def test_mv_hardlink_cleanup(self):
         """
@@ -397,10 +445,7 @@ class TestStrays(CephFSTestCase):
         # Trigger reintegration and wait for it to happen
         self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 0)
         self.mount_a.run_shell(["mv", "linkto_b", "file_c"])
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_reintegrated"),
-            expect_val=1, timeout=60, reject_fn=lambda x: x > 1
-        )
+        self._wait_for_counter("mds_cache", "strays_reintegrated", 1)
 
         self.fs.mds_asok(['flush', 'journal'])
 
@@ -529,11 +574,12 @@ class TestStrays(CephFSTestCase):
         no ongoing purge activity.  Sanity check for when PurgeQueue should
         be idle.
         """
-        stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
-        self.assertEqual(stats["num_strays"], 0)
-        self.assertEqual(stats["num_strays_purging"], 0)
-        self.assertEqual(stats["num_strays_delayed"], 0)
-        self.assertEqual(stats["num_purge_ops"], 0)
+        mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
+        pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
+        self.assertEqual(mdc_stats["num_strays"], 0)
+        self.assertEqual(mdc_stats["num_strays_delayed"], 0)
+        self.assertEqual(pq_stats["pq_executing"], 0)
+        self.assertEqual(pq_stats["pq_executing_ops"], 0)
 
     def test_mv_cleanup(self):
         """
@@ -557,10 +603,9 @@ class TestStrays(CephFSTestCase):
         # See that stray counter increments
         self.assertEqual(self.get_mdc_stat("strays_created"), 1)
         # Wait for purge counter to increment
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged"),
-            expect_val=1, timeout=60, reject_fn=lambda x: x > 1
-        )
+        self._wait_for_counter("mds_cache", "strays_enqueued", 1)
+        self._wait_for_counter("purge_queue", "pq_executed", 1)
+
         self.assert_purge_idle()
 
         # file_b should have been purged
@@ -654,10 +699,8 @@ class TestStrays(CephFSTestCase):
         self.fs.mds_asok(["flush", "journal"])
 
         # See that a purge happens now
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged"),
-            expect_val=2, timeout=60, reject_fn=lambda x: x > 1
-        )
+        self._wait_for_counter("mds_cache", "strays_enqueued", 2)
+        self._wait_for_counter("purge_queue", "pq_executed", 2)
 
         self.assertTrue(self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024))
         self.await_data_pool_empty()
@@ -775,11 +818,8 @@ class TestStrays(CephFSTestCase):
         strays_after = self.get_mdc_stat("strays_created")
         self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
 
-        self.wait_until_equal(
-            lambda: self.get_mdc_stat("strays_purged"),
-            strays_after,
-            timeout=600
-        )
+        self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
+        self._wait_for_counter("purge_queue", "pq_executed", strays_after)
 
         self.mount_a.run_python(dedent("""
             import os