]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: revamp ceph.conf distribution scheduling
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 27 Jul 2020 12:27:12 +0000 (14:27 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Tue, 4 Aug 2020 14:21:44 +0000 (16:21 +0200)
Having an in-memeory list doesn't work properly: Especially
when loading the mgr module, we didn't knwo if we should
deploy confs or not.

Now we only distribute ceph.confs, if there is a new mon_map.
We also store that info now in the config store

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit aa4c3b2a4e9a37f0f92f50d3bafbbd63dfb6fefa)

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/tests/__init__.py

index e6f01af92490dc137e019a0b063334fc648372b8..b955078e88b49af228bc5cd346192befa3244daa 100644 (file)
@@ -177,10 +177,12 @@ class HostCache():
         self.daemon_refresh_queue = [] # type: List[str]
         self.device_refresh_queue = [] # type: List[str]
         self.osdspec_previews_refresh_queue = [] # type: List[str]
+
+        # host -> daemon name -> dict
         self.daemon_config_deps = {}   # type: Dict[str, Dict[str, Dict[str,Any]]]
         self.last_host_check = {}      # type: Dict[str, datetime.datetime]
         self.loading_osdspec_preview = set()  # type: Set[str]
-        self.etc_ceph_ceph_conf_refresh_queue: Set[str] = set()
+        self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
         self.registry_login_queue: Set[str] = set()
 
     def load(self):
@@ -223,7 +225,9 @@ class HostCache():
                 if 'last_host_check' in j:
                     self.last_host_check[host] = datetime.datetime.strptime(
                         j['last_host_check'], DATEFMT)
-                self.etc_ceph_ceph_conf_refresh_queue.add(host)
+                if 'last_etc_ceph_ceph_conf' in j:
+                    self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime(
+                        j['last_etc_ceph_ceph_conf'], DATEFMT)
                 self.registry_login_queue.add(host)
                 self.mgr.log.debug(
                     'HostCache.load: host %s has %d daemons, '
@@ -269,7 +273,6 @@ class HostCache():
         self.daemon_refresh_queue.append(host)
         self.device_refresh_queue.append(host)
         self.osdspec_previews_refresh_queue.append(host)
-        self.etc_ceph_ceph_conf_refresh_queue.add(host)
         self.registry_login_queue.add(host)
 
     def invalidate_host_daemons(self, host):
@@ -285,9 +288,6 @@ class HostCache():
         if host in self.last_device_update:
             del self.last_device_update[host]
         self.mgr.event.set()
-
-    def distribute_new_etc_ceph_ceph_conf(self):
-        self.etc_ceph_ceph_conf_refresh_queue = set(self.mgr.inventory.keys())
     
     def distribute_new_registry_login_info(self):
         self.registry_login_queue = set(self.mgr.inventory.keys())
@@ -319,6 +319,10 @@ class HostCache():
 
         if host in self.last_host_check:
             j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT)
+
+        if host in self.last_etc_ceph_ceph_conf:
+            j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT)
+
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
     def rm_host(self, host):
@@ -437,19 +441,27 @@ class HostCache():
             seconds=self.mgr.host_check_interval)
         return host not in self.last_host_check or self.last_host_check[host] < cutoff
 
-    def host_needs_new_etc_ceph_ceph_conf(self, host):
+    def host_needs_new_etc_ceph_ceph_conf(self, host: str):
         if not self.mgr.manage_etc_ceph_ceph_conf:
             return False
         if self.mgr.paused:
             return False
         if host in self.mgr.offline_hosts:
             return False
-        if host in self.etc_ceph_ceph_conf_refresh_queue:
-            # We're read-only here.
-            # self.etc_ceph_ceph_conf_refresh_queue.remove(host)
+        if not self.mgr.last_monmap:
+            return False
+        if host not in self.last_etc_ceph_ceph_conf:
+            return True
+        if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
             return True
+        # already up to date:
         return False
     
+    def update_last_etc_ceph_ceph_conf(self, host: str):
+        if not self.mgr.last_monmap:
+            return
+        self.last_etc_ceph_ceph_conf[host] = self.mgr.last_monmap
+
     def host_needs_registry_login(self, host):
         if host in self.mgr.offline_hosts:
             return False
@@ -458,9 +470,6 @@ class HostCache():
             return True
         return False
 
