]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: move into DaemonCache class
authorSage Weil <sage@redhat.com>
Mon, 17 Feb 2020 13:37:58 +0000 (07:37 -0600)
committerSage Weil <sage@redhat.com>
Tue, 18 Feb 2020 21:50:01 +0000 (15:50 -0600)
Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index 0f52d4d90c9b074949161ebc80fb81f21a75b75f..7e1e99c77b97c29e0ba058a51a621867eb7dc252 100644 (file)
@@ -104,6 +104,122 @@ def assert_valid_host(name):
     except AssertionError as e:
         raise OrchestratorError(e)
 
+
+class DaemonCache():
+    def __init__(self, mgr):
+        # type: (CephadmOrchestrator) -> None
+        self.mgr = mgr
+        self.data = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
+        self.last_update = {}   # type: Dict[str, datetime.datetime]
+
+    def load(self):
+        # type: () -> None
+        for k, v in six.iteritems(self.mgr.get_store_prefix(DAEMON_CACHE_PREFIX)):
+            host = k[len(DAEMON_CACHE_PREFIX):]
+            if host not in self.mgr.inventory:
+                self.mgr.log.warning('removing stray DaemonCache host record %s' % (
+                    host))
+                self.mgr.set_store(k, None)
+            try:
+                j = json.loads(v)
+                # we do ignore the persisted last_update to trigger a new
+                # scrape on mgr restart
+                self.data[host] = {}
+                for name, d in j.get('daemons', {}).items():
+                    self.data[host][name] = \
+                        orchestrator.DaemonDescription.from_json(d)
+                self.mgr.log.debug('DaemonCache.load: host %s has %d daemons' % (
+                    host, len(self.data[host])))
+            except Exception as e:
+                self.mgr.log.warning('unable to load cached state for %s: %s' % (
+                    host, e))
+                pass
+
+    def update_host(self, host, dm):
+        # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
+        self.data[host] = dm
+        self.last_update[host] = datetime.datetime.utcnow()
+
+    def prime_empty_host(self, host):
+        # type: (str) -> None
+        """
+        Install an empty entry for a host
+        """
+        self.data[host] = {}
+
+    def invalidate_host(self, host):
+        # type: (str) -> None
+        if host in self.last_update:
+            del self.last_update[host]
+
+    def save_host(self, host):
+        # type: (str) -> None
+        j = {   # type: ignore
+            'daemons': {},
+        }
+        if host in self.last_update:
+            j['last_update'] = self.last_update[host].strftime(DATEFMT) # type: ignore
+        for name, dd in self.data[host].items():
+            j['daemons'][name] = dd.to_json()
+        self.mgr.set_store(DAEMON_CACHE_PREFIX + host, json.dumps(j))
+
+    def rm_host(self, host):
+        # type: (str) -> None
+        if host in self.data:
+            del self.data[host]
+        if host in self.last_update:
+            del self.last_update[host]
+        self.mgr.set_store(DAEMON_CACHE_PREFIX + host, None)
+
+    def get_hosts(self):
+        # type: () -> List[str]
+        r = []
+        for host, di in self.data.items():
+            r.append(host)
+        return r
+
+    def get_daemons(self):
+        # type: () -> List[orchestrator.DaemonDescription]
+        r = []
+        for host, dm in self.data.items():
+            for name, dd in dm.items():
+                r.append(dd)
+        return r
+
+    def get_daemons_by_type(self, daemon_type):
+        # type: (str) -> List[orchestrator.DaemonDescription]
+        result = []   # type: List[orchestrator.DaemonDescription]
+        for host, dm in self.data.items():
+            for name, d in dm.items():
+                if name.startswith(daemon_type + '.'):
+                    result.append(d)
+        return result
+
+    def get_daemon_names(self):
+        # type: () -> List[str]
+        r = []
+        for host, dm in self.data.items():
+            for name, dd in dm.items():
+                r.append(name)
+        return r
+
+    def host_needs_refresh(self, host, cutoff):
+        # type: (str, datetime.datetime) -> bool
+        if host not in self.last_update or self.last_update[host] < cutoff:
+            return True
+        return False
+
+    def add_daemon(self, host, dd):
+        # type: (str, orchestrator.DaemonDescription) -> None
+        assert host in self.data
+        self.data[host][dd.name()] = dd
+
+    def rm_daemon(self, host, name):
+        if host in self.data:
+            if name in self.data[host]:
+                del self.data[host][name]
+
+
 class AsyncCompletion(orchestrator.Completion):
     def __init__(self,
                  _first_promise=None,  # type: Optional[orchestrator.Completion]
@@ -387,26 +503,23 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self.inventory_cache = orchestrator.OutdatablePersistentDict(
             self, self._STORE_HOST_PREFIX + '.devices')
 
-        self.daemon_cache = {}  # type: ignore
-        self._daemon_cache_load()
+        self.daemon_cache = DaemonCache(self)
+        self.daemon_cache.load()
 
         # ensure the host lists are in sync
         for h in self.inventory.keys():
             if h not in self.inventory_cache:
                 self.log.debug('adding inventory item for %s' % h)
                 self.inventory_cache[h] = orchestrator.OutdatableData()
-            if h not in self.daemon_cache:
+            if h not in self.daemon_cache.data:
                 self.log.debug('adding service item for %s' % h)
-                self.daemon_cache[h] = {
-                    'last_update': None,
-                    'daemons': {},
-                }
+                self.daemon_cache.prime_empty_host(h)
         for h in self.inventory_cache:
             if h not in self.inventory:
                 del self.inventory_cache[h]
-        for h in self.daemon_cache:
+        for h in self.daemon_cache.get_hosts():
             if h not in self.inventory:
-                del self.daemon_cache[h]
+                self.daemon_cache.rm_host(h)
 
     def shutdown(self):
         self.log.info('shutdown')
@@ -499,10 +612,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if opt['name'] == 'container_image':
                 image_settings[opt['section']] = opt['value']
 
-        daemons = []
-        for host, di in self.daemon_cache.items():
-            for name, dd in di['daemons'].items():
-                daemons.append(dd)
+        daemons = self.daemon_cache.get_daemons()
 
         for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
             self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
@@ -698,10 +808,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 del self.health_checks[k]
         if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
             ls = self.list_servers()
-            managed = []
-            for host, di in self.daemon_cache.items():
-                for name, dd in di['daemons'].items():
-                    managed.append(name)
+            managed = self.daemon_cache.get_daemon_names()
             host_detail = []     # type: List[str]
             host_num_daemons = 0
             daemon_detail = []  # type: List[str]
@@ -755,11 +862,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             # refresh daemons
             cutoff = datetime.datetime.utcnow() - datetime.timedelta(
                 seconds=self.daemon_cache_timeout)
-            cutoffs = cutoff.strftime(DATEFMT)
-            self.log.debug('refreshing daemons, cutoff %s' % cutoffs)
+            self.log.debug('refreshing daemons, cutoff %s' % cutoff)
             failures = []
-            for host, di in self.daemon_cache.items():
-                if not di['last_update'] or di['last_update'] < cutoffs:
+            for host in self.daemon_cache.get_hosts():
+                if self.daemon_cache.host_needs_refresh(host, cutoff):
                     self.log.debug('refreshing %s' % host)
                     r = self._refresh_host_daemons(host)
                     if r:
@@ -1189,10 +1295,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         }
         self._save_inventory()
         self.inventory_cache[spec.hostname] = orchestrator.OutdatableData()
