]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orchestrator: Add cache for Inventory and Services
authorSebastian Wagner <sebastian.wagner@suse.com>
Wed, 22 May 2019 13:33:24 +0000 (15:33 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Fri, 12 Jul 2019 07:31:40 +0000 (09:31 +0200)
mgr/mgr_module: Added persistent dict
  basically a pythonic interface for a k-v store
mgr/orchestrator: Added common code to implement a cache
mgr/ssh: The cache works by manually adding and removing hosts.
  only the data is invalidated
mgr/deepsea: The cache invalidates all at once and only
  valid objects are cache

Fixes https://tracker.ceph.com/issues/39990

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/deepsea/module.py
src/pybind/mgr/mgr_module.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/rook/module.py
src/pybind/mgr/selftest/module.py
src/pybind/mgr/ssh/module.py
src/pybind/mgr/test_orchestrator/module.py

index 2cbf25c14499959d2f30fc4d898d39962ab390e4..826ef6aa602c61a9abe4a29cdd9d6b59cd609b63 100644 (file)
@@ -108,7 +108,8 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._last_failure_msg = None
         self._all_completions = dict()
         self._completion_lock = Lock()
-
+        self.inventory_cache = orchestrator.OutdatableDict(self, 'inventory_cache')
+        self.service_cache = orchestrator.OutdatableDict(self, 'service_cache')
 
     def available(self):
         if not self._config_valid():
@@ -119,7 +120,6 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return True, ""
 
-
     def get_inventory(self, node_filter=None, refresh=False):
         """
         Note that this will raise an exception (e.g. if the salt-api is down,
@@ -128,14 +128,23 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
         cli, for example, just prints the traceback in the console, so the
         user at least sees the error.
         """
+        self.inventory_cache.remove_outdated()
+        if not self.inventory_cache.any_outdated() and not refresh:
+            if node_filter is None:
+                return orchestrator.TrivialReadCompletion(
+                    orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items()))
+            elif node_filter.labels is None:
+                return orchestrator.TrivialReadCompletion(
+                    orchestrator.InventoryNode.from_nested_items(
+                        self.inventory_cache.items_filtered(node_filter.nodes)))
 
         def process_result(event_data):
             result = []
             if event_data['success']:
                 for node_name, node_devs in event_data["return"].items():
-                    devs = list(map(lambda di:
-                        orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
-                        node_devs))
+                    if node_filter is None:
+                        self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs)
+                    devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs)
                     result.append(orchestrator.InventoryNode(node_name, devs))
             else:
                 self.log.error(event_data['return'])
@@ -163,8 +172,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             return c
 
-
-    def describe_service(self, service_type, service_id, node_name):
+    def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
 
         # Note: describe_service() does *not* support OSDs.  This is because
         # DeepSea doesn't really record what OSDs are deployed where; Ceph is
@@ -174,16 +182,27 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported"
 
+        self.service_cache.remove_outdated()
+        if not self.service_cache.any_outdated() and not refresh:
+            # Let's hope the services are complete.
+            services = [orchestrator.ServiceDescription.from_json(d[1]) for d in self.service_cache.items_filtered([node_name])]
+            services = [s for s in services if
+                        (True if s.service_type is None else s.service_type == service_type) and
+                        (True if s.service_id is None else s.service_id == service_id)]
+            return orchestrator.TrivialReadCompletion(services)
+
+
         def process_result(event_data):
             result = []
             if event_data['success']:
                 for node_name, service_info in event_data["return"].items():
                     for service_type, service_instance in service_info.items():
-                        desc = orchestrator.ServiceDescription()
-                        desc.nodename = node_name
-                        desc.service_instance = service_instance
-                        desc.service_type = service_type
+                        desc = orchestrator.ServiceDescription(nodename=node_name,
+                                                               service_instance=service_instance,
+                                                               service_type=service_type)
                         result.append(desc)
