return daemon_spec
- def _check_safe_to_destroy(self, mon_id: str) -> None:
+ def config(self, spec: ServiceSpec) -> None:
+ assert self.TYPE == spec.service_type
+ self.set_crush_locations(self.mgr.cache.get_daemons_by_type('mon'), spec)
+
+ def _get_quorum_status(self) -> Dict[Any, Any]:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'quorum_status',
})
try:
j = json.loads(out)
- except Exception:
- raise OrchestratorError('failed to parse quorum status')
+ except Exception as e:
+ raise OrchestratorError(f'failed to parse mon quorum status: {e}')
+ return j
- mons = [m['name'] for m in j['monmap']['mons']]
+ def _check_safe_to_destroy(self, mon_id: str) -> None:
+ quorum_status = self._get_quorum_status()
+ mons = [m['name'] for m in quorum_status['monmap']['mons']]
if mon_id not in mons:
logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
mon_id, mons))
return
new_mons = [m for m in mons if m != mon_id]
- new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+ new_quorum = [m for m in quorum_status['quorum_names'] if m != mon_id]
if len(new_quorum) > len(new_mons) / 2:
logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
(mon_id, new_quorum, new_mons))
mon_spec = cast(MONSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
if mon_spec.crush_locations:
if daemon_spec.host in mon_spec.crush_locations:
- # the --crush-location flag only supports a single bucker=loc pair so
+ # the --crush-location flag only supports a single bucket=loc pair so
# others will have to be handled later. The idea is to set the flag
# for the first bucket=loc pair in the list in order to facilitate
# replacing a tiebreaker mon (https://docs.ceph.com/en/quincy/rados/operations/stretch-mode/#other-commands)
return daemon_spec.final_config, daemon_spec.deps
+ def set_crush_locations(self, daemon_descrs: List[DaemonDescription], spec: ServiceSpec) -> None:
+ logger.debug('Setting mon crush locations from spec')
+ if not daemon_descrs:
+ return
+ assert self.TYPE == spec.service_type
+ mon_spec = cast(MONSpec, spec)
+
+ if not mon_spec.crush_locations:
+ return
+
+ quorum_status = self._get_quorum_status()
+ mons_in_monmap = [m['name'] for m in quorum_status['monmap']['mons']]
+ for dd in daemon_descrs:
+ assert dd.daemon_id is not None
+ assert dd.hostname is not None
+ if dd.hostname not in mon_spec.crush_locations:
+ continue
+ if dd.daemon_id not in mons_in_monmap:
+ continue
+ # expected format for crush_locations from the quorum status is
+ # {bucket1=loc1,bucket2=loc2} etc. for the number of bucket=loc pairs
+ try:
+ current_crush_locs = [m['crush_location'] for m in quorum_status['monmap']['mons'] if m['name'] == dd.daemon_id][0]
+ except (KeyError, IndexError) as e:
+ logger.warning(f'Failed setting crush location for mon {dd.daemon_id}: {e}\n'
+ 'Mon may not have a monmap entry yet. Try re-applying mon spec once mon is confirmed up.')
+ desired_crush_locs = '{' + ','.join(mon_spec.crush_locations[dd.hostname]) + '}'
+ logger.debug(f'Found spec defined crush locations for mon on {dd.hostname}: {desired_crush_locs}')
+ logger.debug(f'Current crush locations for mon on {dd.hostname}: {current_crush_locs}')
+ if current_crush_locs != desired_crush_locs:
+ logger.info(f'Setting crush location for mon {dd.daemon_id} to {desired_crush_locs}')
+ try:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mon set_location',
+ 'name': dd.daemon_id,
+ 'args': mon_spec.crush_locations[dd.hostname]
+ })
+ except Exception as e:
+ logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}')
+
class MgrService(CephService):
TYPE = 'mgr'
from orchestrator import OrchestratorError
from orchestrator._interface import DaemonDescription
+from typing import Dict, List
+
class FakeInventory:
def get_addr(self, name: str) -> str:
class FakeMgr:
def __init__(self):
self.config = ''
+ self.set_mon_crush_locations: Dict[str, List[str]] = {}
self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
self.mon_command = MagicMock(side_effect=self._check_mon_command)
self.template = MagicMock()
return 0, 'value set', ''
if prefix in ['auth get']:
return 0, '[foo]\nkeyring = asdf\n', ''
+ if prefix == 'quorum_status':
+ # actual quorum status output from testing
+ # note in this output all of the mons have blank crush locations
+ return 0, """{"election_epoch": 14, "quorum": [0, 1, 2], "quorum_names": ["vm-00", "vm-01", "vm-02"], "quorum_leader_name": "vm-00", "quorum_age": 101, "features": {"quorum_con": "4540138322906710015", "quorum_mon": ["kraken", "luminous", "mimic", "osdmap-prune", "nautilus", "octopus", "pacific", "elector-pinging", "quincy", "reef"]}, "monmap": {"epoch": 3, "fsid": "9863e1b8-6f24-11ed-8ad8-525400c13ad2", "modified": "2022-11-28T14:00:29.972488Z", "created": "2022-11-28T13:57:55.847497Z", "min_mon_release": 18, "min_mon_release_name": "reef", "election_strategy": 1, "disallowed_leaders: ": "", "stretch_mode": false, "tiebreaker_mon": "", "features": {"persistent": ["kraken", "luminous", "mimic", "osdmap-prune", "nautilus", "octopus", "pacific", "elector-pinging", "quincy", "reef"], "optional": []}, "mons": [{"rank": 0, "name": "vm-00", "public_addrs": {"addrvec": [{"type": "v2", "addr": "192.168.122.61:3300", "nonce": 0}, {"type": "v1", "addr": "192.168.122.61:6789", "nonce": 0}]}, "addr": "192.168.122.61:6789/0", "public_addr": "192.168.122.61:6789/0", "priority": 0, "weight": 0, "crush_location": "{}"}, {"rank": 1, "name": "vm-01", "public_addrs": {"addrvec": [{"type": "v2", "addr": "192.168.122.63:3300", "nonce": 0}, {"type": "v1", "addr": "192.168.122.63:6789", "nonce": 0}]}, "addr": "192.168.122.63:6789/0", "public_addr": "192.168.122.63:6789/0", "priority": 0, "weight": 0, "crush_location": "{}"}, {"rank": 2, "name": "vm-02", "public_addrs": {"addrvec": [{"type": "v2", "addr": "192.168.122.82:3300", "nonce": 0}, {"type": "v1", "addr": "192.168.122.82:6789", "nonce": 0}]}, "addr": "192.168.122.82:6789/0", "public_addr": "192.168.122.82:6789/0", "priority": 0, "weight": 0, "crush_location": "{}"}]}}""", ''
+ if prefix == 'mon set_location':
+ self.set_mon_crush_locations[cmd_dict.get('name')] = cmd_dict.get('args')
+ return 0, '', ''
return -1, '', 'error'
def get_minimal_ceph_conf(self) -> str:
assert f == expected
+class TestMonService:
+
+ def test_set_crush_locations(self, cephadm_module: CephadmOrchestrator):
+ mgr = FakeMgr()
+ mon_service = MonService(mgr)
+ mon_spec = ServiceSpec(service_type='mon', crush_locations={'vm-00': ['datacenter=a', 'rack=1'], 'vm-01': ['datacenter=a'], 'vm-02': ['datacenter=b', 'rack=3']})
+
+ mon_daemons = [
+ DaemonDescription(daemon_type='mon', daemon_id='vm-00', hostname='vm-00'),
+ DaemonDescription(daemon_type='mon', daemon_id='vm-01', hostname='vm-01'),
+ DaemonDescription(daemon_type='mon', daemon_id='vm-02', hostname='vm-02')
+ ]
+ mon_service.set_crush_locations(mon_daemons, mon_spec)
+ assert 'vm-00' in mgr.set_mon_crush_locations
+ assert mgr.set_mon_crush_locations['vm-00'] == ['datacenter=a', 'rack=1']
+ assert 'vm-01' in mgr.set_mon_crush_locations
+ assert mgr.set_mon_crush_locations['vm-01'] == ['datacenter=a']
+ assert 'vm-02' in mgr.set_mon_crush_locations
+ assert mgr.set_mon_crush_locations['vm-02'] == ['datacenter=b', 'rack=3']
+
+
class TestSNMPGateway:
@patch("cephadm.serve.CephadmServe._run_cephadm")