# Set max_mds to 2
self.fs.set_max_mds(2)
-
- # See that we have two active MDSs
- self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
- reject_fn=lambda v: v > 2 or v < 1)
- active_mds_names = self.fs.get_active_names()
+ status = self.fs.wait_for_daemons()
+ active_mds_names = self.fs.get_active_names(status=status)
# Switch off any unneeded MDS daemons
for unneeded_mds in set(self.mds_cluster.mds_ids) - set(active_mds_names):
self.mds_cluster.mds_fail(unneeded_mds)
# Create a dir on each rank
- self.mount_a.run_shell(["mkdir", "alpha"])
- self.mount_a.run_shell(["mkdir", "bravo"])
+ self.mount_a.run_shell_payload("mkdir {alpha,bravo} && touch {alpha,bravo}/file")
self.mount_a.setfattr("alpha/", "ceph.dir.pin", "0")
self.mount_a.setfattr("bravo/", "ceph.dir.pin", "1")
- def subtrees_assigned():
- got_subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=active_mds_names[0])
-
- for s in got_subtrees:
- if s['dir']['path'] == '/bravo':
- if s['auth_first'] == 1:
- return True
- else:
- # Should not happen
- raise RuntimeError("/bravo is subtree but not rank 1!")
-
- return False
-
# Ensure the pinning has taken effect and the /bravo dir is now
# migrated to rank 1.
- self.wait_until_true(subtrees_assigned, 30)
+ self._wait_subtrees([('/bravo', 1), ('/alpha', 0)], rank=0, status=status)
# Do some IO (this should be split across ranks according to
# the rank-pinned dirs)
self.skipTest("Requires FUSE client to use is_blacklisted()")
self.fs.set_max_mds(2)
- self.fs.wait_for_daemons()
- status = self.fs.status()
+ status = self.fs.wait_for_daemons()
- self.mount_a.run_shell(["mkdir", "d0", "d1"])
+ self.mount_a.run_shell_payload("mkdir {d0,d1} && touch {d0,d1}/file")
self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
self._wait_subtrees([('/d0', 0), ('/d1', 1)], status=status)
# setup subtrees
self.mount_a.run_shell(["mkdir", "-p", "d1/dir"])
self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
- self.wait_until_true(lambda: self._check_subtree(1, '/d1', status=status), timeout=30)
+ self._wait_subtrees([("/d1", 1)], rank=1, path="/d1")
last_created = self._get_last_created_snap(rank=0,status=status)
self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
self.mount_a.setfattr("d0/d1", "ceph.dir.pin", "1")
self.mount_a.setfattr("d0/d2", "ceph.dir.pin", "2")
- self.wait_until_true(lambda: self._check_subtree(2, '/d0/d2', status=status), timeout=30)
- self.wait_until_true(lambda: self._check_subtree(1, '/d0/d1', status=status), timeout=5)
- self.wait_until_true(lambda: self._check_subtree(0, '/d0', status=status), timeout=5)
+ self._wait_subtrees([("/d0", 0), ("/d0/d1", 1), ("/d0/d2", 2)], rank="all", status=status, path="/d0")
def _check_snapclient_cache(snaps_dump, cache_dump=None, rank=0):
if cache_dump is None:
self.fs.set_max_mds(2)
status = self.fs.wait_for_daemons()
- self.mount_a.run_shell(["mkdir", "-p", "d0/d1"])
+ self.mount_a.run_shell(["mkdir", "-p", "d0/d1/empty"])
self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
self.mount_a.setfattr("d0/d1", "ceph.dir.pin", "1")
- self.wait_until_true(lambda: self._check_subtree(1, '/d0/d1', status=status), timeout=30)
- self.wait_until_true(lambda: self._check_subtree(0, '/d0', status=status), timeout=5)
+ self._wait_subtrees([("/d0", 0), ("/d0/d1", 1)], rank="all", status=status, path="/d0")
self.mount_a.write_test_pattern("d0/d1/file_a", 8 * 1024 * 1024)
self.mount_a.run_shell(["mkdir", "d0/.snap/s1"])
self.fs.set_max_mds(2)
status = self.fs.wait_for_daemons()
- self.mount_a.run_shell(["mkdir", "d0", "d1"])
+ self.mount_a.run_shell_payload("mkdir -p {d0,d1}/empty")
self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
- self.wait_until_true(lambda: self._check_subtree(1, '/d1', status=status), timeout=30)
- self.wait_until_true(lambda: self._check_subtree(0, '/d0', status=status), timeout=5)
+ self._wait_subtrees([("/d0", 0), ("/d1", 1)], rank=0, status=status)
self.mount_a.run_shell(["mkdir", "d0/d3"])
self.mount_a.run_shell(["mkdir", "d0/.snap/s1"])
self.fs.set_max_mds(2)
status = self.fs.wait_for_daemons()
- self.mount_a.run_shell(["mkdir", "d0", "d1"])
+ self.mount_a.run_shell_payload("mkdir -p {d0,d1}/empty")
self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
- self.wait_until_true(lambda: self._check_subtree(1, '/d1', status=status), timeout=30)
- self.wait_until_true(lambda: self._check_subtree(0, '/d0', status=status), timeout=5)
+ self._wait_subtrees([("/d0", 0), ("/d1", 1)], rank=0, status=status)
self.mount_a.run_python(dedent("""
import os
return rank_0_id, rank_1_id
- def _force_migrate(self, to_id, path, watch_ino):
+ def _force_migrate(self, path, rank=1):
"""
:param to_id: MDS id to move it to
:param path: Filesystem path (string) to move
:param watch_ino: Inode number to look for at destination to confirm move
:return: None
"""
- self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
-
- # Poll the MDS cache dump to watch for the export completing
- migrated = False
- migrate_timeout = 60
- migrate_elapsed = 0
- while not migrated:
- data = self.fs.mds_asok(["dump", "cache"], to_id)
- for inode_data in data:
- if inode_data['ino'] == watch_ino:
- log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
- if inode_data['is_auth'] is True:
- migrated = True
- break
-
- if not migrated:
- if migrate_elapsed > migrate_timeout:
- raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
- else:
- migrate_elapsed += 1
- time.sleep(1)
+ self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", str(rank), path])
+ rpath = "/"+path
+ self._wait_subtrees([(rpath, rank)], rank=rank, path=rpath)
def _is_stopped(self, rank):
mds_map = self.fs.get_mds_map()
self.mount_a.create_n_files("delete_me/file", file_count)
- self._force_migrate(rank_1_id, "delete_me",
- self.mount_a.path_to_ino("delete_me/file_0"))
+ self._force_migrate("delete_me")
self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
self.mount_a.umount_wait()
# Create a non-purgeable stray in a ~mds1 stray directory
# by doing a hard link and deleting the original file
- self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
- self.mount_a.run_shell(["touch", "dir_1/original"])
- self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
+ self.mount_a.run_shell_payload("""
+mkdir dir_1 dir_2
+touch dir_1/original
+ln dir_1/original dir_2/linkto
+""")
- self._force_migrate(rank_1_id, "dir_1",
- self.mount_a.path_to_ino("dir_1/original"))
+ self._force_migrate("dir_1")
+ self._force_migrate("dir_2", rank=0)
# empty mds cache. otherwise mds reintegrates stray when unlink finishes
self.mount_a.umount_wait()
- self.fs.mds_asok(['flush', 'journal'], rank_0_id)
self.fs.mds_asok(['flush', 'journal'], rank_1_id)
- self.fs.mds_fail_restart()
- self.fs.wait_for_daemons()
-
- active_mds_names = self.fs.get_active_names()
- rank_0_id = active_mds_names[0]
- rank_1_id = active_mds_names[1]
+ self.fs.mds_asok(['cache', 'drop'], rank_1_id)
self.mount_a.mount_wait()
-
self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
self.mount_a.umount_wait()
# Shut down rank 1
self.fs.set_max_mds(1)
- self.fs.wait_for_daemons(timeout=120)
+ status = self.fs.wait_for_daemons(timeout=120)
# See that the stray counter on rank 0 has incremented
self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
self.mount_a.create_n_files("delete_me/file", file_count)
- self._force_migrate(rank_1_id, "delete_me",
- self.mount_a.path_to_ino("delete_me/file_0"))
+ self._force_migrate("delete_me")
begin = datetime.datetime.now()
self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
duration = (end - begin).total_seconds()
self.assertLess(duration, (file_count * tick_period) * 0.25)
-