import time
import logging
from textwrap import dedent
+import datetime
import gevent
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
))
self.mount_a.run_python(create_script)
+
+ # That the dirfrag object is created
+ self.fs.mds_asok(["flush", "journal"])
+ dir_ino = self.mount_a.path_to_ino("delete_me")
+ self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
+
+ # Remove everything
self.mount_a.run_shell(["rm", "-rf", "delete_me"])
self.fs.mds_asok(["flush", "journal"])
+
+ # That all the removed files get created as strays
strays = self.get_mdc_stat("strays_created")
self.assertEqual(strays, file_count + 1)
+
+ # That the strays all get enqueued for purge
self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged"),
+ lambda: self.get_mdc_stat("strays_enqueued"),
strays,
timeout=600
)
+ # That all the purge operations execute
+ self.wait_until_equal(
+ lambda: self.get_stat("purge_queue", "pq_executed"),
+ strays,
+ timeout=600
+ )
+
+ # That finally, the directory metadata object is gone
+ self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
+
+ # That finally, the data objects are all gone
+ self.await_data_pool_empty()
+
def _test_throttling(self, throttle_type):
+ self.data_log = []
+ try:
+ return self._do_test_throttling(throttle_type)
+ except:
+ for l in self.data_log:
+ log.info(",".join([l_.__str__() for l_ in l]))
+ raise
+
+ def _do_test_throttling(self, throttle_type):
"""
That the mds_max_purge_ops setting is respected
"""
elapsed = 0
files_high_water = 0
ops_high_water = 0
+
while True:
- mdc_stats = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']
+ stats = self.fs.mds_asok(['perf', 'dump'])
+ mdc_stats = stats['mds_cache']
+ pq_stats = stats['purge_queue']
if elapsed >= purge_timeout:
raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
num_strays = mdc_stats['num_strays']
- num_strays_purging = mdc_stats['num_strays_purging']
- num_purge_ops = mdc_stats['num_purge_ops']
+ num_strays_purging = pq_stats['pq_executing']
+ num_purge_ops = pq_stats['pq_executing_ops']
+
+ self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
files_high_water = max(files_high_water, num_strays_purging)
ops_high_water = max(ops_high_water, num_purge_ops)
total_strays_created = mdc_stats['strays_created']
- total_strays_purged = mdc_stats['strays_purged']
+ total_strays_purged = pq_stats['pq_executed']
if total_strays_purged == total_inodes:
log.info("Complete purge in {0} seconds".format(elapsed))
raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
else:
if throttle_type == self.OPS_THROTTLE:
- if num_strays_purging > mds_max_purge_files:
+ # 11 is filer_max_purge_ops plus one for the backtrace:
+ # limit is allowed to be overshot by this much.
+ if num_purge_ops > mds_max_purge_ops + 11:
raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
num_purge_ops, mds_max_purge_ops
))
))
# Sanity check all MDC stray stats
- mdc_stats = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']
+ stats = self.fs.mds_asok(['perf', 'dump'])
+ mdc_stats = stats['mds_cache']
+ pq_stats = stats['purge_queue']
self.assertEqual(mdc_stats['num_strays'], 0)
- self.assertEqual(mdc_stats['num_strays_purging'], 0)
self.assertEqual(mdc_stats['num_strays_delayed'], 0)
- self.assertEqual(mdc_stats['num_purge_ops'], 0)
+ self.assertEqual(pq_stats['pq_executing'], 0)
+ self.assertEqual(pq_stats['pq_executing_ops'], 0)
self.assertEqual(mdc_stats['strays_created'], total_inodes)
- self.assertEqual(mdc_stats['strays_purged'], total_inodes)
+ self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
+ self.assertEqual(pq_stats['pq_executed'], total_inodes)
def get_mdc_stat(self, name, mds_id=None):
- return self.fs.mds_asok(['perf', 'dump', "mds_cache", name],
- mds_id=mds_id)['mds_cache'][name]
+ return self.get_stat("mds_cache", name, mds_id)
+
+ def get_stat(self, subsys, name, mds_id=None):
+ return self.fs.mds_asok(['perf', 'dump', subsys, name],
+ mds_id=mds_id)[subsys][name]
+
+ def _wait_for_counter(self, subsys, counter, expect_val, timeout=60):
+ self.wait_until_equal(
+ lambda: self.get_stat(subsys, counter),
+ expect_val=expect_val, timeout=timeout,
+ reject_fn=lambda x: x > expect_val
+ )
def test_open_inode(self):
"""
lambda: self.get_mdc_stat("num_strays"),
expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
- # See that while the stray count has incremented, the purge count
- # has not
+ # See that while the stray count has incremented, none have passed
+ # on to the purge queue
self.assertEqual(self.get_mdc_stat("strays_created"), 1)
- self.assertEqual(self.get_mdc_stat("strays_purged"), 0)
+ self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
# See that the client still holds 2 caps
self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
)
# Wait to see the purge counter increment, stray count go to zero
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged"),
- expect_val=1, timeout=60, reject_fn=lambda x: x > 1
- )
+ self._wait_for_counter("mds_cache", "strays_enqueued", 1)
self.wait_until_equal(
lambda: self.get_mdc_stat("num_strays"),
expect_val=0, timeout=6, reject_fn=lambda x: x > 1
)
+ self._wait_for_counter("purge_queue", "pq_executed", 1)
# See that the data objects no longer exist
self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
# stray did not advance to purging given time)
time.sleep(30)
self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
- self.assertEqual(self.get_mdc_stat("strays_purged"), 0)
+ self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
# See that before reintegration, the inode's backtrace points to a stray dir
self.fs.mds_asok(['flush', 'journal'])
# See the reintegration counter increment
# This should happen as a result of the eval_remote call on
# responding to a client request.
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_reintegrated"),
- expect_val=1, timeout=60, reject_fn=lambda x: x > 1
- )
+ self._wait_for_counter("mds_cache", "strays_reintegrated", 1)
# Flush the journal
self.fs.mds_asok(['flush', 'journal'])
# Now really delete it
self.mount_a.run_shell(["rm", "-f", "file_c"])
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged"),
- expect_val=1, timeout=60, reject_fn=lambda x: x > 1
- )
+ self._wait_for_counter("mds_cache", "strays_enqueued", 1)
+ self._wait_for_counter("purge_queue", "pq_executed", 1)
+
self.assert_purge_idle()
self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
# One time we reintegrated it
self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 1)
# Then the second time we purged it
- self.assertEqual(self.get_mdc_stat("strays_purged"), 1)
+ self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
def test_mv_hardlink_cleanup(self):
"""
# Trigger reintegration and wait for it to happen
self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 0)
self.mount_a.run_shell(["mv", "linkto_b", "file_c"])
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_reintegrated"),
- expect_val=1, timeout=60, reject_fn=lambda x: x > 1
- )
+ self._wait_for_counter("mds_cache", "strays_reintegrated", 1)
self.fs.mds_asok(['flush', 'journal'])
no ongoing purge activity. Sanity check for when PurgeQueue should
be idle.
"""
- stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
- self.assertEqual(stats["num_strays"], 0)
- self.assertEqual(stats["num_strays_purging"], 0)
- self.assertEqual(stats["num_strays_delayed"], 0)
- self.assertEqual(stats["num_purge_ops"], 0)
+ mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
+ pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
+ self.assertEqual(mdc_stats["num_strays"], 0)
+ self.assertEqual(mdc_stats["num_strays_delayed"], 0)
+ self.assertEqual(pq_stats["pq_executing"], 0)
+ self.assertEqual(pq_stats["pq_executing_ops"], 0)
def test_mv_cleanup(self):
"""
# See that stray counter increments
self.assertEqual(self.get_mdc_stat("strays_created"), 1)
# Wait for purge counter to increment
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged"),
- expect_val=1, timeout=60, reject_fn=lambda x: x > 1
- )
+ self._wait_for_counter("mds_cache", "strays_enqueued", 1)
+ self._wait_for_counter("purge_queue", "pq_executed", 1)
+
self.assert_purge_idle()
# file_b should have been purged
self.fs.mds_asok(["flush", "journal"])
# See that a purge happens now
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged"),
- expect_val=2, timeout=60, reject_fn=lambda x: x > 1
- )
+ self._wait_for_counter("mds_cache", "strays_enqueued", 2)
+ self._wait_for_counter("purge_queue", "pq_executed", 2)
self.assertTrue(self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024))
self.await_data_pool_empty()
strays_after = self.get_mdc_stat("strays_created")
self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged"),
- strays_after,
- timeout=600
- )
+ self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
+ self._wait_for_counter("purge_queue", "pq_executed", strays_after)
self.mount_a.run_python(dedent("""
import os