-        self.daemon_cache[spec.hostname] = {
-            'last_update': None,
-            'daemons': {},
-        }
+        self.daemon_cache.prime_empty_host(spec.hostname)
         self.event.set()  # refresh stray health check
         return "Added host '{}'".format(spec.hostname)
 
@@ -1206,9 +1309,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         del self.inventory[host]
         self._save_inventory()
         del self.inventory_cache[host]
-        del self.daemon_cache[host]
+        self.daemon_cache.rm_host(host)
         self._reset_con(host)
-        self._daemon_cache_rm_host(host)
         self.event.set()  # refresh stray health check
         return "Removed host '{}'".format(host)
 
@@ -1267,15 +1369,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self._save_inventory()
         return 'Removed label %s from host %s' % (label, host)
 
-    def _get_daemons_by_type(self, daemon_type):
-        # type: (str) -> List[orchestrator.DaemonDescription]
-        result = []   # type: List[orchestrator.DaemonDescription]
-        for host, di in self.daemon_cache.items():
-            for name, d in di['daemons'].items():
-                if name.startswith(daemon_type + '.'):
-                    result.append(d)
-        return result
-
     def _refresh_host_daemons(self, host):
         try:
             out, err, code = self._run_cephadm(
@@ -1320,50 +1413,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 sd.status = None
             dm[sd.name()] = sd
         self.log.debug('Refreshed host %s daemons: %s' % (host, dm))
-        self.daemon_cache[host] = {
-            'last_update': datetime.datetime.utcnow().strftime(DATEFMT),
-            'daemons': dm,
-        }
-        self._daemon_cache_save_host(host)
+        self.daemon_cache.update_host(host, dm)
         return None
 
-    def _daemon_cache_load(self):
-        for k, v in six.iteritems(self.get_store_prefix(DAEMON_CACHE_PREFIX)):
-            host = k[len(DAEMON_CACHE_PREFIX):]
-            if host not in self.inventory:
-                self.log.warning('removing stray daemon_cache host record %s' % (
-                    host))
-                self.set_store(k, None)
-            try:
-                j = json.loads(v)
-                self.daemon_cache[host] = {
-                    # we do ignore the persisted last_update to trigger a new
-                    # scrape on mgr restart
-                    'last_update': None,
-                    'daemons': {},
-                }
-                for name, d in j.get('daemons', {}).items():
-                    self.daemon_cache[host]['daemons'][name] = \
-                        orchestrator.DaemonDescription.from_json(d)
-                self.log.debug('_daemon_cache_load: host %s has %d daemons' % (
-                    host, len(self.daemon_cache[host]['daemons'])))
-            except Exception as e:
-                self.log.warning('unable to load cached state for %s: %s' % (
-                    host, e))
-                pass
-
-    def _daemon_cache_save_host(self, host):
-        di = self.daemon_cache[host]
-        j = {
-            'last_update': di['last_update'],
-            'daemons': {},
-        }
-        for name, dd in di['daemons'].items():
-            j['daemons'][name] = dd.to_json()
-        self.set_store(DAEMON_CACHE_PREFIX + host, json.dumps(j))
-
-    def _daemon_cache_rm_host(self, host):
-        self.set_store(DAEMON_CACHE_PREFIX + host, None)
 
 #    def describe_service(self, service_type=None, service_id=None,
 #                         node_name=None, refresh=False):
@@ -1386,15 +1438,16 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 for host, hi in self.inventory.items():
                     self._refresh_host_daemons(host)
         result = []
-        for h, di in self.daemon_cache.items():
+        for h, dm in self.daemon_cache.data.items():
             if host and h != host:
                 continue
-            for name, dd in di['daemons'].items():
+            for name, dd in dm.items():
                 if daemon_type and daemon_type != dd.daemon_type:
                     continue
                 if daemon_id and daemon_id != dd.daemon_id:
                     continue
                 result.append(dd)
+        self.log.debug('list_daemons result %s' % result)
         return trivial_result(result)
 
     def service_action(self, action, service_type, service_name):
@@ -1405,8 +1458,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         else:
             prefix = ''
         args = []
-        for host, di in self.daemon_cache.items():
-            for name, d in di['daemons'].items():
+        for host, dm in self.daemon_cache.data.items():
+            for name, d in dm.items():
                 if d.daemon_type == service_type and d.daemon_id.startswith(prefix):
                     args.append((d.daemon_type, d.daemon_id,
                                  d.nodename, action))
@@ -1436,7 +1489,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 host, name, 'unit',
                 ['--name', name, a],
                 error_ok=True)
