import time
import signal
import logging
+import operator
from random import randint
from cephfs_test_case import CephFSTestCase
log = logging.getLogger(__name__)
+class TestClusterAffinity(CephFSTestCase):
+ CLIENTS_REQUIRED = 0
+ MDSS_REQUIRED = 4
+
+ def _verify_join_fs(self, target, status=None):
+ if status is None:
+ status = self.fs.wait_for_daemons(timeout=30)
+ log.debug("%s", status)
+ target = sorted(target, key=operator.itemgetter('name'))
+ log.info("target = %s", target)
+ current = list(status.get_all())
+ current = sorted(current, key=operator.itemgetter('name'))
+ log.info("current = %s", current)
+ self.assertEqual(len(current), len(target))
+ for i in xrange(len(current)):
+ for attr in target[i]:
+ self.assertIn(attr, current[i])
+ self.assertEqual(target[i][attr], current[i][attr])
+
+ def _change_target_state(self, state, name, changes):
+ for entity in state:
+ if entity['name'] == name:
+ for k, v in changes.items():
+ entity[k] = v
+ return
+ self.fail("no entity")
+
+ def _verify_init(self):
+ status = self.fs.status()
+ log.info("status = {0}".format(status))
+ target = [{'join_fscid': -1, 'name': info['name']} for info in status.get_all()]
+ self._verify_join_fs(target, status=status)
+ return (status, target)
+
+ def _reach_target(self, target):
+ def takeover():
+ try:
+ self._verify_join_fs(target)
+ return True
+ except AssertionError as e:
+ log.debug("%s", e)
+ return False
+ status = self.wait_until_true(takeover, 30)
+
+ def test_join_fs_runtime(self):
+ """
+ That setting mds_join_fs at runtime affects the cluster layout.
+ """
+ status, target = self._verify_init()
+ standbys = list(status.get_standbys())
+ self.config_set('mds.'+standbys[0]['name'], 'mds_join_fs', 'cephfs')
+ self._change_target_state(target, standbys[0]['name'], {'join_fscid': self.fs.id, 'state': 'up:active'})
+ self._reach_target(target)
+
+ def test_join_fs_unset(self):
+ """
+ That unsetting mds_join_fs will cause failover if another high-affinity standby exists.
+ """
+ status, target = self._verify_init()
+ standbys = list(status.get_standbys())
+ names = (standbys[0]['name'], standbys[1]['name'])
+ self.config_set('mds.'+names[0], 'mds_join_fs', 'cephfs')
+ self.config_set('mds.'+names[1], 'mds_join_fs', 'cephfs')
+ self._change_target_state(target, names[0], {'join_fscid': self.fs.id})
+ self._change_target_state(target, names[1], {'join_fscid': self.fs.id})
+ self._reach_target(target)
+ status = self.fs.status()
+ active = self.fs.get_active_names(status=status)[0]
+ self.assertIn(active, names)
+ self.config_rm('mds.'+active, 'mds_join_fs')
+ self._change_target_state(target, active, {'join_fscid': -1})
+ new_active = (set(names) - set((active,))).pop()
+ self._change_target_state(target, new_active, {'state': 'up:active'})
+ self._reach_target(target)
+
+ def test_join_fs_drop(self):
+ """
+ That unsetting mds_join_fs will not cause failover if no high-affinity standby exists.
+ """
+ status, target = self._verify_init()
+ standbys = list(status.get_standbys())
+ active = standbys[0]['name']
+ self.config_set('mds.'+active, 'mds_join_fs', 'cephfs')
+ self._change_target_state(target, active, {'join_fscid': self.fs.id, 'state': 'up:active'})
+ self._reach_target(target)
+ self.config_rm('mds.'+active, 'mds_join_fs')
+ self._change_target_state(target, active, {'join_fscid': -1})
+ self._reach_target(target)
+
+ def test_join_fs_vanilla(self):
+ """
+ That a vanilla standby is preferred over others with mds_join_fs set to another fs.
+ """
+ self.fs.set_allow_multifs()
+ fs2 = self.mds_cluster.newfs(name="cephfs2")
+ status, target = self._verify_init()
+ active = self.fs.get_active_names(status=status)[0]
+ standbys = [info['name'] for info in status.get_standbys()]
+ victim = standbys.pop()
+ # Set a bogus fs on the others
+ for mds in standbys:
+ self.config_set('mds.'+mds, 'mds_join_fs', 'cephfs2')
+ self._change_target_state(target, mds, {'join_fscid': fs2.id})
+ self.fs.rank_fail()
+ self._change_target_state(target, victim, {'state': 'up:active'})
+ self._reach_target(target)
+ status = self.fs.status()
+ active = self.fs.get_active_names(status=status)[0]
+ self.assertEqual(active, victim)
+
+ def test_join_fs_last_resort(self):
+ """
+ That a standby with mds_join_fs set to another fs is still used if necessary.
+ """
+ status, target = self._verify_init()
+ active = self.fs.get_active_names(status=status)[0]
+ standbys = [info['name'] for info in status.get_standbys()]
+ for mds in standbys:
+ self.config_set('mds.'+mds, 'mds_join_fs', 'cephfs2')
+ self.fs.set_allow_multifs()
+ fs2 = self.mds_cluster.newfs(name="cephfs2")
+ for mds in standbys:
+ self._change_target_state(target, mds, {'join_fscid': fs2.id})
+ self.fs.rank_fail()
+ status = self.fs.status()
+ ranks = list(self.fs.get_ranks(status=status))
+ self.assertEqual(len(ranks), 1)
+ self.assertIn(ranks[0]['name'], standbys)
+ # Note that we would expect the former active to reclaim its spot, but
+ # we're not testing that here.
+
+ def test_join_fs_steady(self):
+ """
+ That a sole MDS with mds_join_fs set will come back as active eventually even after failover.
+ """
+ status, target = self._verify_init()
+ active = self.fs.get_active_names(status=status)[0]
+ self.config_set('mds.'+active, 'mds_join_fs', 'cephfs')
+ self._change_target_state(target, active, {'join_fscid': self.fs.id})
+ self._reach_target(target)
+ self.fs.rank_fail()
+ self._reach_target(target)
+
+ def test_join_fs_standby_replay(self):
+ """
+ That a standby-replay daemon with weak affinity is replaced by a stronger one.
+ """
+ status, target = self._verify_init()
+ standbys = [info['name'] for info in status.get_standbys()]
+ self.config_set('mds.'+standbys[0], 'mds_join_fs', 'cephfs')
+ self._change_target_state(target, standbys[0], {'join_fscid': self.fs.id, 'state': 'up:active'})
+ self._reach_target(target)
+ self.fs.set_allow_standby_replay(True)
+ status = self.fs.status()
+ standbys = [info['name'] for info in status.get_standbys()]
+ self.config_set('mds.'+standbys[0], 'mds_join_fs', 'cephfs')
+ self._change_target_state(target, standbys[0], {'join_fscid': self.fs.id, 'state': 'up:standby-replay'})
+ self._reach_target(target)
class TestClusterResize(CephFSTestCase):
CLIENTS_REQUIRED = 1
"--yes-i-really-mean-it")
def _setup_two(self):
- fs_a = self.mds_cluster.newfs("alpha")
- fs_b = self.mds_cluster.newfs("bravo")
+ fs_a = self.mds_cluster.newfs(name="alpha")
+ fs_b = self.mds_cluster.newfs(name="bravo")
self.mds_cluster.mds_restart()