self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
self.osdspec_previews_refresh_queue = [] # type: List[str]
+
+ # host -> daemon name -> dict
self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
self.last_host_check = {} # type: Dict[str, datetime.datetime]
self.loading_osdspec_preview = set() # type: Set[str]
- self.etc_ceph_ceph_conf_refresh_queue: Set[str] = set()
+ self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
self.registry_login_queue: Set[str] = set()
def load(self):
if 'last_host_check' in j:
self.last_host_check[host] = datetime.datetime.strptime(
j['last_host_check'], DATEFMT)
- self.etc_ceph_ceph_conf_refresh_queue.add(host)
+ if 'last_etc_ceph_ceph_conf' in j:
+ self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime(
+ j['last_etc_ceph_ceph_conf'], DATEFMT)
self.registry_login_queue.add(host)
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
self.daemon_refresh_queue.append(host)
self.device_refresh_queue.append(host)
self.osdspec_previews_refresh_queue.append(host)
- self.etc_ceph_ceph_conf_refresh_queue.add(host)
self.registry_login_queue.add(host)
def invalidate_host_daemons(self, host):
if host in self.last_device_update:
del self.last_device_update[host]
self.mgr.event.set()
-
- def distribute_new_etc_ceph_ceph_conf(self):
- self.etc_ceph_ceph_conf_refresh_queue = set(self.mgr.inventory.keys())
def distribute_new_registry_login_info(self):
self.registry_login_queue = set(self.mgr.inventory.keys())
if host in self.last_host_check:
j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT)
+
+ if host in self.last_etc_ceph_ceph_conf:
+ j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT)
+
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
def rm_host(self, host):
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
- def host_needs_new_etc_ceph_ceph_conf(self, host):
+ def host_needs_new_etc_ceph_ceph_conf(self, host: str):
if not self.mgr.manage_etc_ceph_ceph_conf:
return False
if self.mgr.paused:
return False
if host in self.mgr.offline_hosts:
return False
- if host in self.etc_ceph_ceph_conf_refresh_queue:
- # We're read-only here.
- # self.etc_ceph_ceph_conf_refresh_queue.remove(host)
+ if not self.mgr.last_monmap:
+ return False
+ if host not in self.last_etc_ceph_ceph_conf:
+ return True
+ if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
return True
+ # already up to date:
return False
+ def update_last_etc_ceph_ceph_conf(self, host: str):
+ if not self.mgr.last_monmap:
+ return
+ self.last_etc_ceph_ceph_conf[host] = self.mgr.last_monmap
+
def host_needs_registry_login(self, host):
if host in self.mgr.offline_hosts:
return False
return True
return False
- def remove_host_needs_new_etc_ceph_ceph_conf(self, host):
- self.etc_ceph_ceph_conf_refresh_queue.remove(host)
-
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
def __init__(self, *args, **kwargs):
super(CephadmOrchestrator, self).__init__(*args, **kwargs)
self._cluster_fsid = self.get('mon_map')['fsid']
+ self.last_monmap: Optional[datetime.datetime] = None
# for serve()
self.run = True
self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
+
+ self.notify('mon_map', None)
self.config_notify()
path = self.get_ceph_option('cephadm_path')
TODO: this method should be moved into mgr_module.py
"""
- module_options_changed: List[str] = []
for opt in self.MODULE_OPTIONS:
- old_val = getattr(self, opt['name'], None)
- new_val = self.get_module_option(opt['name'])
setattr(self,
opt['name'], # type: ignore
- new_val) # type: ignore
+ self.get_module_option(opt['name'])) # type: ignore
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
- if old_val != new_val:
- module_options_changed.append(opt['name'])
for opt in self.NATIVE_OPTIONS:
setattr(self,
opt, # type: ignore
self.get_ceph_option(opt))
self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
- for what in module_options_changed:
- self.config_notify_one(what)
-
self.event.set()
- def config_notify_one(self, what):
- if what == 'manage_etc_ceph_ceph_conf' and self.manage_etc_ceph_ceph_conf:
- self.cache.distribute_new_etc_ceph_ceph_conf()
-
def notify(self, notify_type, notify_id):
if notify_type == "mon_map":
- self.cache.distribute_new_etc_ceph_ceph_conf()
+ # get monmap mtime so we can refresh configs when mons change
+ monmap = self.get('mon_map')
+ self.last_monmap = datetime.datetime.strptime(
+ monmap['modified'], CEPH_DATEFMT)
+ if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
+ self.last_monmap = None # just in case clocks are skewed
def pause(self):
if not self.paused:
)
if code:
return f'failed to create /etc/ceph/ceph.conf on {host}: {err}'
- self.cache.remove_host_needs_new_etc_ceph_ceph_conf(host)
+ self.cache.update_last_etc_ceph_ceph_conf(host)
+ self.cache.save_host(host)
except OrchestratorError as e:
return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
return None
f'service {service_name}')
def _check_daemons(self):
- # get monmap mtime so we can refresh configs when mons change
- monmap = self.get('mon_map')
- last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
- monmap['modified'], CEPH_DATEFMT)
- if last_monmap and last_monmap > datetime.datetime.utcnow():
- last_monmap = None # just in case clocks are skewed
daemons = self.cache.get_daemons()
daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
self.log.info('Reconfiguring %s (dependencies changed)...' % (
dd.name()))
reconfig = True
- elif last_monmap and \
- last_monmap > last_config and \
+ elif self.last_monmap and \
+ self.last_monmap > last_config and \
dd.daemon_type in CEPH_TYPES:
self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
reconfig = True
+import datetime
import time
import fnmatch
from contextlib import contextmanager
+from cephadm.module import CEPH_DATEFMT
+
try:
from typing import Any
except ImportError:
return 0, '', ''
@contextmanager
-def with_cephadm_module(module_options):
+def with_cephadm_module(module_options=None, store=None):
+ """
+ :param module_options: Set opts as if they were set before module.__init__ is called
+ :param store: Set the store before module.__init__ is called
+ """
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
- for k, v in module_options.items():
- m._ceph_set_module_option('cephadm', k, v)
+ if module_options is not None:
+ for k, v in module_options.items():
+ m._ceph_set_module_option('cephadm', k, v)
+ if store is None:
+ store = {}
+ if '_ceph_get/mon_map' not in store:
+ store['_ceph_get/mon_map'] = {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ }
+ for k, v in store.items():
+ m._ceph_set_store(k, v)
+
m.__init__('cephadm', 0, 0)
m._cluster_fsid = "fsid"
yield m
)
])
))
- #@mock.patch("mgr_module.MgrModule._ceph_get")
- @mock.patch("ceph_module.BaseMgrModule._ceph_get")
- def test_daemon_action(self, _ceph_get, cephadm_module: CephadmOrchestrator):
+ def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
+
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
+ assert len(c.result) == 1
c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar')
assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
- now = datetime.datetime.utcnow().strftime(CEPH_DATEFMT)
- _ceph_get.return_value = {'modified': now}
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module._store['_ceph_get/mon_map'] = {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ }
+ cephadm_module.notify('mon_map', None)
cephadm_module._check_daemons()
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("remoto.process.check")
- def test_etc_ceph(self, _check, _get_connection, cephadm_module: CephadmOrchestrator):
+ def test_etc_ceph(self, _check, _get_connection, cephadm_module):
_get_connection.return_value = mock.Mock(), mock.Mock()
_check.return_value = '{}', '', 0
+ assert cephadm_module.manage_etc_ceph_ceph_conf is False
+
with with_host(cephadm_module, 'test'):
assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+ cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+ cephadm_module.cache.load()
+
+ assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module._store['_ceph_get/mon_map'] = {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ }
cephadm_module.notify('mon_map', mock.MagicMock())
assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+ cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+ cephadm_module.cache.load()
+ assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
def test_etc_ceph_init(self):
with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
- assert m.manage_etc_ceph_ceph_conf == True
+ assert m.manage_etc_ceph_ceph_conf is True
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
M_classes = set()
class M(object):
+ """
+ Note that:
+
+ * self.set_store() populates self._store
+ * self.set_module_option() populates self._store[module_name]
+ * self.get(thing) comes from self._store['_ceph_get' + thing]
+
+ """
def _ceph_get_store(self, k):
if not hasattr(self, '_store'):
self._store = {}
def _ceph_set_module_option(self, module, key, val):
return self._ceph_set_store(f'{module}/{key}', val)
- def _ceph_get(self, *args):
- return mock.MagicMock()
+ def _ceph_get(self, data_name):
+ if not hasattr(self, '_store'):
+ self._store = {}
+ return self._store.get(f'_ceph_get/{data_name}', mock.MagicMock())
def __init__(self, *args):