From aa4c3b2a4e9a37f0f92f50d3bafbbd63dfb6fefa Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 27 Jul 2020 14:27:12 +0200 Subject: [PATCH] mgr/cephadm: revamp ceph.conf distribution scheduling Having an in-memeory list doesn't work properly: Especially when loading the mgr module, we didn't knwo if we should deploy confs or not. Now we only distribute ceph.confs, if there is a new mon_map. We also store that info now in the config store Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/inventory.py | 35 +++++++++++------- src/pybind/mgr/cephadm/module.py | 37 ++++++++------------ src/pybind/mgr/cephadm/tests/fixtures.py | 24 +++++++++++-- src/pybind/mgr/cephadm/tests/test_cephadm.py | 34 ++++++++++++++---- src/pybind/mgr/tests/__init__.py | 14 ++++++-- 5 files changed, 96 insertions(+), 48 deletions(-) diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index d9d6d5825e58b..613b6066952b4 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -175,10 +175,12 @@ class HostCache(): self.daemon_refresh_queue = [] # type: List[str] self.device_refresh_queue = [] # type: List[str] self.osdspec_previews_refresh_queue = [] # type: List[str] + + # host -> daemon name -> dict self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] self.last_host_check = {} # type: Dict[str, datetime.datetime] self.loading_osdspec_preview = set() # type: Set[str] - self.etc_ceph_ceph_conf_refresh_queue: Set[str] = set() + self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {} self.registry_login_queue: Set[str] = set() def load(self): @@ -221,7 +223,9 @@ class HostCache(): if 'last_host_check' in j: self.last_host_check[host] = datetime.datetime.strptime( j['last_host_check'], DATEFMT) - self.etc_ceph_ceph_conf_refresh_queue.add(host) + if 'last_etc_ceph_ceph_conf' in j: + self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime( + j['last_etc_ceph_ceph_conf'], DATEFMT) self.registry_login_queue.add(host) self.mgr.log.debug( 'HostCache.load: host %s has %d daemons, ' @@ -267,7 +271,6 @@ class HostCache(): self.daemon_refresh_queue.append(host) self.device_refresh_queue.append(host) self.osdspec_previews_refresh_queue.append(host) - self.etc_ceph_ceph_conf_refresh_queue.add(host) self.registry_login_queue.add(host) def invalidate_host_daemons(self, host): @@ -283,9 +286,6 @@ class HostCache(): if host in self.last_device_update: del self.last_device_update[host] self.mgr.event.set() - - def distribute_new_etc_ceph_ceph_conf(self): - self.etc_ceph_ceph_conf_refresh_queue = set(self.mgr.inventory.keys()) def distribute_new_registry_login_info(self): self.registry_login_queue = set(self.mgr.inventory.keys()) @@ -317,6 +317,10 @@ class HostCache(): if host in self.last_host_check: j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT) + + if host in self.last_etc_ceph_ceph_conf: + j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT) + self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) def rm_host(self, host): @@ -435,19 +439,27 @@ class HostCache(): seconds=self.mgr.host_check_interval) return host not in self.last_host_check or self.last_host_check[host] < cutoff - def host_needs_new_etc_ceph_ceph_conf(self, host): + def host_needs_new_etc_ceph_ceph_conf(self, host: str): if not self.mgr.manage_etc_ceph_ceph_conf: return False if self.mgr.paused: return False if host in self.mgr.offline_hosts: return False - if host in self.etc_ceph_ceph_conf_refresh_queue: - # We're read-only here. - # self.etc_ceph_ceph_conf_refresh_queue.remove(host) + if not self.mgr.last_monmap: + return False + if host not in self.last_etc_ceph_ceph_conf: + return True + if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]: return True + # already up to date: return False + def update_last_etc_ceph_ceph_conf(self, host: str): + if not self.mgr.last_monmap: + return + self.last_etc_ceph_ceph_conf[host] = self.mgr.last_monmap + def host_needs_registry_login(self, host): if host in self.mgr.offline_hosts: return False @@ -456,9 +468,6 @@ class HostCache(): return True return False - def remove_host_needs_new_etc_ceph_ceph_conf(self, host): - self.etc_ceph_ceph_conf_refresh_queue.remove(host) - def add_daemon(self, host, dd): # type: (str, orchestrator.DaemonDescription) -> None assert host in self.daemons diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 68c648e7ef9e0..9d97554cf2ccf 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -253,6 +253,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, def __init__(self, *args, **kwargs): super(CephadmOrchestrator, self).__init__(*args, **kwargs) self._cluster_fsid = self.get('mon_map')['fsid'] + self.last_monmap: Optional[datetime.datetime] = None # for serve() self.run = True @@ -289,6 +290,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]] + + self.notify('mon_map', None) self.config_notify() path = self.get_ceph_option('cephadm_path') @@ -539,35 +542,28 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, TODO: this method should be moved into mgr_module.py """ - module_options_changed: List[str] = [] for opt in self.MODULE_OPTIONS: - old_val = getattr(self, opt['name'], None) - new_val = self.get_module_option(opt['name']) setattr(self, opt['name'], # type: ignore - new_val) # type: ignore + self.get_module_option(opt['name'])) # type: ignore self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore - if old_val != new_val: - module_options_changed.append(opt['name']) for opt in self.NATIVE_OPTIONS: setattr(self, opt, # type: ignore self.get_ceph_option(opt)) self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore - for what in module_options_changed: - self.config_notify_one(what) - self.event.set() - def config_notify_one(self, what): - if what == 'manage_etc_ceph_ceph_conf' and self.manage_etc_ceph_ceph_conf: - self.cache.distribute_new_etc_ceph_ceph_conf() - def notify(self, notify_type, notify_id): if notify_type == "mon_map": - self.cache.distribute_new_etc_ceph_ceph_conf() + # get monmap mtime so we can refresh configs when mons change + monmap = self.get('mon_map') + self.last_monmap = datetime.datetime.strptime( + monmap['modified'], CEPH_DATEFMT) + if self.last_monmap and self.last_monmap > datetime.datetime.utcnow(): + self.last_monmap = None # just in case clocks are skewed def pause(self): if not self.paused: @@ -1413,7 +1409,8 @@ you may want to run: ) if code: return f'failed to create /etc/ceph/ceph.conf on {host}: {err}' - self.cache.remove_host_needs_new_etc_ceph_ceph_conf(host) + self.cache.update_last_etc_ceph_ceph_conf(host) + self.cache.save_host(host) except OrchestratorError as e: return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}' return None @@ -2036,12 +2033,6 @@ you may want to run: f'service {service_name}') def _check_daemons(self): - # get monmap mtime so we can refresh configs when mons change - monmap = self.get('mon_map') - last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime( - monmap['modified'], CEPH_DATEFMT) - if last_monmap and last_monmap > datetime.datetime.utcnow(): - last_monmap = None # just in case clocks are skewed daemons = self.cache.get_daemons() daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) @@ -2078,8 +2069,8 @@ you may want to run: self.log.info('Reconfiguring %s (dependencies changed)...' % ( dd.name())) reconfig = True - elif last_monmap and \ - last_monmap > last_config and \ + elif self.last_monmap and \ + self.last_monmap > last_config and \ dd.daemon_type in CEPH_TYPES: self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) reconfig = True diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 354eeab2134a2..2eac632a959a1 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -1,7 +1,10 @@ +import datetime import time import fnmatch from contextlib import contextmanager +from cephadm.module import CEPH_DATEFMT + try: from typing import Any except ImportError: @@ -34,15 +37,30 @@ def mon_command(*args, **kwargs): return 0, '', '' @contextmanager -def with_cephadm_module(module_options): +def with_cephadm_module(module_options=None, store=None): + """ + :param module_options: Set opts as if they were set before module.__init__ is called + :param store: Set the store before module.__init__ is called + """ with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\ mock.patch("cephadm.module.CephadmOrchestrator.remote"),\ mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \ mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command): m = CephadmOrchestrator.__new__ (CephadmOrchestrator) - for k, v in module_options.items(): - m._ceph_set_module_option('cephadm', k, v) + if module_options is not None: + for k, v in module_options.items(): + m._ceph_set_module_option('cephadm', k, v) + if store is None: + store = {} + if '_ceph_get/mon_map' not in store: + store['_ceph_get/mon_map'] = { + 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT), + 'fsid': 'foobar', + } + for k, v in store.items(): + m._ceph_set_store(k, v) + m.__init__('cephadm', 0, 0) m._cluster_fsid = "fsid" yield m diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 4a551776f40a4..443e45395e5cb 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -165,13 +165,13 @@ class TestCephadm(object): ) ]) )) - #@mock.patch("mgr_module.MgrModule._ceph_get") - @mock.patch("ceph_module.BaseMgrModule._ceph_get") - def test_daemon_action(self, _ceph_get, cephadm_module: CephadmOrchestrator): + def test_daemon_action(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 with with_host(cephadm_module, 'test'): c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) + assert len(c.result) == 1 c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar') assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"] @@ -179,8 +179,12 @@ class TestCephadm(object): c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar') assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"] - now = datetime.datetime.utcnow().strftime(CEPH_DATEFMT) - _ceph_get.return_value = {'modified': now} + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module._store['_ceph_get/mon_map'] = { + 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT), + 'fsid': 'foobar', + } + cephadm_module.notify('mon_map', None) cephadm_module._check_daemons() @@ -644,10 +648,12 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") @mock.patch("remoto.process.check") - def test_etc_ceph(self, _check, _get_connection, cephadm_module: CephadmOrchestrator): + def test_etc_ceph(self, _check, _get_connection, cephadm_module): _get_connection.return_value = mock.Mock(), mock.Mock() _check.return_value = '{}', '', 0 + assert cephadm_module.manage_etc_ceph_ceph_conf is False + with with_host(cephadm_module, 'test'): assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test') @@ -661,12 +667,26 @@ class TestCephadm(object): assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test') + cephadm_module.cache.last_etc_ceph_ceph_conf = {} + cephadm_module.cache.load() + + assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test') + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module._store['_ceph_get/mon_map'] = { + 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT), + 'fsid': 'foobar', + } cephadm_module.notify('mon_map', mock.MagicMock()) assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test') + cephadm_module.cache.last_etc_ceph_ceph_conf = {} + cephadm_module.cache.load() + assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test') + def test_etc_ceph_init(self): with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m: - assert m.manage_etc_ceph_ceph_conf == True + assert m.manage_etc_ceph_ceph_conf is True @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator): diff --git a/src/pybind/mgr/tests/__init__.py b/src/pybind/mgr/tests/__init__.py index 4b024ce1c987a..6ca9315150c87 100644 --- a/src/pybind/mgr/tests/__init__.py +++ b/src/pybind/mgr/tests/__init__.py @@ -19,6 +19,14 @@ if 'UNITTEST' in os.environ: M_classes = set() class M(object): + """ + Note that: + + * self.set_store() populates self._store + * self.set_module_option() populates self._store[module_name] + * self.get(thing) comes from self._store['_ceph_get' + thing] + + """ def _ceph_get_store(self, k): if not hasattr(self, '_store'): self._store = {} @@ -57,8 +65,10 @@ if 'UNITTEST' in os.environ: def _ceph_set_module_option(self, module, key, val): return self._ceph_set_store(f'{module}/{key}', val) - def _ceph_get(self, *args): - return mock.MagicMock() + def _ceph_get(self, data_name): + if not hasattr(self, '_store'): + self._store = {} + return self._store.get(f'_ceph_get/{data_name}', mock.MagicMock()) def __init__(self, *args): -- 2.39.5