-            self.daemon_cache[host]['last_update'] = None
+            self.daemon_cache.invalidate_host(host)
             self.log.debug('_daemon_action code %s out %s' % (code, out))
         return "{} {} from host '{}'".format(action, name, host)
 
@@ -1445,8 +1498,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             action, daemon_type, daemon_id))
 
         args = []
-        for host, di in self.daemon_cache.items():
-            for name, d in di['daemons'].items():
+        for host, dm in self.daemon_cache.data.items():
+            for name, d in dm.items():
                 if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
                     args.append((d.daemon_type, d.daemon_id,
                                  d.nodename, action))
@@ -1459,9 +1512,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
     def remove_daemons(self, names, force):
         # type: (List[str], bool) -> orchestrator.Completion
         args = []
-        for host, di in self.daemon_cache.items():
+        for host, dm in self.daemon_cache.data.items():
             for name in names:
-                if name in di['daemons']:
+                if name in dm:
                     args.append((name, host, force))
         if not args:
             raise OrchestratorError('Unable to find daemon(s) %s' % (names))
@@ -1473,8 +1526,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         else:
             prefix = ''
         args = []
-        for host, di in self.daemon_cache.items():
-            for name, d in di['daemons'].items():
+        for host, dm in self.daemon_cache.data.items():
+            for name, d in dm.items():
                 if d.daemon_type == service_type and \
                    d.daemon_id.startswith(prefix):
                     args.append(
@@ -1734,7 +1787,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             ] + extra_args,
             stdin=j)
         self.log.debug('create_daemon code %s out %s' % (code, out))
-        if not code and host in self.daemon_cache:
+        if not code and host in self.daemon_cache.data:
             # prime cached service state with what we (should have)
             # just created
             sd = orchestrator.DaemonDescription()
@@ -1743,8 +1796,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             sd.nodename = host
             sd.status = 1
             sd.status_desc = 'starting'
-            self.daemon_cache[host]['daemons'][sd.name()] = sd
-        self.daemon_cache[host]['last_update'] = None
+            self.daemon_cache.add_daemon(host, sd)
+        self.daemon_cache.invalidate_host(host)
         self.event.set()
         return "{} {} on host '{}'".format(
             'Reconfigured' if reconfig else 'Deployed', name, host)