+                for service in result:
+                    self.service_cache[service.nodename] = orchestrator.OutdatableData(service.to_json())
             else:
                 self.log.error(event_data['return'])
             return result
@@ -202,7 +221,6 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             return c
 
-
     def wait(self, completions):
         incomplete = False
 
@@ -238,7 +256,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
                         "Unknown configuration option '{0}'".format(cmd['key']))
 
             self.set_module_option(cmd['key'], cmd['value'])
-            self._event.set();
+            self._event.set()
             return 0, "Configuration option '{0}' updated".format(cmd['key']), ''
 
         return (-errno.EINVAL, '',
index bdd0b26cc390898dea88a62ae67720b16ef4995d..ba36be96e5d2a10875722540869fe27f6d75f739 100644 (file)
@@ -1,5 +1,10 @@
 import ceph_module  # noqa
 
+try:
+    from typing import Set, Tuple, Iterator, Any
+except ImportError:
+    # just for type checking
+    pass
 import logging
 import json
 import six
@@ -1311,3 +1316,69 @@ class MgrModule(ceph_module.BaseMgrModule):
         :param int query_id: query ID
         """
         return self._ceph_get_osd_perf_counters(query_id)
+
+
+class PersistentStoreDict(object):
+    def __init__(self, mgr, prefix):
+        # type: (MgrModule, str) -> None
+        self.mgr = mgr
+        self.prefix = prefix + '.'
+
+    def _mk_store_key(self, key):
+        return self.prefix + key
+
+    def __missing__(self, key):
+        # KeyError won't work for the `in` operator.
+        # https://docs.python.org/3/reference/expressions.html#membership-test-details
+        raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
+
+    def clear(self):
+        # Don't make any assumptions about the content of the values.
+        for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
+            k, _ = item
+            self.mgr.set_store(k, None)
+
+    def __getitem__(self, item):
+        # type: (str) -> Any
+        key = self._mk_store_key(item)
+        try:
+            val = self.mgr.get_store(key)
+            if val is None:
+                self.__missing__(key)
+            return json.loads(val)
+        except (KeyError, AttributeError, IndexError, ValueError, TypeError):
+            logging.getLogger(__name__).exception('failed to deserialize')
+            self.mgr.set_store(key, None)
+            raise
+
+    def __setitem__(self, item, value):
+        # type: (str, Any) -> None
+        """
+        value=None is not allowed, as it will remove the key.
+        """
+        key = self._mk_store_key(item)
+        self.mgr.set_store(key, json.dumps(value) if value is not None else None)
+
+    def __delitem__(self, item):
+        self[item] = None
+
+    def __len__(self):
+        return len(self.keys())
+
+    def items(self):
+        # type: () -> Iterator[Tuple[str, Any]]
+        prefix_len = len(self.prefix)
+        try:
+            for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
+                k, v = item
+                yield k[prefix_len:], json.loads(v)
+        except (KeyError, AttributeError, IndexError, ValueError, TypeError):
+            logging.getLogger(__name__).exception('failed to deserialize')
+            self.clear()
+
+    def keys(self):
+        # type: () -> Set[str]
+        return {item[0] for item in self.items()}
+
+    def __iter__(self):
+        return iter(self.keys())
index d0b0f5839ab18005ddb9772da846d6ef6c905a19..ec0e8a480408b73f2f0cd48f95d48198ea3fcc5a 100644 (file)
@@ -8,14 +8,16 @@ import sys
 import time
 import fnmatch
 import uuid
+import datetime
 
 import six
 
-from mgr_module import MgrModule
+from mgr_module import MgrModule, PersistentStoreDict
 from mgr_util import format_bytes
 
 try:
-    from typing import TypeVar, Generic, List, Optional, Union, Tuple
+    from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator
+
     T = TypeVar('T')
     G = Generic[T]
 except ImportError:
@@ -144,6 +146,23 @@ class ReadCompletion(_Completion):
         return not self.is_complete
 
 
+class TrivialReadCompletion(ReadCompletion):
+    """
+    This is the trivial completion simply wrapping a result.
+    """
+    def __init__(self, result):
+        super(TrivialReadCompletion, self).__init__()
+        self._result = result
+
+    @property
+    def result(self):
+        return self._result
+
+    @property
+    def is_complete(self):
+        return True
+
+
 class WriteCompletion(_Completion):
     """
     ``Orchestrator`` implementations should inherit from this
@@ -309,7 +328,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def describe_service(self, service_type=None, service_id=None, node_name=None):
-        # type: (Optional[str], Optional[str], Optional[str]) -> ReadCompletion[List[ServiceDescription]]
+        # type: (str, str, str) -> ReadCompletion[List[ServiceDescription]]
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -488,13 +507,16 @@ class ServiceDescription(object):
     is literally up this second, it's a description of where the orchestrator
     has decided the service should run.
     """
-    def __init__(self):
+
+    def __init__(self, nodename=None, container_id=None, service=None, service_instance=None,
+                 service_type=None, version=None, rados_config_location=None,
+                 service_url=None, status=None, status_desc=None):
         # Node is at the same granularity as InventoryNode
-        self.nodename = None
+        self.nodename = nodename
 
         # Not everyone runs in containers, but enough people do to
         # justify having this field here.
-        self.container_id = None
+        self.container_id = container_id
 
         # Some services can be deployed in groups. For example, mds's can
         # have an active and standby daemons, and nfs-ganesha can run daemons
@@ -505,33 +527,33 @@ class ServiceDescription(object):
         # Filesystem name in the FSMap).
         #
         # Single-instance services should leave this set to None
-        self.service = None
+        self.service = service
 
         # The orchestrator will have picked some names for daemons,
         # typically either based on hostnames or on pod names.
         # This is the <foo> in mds.<foo>, the ID that will appear
         # in the FSMap/ServiceMap.
-        self.service_instance = None
+        self.service_instance = service_instance
 
         # The type of service (osd, mon, mgr, etc.)
-        self.service_type = None
+        self.service_type = service_type
 
         # Service version that was deployed
-        self.version = None
+        self.version = version
 
         # Location of the service configuration when stored in rados
         # object. Format: "rados://<pool>/[<namespace/>]<object>"
-        self.rados_config_location = None
+        self.rados_config_location = rados_config_location
 
         # If the service exposes REST-like API, this attribute should hold
         # the URL.
-        self.service_url = None
+        self.service_url = service_url
 
         # Service status: -1 error, 0 stopped, 1 running
-        self.status = None
+        self.status = status
 
         # Service status description when status == -1.
-        self.status_desc = None
+        self.status_desc = status_desc
 
     def to_json(self):
         out = {
@@ -548,6 +570,10 @@ class ServiceDescription(object):
         }
         return {k: v for (k, v) in out.items() if v is not None}
 
+    @classmethod
+    def from_json(cls, data):
+        return cls(**data)
+
 
 class DeviceSelection(object):
     """
@@ -807,6 +833,10 @@ class InventoryDevice(object):
         dev.extended = data
         return dev
 
+    @classmethod
+    def from_ceph_volume_inventory_list(cls, datas):
+        return [cls.from_ceph_volume_inventory(d) for d in datas]
+
     def pretty_print(self, only_header=False):
         """Print a human friendly line with the information of the device
 
@@ -844,6 +874,11 @@ class InventoryNode(object):
     def to_json(self):
         return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
 
+    @classmethod
+    def from_nested_items(cls, hosts):
+        devs = InventoryDevice.from_ceph_volume_inventory_list
+        return [cls(item[0], devs(item[1])) for item in hosts]
+
 
 def _mk_orch_methods(cls):
     # Needs to be defined outside of for.
@@ -947,3 +982,97 @@ class OrchestratorClientMixin(Orchestrator):
                 break
         for c in completions:
             self._update_completion_progress(c)
+
+
+class OutdatableData(object):
+    DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
+
+    def __init__(self, data=None, last_refresh=None):
+        # type: (Optional[dict], Optional[datetime.datetime]) -> None
+        self._data = data
+        if data is not None and last_refresh is None:
+            self.last_refresh = datetime.datetime.utcnow()
+        else:
+            self.last_refresh = last_refresh
+
+    def json(self):
+        if self.last_refresh is not None:
+            timestr = self.last_refresh.strftime(self.DATEFMT)
+        else:
+            timestr = None
+
+        return {
+            "data": self.data,
+            "last_refresh": timestr,
+        }
+
+    @property
+    def data(self):
+        return self._data
+
+    # @data.setter
+    # No setter, as it doesn't work as expected: It's not saved in store automatically
+
+    @classmethod
+    def time_from_string(cls, timestr):
+        if timestr is None:
+            return None
+        # drop the 'Z' timezone indication, it's always UTC
+        timestr = timestr.rstrip('Z')
+        return datetime.datetime.strptime(timestr, cls.DATEFMT)
+
+
+    @classmethod
+    def from_json(cls, data):
+        return cls(data['data'], cls.time_from_string(data['last_refresh']))
+
+    def outdated(self, timeout_min=None):
+        if timeout_min is None:
+            timeout_min = 10
+        if self.last_refresh is None:
+            return True
+        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+            minutes=timeout_min)
+        return self.last_refresh < cutoff
+
+    def __repr__(self):
+        return 'OutdatableData(data={}, last_refresh={})'.format(self.data, self.last_refresh)
+
+
+class OutdatableDict(PersistentStoreDict):
+    """
+    Toolbox for implementing a cache. As every orchestrator has
+    different needs, we cannot implement any logic here.
+    """
+
+    def __getitem__(self, item):
+        # type: (str) -> OutdatableData
+        return OutdatableData.from_json(super(OutdatableDict, self).__getitem__(item))
+
+    def __setitem__(self, key, value):
+        # type: (str, OutdatableData) -> None
+        val = None if value is None else value.json()
+        super(OutdatableDict, self).__setitem__(key, val)
+
+    def items(self):
+        # type: () -> Iterator[Tuple[str, OutdatableData]]
+        for item in super(OutdatableDict, self).items():
+            k, v = item
+            yield k, OutdatableData.from_json(v)
+
+    def items_filtered(self, keys=None):
+        if keys:
+            return [(host, self[keys]) for host in keys]
+        else:
+            return list(self.items())
+
+    def any_outdated(self, timeout=None):
+        items = self.items()
+        if not items:
+            return True
+        return any([i[1].outdated(timeout) for i in items])
+
+    def remove_outdated(self):
+        outdated = [item[0] for item in self.items() if item[1].outdated()]
+        for o in outdated:
+            del self[o]
index a2653512099e7ce9a8f548dbdecbd2cd4744e9bb..4b49d68100e96906c0b7fe1565b0c2591566f8a8 100644 (file)
@@ -330,7 +330,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         return result
 
     @deferred_read
-    def describe_service(self, service_type=None, service_id=None, node_name=None):
+    def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
 
         if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None):
             raise orchestrator.OrchestratorValidationError(service_type + " unsupported")
index 71913d31d7bf24262937108ea0ebfd3dd8c422c0..a5a2cc10273b167f9ff810676e4acc898462e919 100644 (file)
@@ -1,5 +1,5 @@
 
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, PersistentStoreDict
 import threading
 import random
 import json
@@ -142,7 +142,7 @@ class Module(MgrModule):
             try:
                 r = self.remote(command['module'], "self_test")
             except RuntimeError as e:
-                return -1, '', "Test failed: {0}".format(e.message)
+                return -1, '', "Test failed: {0}".format(e)
             else:
                 return 0, str(r), "Self-test OK"
         elif command['prefix'] == 'mgr self-test health set':
@@ -170,7 +170,7 @@ class Module(MgrModule):
         try:
             checks = json.loads(command["checks"])
         except Exception as e:
-            return -1, "", "Failed to decode JSON input: {}".format(e.message)
+            return -1, "", "Failed to decode JSON input: {}".format(e)
 
         try:
             for check, info in six.iteritems(checks):
@@ -180,7 +180,7 @@ class Module(MgrModule):
                     "detail": [str(m) for m in info["detail"]]
                 }
         except Exception as e:
-            return -1, "", "Invalid health check format: {}".format(e.message)
+            return -1, "", "Invalid health check format: {}".format(e)
 
         self.set_health_checks(self._health)
         return 0, "", ""
@@ -198,9 +198,9 @@ class Module(MgrModule):
 
     def _insights_set_now_offset(self, inbuf, command):
         try:
-            hours = long(command["hours"])
+            hours = int(command["hours"])
         except Exception as e:
-            return -1, "", "Timestamp must be numeric: {}".format(e.message)
+            return -1, "", "Timestamp must be numeric: {}".format(e)
 
         self.remote("insights", "testing_set_now_time_offset", hours)
         return 0, "", ""
@@ -214,6 +214,7 @@ class Module(MgrModule):
         self._self_test_store()
         self._self_test_misc()
         self._self_test_perf_counters()
+        self._self_persistent_store_dict()
 
     def _self_test_getters(self):
         self.version
@@ -386,6 +387,25 @@ class Module(MgrModule):
 
         self.log.info("Finished self-test procedure.")
 
+    def _self_persistent_store_dict(self):
+        self.test_dict = PersistentStoreDict(self, 'test_dict')
+        for i in "abcde":
+            self.test_dict[i] = {i:1}
+        assert self.test_dict.keys() == set("abcde")
+        assert 'a' in self.test_dict
+        del self.test_dict['a']
+        assert self.test_dict.keys() == set("bcde"), self.test_dict.keys()
+        assert 'a' not in self.test_dict
+        self.test_dict.clear()
+        assert not self.test_dict, dict(self.test_dict.items())
+        self.set_store('test_dict.a', 'invalid json')
+        try:
+            self.test_dict['a']
+            assert False
+        except ValueError:
+            pass
+        assert not self.test_dict, dict(self.test_dict.items())
+
     def _test_remote_calls(self):
         # Test making valid call
         self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})
index 1fef4c8b1fd8d7dd32c0a12becbcacab480334c0..43072c4d382d34bdcea2706d581685406e2d18c0 100644 (file)
@@ -1,8 +1,10 @@
 import json
 import errno
+import logging
+from functools import wraps
+
 import six
 import os
-import datetime
 import tempfile
 import multiprocessing.pool
 
@@ -18,13 +20,13 @@ except ImportError as e:
     remoto = None
     remoto_import_error = str(e)
 
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f%z'
+logger = logging.getLogger(__name__)
 
 # high-level TODO:
 #  - bring over some of the protections from ceph-deploy that guard against
 #    multiple bootstrapping / initialization
 
-class SSHReadCompletion(orchestrator.ReadCompletion):
+class SSHCompletionmMixin(object):
     def __init__(self, result):
         if isinstance(result, multiprocessing.pool.AsyncResult):
             self._result = [result]
@@ -36,34 +38,13 @@ class SSHReadCompletion(orchestrator.ReadCompletion):
     def result(self):
         return list(map(lambda r: r.get(), self._result))
 
+class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
     @property
     def is_complete(self):
         return all(map(lambda r: r.ready(), self._result))
 
-class SSHReadCompletionReady(SSHReadCompletion):
-    def __init__(self, result):
-        self._result = result
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def is_complete(self):
-        return True
 
-class SSHWriteCompletion(orchestrator.WriteCompletion):
-    def __init__(self, result):
-        super(SSHWriteCompletion, self).__init__()
-        if isinstance(result, multiprocessing.pool.AsyncResult):
-            self._result = [result]
-        else:
-            self._result = result
-        assert isinstance(self._result, list)
-
-    @property
-    def result(self):
-        return list(map(lambda r: r.get(), self._result))
+class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
 
     @property
     def is_persistent(self):
@@ -115,6 +96,23 @@ class SSHConnection(object):
     def __getattr__(self, name):
         return getattr(self.conn, name)
 
+
+def log_exceptions(f):
+    if six.PY3:
+        return f
+    else:
+        # Python 2 does no exception chaining, thus the
+        # real exception is lost
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            try:
+                return f(*args, **kwargs)
+            except Exception:
+                logger.exception('something went wrong.')
+                raise
+        return wrapper
+
+
 class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
     _STORE_HOST_PREFIX = "host"
@@ -143,6 +141,14 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._cluster_fsid = None
         self._worker_pool = multiprocessing.pool.ThreadPool(1)
 
+        # the keys in inventory_cache are authoritative.
+        #   You must not call remove_outdated()
+        # The values are cached by instance.
+        # cache is invalidated by
+        # 1. timeout
+        # 2. refresh parameter
+        self.inventory_cache = orchestrator.OutdatableDict(self, self._STORE_HOST_PREFIX)
+
     def handle_command(self, inbuf, command):
         if command["prefix"] == "ssh set-ssh-config":
             return self._set_ssh_config(inbuf, command)
@@ -170,21 +176,17 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         complete = True
         for c in completions:
+            if c.is_complete:
+                continue
+
             if not isinstance(c, SSHReadCompletion) and \
                     not isinstance(c, SSHWriteCompletion):
                 raise TypeError("unexpected completion: {}".format(c.__class__))
 
-            if c.is_complete:
-                continue
-
             complete = False
 
         return complete
 
-    @staticmethod
-    def time_from_string(timestr):
-        return datetime.datetime.strptime(timestr, DATEFMT)
-
     def _get_cluster_fsid(self):
         """
         Fetch and cache the cluster fsid.
@@ -200,12 +202,10 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         """
         if isinstance(hosts, six.string_types):
             hosts = [hosts]
-        unregistered_hosts = []
-        for host in hosts:
-            key = self._hostname_to_store_key(host)
-            if not self.get_store(key):
-                unregistered_hosts.append(host)
+        keys = self.inventory_cache.keys()
+        unregistered_hosts = set(hosts) - keys
         if unregistered_hosts:
+            logger.warning('keys = {}'.format(keys))
             raise RuntimeError("Host(s) {} not registered".format(
                 ", ".join(map(lambda h: "'{}'".format(h),
                     unregistered_hosts))))
@@ -365,21 +365,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         conn.remote_module.write_keyring(keyring_path, keyring)
         return keyring_path
 
-    def _hostname_to_store_key(self, host):
-        return "{}.{}".format(self._STORE_HOST_PREFIX, host)
-
     def _get_hosts(self, wanted=None):
-        if wanted:
-            hosts_info = []
-            for host in wanted:
-                key = self._hostname_to_store_key(host)
-                info = self.get_store(key)
-                if info:
-                    hosts_info.append((key, info))
-        else:
-            hosts_info = six.iteritems(self.get_store_prefix(self._STORE_HOST_PREFIX))
-
-        return list(map(lambda kv: (kv[0], json.loads(kv[1])), hosts_info))
+        return self.inventory_cache.items_filtered(wanted)
 
     def add_host(self, host):
         """
@@ -387,13 +374,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         :param host: host name
         """
+        @log_exceptions
         def run(host):
-            key = self._hostname_to_store_key(host)
-            self.set_store(key, json.dumps({
-                "host": host,
-                "inventory": None,
-                "last_inventory_refresh": None
-            }))
+            self.inventory_cache[host] = orchestrator.OutdatableData()
             return "Added host '{}'".format(host)
 
         return SSHWriteCompletion(
@@ -405,9 +388,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         :param host: host name
         """
+        @log_exceptions
         def run(host):
-            key = self._hostname_to_store_key(host)
-            self.set_store(key, None)
+            del self.inventory_cache[host]
             return "Removed host '{}'".format(host)
 
         return SSHWriteCompletion(
@@ -423,11 +406,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         TODO:
           - InventoryNode probably needs to be able to report labels
         """
-        nodes = []
-        for key, host_info in self._get_hosts():
-            node = orchestrator.InventoryNode(host_info["host"], [])
-            nodes.append(node)
-        return SSHReadCompletionReady(nodes)
+        nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
+        return orchestrator.TrivialReadCompletion(nodes)
 
     def _get_device_inventory(self, host):
         """
@@ -473,42 +453,23 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
             # this implies the returned hosts are registered
             hosts = self._get_hosts()
 
-        def run(key, host_info):
-            updated = False
-            host = host_info["host"]
-
-            if not host_info["inventory"]:
-                self.log.info("caching inventory for '{}'".format(host))
-                host_info["inventory"] = self._get_device_inventory(host)
-                updated = True
-            else:
-                timeout_min = int(self.get_module_option(
-                    "inventory_cache_timeout_min",
-                    self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
-
-                cutoff = datetime.datetime.utcnow() - datetime.timedelta(
-                        minutes=timeout_min)
-
-                last_update = self.time_from_string(host_info["last_inventory_refresh"])
+        @log_exceptions
+        def run(host, host_info):
+            # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
 
-                if last_update < cutoff or refresh:
-                    self.log.info("refresh stale inventory for '{}'".format(host))
-                    host_info["inventory"] = self._get_device_inventory(host)
-                    updated = True
-                else:
-                    self.log.info("reading cached inventory for '{}'".format(host))
-                    pass
+            timeout_min = int(self.get_module_option(
+                "inventory_cache_timeout_min",
+                self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
 
-            if updated:
-                now = datetime.datetime.utcnow()
-                now = now.strftime(DATEFMT)
-                host_info["last_inventory_refresh"] = now
-                self.set_store(key, json.dumps(host_info))
-
-            devices = list(map(lambda di:
-                orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
-                host_info["inventory"]))
+            if host_info.outdated(timeout_min) or refresh:
+                self.log.info("refresh stale inventory for '{}'".format(host))
+                data = self._get_device_inventory(host)
+                host_info = orchestrator.OutdatableData(data)
+                self.inventory_cache[host] = host_info
+            else:
+                self.log.debug("reading cached inventory for '{}'".format(host))
 
+            devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
             return orchestrator.InventoryNode(host, devices)
 
         results = []
@@ -518,6 +479,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return SSHReadCompletion(results)
 
+    @log_exceptions
     def _create_osd(self, host, drive_group):
         conn = self._get_connection(host)
         try:
@@ -758,7 +720,6 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
             raise NotImplementedError("Removing managers is not supported")
 
         # check that all the hosts are registered
-        hosts = list(set(hosts))
         self._require_hosts(hosts)
 
         # we assume explicit placement by which there are the same number of
index 236207358c8ab0e751da4f67c14f0cbb458fd344..f6ed7b20b0ce0e8b05f09f2b4c83e45e75caaf84 100644 (file)
@@ -6,7 +6,7 @@ import functools
 import uuid
 from subprocess import check_output, CalledProcessError
 
-from mgr_module import MgrModule
+from mgr_module import MgrModule, PersistentStoreDict
 
 import orchestrator
 
@@ -181,7 +181,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         raise Exception('c-v failed')
 
     @deferred_read
-    def describe_service(self, service_type=None, service_id=None, node_name=None):
+    def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
         """
         There is no guarantee which daemons are returned by describe_service, except that
         it returns the mgr we're running in.