assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
# Be careful with backward compatibility when changing things here:
- assert json.loads(cephadm_module._store['inventory']) == \
+ assert json.loads(cephadm_module.get_store('inventory')) == \
{"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
with with_host(cephadm_module, 'second'):
_ceph_send_command.side_effect = Exception("myerror")
# Make sure, _check_daemons does a redeploy due to monmap change:
- cephadm_module._store['_ceph_get/mon_map'] = {
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
'fsid': 'foobar',
- }
+ })
cephadm_module.notify('mon_map', None)
cephadm_module._check_daemons()
with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
# Make sure, _check_daemons does a redeploy due to monmap change:
- cephadm_module._store['_ceph_get/mon_map'] = {
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
'fsid': 'foobar',
- }
+ })
cephadm_module.notify('mon_map', None)
- cephadm_module._store['_ceph_get/mgr_map'] = {
+ cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
'modules': ['dashboard']
- }
+ })
with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
# Make sure, _check_daemons does a redeploy due to monmap change:
- cephadm_module._store['_ceph_get/mon_map'] = {
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
'fsid': 'foobar',
- }
+ })
cephadm_module.notify('mon_map', mock.MagicMock())
assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
cephadm_module.cache.last_etc_ceph_ceph_conf = {}
# type: ignore
from __future__ import absolute_import
-
+import json
+import logging
import os
if 'UNITTEST' in os.environ:
* self.get(thing) comes from self._store['_ceph_get' + thing]
"""
- def _ceph_get_store(self, k):
+
+ def mock_store_get(self, kind, key, default):
if not hasattr(self, '_store'):
self._store = {}
- return self._store.get(k, None)
+ return self._store.get(f'mock_store/{kind}/{key}', default)
- def _ceph_set_store(self, k, v):
+ def mock_store_set(self, kind, key, value):
if not hasattr(self, '_store'):
self._store = {}
- if v is None:
+ k = f'mock_store/{kind}/{key}'
+ if value is None:
if k in self._store:
del self._store[k]
else:
- self._store[k] = v
+ self._store[k] = value
- def _ceph_get_store_prefix(self, prefix):
+ def mock_store_preifx(self, kind, prefix):
if not hasattr(self, '_store'):
self._store = {}
+ full_prefix = f'mock_store/{kind}/{prefix}'
+ kind_len = len(f'mock_store/{kind}/')
return {
- k: v for k, v in self._store.items()
- if k.startswith(prefix)
+ k[kind_len:]: v for k, v in self._store.items()
+ if k.startswith(full_prefix)
}
- def _ceph_get_module_option(self, module, key, localized_prefix: None):
- val = self._ceph_get_store(f'{module}/{key}')
+ def _ceph_get_store(self, k):
+ return self.mock_store_get('store', k, None)
+
+ def _ceph_set_store(self, k, v):
+ self.mock_store_set('store', k, v)
+
+ def _ceph_get_store_prefix(self, prefix):
+ return self.mock_store_preifx('store', prefix)
+
+ def _ceph_get_module_option(self, module, key, localized_prefix= None):
+ try:
+ _, val, _ = self.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'mgr',
+ 'key': f'mgr/{module}/{key}'
+ })
+ except FileNotFoundError:
+ val = None
mo = [o for o in self.MODULE_OPTIONS if o['name'] == key]
if len(mo) == 1 and val is not None:
cls = {
return val
def _ceph_set_module_option(self, module, key, val):
- return self._ceph_set_store(f'{module}/{key}', val)
+ _, _, _ = self.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mgr',
+ 'name': f'mgr/{module}/{key}',
+ 'value': val
+ })
+ return val
def _ceph_get(self, data_name):
- if not hasattr(self, '_store'):
- self._store = {}
- return self._store.get(f'_ceph_get/{data_name}', mock.MagicMock())
-
- def _ceph_send_command(self, ev, *args):
- ev.complete(0, '', '')
+ return self.mock_store_get('_ceph_get', data_name, mock.MagicMock())
+
+ def _ceph_send_command(self, res, svc_type, svc_id, command, tag):
+ cmd = json.loads(command)
+
+ # Mocking the config store is handy sometimes:
+ def config_get():
+ who = cmd['who'].split('.')
+ whos = ['global'] + ['.'.join(who[:i+1]) for i in range(len(who))]
+ for attepmt in reversed(whos):
+ val = self.mock_store_get('config', f'{attepmt}/{cmd["key"]}', None)
+ if val is not None:
+ return val
+ return None
+
+ def config_set():
+ self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', cmd['value'])
+ return ''
+
+ def config_dump():
+ r = []
+ for prefix, value in self.mock_store_preifx('config', '').items():
+ section, name = prefix.split('/', 1)
+ r.append({
+ 'name': name,
+ 'section': section,
+ 'value': value
+ })
+ return json.dumps(r)
+
+ outb = ''
+ if cmd['prefix'] == 'config get':
+ outb = config_get()
+ elif cmd['prefix'] == 'config set':
+ outb = config_set()
+ elif cmd['prefix'] == 'config dump':
+ outb = config_dump()
+
+ res.complete(0, outb, '')
+
+ @property
+ def _logger(self):
+ return logging.getLogger(__name__)
+
+ @_logger.setter
+ def _logger(self, _):
+ pass
def __init__(self, *args):
if not hasattr(self, '_store'):