-    def remove_host_needs_new_etc_ceph_ceph_conf(self, host):
-        self.etc_ceph_ceph_conf_refresh_queue.remove(host)
-
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons
index 1e202067fcfb07c096a1374b4c95ca9b1d9dfd0c..e64ae1c1c70d53a78f32505931d5b92275544a01 100644 (file)
@@ -254,6 +254,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def __init__(self, *args, **kwargs):
         super(CephadmOrchestrator, self).__init__(*args, **kwargs)
         self._cluster_fsid = self.get('mon_map')['fsid']
+        self.last_monmap: Optional[datetime.datetime] = None
 
         # for serve()
         self.run = True
@@ -290,6 +291,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         self._cons = {}  # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
 
+
+        self.notify('mon_map', None)
         self.config_notify()
 
         path = self.get_ceph_option('cephadm_path')
@@ -540,35 +543,28 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         TODO: this method should be moved into mgr_module.py
         """
-        module_options_changed: List[str] = []
         for opt in self.MODULE_OPTIONS:
-            old_val = getattr(self, opt['name'], None)
-            new_val = self.get_module_option(opt['name'])
             setattr(self,
                     opt['name'],  # type: ignore
-                    new_val)  # type: ignore
+                    self.get_module_option(opt['name']))  # type: ignore
             self.log.debug(' mgr option %s = %s',
                            opt['name'], getattr(self, opt['name']))  # type: ignore
-            if old_val != new_val:
-                module_options_changed.append(opt['name'])
         for opt in self.NATIVE_OPTIONS:
             setattr(self,
                     opt,  # type: ignore
                     self.get_ceph_option(opt))
             self.log.debug(' native option %s = %s', opt, getattr(self, opt))  # type: ignore
 
-        for what in module_options_changed:
-            self.config_notify_one(what)
-
         self.event.set()
 
-    def config_notify_one(self, what):
-        if what == 'manage_etc_ceph_ceph_conf' and self.manage_etc_ceph_ceph_conf:
-            self.cache.distribute_new_etc_ceph_ceph_conf()
-
     def notify(self, notify_type, notify_id):
         if notify_type == "mon_map":
-            self.cache.distribute_new_etc_ceph_ceph_conf()
+            # get monmap mtime so we can refresh configs when mons change
+            monmap = self.get('mon_map')
+            self.last_monmap = datetime.datetime.strptime(
+                monmap['modified'], CEPH_DATEFMT)
+            if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
+                self.last_monmap = None  # just in case clocks are skewed
 
     def pause(self):
         if not self.paused:
@@ -1437,7 +1433,8 @@ you may want to run:
                 )
                 if code:
                     return f'failed to create /etc/ceph/ceph.conf on {host}: {err}'
-                self.cache.remove_host_needs_new_etc_ceph_ceph_conf(host)
+                self.cache.update_last_etc_ceph_ceph_conf(host)
+                self.cache.save_host(host)
         except OrchestratorError as e:
             return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
         return None
@@ -2070,12 +2067,6 @@ you may want to run:
                                     f'service {service_name}')
 
     def _check_daemons(self):
-        # get monmap mtime so we can refresh configs when mons change
-        monmap = self.get('mon_map')
-        last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
-            monmap['modified'], CEPH_DATEFMT)
-        if last_monmap and last_monmap > datetime.datetime.utcnow():
-            last_monmap = None   # just in case clocks are skewed
 
         daemons = self.cache.get_daemons()
         daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
@@ -2112,8 +2103,8 @@ you may want to run:
                 self.log.info('Reconfiguring %s (dependencies changed)...' % (
                     dd.name()))
                 reconfig = True
-            elif last_monmap and \
-               last_monmap > last_config and \
+            elif self.last_monmap and \
+                    self.last_monmap > last_config and \
                dd.daemon_type in CEPH_TYPES:
                 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
                 reconfig = True
index 354eeab2134a2624c06cbef632d7dffaa89cc17a..2eac632a959a187c26fa415ef59e759d43cbbdca 100644 (file)
@@ -1,7 +1,10 @@
+import datetime
 import time
 import fnmatch
 from contextlib import contextmanager
 
+from cephadm.module import CEPH_DATEFMT
+
 try:
     from typing import Any
 except ImportError:
@@ -34,15 +37,30 @@ def mon_command(*args, **kwargs):
     return 0, '', ''
 
 @contextmanager
-def with_cephadm_module(module_options):
+def with_cephadm_module(module_options=None, store=None):
+    """
+    :param module_options: Set opts as if they were set before module.__init__ is called
+    :param store: Set the store before module.__init__ is called
+    """
     with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
             mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
             mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
             mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
 
         m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
-        for k, v in module_options.items():
-            m._ceph_set_module_option('cephadm', k, v)
+        if module_options is not None:
+            for k, v in module_options.items():
+                m._ceph_set_module_option('cephadm', k, v)
+        if store is None:
+            store = {}
+        if '_ceph_get/mon_map' not in store:
+            store['_ceph_get/mon_map'] = {
+                'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                'fsid': 'foobar',
+            }
+        for k, v in store.items():
+            m._ceph_set_store(k, v)
+
         m.__init__('cephadm', 0, 0)
         m._cluster_fsid = "fsid"
         yield m
index f33371ffa37815db4ac07b7314ac6addc0f49589..77068d2a910f5657fbb1c9d5d5440d53cac25bb9 100644 (file)
@@ -164,13 +164,13 @@ class TestCephadm(object):
             )
         ])
     ))
-    #@mock.patch("mgr_module.MgrModule._ceph_get")
-    @mock.patch("ceph_module.BaseMgrModule._ceph_get")
-    def test_daemon_action(self, _ceph_get, cephadm_module: CephadmOrchestrator):
+    def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
+
         cephadm_module.service_cache_timeout = 10
         with with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             wait(cephadm_module, c)
+            assert len(c.result) == 1
             c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar')
             assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
 
@@ -178,8 +178,12 @@ class TestCephadm(object):
                 c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
                 assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
 
-            now = datetime.datetime.utcnow().strftime(CEPH_DATEFMT)
-            _ceph_get.return_value = {'modified': now}
+            # Make sure, _check_daemons does a redeploy due to monmap change:
+            cephadm_module._store['_ceph_get/mon_map'] = {
+                'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                'fsid': 'foobar',
+            }
+            cephadm_module.notify('mon_map', None)
 
             cephadm_module._check_daemons()
 
@@ -645,10 +649,12 @@ class TestCephadm(object):
 
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
     @mock.patch("remoto.process.check")
-    def test_etc_ceph(self, _check, _get_connection, cephadm_module: CephadmOrchestrator):
+    def test_etc_ceph(self, _check, _get_connection, cephadm_module):
         _get_connection.return_value = mock.Mock(), mock.Mock()
         _check.return_value = '{}', '', 0
 
+        assert cephadm_module.manage_etc_ceph_ceph_conf is False
+
         with with_host(cephadm_module, 'test'):
             assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
 
@@ -662,12 +668,26 @@ class TestCephadm(object):
 
             assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
 
+            cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+            cephadm_module.cache.load()
+
+            assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+            # Make sure, _check_daemons does a redeploy due to monmap change:
+            cephadm_module._store['_ceph_get/mon_map'] = {
+                'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                'fsid': 'foobar',
+            }
             cephadm_module.notify('mon_map', mock.MagicMock())
             assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+            cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+            cephadm_module.cache.load()
+            assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
 
     def test_etc_ceph_init(self):
         with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
-            assert m.manage_etc_ceph_ceph_conf == True
+            assert m.manage_etc_ceph_ceph_conf is True
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
     def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
index 4b024ce1c987a7440c4ad0a6b3343e13a836d12b..6ca9315150c87c28bd7044d94c494f1f027c5285 100644 (file)
@@ -19,6 +19,14 @@ if 'UNITTEST' in os.environ:
     M_classes = set()
 
     class M(object):
+        """
+        Note that:
+
+        * self.set_store() populates self._store
+        * self.set_module_option() populates self._store[module_name]
+        * self.get(thing) comes from self._store['_ceph_get' + thing]
+
+        """
         def _ceph_get_store(self, k):
             if not hasattr(self, '_store'):
                 self._store = {}
@@ -57,8 +65,10 @@ if 'UNITTEST' in os.environ:
         def _ceph_set_module_option(self, module, key, val):
             return self._ceph_set_store(f'{module}/{key}', val)
 
-        def _ceph_get(self, *args):
-            return mock.MagicMock()
+        def _ceph_get(self, data_name):
+            if not hasattr(self, '_store'):
+                self._store = {}
+            return self._store.get(f'_ceph_get/{data_name}', mock.MagicMock())
 
 
         def __init__(self, *args):