From: Sebastian Wagner Date: Wed, 22 May 2019 13:33:24 +0000 (+0200) Subject: mgr/orchestrator: Add cache for Inventory and Services X-Git-Tag: v15.1.0~2201^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f49a8751851328c94ad5acaab8fd70ddda56f9d3;p=ceph.git mgr/orchestrator: Add cache for Inventory and Services mgr/mgr_module: Added persistent dict basically a pythonic interface for a k-v store mgr/orchestrator: Added common code to implement a cache mgr/ssh: The cache works by manually adding and removing hosts. only the data is invalidated mgr/deepsea: The cache invalidates all at once and only valid objects are cache Fixes https://tracker.ceph.com/issues/39990 Signed-off-by: Sebastian Wagner --- diff --git a/src/pybind/mgr/deepsea/module.py b/src/pybind/mgr/deepsea/module.py index 2cbf25c14499..826ef6aa602c 100644 --- a/src/pybind/mgr/deepsea/module.py +++ b/src/pybind/mgr/deepsea/module.py @@ -108,7 +108,8 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): self._last_failure_msg = None self._all_completions = dict() self._completion_lock = Lock() - + self.inventory_cache = orchestrator.OutdatableDict(self, 'inventory_cache') + self.service_cache = orchestrator.OutdatableDict(self, 'service_cache') def available(self): if not self._config_valid(): @@ -119,7 +120,6 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): return True, "" - def get_inventory(self, node_filter=None, refresh=False): """ Note that this will raise an exception (e.g. if the salt-api is down, @@ -128,14 +128,23 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): cli, for example, just prints the traceback in the console, so the user at least sees the error. """ + self.inventory_cache.remove_outdated() + if not self.inventory_cache.any_outdated() and not refresh: + if node_filter is None: + return orchestrator.TrivialReadCompletion( + orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items())) + elif node_filter.labels is None: + return orchestrator.TrivialReadCompletion( + orchestrator.InventoryNode.from_nested_items( + self.inventory_cache.items_filtered(node_filter.nodes))) def process_result(event_data): result = [] if event_data['success']: for node_name, node_devs in event_data["return"].items(): - devs = list(map(lambda di: - orchestrator.InventoryDevice.from_ceph_volume_inventory(di), - node_devs)) + if node_filter is None: + self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs) + devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs) result.append(orchestrator.InventoryNode(node_name, devs)) else: self.log.error(event_data['return']) @@ -163,8 +172,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): return c - - def describe_service(self, service_type, service_id, node_name): + def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): # Note: describe_service() does *not* support OSDs. This is because # DeepSea doesn't really record what OSDs are deployed where; Ceph is @@ -174,16 +182,27 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported" + self.service_cache.remove_outdated() + if not self.service_cache.any_outdated() and not refresh: + # Let's hope the services are complete. + services = [orchestrator.ServiceDescription.from_json(d[1]) for d in self.service_cache.items_filtered([node_name])] + services = [s for s in services if + (True if s.service_type is None else s.service_type == service_type) and + (True if s.service_id is None else s.service_id == service_id)] + return orchestrator.TrivialReadCompletion(services) + + def process_result(event_data): result = [] if event_data['success']: for node_name, service_info in event_data["return"].items(): for service_type, service_instance in service_info.items(): - desc = orchestrator.ServiceDescription() - desc.nodename = node_name - desc.service_instance = service_instance - desc.service_type = service_type + desc = orchestrator.ServiceDescription(nodename=node_name, + service_instance=service_instance, + service_type=service_type) result.append(desc) + for service in result: + self.service_cache[service.nodename] = orchestrator.OutdatableData(service.to_json()) else: self.log.error(event_data['return']) return result @@ -202,7 +221,6 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): return c - def wait(self, completions): incomplete = False @@ -238,7 +256,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): "Unknown configuration option '{0}'".format(cmd['key'])) self.set_module_option(cmd['key'], cmd['value']) - self._event.set(); + self._event.set() return 0, "Configuration option '{0}' updated".format(cmd['key']), '' return (-errno.EINVAL, '', diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index bdd0b26cc390..ba36be96e5d2 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1,5 +1,10 @@ import ceph_module # noqa +try: + from typing import Set, Tuple, Iterator, Any +except ImportError: + # just for type checking + pass import logging import json import six @@ -1311,3 +1316,69 @@ class MgrModule(ceph_module.BaseMgrModule): :param int query_id: query ID """ return self._ceph_get_osd_perf_counters(query_id) + + +class PersistentStoreDict(object): + def __init__(self, mgr, prefix): + # type: (MgrModule, str) -> None + self.mgr = mgr + self.prefix = prefix + '.' + + def _mk_store_key(self, key): + return self.prefix + key + + def __missing__(self, key): + # KeyError won't work for the `in` operator. + # https://docs.python.org/3/reference/expressions.html#membership-test-details + raise IndexError('PersistentStoreDict: "{}" not found'.format(key)) + + def clear(self): + # Don't make any assumptions about the content of the values. + for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)): + k, _ = item + self.mgr.set_store(k, None) + + def __getitem__(self, item): + # type: (str) -> Any + key = self._mk_store_key(item) + try: + val = self.mgr.get_store(key) + if val is None: + self.__missing__(key) + return json.loads(val) + except (KeyError, AttributeError, IndexError, ValueError, TypeError): + logging.getLogger(__name__).exception('failed to deserialize') + self.mgr.set_store(key, None) + raise + + def __setitem__(self, item, value): + # type: (str, Any) -> None + """ + value=None is not allowed, as it will remove the key. + """ + key = self._mk_store_key(item) + self.mgr.set_store(key, json.dumps(value) if value is not None else None) + + def __delitem__(self, item): + self[item] = None + + def __len__(self): + return len(self.keys()) + + def items(self): + # type: () -> Iterator[Tuple[str, Any]] + prefix_len = len(self.prefix) + try: + for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)): + k, v = item + yield k[prefix_len:], json.loads(v) + except (KeyError, AttributeError, IndexError, ValueError, TypeError): + logging.getLogger(__name__).exception('failed to deserialize') + self.clear() + + def keys(self): + # type: () -> Set[str] + return {item[0] for item in self.items()} + + def __iter__(self): + return iter(self.keys()) diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index d0b0f5839ab1..ec0e8a480408 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -8,14 +8,16 @@ import sys import time import fnmatch import uuid +import datetime import six -from mgr_module import MgrModule +from mgr_module import MgrModule, PersistentStoreDict from mgr_util import format_bytes try: - from typing import TypeVar, Generic, List, Optional, Union, Tuple + from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator + T = TypeVar('T') G = Generic[T] except ImportError: @@ -144,6 +146,23 @@ class ReadCompletion(_Completion): return not self.is_complete +class TrivialReadCompletion(ReadCompletion): + """ + This is the trivial completion simply wrapping a result. + """ + def __init__(self, result): + super(TrivialReadCompletion, self).__init__() + self._result = result + + @property + def result(self): + return self._result + + @property + def is_complete(self): + return True + + class WriteCompletion(_Completion): """ ``Orchestrator`` implementations should inherit from this @@ -309,7 +328,7 @@ class Orchestrator(object): raise NotImplementedError() def describe_service(self, service_type=None, service_id=None, node_name=None): - # type: (Optional[str], Optional[str], Optional[str]) -> ReadCompletion[List[ServiceDescription]] + # type: (str, str, str) -> ReadCompletion[List[ServiceDescription]] """ Describe a service (of any kind) that is already configured in the orchestrator. For example, when viewing an OSD in the dashboard @@ -488,13 +507,16 @@ class ServiceDescription(object): is literally up this second, it's a description of where the orchestrator has decided the service should run. """ - def __init__(self): + + def __init__(self, nodename=None, container_id=None, service=None, service_instance=None, + service_type=None, version=None, rados_config_location=None, + service_url=None, status=None, status_desc=None): # Node is at the same granularity as InventoryNode - self.nodename = None + self.nodename = nodename # Not everyone runs in containers, but enough people do to # justify having this field here. - self.container_id = None + self.container_id = container_id # Some services can be deployed in groups. For example, mds's can # have an active and standby daemons, and nfs-ganesha can run daemons @@ -505,33 +527,33 @@ class ServiceDescription(object): # Filesystem name in the FSMap). # # Single-instance services should leave this set to None - self.service = None + self.service = service # The orchestrator will have picked some names for daemons, # typically either based on hostnames or on pod names. # This is the in mds., the ID that will appear # in the FSMap/ServiceMap. - self.service_instance = None + self.service_instance = service_instance # The type of service (osd, mon, mgr, etc.) - self.service_type = None + self.service_type = service_type # Service version that was deployed - self.version = None + self.version = version # Location of the service configuration when stored in rados # object. Format: "rados:///[]" - self.rados_config_location = None + self.rados_config_location = rados_config_location # If the service exposes REST-like API, this attribute should hold # the URL. - self.service_url = None + self.service_url = service_url # Service status: -1 error, 0 stopped, 1 running - self.status = None + self.status = status # Service status description when status == -1. - self.status_desc = None + self.status_desc = status_desc def to_json(self): out = { @@ -548,6 +570,10 @@ class ServiceDescription(object): } return {k: v for (k, v) in out.items() if v is not None} + @classmethod + def from_json(cls, data): + return cls(**data) + class DeviceSelection(object): """ @@ -807,6 +833,10 @@ class InventoryDevice(object): dev.extended = data return dev + @classmethod + def from_ceph_volume_inventory_list(cls, datas): + return [cls.from_ceph_volume_inventory(d) for d in datas] + def pretty_print(self, only_header=False): """Print a human friendly line with the information of the device @@ -844,6 +874,11 @@ class InventoryNode(object): def to_json(self): return {'name': self.name, 'devices': [d.to_json() for d in self.devices]} + @classmethod + def from_nested_items(cls, hosts): + devs = InventoryDevice.from_ceph_volume_inventory_list + return [cls(item[0], devs(item[1])) for item in hosts] + def _mk_orch_methods(cls): # Needs to be defined outside of for. @@ -947,3 +982,97 @@ class OrchestratorClientMixin(Orchestrator): break for c in completions: self._update_completion_progress(c) + + +class OutdatableData(object): + DATEFMT = '%Y-%m-%d %H:%M:%S.%f' + + def __init__(self, data=None, last_refresh=None): + # type: (Optional[dict], Optional[datetime.datetime]) -> None + self._data = data + if data is not None and last_refresh is None: + self.last_refresh = datetime.datetime.utcnow() + else: + self.last_refresh = last_refresh + + def json(self): + if self.last_refresh is not None: + timestr = self.last_refresh.strftime(self.DATEFMT) + else: + timestr = None + + return { + "data": self.data, + "last_refresh": timestr, + } + + @property + def data(self): + return self._data + + # @data.setter + # No setter, as it doesn't work as expected: It's not saved in store automatically + + @classmethod + def time_from_string(cls, timestr): + if timestr is None: + return None + # drop the 'Z' timezone indication, it's always UTC + timestr = timestr.rstrip('Z') + return datetime.datetime.strptime(timestr, cls.DATEFMT) + + + @classmethod + def from_json(cls, data): + return cls(data['data'], cls.time_from_string(data['last_refresh'])) + + def outdated(self, timeout_min=None): + if timeout_min is None: + timeout_min = 10 + if self.last_refresh is None: + return True + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + minutes=timeout_min) + return self.last_refresh < cutoff + + def __repr__(self): + return 'OutdatableData(data={}, last_refresh={})'.format(self.data, self.last_refresh) + + +class OutdatableDict(PersistentStoreDict): + """ + Toolbox for implementing a cache. As every orchestrator has + different needs, we cannot implement any logic here. + """ + + def __getitem__(self, item): + # type: (str) -> OutdatableData + return OutdatableData.from_json(super(OutdatableDict, self).__getitem__(item)) + + def __setitem__(self, key, value): + # type: (str, OutdatableData) -> None + val = None if value is None else value.json() + super(OutdatableDict, self).__setitem__(key, val) + + def items(self): + # type: () -> Iterator[Tuple[str, OutdatableData]] + for item in super(OutdatableDict, self).items(): + k, v = item + yield k, OutdatableData.from_json(v) + + def items_filtered(self, keys=None): + if keys: + return [(host, self[keys]) for host in keys] + else: + return list(self.items()) + + def any_outdated(self, timeout=None): + items = self.items() + if not items: + return True + return any([i[1].outdated(timeout) for i in items]) + + def remove_outdated(self): + outdated = [item[0] for item in self.items() if item[1].outdated()] + for o in outdated: + del self[o] diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index a2653512099e..4b49d68100e9 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -330,7 +330,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result @deferred_read - def describe_service(self, service_type=None, service_id=None, node_name=None): + def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None): raise orchestrator.OrchestratorValidationError(service_type + " unsupported") diff --git a/src/pybind/mgr/selftest/module.py b/src/pybind/mgr/selftest/module.py index 71913d31d7bf..a5a2cc10273b 100644 --- a/src/pybind/mgr/selftest/module.py +++ b/src/pybind/mgr/selftest/module.py @@ -1,5 +1,5 @@ -from mgr_module import MgrModule, CommandResult +from mgr_module import MgrModule, CommandResult, PersistentStoreDict import threading import random import json @@ -142,7 +142,7 @@ class Module(MgrModule): try: r = self.remote(command['module'], "self_test") except RuntimeError as e: - return -1, '', "Test failed: {0}".format(e.message) + return -1, '', "Test failed: {0}".format(e) else: return 0, str(r), "Self-test OK" elif command['prefix'] == 'mgr self-test health set': @@ -170,7 +170,7 @@ class Module(MgrModule): try: checks = json.loads(command["checks"]) except Exception as e: - return -1, "", "Failed to decode JSON input: {}".format(e.message) + return -1, "", "Failed to decode JSON input: {}".format(e) try: for check, info in six.iteritems(checks): @@ -180,7 +180,7 @@ class Module(MgrModule): "detail": [str(m) for m in info["detail"]] } except Exception as e: - return -1, "", "Invalid health check format: {}".format(e.message) + return -1, "", "Invalid health check format: {}".format(e) self.set_health_checks(self._health) return 0, "", "" @@ -198,9 +198,9 @@ class Module(MgrModule): def _insights_set_now_offset(self, inbuf, command): try: - hours = long(command["hours"]) + hours = int(command["hours"]) except Exception as e: - return -1, "", "Timestamp must be numeric: {}".format(e.message) + return -1, "", "Timestamp must be numeric: {}".format(e) self.remote("insights", "testing_set_now_time_offset", hours) return 0, "", "" @@ -214,6 +214,7 @@ class Module(MgrModule): self._self_test_store() self._self_test_misc() self._self_test_perf_counters() + self._self_persistent_store_dict() def _self_test_getters(self): self.version @@ -386,6 +387,25 @@ class Module(MgrModule): self.log.info("Finished self-test procedure.") + def _self_persistent_store_dict(self): + self.test_dict = PersistentStoreDict(self, 'test_dict') + for i in "abcde": + self.test_dict[i] = {i:1} + assert self.test_dict.keys() == set("abcde") + assert 'a' in self.test_dict + del self.test_dict['a'] + assert self.test_dict.keys() == set("bcde"), self.test_dict.keys() + assert 'a' not in self.test_dict + self.test_dict.clear() + assert not self.test_dict, dict(self.test_dict.items()) + self.set_store('test_dict.a', 'invalid json') + try: + self.test_dict['a'] + assert False + except ValueError: + pass + assert not self.test_dict, dict(self.test_dict.items()) + def _test_remote_calls(self): # Test making valid call self.remote("influx", "handle_command", "", {"prefix": "influx self-test"}) diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py index 1fef4c8b1fd8..43072c4d382d 100644 --- a/src/pybind/mgr/ssh/module.py +++ b/src/pybind/mgr/ssh/module.py @@ -1,8 +1,10 @@ import json import errno +import logging +from functools import wraps + import six import os -import datetime import tempfile import multiprocessing.pool @@ -18,13 +20,13 @@ except ImportError as e: remoto = None remoto_import_error = str(e) -DATEFMT = '%Y-%m-%dT%H:%M:%S.%f%z' +logger = logging.getLogger(__name__) # high-level TODO: # - bring over some of the protections from ceph-deploy that guard against # multiple bootstrapping / initialization -class SSHReadCompletion(orchestrator.ReadCompletion): +class SSHCompletionmMixin(object): def __init__(self, result): if isinstance(result, multiprocessing.pool.AsyncResult): self._result = [result] @@ -36,34 +38,13 @@ class SSHReadCompletion(orchestrator.ReadCompletion): def result(self): return list(map(lambda r: r.get(), self._result)) +class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion): @property def is_complete(self): return all(map(lambda r: r.ready(), self._result)) -class SSHReadCompletionReady(SSHReadCompletion): - def __init__(self, result): - self._result = result - - @property - def result(self): - return self._result - - @property - def is_complete(self): - return True -class SSHWriteCompletion(orchestrator.WriteCompletion): - def __init__(self, result): - super(SSHWriteCompletion, self).__init__() - if isinstance(result, multiprocessing.pool.AsyncResult): - self._result = [result] - else: - self._result = result - assert isinstance(self._result, list) - - @property - def result(self): - return list(map(lambda r: r.get(), self._result)) +class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion): @property def is_persistent(self): @@ -115,6 +96,23 @@ class SSHConnection(object): def __getattr__(self, name): return getattr(self.conn, name) + +def log_exceptions(f): + if six.PY3: + return f + else: + # Python 2 does no exception chaining, thus the + # real exception is lost + @wraps(f) + def wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except Exception: + logger.exception('something went wrong.') + raise + return wrapper + + class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): _STORE_HOST_PREFIX = "host" @@ -143,6 +141,14 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): self._cluster_fsid = None self._worker_pool = multiprocessing.pool.ThreadPool(1) + # the keys in inventory_cache are authoritative. + # You must not call remove_outdated() + # The values are cached by instance. + # cache is invalidated by + # 1. timeout + # 2. refresh parameter + self.inventory_cache = orchestrator.OutdatableDict(self, self._STORE_HOST_PREFIX) + def handle_command(self, inbuf, command): if command["prefix"] == "ssh set-ssh-config": return self._set_ssh_config(inbuf, command) @@ -170,21 +176,17 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): complete = True for c in completions: + if c.is_complete: + continue + if not isinstance(c, SSHReadCompletion) and \ not isinstance(c, SSHWriteCompletion): raise TypeError("unexpected completion: {}".format(c.__class__)) - if c.is_complete: - continue - complete = False return complete - @staticmethod - def time_from_string(timestr): - return datetime.datetime.strptime(timestr, DATEFMT) - def _get_cluster_fsid(self): """ Fetch and cache the cluster fsid. @@ -200,12 +202,10 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): """ if isinstance(hosts, six.string_types): hosts = [hosts] - unregistered_hosts = [] - for host in hosts: - key = self._hostname_to_store_key(host) - if not self.get_store(key): - unregistered_hosts.append(host) + keys = self.inventory_cache.keys() + unregistered_hosts = set(hosts) - keys if unregistered_hosts: + logger.warning('keys = {}'.format(keys)) raise RuntimeError("Host(s) {} not registered".format( ", ".join(map(lambda h: "'{}'".format(h), unregistered_hosts)))) @@ -365,21 +365,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): conn.remote_module.write_keyring(keyring_path, keyring) return keyring_path - def _hostname_to_store_key(self, host): - return "{}.{}".format(self._STORE_HOST_PREFIX, host) - def _get_hosts(self, wanted=None): - if wanted: - hosts_info = [] - for host in wanted: - key = self._hostname_to_store_key(host) - info = self.get_store(key) - if info: - hosts_info.append((key, info)) - else: - hosts_info = six.iteritems(self.get_store_prefix(self._STORE_HOST_PREFIX)) - - return list(map(lambda kv: (kv[0], json.loads(kv[1])), hosts_info)) + return self.inventory_cache.items_filtered(wanted) def add_host(self, host): """ @@ -387,13 +374,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): :param host: host name """ + @log_exceptions def run(host): - key = self._hostname_to_store_key(host) - self.set_store(key, json.dumps({ - "host": host, - "inventory": None, - "last_inventory_refresh": None - })) + self.inventory_cache[host] = orchestrator.OutdatableData() return "Added host '{}'".format(host) return SSHWriteCompletion( @@ -405,9 +388,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): :param host: host name """ + @log_exceptions def run(host): - key = self._hostname_to_store_key(host) - self.set_store(key, None) + del self.inventory_cache[host] return "Removed host '{}'".format(host) return SSHWriteCompletion( @@ -423,11 +406,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): TODO: - InventoryNode probably needs to be able to report labels """ - nodes = [] - for key, host_info in self._get_hosts(): - node = orchestrator.InventoryNode(host_info["host"], []) - nodes.append(node) - return SSHReadCompletionReady(nodes) + nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache] + return orchestrator.TrivialReadCompletion(nodes) def _get_device_inventory(self, host): """ @@ -473,42 +453,23 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): # this implies the returned hosts are registered hosts = self._get_hosts() - def run(key, host_info): - updated = False - host = host_info["host"] - - if not host_info["inventory"]: - self.log.info("caching inventory for '{}'".format(host)) - host_info["inventory"] = self._get_device_inventory(host) - updated = True - else: - timeout_min = int(self.get_module_option( - "inventory_cache_timeout_min", - self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN)) - - cutoff = datetime.datetime.utcnow() - datetime.timedelta( - minutes=timeout_min) - - last_update = self.time_from_string(host_info["last_inventory_refresh"]) + @log_exceptions + def run(host, host_info): + # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode - if last_update < cutoff or refresh: - self.log.info("refresh stale inventory for '{}'".format(host)) - host_info["inventory"] = self._get_device_inventory(host) - updated = True - else: - self.log.info("reading cached inventory for '{}'".format(host)) - pass + timeout_min = int(self.get_module_option( + "inventory_cache_timeout_min", + self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN)) - if updated: - now = datetime.datetime.utcnow() - now = now.strftime(DATEFMT) - host_info["last_inventory_refresh"] = now - self.set_store(key, json.dumps(host_info)) - - devices = list(map(lambda di: - orchestrator.InventoryDevice.from_ceph_volume_inventory(di), - host_info["inventory"])) + if host_info.outdated(timeout_min) or refresh: + self.log.info("refresh stale inventory for '{}'".format(host)) + data = self._get_device_inventory(host) + host_info = orchestrator.OutdatableData(data) + self.inventory_cache[host] = host_info + else: + self.log.debug("reading cached inventory for '{}'".format(host)) + devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data) return orchestrator.InventoryNode(host, devices) results = [] @@ -518,6 +479,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): return SSHReadCompletion(results) + @log_exceptions def _create_osd(self, host, drive_group): conn = self._get_connection(host) try: @@ -758,7 +720,6 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): raise NotImplementedError("Removing managers is not supported") # check that all the hosts are registered - hosts = list(set(hosts)) self._require_hosts(hosts) # we assume explicit placement by which there are the same number of diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index 236207358c8a..f6ed7b20b0ce 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -6,7 +6,7 @@ import functools import uuid from subprocess import check_output, CalledProcessError -from mgr_module import MgrModule +from mgr_module import MgrModule, PersistentStoreDict import orchestrator @@ -181,7 +181,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): raise Exception('c-v failed') @deferred_read - def describe_service(self, service_type=None, service_id=None, node_name=None): + def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): """ There is no guarantee which daemons are returned by describe_service, except that it returns the mgr we're running in.