:param arguments: dict of key/value arguments to test
"""
return self._ceph_is_authorized(arguments)
-
-
-class PersistentStoreDict(object):
- def __init__(self, mgr, prefix):
- # type: (MgrModule, str) -> None
- self.mgr = mgr
- self.prefix = prefix + '.'
-
- def _mk_store_key(self, key):
- return self.prefix + key
-
- def __missing__(self, key):
- # KeyError won't work for the `in` operator.
- # https://docs.python.org/3/reference/expressions.html#membership-test-details
- raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
-
- def clear(self):
- # Don't make any assumptions about the content of the values.
- for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
- k, _ = item
- self.mgr.set_store(k, None)
-
- def __getitem__(self, item):
- # type: (str) -> Any
- key = self._mk_store_key(item)
- try:
- val = self.mgr.get_store(key)
- if val is None:
- self.__missing__(key)
- return json.loads(val)
- except (KeyError, AttributeError, IndexError, ValueError, TypeError):
- logging.getLogger(__name__).debug('failed to deserialize item in store')
- self.mgr.set_store(key, None)
- raise
-
- def __setitem__(self, item, value):
- # type: (str, Any) -> None
- """
- value=None is not allowed, as it will remove the key.
- """
- key = self._mk_store_key(item)
- self.mgr.set_store(key, json.dumps(value) if value is not None else None)
-
- def __delitem__(self, item):
- self[item] = None
-
- def __len__(self):
- return len(self.keys())
-
- def items(self):
- # type: () -> Iterator[Tuple[str, Any]]
- prefix_len = len(self.prefix)
- try:
- for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
- k, v = item
- yield k[prefix_len:], json.loads(v)
- except (KeyError, AttributeError, IndexError, ValueError, TypeError):
- logging.getLogger(__name__).exception('failed to deserialize')
- self.clear()
-
- def keys(self):
- # type: () -> Set[str]
- return {item[0] for item in self.items()}
-
- def __iter__(self):
- return iter(self.keys())
ServiceDescription, InventoryFilter, HostSpec, \
DaemonDescription, \
InventoryHost, DeviceLightLoc, \
- OutdatableData, OutdatablePersistentDict, \
UpgradeStatusSpec
ServiceSpecValidationError
from ceph.deployment.drive_group import DriveGroupSpec
-from mgr_module import MgrModule, PersistentStoreDict, CLICommand, HandleCommandResult
+from mgr_module import MgrModule, CLICommand, HandleCommandResult
try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
time.sleep(1)
else:
break
-
-
-class OutdatableData(object):
- DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
-
- def __init__(self, data=None, last_refresh=None):
- # type: (Optional[dict], Optional[datetime.datetime]) -> None
- self._data = data
- if data is not None and last_refresh is None:
- self.last_refresh = datetime.datetime.utcnow() # type: Optional[datetime.datetime]
- else:
- self.last_refresh = last_refresh
-
- def json(self):
- if self.last_refresh is not None:
- timestr = self.last_refresh.strftime(self.DATEFMT) # type: Optional[str]
- else:
- timestr = None
-
- return {
- "data": self._data,
- "last_refresh": timestr,
- }
-
- @property
- def data(self):
- return self._data
-
- # @data.setter
- # No setter, as it doesn't work as expected: It's not saved in store automatically
-
- @classmethod
- def time_from_string(cls, timestr):
- if timestr is None:
- return None
- # drop the 'Z' timezone indication, it's always UTC
- timestr = timestr.rstrip('Z')
- return datetime.datetime.strptime(timestr, cls.DATEFMT)
-
- @classmethod
- def from_json(cls, data):
- return cls(data['data'], cls.time_from_string(data['last_refresh']))
-
- def outdated(self, timeout=None):
- if timeout is None:
- timeout = 600
- if self.last_refresh is None:
- return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
- seconds=timeout)
- return self.last_refresh < cutoff
-
- def __repr__(self):
- return 'OutdatableData(data={}, last_refresh={})'.format(self._data, self.last_refresh)
-
-
-class OutdatableDictMixin(object):
- """
- Toolbox for implementing a cache. As every orchestrator has
- different needs, we cannot implement any logic here.
- """
-
- def __getitem__(self, item):
- # type: (str) -> OutdatableData
- return OutdatableData.from_json(super(OutdatableDictMixin, self).__getitem__(item)) # type: ignore
-
- def __setitem__(self, key, value):
- # type: (str, OutdatableData) -> None
- val = None if value is None else value.json()
- super(OutdatableDictMixin, self).__setitem__(key, val) # type: ignore
-
- def items(self):
- ## type: () -> Iterator[Tuple[str, OutdatableData]]
- for item in super(OutdatableDictMixin, self).items(): # type: ignore
- k, v = item
- yield k, OutdatableData.from_json(v)
-
- def items_filtered(self, keys=None):
- if keys:
- return [(host, self[host]) for host in keys]
- else:
- return list(self.items())
-
- def any_outdated(self, timeout=None):
- items = self.items()
- if not list(items):
- return True
- return any([i[1].outdated(timeout) for i in items])
-
- def remove_outdated(self):
- outdated = [item[0] for item in self.items() if item[1].outdated()]
- for o in outdated:
- del self[o] # type: ignore
-
- def invalidate(self, key):
- self[key] = OutdatableData(self[key].data,
- datetime.datetime.fromtimestamp(0))
-
-
-class OutdatablePersistentDict(OutdatableDictMixin, PersistentStoreDict):
- pass
-
-
-class OutdatableDict(OutdatableDictMixin, dict):
- pass
-from mgr_module import MgrModule, CommandResult, PersistentStoreDict
+from mgr_module import MgrModule, CommandResult
import threading
import random
import json
self._self_test_store()
self._self_test_misc()
self._self_test_perf_counters()
- self._self_persistent_store_dict()
def _self_test_getters(self):
self.version
self.log.info("Finished self-test procedure.")
- def _self_persistent_store_dict(self):
- self.test_dict = PersistentStoreDict(self, 'test_dict')
- for i in "abcde":
- self.test_dict[i] = {i:1}
- assert self.test_dict.keys() == set("abcde")
- assert 'a' in self.test_dict
- del self.test_dict['a']
- assert self.test_dict.keys() == set("bcde"), self.test_dict.keys()
- assert 'a' not in self.test_dict
- self.test_dict.clear()
- assert not self.test_dict, dict(self.test_dict.items())
- self.set_store('test_dict.a', 'invalid json')
- try:
- self.test_dict['a']
- assert False
- except ValueError:
- pass
- assert not self.test_dict, dict(self.test_dict.items())
-
def _test_remote_calls(self):
# Test making valid call
self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})