out_json = self.fs.rank_tell(["scrub", "status"])
if out_json['status'] == "no active scrubs running":
break;
+
+ def _wait_distributed_subtrees(self, status, rank, count):
+ timeout = 30
+ pause = 2
+ for i in range(timeout//pause):
+ subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=status.get_rank(self.fs.id, rank)['name'])
+ subtrees = filter(lambda s: s['dir']['path'].startswith('/'), subtrees)
+ subtrees = list(filter(lambda s: s['distributed_ephemeral_pin'] == 1, subtrees))
+ if (len(subtrees) == count):
+ return subtrees
+ time.sleep(pause)
+ raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank))
+
+ def get_auth_subtrees(self, status, rank):
+ subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=status.get_rank(self.fs.id, rank)['name'])
+ subtrees = filter(lambda s: s['dir']['path'].startswith('/'), subtrees)
+ subtrees = filter(lambda s: s['auth_first'] == rank, subtrees)
+
+ return list(subtrees)
+
+ def get_ephemerally_pinned_auth_subtrees(self, status, rank):
+ subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=status.get_rank(self.fs.id, rank)['name'])
+ subtrees = filter(lambda s: s['dir']['path'].startswith('/'), subtrees)
+ subtrees = filter(lambda s: (s['distributed_ephemeral_pin'] == 1 or s['random_ephemeral_pin'] == 1) and (s['auth_first'] == rank), subtrees)
+
+ return list(subtrees)
+
+ def get_distributed_auth_subtrees(self, status, rank):
+ subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=status.get_rank(self.fs.id, rank)['name'])
+ subtrees = filter(lambda s: s['dir']['path'].startswith('/'), subtrees)
+ subtrees = filter(lambda s: (s['distributed_ephemeral_pin'] == 1) and (s['auth_first'] == rank), subtrees)
+
+ return list(subtrees)
+
+ def get_random_auth_subtrees(self, status, rank):
+ subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=status.get_rank(self.fs.id, rank)['name'])
+ subtrees = filter(lambda s: s['dir']['path'].startswith('/'), subtrees)
+ subtrees = filter(lambda s: (s['random_ephemeral_pin'] == 1) and (s['auth_first'] == rank), subtrees)
+
+ return list(subtrees)
# Check if rank1 changed (standby tookover?)
new_rank1 = self.fs.get_rank(rank=1)
self.assertEqual(rank1['gid'], new_rank1['gid'])
+
+ def test_ephememeral_pin_distribution(self):
+
+ # Check if the subtree distribution under ephemeral distributed pin is fairly uniform
+
+ self.fs.set_max_mds(3)
+ self.fs.wait_for_daemons()
+
+ status = self.fs.status()
+
+ self.mount_a.run_shell(["mkdir", "-p", "a"])
+ self._wait_subtrees(status, 0, [])
+
+ for i in range(0,100):
+ self.mount_a.run_shell(["mkdir", "-p", "a/" + str(i) + "/d"])
+
+ self._wait_subtrees(status, 0, [])
+
+ self.mount_b.setfattr(["a", "ceph.dir.pin.distributed", "1"])
+
+ self._wait_distributed_subtrees([status, 0, 100])
+
+ # Check if distribution is uniform
+ rank0_distributed_subtree_ratio = len(self.get_distributed_auth_subtrees(status, 0))/len(self.get_auth_subtrees(status, 0))
+ self.assertGreaterEqual(rank0_distributed_subtree_ratio, 0.2)
+
+ rank1_distributed_subtree_ratio = len(self.get_distributed_auth_subtrees(status, 1))/len(self.get_auth_subtrees(status, 1))
+ self.assertGreaterEqual(rank1_distributed_subtree_ratio, 0.2)
+
+ rank2_distributed_subtree_ratio = len(self.get_distributed_auth_subtrees(status, 2))/len(self.get_auth_subtrees(status, 2))
+ self.assertGreaterEqual(rank2_distributed_subtree_ratio, 0.2)
+
+ def test_ephemeral_random(self):
+
+ # Check if export ephemeral random is applied hierarchically
+
+ self.fs.set_max_mds(3)
+ self.fs.wait_for_daemons()
+
+ status = self.fs.status()
+
+ tmp_dir = ""
+ for i in range(0, 100):
+ tmp_dir = tmp_dir + str(i) + "/"
+ self.mount_a.run_shell(["mkdir", "-p", tmp_dir])
+ self.mount_b.setfattr([temp_dir, "ceph.dir.pin.random", "1"])
+
+ count = len(get_random_auth_subtrees(status,0))
+ self.assertEqual(count, 100)
+
+ def test_ephemeral_pin_grow_mds(self):
+
+ # Increase the no of MDSs and verify that the no of subtrees that migrate are less than 1/3 of the total no of subtrees that are ephemerally pinned
+
+ self.fs.set_max_mds(3)
+ self.fs.wait_for_daemons()
+
+ status = self.fs.status()
+
+ for i in range(0,100):
+ self.mount_a.run_shell(["mkdir", "-p", "a/" + str(i) + "/d"])
+ self._wait_subtrees(status, 0, [])
+ self.mount_b.setfattr(["a", "ceph.dir.pin.distributed", "1"])
+ self._wait_distributed_subtrees([status, 0, 100])
+
+ subtrees_old = dict(get_ephemrally_pinned_auth_subtrees(status, 0).items() + get_ephemrally_pinned_auth_subtrees(status, 1).items() + get_ephemrally_pinned_auth_subtrees(status, 2).items())
+ self.fs.set_max_mds(4)
+ self.fs.wait_for_daemons()
+ # Sleeping for a while to allow the ephemeral pin migrations to complete
+ time.sleep(15)
+ subtrees_new = dict(get_ephemrally_pinned_auth_subtrees(status, 0).items() + get_ephemrally_pinned_auth_subtrees(status, 1).items() + get_ephemrally_pinned_auth_subtrees(status, 2).items())
+ for old_subtree in subtrees_old:
+ for new_subtree in subtrees_new:
+ if (old_subtree['dir']['path'] == new_subtree['dir']['path']) and (old_subtree['auth_first'] != new_subtree['auth_first']):
+ count = count + 1
+ break
+
+ assertLessEqual((count/subtrees_old), 0.33)
+
+ def test_ephemeral_pin_shrink_mds(self):
+
+ # Shrink the no of MDSs
+
+ self.fs.set_max_mds(3)
+ self.fs.wait_for_daemons()
+
+ status = self.fs.status()
+
+ for i in range(0,100):
+ self.mount_a.run_shell(["mkdir", "-p", "a/" + str(i) + "/d"])
+ self._wait_subtrees(status, 0, [])
+ self.mount_b.setfattr(["a", "ceph.dir.pin.distributed", "1"])
+ self._wait_distributed_subtrees([status, 0, 100])
+
+ subtrees_old = dict(get_ephemrally_pinned_auth_subtrees(status, 0).items() + get_ephemrally_pinned_auth_subtrees(status, 1).items() + get_ephemrally_pinned_auth_subtrees(status, 2).items())
+ self.fs.set_max_mds(2)
+ self.fs.wait_for_daemons()
+ time.sleep(15)
+
+ subtrees_new = dict(get_ephemrally_pinned_auth_subtrees(status, 0).items() + get_ephemrally_pinned_auth_subtrees(status, 1).items() + get_ephemrally_pinned_auth_subtrees(status, 2).items())
+ for old_subtree in subtrees_old:
+ for new_subtree in subtrees_new:
+ if (old_subtree['dir']['path'] == new_subtree['dir']['path']) and (old_subtree['auth_first'] != new_subtree['auth_first']):
+ count = count + 1
+ break
+
+ assertLessEqual((count/subtrees_old), 0.33)
+
+ def test_ephemeral_pin_unset_config(self):
+
+ # Check if unsetting the distributed pin config results in every distributed pin being unset
+
+ self.fs.set_max_mds(3)
+ self.fs.wait_for_daemons()
+
+ status = self.fs.status()
+
+ for i in range(0, 10):
+ self.mount_a.run_shell(["mkdir", "-p", i +"/dummy_dir"])
+ self.mount_b.setfattr([i, "ceph.dir.pin.distributed", "1"])
+
+ self._wait_distributed_subtrees([status, 0, 10])
+
+ self.fs.mds_asok(["config", "set", "mds_export_ephemeral_distributed_config", "false"])
+ # Sleep for a while to facilitate unsetting of the pins
+ time.sleep(15)
+
+ for i in range(0, 10):
+ self.assertTrue(self.mount_a.getfattr(i, "ceph.dir.pin.distributed") == "0")
+
+ def test_ephemeral_distributed_pin_unset(self):
+
+ # Test if unsetting the distributed ephemeral pin on a parent directory then the children directory should not be ephemerally pinned anymore
+
+ self.fs.set_max_mds(3)
+ self.fs.wait_for_daemons()
+
+ status = self.fs.status()
+
+ for i in range(0, 10):
+ self.mount_a.run_shell(["mkdir", "-p", i +"/a/b"])
+ self.mount_b.setfattr([i, "ceph.dir.pin.distributed", "1"])
+
+ self._wait_distributed_subtrees([status, 0, 10])
+
+ for i in range(0, 10):
+ self.mount_a.run_shell(["mkdir", "-p", i +"/a/b"])
+ self.mount_b.setfattr([i, "ceph.dir.pin.distributed", "0"])
+
+ time.sleep(15)
+
+ subtree_count = len(get_distributed_auth_subtrees(status, 0))
+ assertEqual(subtree_count, 0)
+
+ def test_ephemeral_standby(self):
+
+ # Test if the distribution is unaltered when a Standby MDS takes up a failed rank
+
+ # Need all my standbys up as well as the active daemons
+ self.wait_for_daemon_start()
+ status = self.fs.status()
+
+ for i in range(0, 10):
+ self.mount_a.run_shell(["mkdir", "-p", i +"/a/b"])
+ self.mount_b.setfattr([i, "ceph.dir.pin.distributed", "1"])
+
+ self._wait_distributed_subtrees([status, 0, 10])
+
+ original_subtrees = get_ephemerally_pinned_auth_subtrees(status, 0)
+
+ # Flush the journal for rank 0
+ self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
+
+ (original_active, ) = self.fs.get_active_names()
+ original_standbys = self.mds_cluster.get_standby_daemons()
+
+ # Kill the rank 0 daemon's physical process
+ self.fs.mds_stop(original_active)
+
+ grace = float(self.fs.get_config("mds_beacon_grace", service_type="mon"))
+
+ # Wait until the monitor promotes his replacement
+ def promoted():
+ active = self.fs.get_active_names()
+ return active and active[0] in original_standbys
+
+ log.info("Waiting for promotion of one of the original standbys {0}".format(
+ original_standbys))
+ self.wait_until_true(
+ promoted,
+ timeout=grace*2)
+
+ self.fs.wait_for_state('up:active', rank=0, timeout=MDS_RESTART_GRACE)
+
+ new_subtrees = get_ephemerally_pinned_auth_subtrees(status, 0)
+
+ assertEqual(original_subtrees, new_subtrees)