self.mds_cluster.mds_fail()
self.fs.rados(["rm", "500.00000000"])
self.mds_cluster.mds_restart()
- self.fs.wait_for_daemons()
\ No newline at end of file
+ self.fs.wait_for_daemons()
+
+ def test_purge_queue_op_rate(self):
+ """
+ A busy purge queue is meant to aggregate operations sufficiently
+ that our RADOS ops to the metadata pool are not O(files). Check
+ that that is so.
+ :return:
+ """
+
+ # For low rates of deletion, the rate of metadata ops actually
+ # will be o(files), so to see the desired behaviour we have to give
+ # the system a significant quantity, i.e. an order of magnitude
+ # more than the number of files it will purge at one time.
+
+ max_purge_files = 2
+
+ self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files)
+ self.fs.mds_fail_restart()
+ self.fs.wait_for_daemons()
+
+ phase_1_files = 256
+ phase_2_files = 512
+
+ self.mount_a.run_shell(["mkdir", "phase1"])
+ self.mount_a.create_n_files("phase1/file", phase_1_files)
+
+ self.mount_a.run_shell(["mkdir", "phase2"])
+ self.mount_a.create_n_files("phase2/file", phase_2_files)
+
+ def unlink_and_count_ops(path, expected_deletions):
+ initial_ops = self.get_stat("objecter", "op")
+ initial_pq_executed = self.get_stat("purge_queue", "pq_executed")
+
+ self.mount_a.run_shell(["rm", "-rf", path])
+
+ self._wait_for_counter(
+ "purge_queue", "pq_executed", initial_pq_executed + expected_deletions
+ )
+
+ final_ops = self.get_stat("objecter", "op")
+
+ # Calculation of the *overhead* operations, i.e. do not include
+ # the operations where we actually delete files.
+ return final_ops - initial_ops - expected_deletions
+
+ self.fs.mds_asok(['flush', 'journal'])
+ phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1)
+
+ self.fs.mds_asok(['flush', 'journal'])
+ phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1)
+
+ log.info("Phase 1: {0}".format(phase1_ops))
+ log.info("Phase 2: {0}".format(phase2_ops))
+
+ # The success criterion is that deleting double the number
+ # of files doesn't generate double the number of overhead ops
+ # -- this comparison is a rough approximation of that rule.
+ self.assertTrue(phase2_ops < phase1_ops * 1.25)
+
+ # Finally, check that our activity did include properly quiescing
+ # the queue (i.e. call to Journaler::write_head in the right place),
+ # by restarting the MDS and checking that it doesn't try re-executing
+ # any of the work we did.
+ self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays
+ # hanging around
+ self.fs.mds_fail_restart()
+ self.fs.wait_for_daemons()
+ time.sleep(10)
+ self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0)