self._last_failure_msg = None
self._all_completions = dict()
self._completion_lock = Lock()
-
+ self.inventory_cache = orchestrator.OutdatableDict(self, 'inventory_cache')
+ self.service_cache = orchestrator.OutdatableDict(self, 'service_cache')
def available(self):
if not self._config_valid():
return True, ""
-
def get_inventory(self, node_filter=None, refresh=False):
"""
Note that this will raise an exception (e.g. if the salt-api is down,
cli, for example, just prints the traceback in the console, so the
user at least sees the error.
"""
+ self.inventory_cache.remove_outdated()
+ if not self.inventory_cache.any_outdated() and not refresh:
+ if node_filter is None:
+ return orchestrator.TrivialReadCompletion(
+ orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items()))
+ elif node_filter.labels is None:
+ return orchestrator.TrivialReadCompletion(
+ orchestrator.InventoryNode.from_nested_items(
+ self.inventory_cache.items_filtered(node_filter.nodes)))
def process_result(event_data):
result = []
if event_data['success']:
for node_name, node_devs in event_data["return"].items():
- devs = list(map(lambda di:
- orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
- node_devs))
+ if node_filter is None:
+ self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs)
+ devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs)
result.append(orchestrator.InventoryNode(node_name, devs))
else:
self.log.error(event_data['return'])
return c
-
- def describe_service(self, service_type, service_id, node_name):
+ def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
# Note: describe_service() does *not* support OSDs. This is because
# DeepSea doesn't really record what OSDs are deployed where; Ceph is
assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported"
+ self.service_cache.remove_outdated()
+ if not self.service_cache.any_outdated() and not refresh:
+ # Let's hope the services are complete.
+ services = [orchestrator.ServiceDescription.from_json(d[1]) for d in self.service_cache.items_filtered([node_name])]
+ services = [s for s in services if
+ (True if s.service_type is None else s.service_type == service_type) and
+ (True if s.service_id is None else s.service_id == service_id)]
+ return orchestrator.TrivialReadCompletion(services)
+
+
def process_result(event_data):
result = []
if event_data['success']:
for node_name, service_info in event_data["return"].items():
for service_type, service_instance in service_info.items():
- desc = orchestrator.ServiceDescription()
- desc.nodename = node_name
- desc.service_instance = service_instance
- desc.service_type = service_type
+ desc = orchestrator.ServiceDescription(nodename=node_name,
+ service_instance=service_instance,
+ service_type=service_type)
result.append(desc)
+ for service in result:
+ self.service_cache[service.nodename] = orchestrator.OutdatableData(service.to_json())
else:
self.log.error(event_data['return'])
return result
return c
-
def wait(self, completions):
incomplete = False
"Unknown configuration option '{0}'".format(cmd['key']))
self.set_module_option(cmd['key'], cmd['value'])
- self._event.set();
+ self._event.set()
return 0, "Configuration option '{0}' updated".format(cmd['key']), ''
return (-errno.EINVAL, '',
import ceph_module # noqa
+try:
+ from typing import Set, Tuple, Iterator, Any
+except ImportError:
+ # just for type checking
+ pass
import logging
import json
import six
:param int query_id: query ID
"""
return self._ceph_get_osd_perf_counters(query_id)
+
+
+class PersistentStoreDict(object):
+ def __init__(self, mgr, prefix):
+ # type: (MgrModule, str) -> None
+ self.mgr = mgr
+ self.prefix = prefix + '.'
+
+ def _mk_store_key(self, key):
+ return self.prefix + key
+
+ def __missing__(self, key):
+ # KeyError won't work for the `in` operator.
+ # https://docs.python.org/3/reference/expressions.html#membership-test-details
+ raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
+
+ def clear(self):
+ # Don't make any assumptions about the content of the values.
+ for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
+ k, _ = item
+ self.mgr.set_store(k, None)
+
+ def __getitem__(self, item):
+ # type: (str) -> Any
+ key = self._mk_store_key(item)
+ try:
+ val = self.mgr.get_store(key)
+ if val is None:
+ self.__missing__(key)
+ return json.loads(val)
+ except (KeyError, AttributeError, IndexError, ValueError, TypeError):
+ logging.getLogger(__name__).exception('failed to deserialize')
+ self.mgr.set_store(key, None)
+ raise
+
+ def __setitem__(self, item, value):
+ # type: (str, Any) -> None
+ """
+ value=None is not allowed, as it will remove the key.
+ """
+ key = self._mk_store_key(item)
+ self.mgr.set_store(key, json.dumps(value) if value is not None else None)
+
+ def __delitem__(self, item):
+ self[item] = None
+
+ def __len__(self):
+ return len(self.keys())
+
+ def items(self):
+ # type: () -> Iterator[Tuple[str, Any]]
+ prefix_len = len(self.prefix)
+ try:
+ for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
+ k, v = item
+ yield k[prefix_len:], json.loads(v)
+ except (KeyError, AttributeError, IndexError, ValueError, TypeError):
+ logging.getLogger(__name__).exception('failed to deserialize')
+ self.clear()
+
+ def keys(self):
+ # type: () -> Set[str]
+ return {item[0] for item in self.items()}
+
+ def __iter__(self):
+ return iter(self.keys())
import time
import fnmatch
import uuid
+import datetime
import six
-from mgr_module import MgrModule
+from mgr_module import MgrModule, PersistentStoreDict
from mgr_util import format_bytes
try:
- from typing import TypeVar, Generic, List, Optional, Union, Tuple
+ from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator
+
T = TypeVar('T')
G = Generic[T]
except ImportError:
return not self.is_complete
+class TrivialReadCompletion(ReadCompletion):
+ """
+ This is the trivial completion simply wrapping a result.
+ """
+ def __init__(self, result):
+ super(TrivialReadCompletion, self).__init__()
+ self._result = result
+
+ @property
+ def result(self):
+ return self._result
+
+ @property
+ def is_complete(self):
+ return True
+
+
class WriteCompletion(_Completion):
"""
``Orchestrator`` implementations should inherit from this
raise NotImplementedError()
def describe_service(self, service_type=None, service_id=None, node_name=None):
- # type: (Optional[str], Optional[str], Optional[str]) -> ReadCompletion[List[ServiceDescription]]
+ # type: (str, str, str) -> ReadCompletion[List[ServiceDescription]]
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
is literally up this second, it's a description of where the orchestrator
has decided the service should run.
"""
- def __init__(self):
+
+ def __init__(self, nodename=None, container_id=None, service=None, service_instance=None,
+ service_type=None, version=None, rados_config_location=None,
+ service_url=None, status=None, status_desc=None):
# Node is at the same granularity as InventoryNode
- self.nodename = None
+ self.nodename = nodename
# Not everyone runs in containers, but enough people do to
# justify having this field here.
- self.container_id = None
+ self.container_id = container_id
# Some services can be deployed in groups. For example, mds's can
# have an active and standby daemons, and nfs-ganesha can run daemons
# Filesystem name in the FSMap).
#
# Single-instance services should leave this set to None
- self.service = None
+ self.service = service
# The orchestrator will have picked some names for daemons,
# typically either based on hostnames or on pod names.
# This is the <foo> in mds.<foo>, the ID that will appear
# in the FSMap/ServiceMap.
- self.service_instance = None
+ self.service_instance = service_instance
# The type of service (osd, mon, mgr, etc.)
- self.service_type = None
+ self.service_type = service_type
# Service version that was deployed
- self.version = None
+ self.version = version
# Location of the service configuration when stored in rados
# object. Format: "rados://<pool>/[<namespace/>]<object>"
- self.rados_config_location = None
+ self.rados_config_location = rados_config_location
# If the service exposes REST-like API, this attribute should hold
# the URL.
- self.service_url = None
+ self.service_url = service_url
# Service status: -1 error, 0 stopped, 1 running
- self.status = None
+ self.status = status
# Service status description when status == -1.
- self.status_desc = None
+ self.status_desc = status_desc
def to_json(self):
out = {
}
return {k: v for (k, v) in out.items() if v is not None}
+ @classmethod
+ def from_json(cls, data):
+ return cls(**data)
+
class DeviceSelection(object):
"""
dev.extended = data
return dev
+ @classmethod
+ def from_ceph_volume_inventory_list(cls, datas):
+ return [cls.from_ceph_volume_inventory(d) for d in datas]
+
def pretty_print(self, only_header=False):
"""Print a human friendly line with the information of the device
def to_json(self):
return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
+ @classmethod
+ def from_nested_items(cls, hosts):
+ devs = InventoryDevice.from_ceph_volume_inventory_list
+ return [cls(item[0], devs(item[1])) for item in hosts]
+
def _mk_orch_methods(cls):
# Needs to be defined outside of for.
break
for c in completions:
self._update_completion_progress(c)
+
+
+class OutdatableData(object):
+ DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
+
+ def __init__(self, data=None, last_refresh=None):
+ # type: (Optional[dict], Optional[datetime.datetime]) -> None
+ self._data = data
+ if data is not None and last_refresh is None:
+ self.last_refresh = datetime.datetime.utcnow()
+ else:
+ self.last_refresh = last_refresh
+
+ def json(self):
+ if self.last_refresh is not None:
+ timestr = self.last_refresh.strftime(self.DATEFMT)
+ else:
+ timestr = None
+
+ return {
+ "data": self.data,
+ "last_refresh": timestr,
+ }
+
+ @property
+ def data(self):
+ return self._data
+
+ # @data.setter
+ # No setter, as it doesn't work as expected: It's not saved in store automatically
+
+ @classmethod
+ def time_from_string(cls, timestr):
+ if timestr is None:
+ return None
+ # drop the 'Z' timezone indication, it's always UTC
+ timestr = timestr.rstrip('Z')
+ return datetime.datetime.strptime(timestr, cls.DATEFMT)
+
+
+ @classmethod
+ def from_json(cls, data):
+ return cls(data['data'], cls.time_from_string(data['last_refresh']))
+
+ def outdated(self, timeout_min=None):
+ if timeout_min is None:
+ timeout_min = 10
+ if self.last_refresh is None:
+ return True
+ cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ minutes=timeout_min)
+ return self.last_refresh < cutoff
+
+ def __repr__(self):
+ return 'OutdatableData(data={}, last_refresh={})'.format(self.data, self.last_refresh)
+
+
+class OutdatableDict(PersistentStoreDict):
+ """
+ Toolbox for implementing a cache. As every orchestrator has
+ different needs, we cannot implement any logic here.
+ """
+
+ def __getitem__(self, item):
+ # type: (str) -> OutdatableData
+ return OutdatableData.from_json(super(OutdatableDict, self).__getitem__(item))
+
+ def __setitem__(self, key, value):
+ # type: (str, OutdatableData) -> None
+ val = None if value is None else value.json()
+ super(OutdatableDict, self).__setitem__(key, val)
+
+ def items(self):
+ # type: () -> Iterator[Tuple[str, OutdatableData]]
+ for item in super(OutdatableDict, self).items():
+ k, v = item
+ yield k, OutdatableData.from_json(v)
+
+ def items_filtered(self, keys=None):
+ if keys:
+ return [(host, self[keys]) for host in keys]
+ else:
+ return list(self.items())
+
+ def any_outdated(self, timeout=None):
+ items = self.items()
+ if not items:
+ return True
+ return any([i[1].outdated(timeout) for i in items])
+
+ def remove_outdated(self):
+ outdated = [item[0] for item in self.items() if item[1].outdated()]
+ for o in outdated:
+ del self[o]
return result
@deferred_read
- def describe_service(self, service_type=None, service_id=None, node_name=None):
+ def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None):
raise orchestrator.OrchestratorValidationError(service_type + " unsupported")
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, PersistentStoreDict
import threading
import random
import json
try:
r = self.remote(command['module'], "self_test")
except RuntimeError as e:
- return -1, '', "Test failed: {0}".format(e.message)
+ return -1, '', "Test failed: {0}".format(e)
else:
return 0, str(r), "Self-test OK"
elif command['prefix'] == 'mgr self-test health set':
try:
checks = json.loads(command["checks"])
except Exception as e:
- return -1, "", "Failed to decode JSON input: {}".format(e.message)
+ return -1, "", "Failed to decode JSON input: {}".format(e)
try:
for check, info in six.iteritems(checks):
"detail": [str(m) for m in info["detail"]]
}
except Exception as e:
- return -1, "", "Invalid health check format: {}".format(e.message)
+ return -1, "", "Invalid health check format: {}".format(e)
self.set_health_checks(self._health)
return 0, "", ""
def _insights_set_now_offset(self, inbuf, command):
try:
- hours = long(command["hours"])
+ hours = int(command["hours"])
except Exception as e:
- return -1, "", "Timestamp must be numeric: {}".format(e.message)
+ return -1, "", "Timestamp must be numeric: {}".format(e)
self.remote("insights", "testing_set_now_time_offset", hours)
return 0, "", ""
self._self_test_store()
self._self_test_misc()
self._self_test_perf_counters()
+ self._self_persistent_store_dict()
def _self_test_getters(self):
self.version
self.log.info("Finished self-test procedure.")
+ def _self_persistent_store_dict(self):
+ self.test_dict = PersistentStoreDict(self, 'test_dict')
+ for i in "abcde":
+ self.test_dict[i] = {i:1}
+ assert self.test_dict.keys() == set("abcde")
+ assert 'a' in self.test_dict
+ del self.test_dict['a']
+ assert self.test_dict.keys() == set("bcde"), self.test_dict.keys()
+ assert 'a' not in self.test_dict
+ self.test_dict.clear()
+ assert not self.test_dict, dict(self.test_dict.items())
+ self.set_store('test_dict.a', 'invalid json')
+ try:
+ self.test_dict['a']
+ assert False
+ except ValueError:
+ pass
+ assert not self.test_dict, dict(self.test_dict.items())
+
def _test_remote_calls(self):
# Test making valid call
self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})
import json
import errno
+import logging
+from functools import wraps
+
import six
import os
-import datetime
import tempfile
import multiprocessing.pool
remoto = None
remoto_import_error = str(e)
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f%z'
+logger = logging.getLogger(__name__)
# high-level TODO:
# - bring over some of the protections from ceph-deploy that guard against
# multiple bootstrapping / initialization
-class SSHReadCompletion(orchestrator.ReadCompletion):
+class SSHCompletionmMixin(object):
def __init__(self, result):
if isinstance(result, multiprocessing.pool.AsyncResult):
self._result = [result]
def result(self):
return list(map(lambda r: r.get(), self._result))
+class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
@property
def is_complete(self):
return all(map(lambda r: r.ready(), self._result))
-class SSHReadCompletionReady(SSHReadCompletion):
- def __init__(self, result):
- self._result = result
-
- @property
- def result(self):
- return self._result
-
- @property
- def is_complete(self):
- return True
-class SSHWriteCompletion(orchestrator.WriteCompletion):
- def __init__(self, result):
- super(SSHWriteCompletion, self).__init__()
- if isinstance(result, multiprocessing.pool.AsyncResult):
- self._result = [result]
- else:
- self._result = result
- assert isinstance(self._result, list)
-
- @property
- def result(self):
- return list(map(lambda r: r.get(), self._result))
+class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
@property
def is_persistent(self):
def __getattr__(self, name):
return getattr(self.conn, name)
+
+def log_exceptions(f):
+ if six.PY3:
+ return f
+ else:
+ # Python 2 does no exception chaining, thus the
+ # real exception is lost
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except Exception:
+ logger.exception('something went wrong.')
+ raise
+ return wrapper
+
+
class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
_STORE_HOST_PREFIX = "host"
self._cluster_fsid = None
self._worker_pool = multiprocessing.pool.ThreadPool(1)
+ # the keys in inventory_cache are authoritative.
+ # You must not call remove_outdated()
+ # The values are cached by instance.
+ # cache is invalidated by
+ # 1. timeout
+ # 2. refresh parameter
+ self.inventory_cache = orchestrator.OutdatableDict(self, self._STORE_HOST_PREFIX)
+
def handle_command(self, inbuf, command):
if command["prefix"] == "ssh set-ssh-config":
return self._set_ssh_config(inbuf, command)
complete = True
for c in completions:
+ if c.is_complete:
+ continue
+
if not isinstance(c, SSHReadCompletion) and \
not isinstance(c, SSHWriteCompletion):
raise TypeError("unexpected completion: {}".format(c.__class__))
- if c.is_complete:
- continue
-
complete = False
return complete
- @staticmethod
- def time_from_string(timestr):
- return datetime.datetime.strptime(timestr, DATEFMT)
-
def _get_cluster_fsid(self):
"""
Fetch and cache the cluster fsid.
"""
if isinstance(hosts, six.string_types):
hosts = [hosts]
- unregistered_hosts = []
- for host in hosts:
- key = self._hostname_to_store_key(host)
- if not self.get_store(key):
- unregistered_hosts.append(host)
+ keys = self.inventory_cache.keys()
+ unregistered_hosts = set(hosts) - keys
if unregistered_hosts:
+ logger.warning('keys = {}'.format(keys))
raise RuntimeError("Host(s) {} not registered".format(
", ".join(map(lambda h: "'{}'".format(h),
unregistered_hosts))))
conn.remote_module.write_keyring(keyring_path, keyring)
return keyring_path
- def _hostname_to_store_key(self, host):
- return "{}.{}".format(self._STORE_HOST_PREFIX, host)
-
def _get_hosts(self, wanted=None):
- if wanted:
- hosts_info = []
- for host in wanted:
- key = self._hostname_to_store_key(host)
- info = self.get_store(key)
- if info:
- hosts_info.append((key, info))
- else:
- hosts_info = six.iteritems(self.get_store_prefix(self._STORE_HOST_PREFIX))
-
- return list(map(lambda kv: (kv[0], json.loads(kv[1])), hosts_info))
+ return self.inventory_cache.items_filtered(wanted)
def add_host(self, host):
"""
:param host: host name
"""
+ @log_exceptions
def run(host):
- key = self._hostname_to_store_key(host)
- self.set_store(key, json.dumps({
- "host": host,
- "inventory": None,
- "last_inventory_refresh": None
- }))
+ self.inventory_cache[host] = orchestrator.OutdatableData()
return "Added host '{}'".format(host)
return SSHWriteCompletion(
:param host: host name
"""
+ @log_exceptions
def run(host):
- key = self._hostname_to_store_key(host)
- self.set_store(key, None)
+ del self.inventory_cache[host]
return "Removed host '{}'".format(host)
return SSHWriteCompletion(
TODO:
- InventoryNode probably needs to be able to report labels
"""
- nodes = []
- for key, host_info in self._get_hosts():
- node = orchestrator.InventoryNode(host_info["host"], [])
- nodes.append(node)
- return SSHReadCompletionReady(nodes)
+ nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
+ return orchestrator.TrivialReadCompletion(nodes)
def _get_device_inventory(self, host):
"""
# this implies the returned hosts are registered
hosts = self._get_hosts()
- def run(key, host_info):
- updated = False
- host = host_info["host"]
-
- if not host_info["inventory"]:
- self.log.info("caching inventory for '{}'".format(host))
- host_info["inventory"] = self._get_device_inventory(host)
- updated = True
- else:
- timeout_min = int(self.get_module_option(
- "inventory_cache_timeout_min",
- self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
-
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
- minutes=timeout_min)
-
- last_update = self.time_from_string(host_info["last_inventory_refresh"])
+ @log_exceptions
+ def run(host, host_info):
+ # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
- if last_update < cutoff or refresh:
- self.log.info("refresh stale inventory for '{}'".format(host))
- host_info["inventory"] = self._get_device_inventory(host)
- updated = True
- else:
- self.log.info("reading cached inventory for '{}'".format(host))
- pass
+ timeout_min = int(self.get_module_option(
+ "inventory_cache_timeout_min",
+ self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
- if updated:
- now = datetime.datetime.utcnow()
- now = now.strftime(DATEFMT)
- host_info["last_inventory_refresh"] = now
- self.set_store(key, json.dumps(host_info))
-
- devices = list(map(lambda di:
- orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
- host_info["inventory"]))
+ if host_info.outdated(timeout_min) or refresh:
+ self.log.info("refresh stale inventory for '{}'".format(host))
+ data = self._get_device_inventory(host)
+ host_info = orchestrator.OutdatableData(data)
+ self.inventory_cache[host] = host_info
+ else:
+ self.log.debug("reading cached inventory for '{}'".format(host))
+ devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
return orchestrator.InventoryNode(host, devices)
results = []
return SSHReadCompletion(results)
+ @log_exceptions
def _create_osd(self, host, drive_group):
conn = self._get_connection(host)
try:
raise NotImplementedError("Removing managers is not supported")
# check that all the hosts are registered
- hosts = list(set(hosts))
self._require_hosts(hosts)
# we assume explicit placement by which there are the same number of
import uuid
from subprocess import check_output, CalledProcessError
-from mgr_module import MgrModule
+from mgr_module import MgrModule, PersistentStoreDict
import orchestrator
raise Exception('c-v failed')
@deferred_read
- def describe_service(self, service_type=None, service_id=None, node_name=None):
+ def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
"""
There is no guarantee which daemons are returned by describe_service, except that
it returns the mgr we're running in.