from textwrap import dedent
import datetime
import gevent
-from teuthology.orchestra.run import CommandFailedError
+from teuthology.orchestra.run import CommandFailedError, Raw
from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
log = logging.getLogger(__name__)
return self.fs.mds_asok(['perf', 'dump', subsys, name],
mds_id=mds_id)[subsys][name]
- def _wait_for_counter(self, subsys, counter, expect_val, timeout=60):
+ def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
+ mds_id=None):
self.wait_until_equal(
- lambda: self.get_stat(subsys, counter),
+ lambda: self.get_stat(subsys, counter, mds_id),
expect_val=expect_val, timeout=timeout,
reject_fn=lambda x: x > expect_val
)
self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
self.assertEqual(self.get_mdc_stat("num_strays"), 0)
- def test_migration_on_shutdown(self):
- """
- That when an MDS rank is shut down, any not-yet-purging strays
- are migrated to another MDS's stray dir.
- """
-
+ def _setup_two_ranks(self):
# Set up two MDSs
self.fs.set_allow_multimds(True)
self.fs.set_max_mds(2)
self.mds_cluster.mds_stop(unneeded_mds)
self.mds_cluster.mds_fail(unneeded_mds)
- # Set the purge file throttle to 0 on MDS rank 1
- self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
- self.fs.mds_fail_restart(rank_1_id)
- self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
- reject_fn=lambda v: v > 2 or v < 1)
+ return rank_0_id, rank_1_id
- # Create a file
- # Export dir on an empty dir doesn't work, so we create the file before
- # calling export dir in order to kick a dirfrag into existence
- size_mb = 8
- self.mount_a.run_shell(["mkdir", "ALPHA"])
- self.mount_a.write_n_mb("ALPHA/alpha_file", size_mb)
- ino = self.mount_a.path_to_ino("ALPHA/alpha_file")
-
- result = self.fs.mds_asok(["export", "dir", "/ALPHA", "1"], rank_0_id)
+ def _force_migrate(self, from_id, to_id, path, watch_ino):
+ """
+ :param from_id: MDS id currently containing metadata
+ :param to_id: MDS id to move it to
+ :param path: Filesystem path (string) to move
+ :param watch_ino: Inode number to look for at destination to confirm move
+ :return: None
+ """
+ result = self.fs.mds_asok(["export", "dir", path, "1"], from_id)
self.assertEqual(result["return_code"], 0)
# Poll the MDS cache dump to watch for the export completing
migrate_timeout = 60
migrate_elapsed = 0
while not migrated:
- data = self.fs.mds_asok(["dump", "cache"], rank_1_id)
+ data = self.fs.mds_asok(["dump", "cache"], to_id)
for inode_data in data:
- if inode_data['ino'] == ino:
+ if inode_data['ino'] == watch_ino:
log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
if inode_data['is_auth'] is True:
migrated = True
migrate_elapsed += 1
time.sleep(1)
- # Delete the file on rank 1
- self.mount_a.run_shell(["rm", "-f", "ALPHA/alpha_file"])
+ def _is_stopped(self, rank):
+ mds_map = self.fs.get_mds_map()
+ return rank not in [i['rank'] for i in mds_map['info'].values()]
- # See the stray counter increment, but the purge counter doesn't
- # See that the file objects are still on disk
- self.wait_until_equal(
- lambda: self.get_mdc_stat("num_strays", rank_1_id),
- expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
- self.assertEqual(self.get_mdc_stat("strays_created", rank_1_id), 1)
- time.sleep(60) # period that we want to see if it gets purged
- self.assertEqual(self.get_mdc_stat("strays_purged", rank_1_id), 0)
- self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
+ def test_purge_on_shutdown(self):
+ """
+ That when an MDS rank is shut down, its purge queue is
+ drained in the process.
+ """
+ rank_0_id, rank_1_id = self._setup_two_ranks()
+
+ self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
+ self.mds_cluster.mds_fail_restart(rank_1_id)
+ self.fs.wait_for_daemons()
+
+ file_count = 5
+
+ self.mount_a.create_n_files("delete_me/file", file_count)
+
+ self._force_migrate(rank_0_id, rank_1_id, "/delete_me",
+ self.mount_a.path_to_ino("delete_me/file_0"))
+
+ self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
+ self.mount_a.umount_wait()
+
+ # See all the strays go into purge queue
+ self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
+ self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
+ self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
+
+ # See nothing get purged from the purge queue (yet)
+ time.sleep(10)
+ self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
# Shut down rank 1
self.fs.set_max_mds(1)
self.fs.deactivate(1)
- # Wait til we get to a single active MDS mdsmap state
- def is_stopped():
- mds_map = self.fs.get_mds_map()
- return 1 not in [i['rank'] for i in mds_map['info'].values()]
+ # It shouldn't proceed past stopping because its still not allowed
+ # to purge
+ time.sleep(10)
+ self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
+ self.assertFalse(self._is_stopped(1))
- self.wait_until_true(is_stopped, timeout=120)
+ # Permit the daemon to start purging again
+ self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
+ 'injectargs',
+ "--mds_max_purge_files 100")
- # See that the stray counter on rank 0 has incremented
- self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
+ # It should now proceed through shutdown
+ self.wait_until_true(
+ lambda: self._is_stopped(1),
+ timeout=60
+ )
- # Wait til the purge counter on rank 0 increments
- self.wait_until_equal(
- lambda: self.get_mdc_stat("strays_purged", rank_0_id),
- 1, timeout=60, reject_fn=lambda x: x > 1)
+ # ...and in the process purge all that data
+ self.await_data_pool_empty()
- # See that the file objects no longer exist
- self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
+ def test_migration_on_shutdown(self):
+ """
+ That when an MDS rank is shut down, any non-purgeable strays
+ get migrated to another rank.
+ """
- self.await_data_pool_empty()
+ rank_0_id, rank_1_id = self._setup_two_ranks()
+
+ # Create a non-purgeable stray in a ~mds1 stray directory
+ # by doing a hard link and deleting the original file
+ self.mount_a.run_shell(["mkdir", "mydir"])
+ self.mount_a.run_shell(["touch", "mydir/original"])
+ self.mount_a.run_shell(["ln", "mydir/original", "mydir/linkto"])
+
+ self._force_migrate(rank_0_id, rank_1_id, "/mydir",
+ self.mount_a.path_to_ino("mydir/original"))
+
+ self.mount_a.run_shell(["rm", "-f", "mydir/original"])
+ self.mount_a.umount_wait()
+
+ self._wait_for_counter("mds_cache", "strays_created", 1,
+ mds_id=rank_1_id)
+
+ # Shut down rank 1
+ self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
+ self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
+
+ # Wait til we get to a single active MDS mdsmap state
+ self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
+
+ # See that the stray counter on rank 0 has incremented
+ self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
def assert_backtrace(self, ino, expected_path):
"""