return self.json_asok(command, 'mds', info['name'], timeout=timeout)
def rank_tell(self, command, rank=0, status=None):
- return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
+ try:
+ out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command)
+ return json.loads(out)
+ except json.decoder.JSONDecodeError:
+ log.error("could not decode: {}".format(out))
+ raise
def ranks_tell(self, command, status=None):
if status is None:
# We purged it at the last
self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
+ def test_reintegration_via_scrub(self):
+ """
+ That reintegration is triggered via recursive scrub.
+ """
+
+ self.mount_a.run_shell_payload("""
+ mkdir -p a b
+ for i in `seq 1 50`; do
+ touch a/"$i"
+ ln a/"$i" b/"$i"
+ done
+ sync -f .
+ """)
+
+ self.mount_a.remount() # drop caps/cache
+ self.fs.rank_tell(["flush", "journal"])
+ self.fs.rank_fail()
+ self.fs.wait_for_daemons()
+
+ # only / in cache, reintegration cannot happen
+ self.wait_until_equal(
+ lambda: len(self.fs.rank_tell(["dump", "tree", "/"])),
+ expect_val=3,
+ timeout=60
+ )
+
+ last_reintegrated = self.get_mdc_stat("strays_reintegrated")
+ self.mount_a.run_shell_payload("""
+ rm a/*
+ sync -f .
+ """)
+ self.wait_until_equal(
+ lambda: len(self.fs.rank_tell(["dump", "tree", "/"])),
+ expect_val=3,
+ timeout=60
+ )
+ self.assertEqual(self.get_mdc_stat("num_strays"), 50)
+ curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
+ self.assertEqual(last_reintegrated, curr_reintegrated)
+
+ self.fs.rank_tell(["scrub", "start", "/", "recursive,force"])
+
+ self.wait_until_equal(
+ lambda: self.get_mdc_stat("num_strays"),
+ expect_val=0,
+ timeout=60
+ )
+ curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
+ # N.B.: reintegrate (rename RPC) may be tried multiple times from different code paths
+ self.assertGreaterEqual(curr_reintegrated, last_reintegrated+50)
+
def test_mv_hardlink_cleanup(self):
"""
That when doing a rename from A to B, and B has hardlinks,