return wrapper
-class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
+class SSHOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
_STORE_HOST_PREFIX = "host"
'default': 10 * 60,
'desc': 'seconds to cache device inventory',
},
+ {
+ 'name': 'service_cache_timeout',
+ 'type': 'seconds',
+ 'default': 60,
+ 'desc': 'seconds to cache service (daemon) inventory',
+ },
]
COMMANDS = [
self.inventory_cache = orchestrator.OutdatablePersistentDict(
self, self._STORE_HOST_PREFIX + '.devices')
- self.daemon_cache = orchestrator.OutdatablePersistentDict(
- self, self._STORE_HOST_PREFIX + '.daemons')
+ self.service_cache = orchestrator.OutdatablePersistentDict(
+ self, self._STORE_HOST_PREFIX + '.services')
def config_notify(self):
"""
@log_exceptions
def run(host):
self.inventory_cache[host] = orchestrator.OutdatableData()
+ self.service_cache[host] = orchestrator.OutdatableData()
return "Added host '{}'".format(host)
return SSHWriteCompletion(
@log_exceptions
def run(host):
del self.inventory_cache[host]
+ del self.service_cache[host]
return "Removed host '{}'".format(host)
return SSHWriteCompletion(
nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
return orchestrator.TrivialReadCompletion(nodes)
+ def _refresh_host_services(self, host):
+ out, code = self._run_ceph_daemon(
+ host, 'mon', 'ls', [], no_fsid=True)
+ data = json.loads(''.join(out))
+ self.log.debug('refreshed host %s services: %s' % (host, data))
+ self.service_cache[host] = orchestrator.OutdatableData(data)
+ return data
+
def _get_services(self,
service_type=None,
service_name=None,
service_id=None,
- node_name=None):
- daemons = {}
- for host, _ in self._get_hosts():
- self.log.info("refresh stale daemons for '{}'".format(host))
- out, code = self._run_ceph_daemon(
- host, 'mon', 'ls', [], no_fsid=True)
- daemons[host] = json.loads(''.join(out))
+ node_name=None,
+ refresh=False):
+ hosts = []
+ wait_for = []
+ for host, host_info in self.service_cache.items_filtered():
+ hosts.append(host)
+ if host_info.outdated(self.service_cache_timeout) or refresh:
+ self.log.info("refresing stale services for '{}'".format(host))
+ wait_for.append(
+ SSHReadCompletion(self._worker_pool.apply_async(
+ self._refresh_host_services, (host,))))
+ else:
+ self.log.debug('have recent services for %s: %s' % (
+ host, host_info.data))
+ wait_for.append(
+ orchestrator.TrivialReadCompletion([host_info.data]))
+ self._orchestrator_wait(wait_for)
+
+ services = {}
+ for host, c in zip(hosts, wait_for):
+ services[host] = c.result[0]
result = []
- for host, ls in daemons.items():
+ for host, ls in services.items():
for d in ls:
if not d['style'].startswith('ceph-daemon'):
self.log.debug('ignoring non-ceph-daemon on %s: %s' % (host, d))
service_type + " unsupported")
result = self._get_services(service_type,
service_id=service_id,
- node_name=node_name)
+ node_name=node_name,
+ refresh=refresh)
return orchestrator.TrivialReadCompletion(result)
def service_action(self, action, service_type,