+import time
+import signal
import json
import logging
from unittest import case, SkipTest
self.fs.mon_manager.raw_cluster_cmd('fs', 'set', self.fs.name, 'standby_count_wanted', '0')
self.wait_for_health_clear(timeout=30)
+ def test_discontinuous_mdsmap(self):
+ """
+ That discontinuous mdsmap does not affect failover.
+ See http://tracker.ceph.com/issues/24856.
+ """
+ mds_ids = sorted(self.mds_cluster.mds_ids)
+ mds_a, mds_b = mds_ids[0:2]
+ # Assign mds to fixed ranks. To prevent standby mds from replacing frozen mds
+ rank = 0;
+ for mds_id in mds_ids:
+ self.set_conf("mds.{0}".format(mds_id), "mds_standby_for_rank", str(rank))
+ rank += 1
+ self.mds_cluster.mds_restart()
+ self.fs.wait_for_daemons()
+
+ self.fs.set_max_mds(2)
+ self.fs.wait_for_state('up:active', rank=1)
+
+ # Drop 'export prep' message, make import stay in 'discovered' state
+ self.fs.mds_asok(['config', 'set', 'mds_inject_migrator_message_loss', '82'], mds_id=mds_b)
+
+ self.mount_a.run_shell(["mkdir", "a"])
+ self.mount_a.setfattr("a", "ceph.dir.pin", "1")
+ self.mount_a.umount_wait()
+
+ # Should be long enough for start the export
+ time.sleep(30)
+
+ grace = float(self.fs.get_config("mds_beacon_grace", service_type="mon"))
+ monc_timeout = float(self.fs.get_config("mon_client_ping_timeout", service_type="mds"))
+ # Freeze mds_b
+ self.mds_cluster.mds_signal(mds_b, signal.SIGSTOP)
+ self.wait_until_true(
+ lambda: "laggy_since" in self.fs.mon_manager.get_mds_status(mds_b),
+ timeout=grace * 2
+ )
+
+ self.mds_cluster.mds_restart(mds_a)
+ self.fs.wait_for_state('up:resolve', rank=0, timeout=30)
+
+ # Make sure of mds_b's monitor connection gets reset
+ time.sleep(monc_timeout * 2)
+
+ # Unfreeze mds_b, it will get discontinuous mdsmap
+ self.mds_cluster.mds_signal(mds_b, signal.SIGCONT)
+ self.wait_until_true(
+ lambda: "laggy_since" not in self.fs.mon_manager.get_mds_status(mds_b),
+ timeout=grace * 2
+ )
+ # Check if mds_b sends 'resolve' message to mds_a. If not, mds_a can't become active
+ self.fs.wait_for_state('up:active', rank=0, timeout=30)
class TestStandbyReplay(CephFSTestCase):
MDSS_REQUIRED = 4
self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
+ def signal(self, sig, silent=False):
+ if not self.running():
+ raise RuntimeError("Can't send signal to non-running daemon")
+
+ os.kill(self._get_pid(), sig)
+ if not silent:
+ log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
+
def safe_kill(pid):
"""
/* This function DOES put the passed message before returning*/
void Migrator::dispatch(Message *m)
{
+ if (unlikely(inject_message_loss)) {
+ if (inject_message_loss == m->get_type() - MDS_PORT_MIGRATOR) {
+ dout(0) << "inject message loss " << *m << dendl;
+ m->put();
+ return;
+ }
+ }
+
switch (m->get_type()) {
// import
case MSG_MDS_EXPORTDIRDISCOVER:
inject_session_race = conf->get_val<bool>("mds_inject_migrator_session_race");
dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
}
+
+ if (changed.count("mds_inject_migrator_message_loss")) {
+ inject_message_loss = g_conf->get_val<int64_t>("mds_inject_migrator_message_loss");
+ dout(0) << "mds_inject_migrator_message_loss is " << inject_message_loss << dendl;
+ }
}