@@ -1761,15 +1814,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         out, err, code = self._run_cephadm(
             host, name, 'rm-daemon', args)
         self.log.debug('_remove_daemon code %s out %s' % (code, out))
-        if not code and host in self.daemon_cache:
+        if not code:
             # remove item from cache
-            if name in self.daemon_cache[host]['daemons']:
-                del self.daemon_cache[host]['daemons'][name]
-        self.daemon_cache[host]['last_update'] = None
+            self.daemon_cache.rm_daemon(host, name)
+        self.daemon_cache.invalidate_host(host)
         return "Removed {} from host '{}'".format(name, host)
 
     def _update_service(self, daemon_type, add_func, spec):
-        daemons = self._get_daemons_by_type(daemon_type)
+        daemons = self.daemon_cache.get_daemons_by_type(daemon_type)
         if len(daemons) > spec.count:
             # remove some
             to_remove = len(daemons) - spec.count
@@ -1787,7 +1839,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                         daemon_type: str,
                         spec: orchestrator.ServiceSpec,
                         create_func: Callable):
-        daemons = self._get_daemons_by_type(daemon_type)
+        daemons = self.daemon_cache.get_daemons_by_type(daemon_type)
         args = []
         num_added = 0
         assert spec.count is not None
@@ -1858,7 +1910,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if not network:
                 raise RuntimeError("Host '{}' is missing a network spec".format(host))
 
-        daemons = self._get_daemons_by_type('mon')
+        daemons = self.daemon_cache.get_daemons_by_type('mon')
         for _, _, name in spec.placement.hosts:
             if name and len([d for d in daemons if d.daemon_id == name]):
                 raise RuntimeError('name %s already exists', name)
@@ -1907,7 +1959,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if not network:
                 raise RuntimeError("Host '{}' is missing a network spec".format(host))
 
-        daemons = self._get_daemons_by_type('mon')
+        daemons = self.daemon_cache.get_daemons_by_type('mon')
         for _, _, name in spec.placement.hosts:
             if name and len([d for d in daemons if d.daemon_id == name]):
                 raise RuntimeError('name %s alrady exists', name)
@@ -1953,7 +2005,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         """
         spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
 
-        daemons = self._get_daemons_by_type('mgr')
+        daemons = self.daemon_cache.get_daemons_by_type('mgr')
         num_mgrs = len(daemons)
         if spec.count == num_mgrs:
             return orchestrator.Completion(value="The requested number of managers exist.")
@@ -2192,8 +2244,8 @@ scrape_configs:
             'needs_update': dict(),
             'up_to_date': list(),
         }
-        for host, di in self.daemon_cache.items():
-            for name, dd in di['daemons'].items():
+        for host, dm in self.daemon_cache.data.items():
+            for name, dd in dm.items():
                 if target_id == dd.container_image_id:
                     r['up_to_date'].append(dd.name())
                 else:
index afc54164d55084933eb077d42ff131efc5715194..31a87ac7e3b3c798081fbe078c46e896c1324f5f 100644 (file)
@@ -57,8 +57,8 @@ class TestCephadm(object):
 
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_host(self, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             assert wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
@@ -66,16 +66,16 @@ class TestCephadm(object):
         assert wait(cephadm_module, c) == []
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_service_ls(self, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             assert wait(cephadm_module, c) == []
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_device_ls(self, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.get_inventory()
@@ -96,8 +96,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_daemon_action(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         cephadm_module.service_cache_timeout = 10
         with self._with_host(cephadm_module, 'test'):
@@ -115,8 +115,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_mon_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@@ -127,8 +127,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@@ -141,8 +141,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_create_osds(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec('test', data_devices=DeviceSelection(paths=['']))
@@ -161,8 +161,8 @@ class TestCephadm(object):
             )
         ])
     ))
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_remove_osds(self, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
@@ -175,8 +175,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_mds(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
@@ -188,8 +188,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_rgw(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
 
         with self._with_host(cephadm_module, 'test'):
@@ -203,8 +203,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_rgw_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
 
         with self._with_host(cephadm_module, 'host1'):
@@ -223,9 +223,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator.serve", None)
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_rgw_update_fail(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
 
         with self._with_host(cephadm_module, 'host1'):
@@ -262,8 +261,8 @@ class TestCephadm(object):
             )
         ])
     ))
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_remove_daemon(self, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
@@ -284,8 +283,8 @@ class TestCephadm(object):
             )
         ])
     ))
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_remove_service(self, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
@@ -298,8 +297,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_rbd_mirror(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
@@ -312,8 +311,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_prometheus(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
@@ -326,8 +325,8 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_save_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._daemon_cache_rm_host")
+    @mock.patch("cephadm.module.DaemonCache.save_host")
+    @mock.patch("cephadm.module.DaemonCache.rm_host")
     def test_blink_device_light(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])