import json
import errno
import logging
+from collections import defaultdict
from threading import Event
from functools import wraps
from . import remotes
from . import utils
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService
+ RbdMirrorService, CrashService, CephadmService
from .services.iscsi import IscsiService
from .services.nfs import NFSService
from .services.osd import RemoveUtil, OSDRemoval, OSDService
self.node_exporter_service = NodeExporterService(self)
self.crash_service = CrashService(self)
self.iscsi_service = IscsiService(self)
+ self.cephadm_services = {
+ 'mon': self.mon_service,
+ 'mgr': self.mgr_service,
+ 'osd': self.osd_service,
+ 'mds': self.mds_service,
+ 'rgw': self.rgw_service,
+ 'rbd-mirror': self.rbd_mirror_service,
+ 'nfs': self.nfs_service,
+ 'grafana': self.grafana_service,
+ 'alertmanager': self.alertmanager_service,
+ 'prometheus': self.prometheus_service,
+ 'node-exporter': self.node_exporter_service,
+ 'crash': self.crash_service,
+ 'iscsi': self.iscsi_service,
+ }
def shutdown(self):
self.log.debug('shutdown')
self.run = False
self.event.set()
+ def _get_cephadm_service(self, service_type: str) -> CephadmService:
+ assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
+ return self.cephadm_services[service_type]
+
def _kick_serve_loop(self):
self.log.debug('_kick_serve_loop')
self.event.set()
last_monmap = None # just in case clocks are skewed
daemons = self.cache.get_daemons()
- grafanas = [] # type: List[orchestrator.DaemonDescription]
- iscsi_daemons = []
+ daemons_post = defaultdict(list)
for dd in daemons:
# orphan?
spec = self.spec_store.specs.get(dd.service_name(), None)
if spec and spec.unmanaged:
continue
- # dependencies?
- if dd.daemon_type == 'grafana':
- # put running instances at the front of the list
- grafanas.insert(0, dd)
- elif dd.daemon_type == 'iscsi':
- iscsi_daemons.append(dd)
+ # These daemon types require additional configs after creation
+ if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+ daemons_post[dd.daemon_type].append(dd)
+
deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
last_deps, last_config = self.cache.get_daemon_last_config_deps(
dd.hostname, dd.name())
self._create_daemon(dd.daemon_type, dd.daemon_id,
dd.hostname, reconfig=True)
- if grafanas:
- self.grafana_service.daemon_check_post(grafanas)
- if iscsi_daemons:
- self.iscsi_service.daemon_check_post(iscsi_daemons)
+ # do daemon post actions
+ for daemon_type, daemon_descs in daemons_post.items():
+ self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
def _add_daemon(self, daemon_type, spec,
create_func: Callable[..., T], config_func=None) -> List[T]:
import logging
from typing import TYPE_CHECKING, List
+from mgr_module import MonCommandFailed
+
from ceph.deployment.service_spec import ServiceSpec, RGWSpec
from orchestrator import OrchestratorError, DaemonDescription
from cephadm import utils
"""The post actions needed to be done after daemons are checked"""
raise NotImplementedError()
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ raise NotImplementedError()
+
+ def _inventory_get_addr(self, hostname: str):
+ """Get a host's address with its hostname."""
+ return self.mgr.inventory.get_addr(hostname)
+
+ def _set_service_url_on_dashboard(self,
+ service_name: str,
+ get_mon_cmd: str,
+ set_mon_cmd: str,
+ service_url: str):
+ """A helper to get and set service_url via Dashboard's MON command."""
+ try:
+ _, out, _ = self.mgr.check_mon_command({
+ 'prefix': get_mon_cmd
+ })
+ except MonCommandFailed as e:
+ logger.warning('Failed to get service URL for %s: %s', service_name, e)
+ return
+ if out.strip() != service_url:
+ try:
+ logger.info(
+ 'Setting service URL %s for %s in the Dashboard', service_url, service_name)
+ _, out, _ = self.mgr.check_mon_command({
+ 'prefix': set_mon_cmd,
+ 'value': service_url,
+ })
+ except MonCommandFailed as e:
+ logger.warning('Failed to set service URL %s for %s in the Dashboard: %s',
+ service_url, service_name, e)
+
class MonService(CephadmService):
def create(self, name, host, network):
logger.warning(
'Unable to add iSCSI gateway to the Dashboard for %s: %s', dd, reason)
continue
- host = self.mgr.inventory.get_addr(dd.hostname)
+ host = self._inventory_get_addr(dd.hostname)
service_url = 'http://{}:{}@{}:{}'.format(
spec.api_user, spec.api_password, host, spec.api_port or '5000')
gw = gateways.get(dd.hostname)
logger = logging.getLogger(__name__)
class GrafanaService(CephadmService):
+ DEFAULT_SERVICE_PORT = 3000
+
def create(self, daemon_id, host):
# type: (str, str) -> str
return self.mgr._create_daemon('grafana', daemon_id, host)
protocol = https
cert_file = /etc/grafana/certs/cert_file
cert_key = /etc/grafana/certs/cert_key
- http_port = 3000
+ http_port = {}
[security]
admin_user = admin
admin_password = admin
allow_embedding = true
-""",
+""".format(self.DEFAULT_SERVICE_PORT),
'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services),
'certs/cert_file': '# generated by cephadm\n%s' % cert,
'certs/cert_key': '# generated by cephadm\n%s' % pkey,
}
return config_file, sorted(deps)
- def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
- # make sure the dashboard [does not] references grafana
- try:
- current_url = self.mgr.get_module_option_ex('dashboard', 'GRAFANA_API_URL')
- host = daemon_descrs[0].hostname
- url = f'https://{self.mgr.inventory.get_addr(host)}:3000'
- if current_url != url:
- logger.info('Setting dashboard grafana config to %s' % url)
- self.mgr.set_module_option_ex('dashboard', 'GRAFANA_API_URL', url)
- # FIXME: is it a signed cert??
- except Exception as e:
- logger.debug('got exception fetching dashboard grafana state: %s', e)
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # Use the least-created one as the active daemon
+ return daemon_descrs[-1]
+ def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
+ # TODO: signed cert
+ dd = self.get_active_daemon(daemon_descrs)
+ service_url = 'https://{}:{}'.format(
+ self._inventory_get_addr(dd.hostname), self.DEFAULT_SERVICE_PORT)
+ self._set_service_url_on_dashboard(
+ 'Grafana',
+ 'dashboard get-grafana-api-url',
+ 'dashboard set-grafana-api-url',
+ service_url
+ )
class AlertmanagerService(CephadmService):
+ DEFAULT_SERVICE_PORT = 9093
+
def create(self, daemon_id, host) -> str:
return self.mgr._create_daemon('alertmanager', daemon_id, host)
"peers": peers
}, sorted(deps)
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # TODO: if there are multiple daemons, who is the active one?
+ return daemon_descrs[0]
+
+ def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
+ dd = self.get_active_daemon(daemon_descrs)
+ service_url = 'http://{}:{}'.format(self._inventory_get_addr(dd.hostname), self.DEFAULT_SERVICE_PORT)
+ self._set_service_url_on_dashboard(
+ 'AlertManager',
+ 'dashboard get-alertmanager-api-host',
+ 'dashboard set-alertmanager-api-host',
+ service_url
+ )
+
class PrometheusService(CephadmService):
+ DEFAULT_SERVICE_PORT = 9095
+
def create(self, daemon_id, host) -> str:
return self.mgr._create_daemon('prometheus', daemon_id, host)
return r, sorted(deps)
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # TODO: if there are multiple daemons, who is the active one?
+ return daemon_descrs[0]
+
+ def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
+ dd = self.get_active_daemon(daemon_descrs)
+ service_url = 'http://{}:{}'.format(
+ self._inventory_get_addr(dd.hostname), self.DEFAULT_SERVICE_PORT)
+ self._set_service_url_on_dashboard(
+ 'Prometheus',
+ 'dashboard get-prometheus-api-host',
+ 'dashboard set-prometheus-api-host',
+ service_url
+ )
class NodeExporterService(CephadmService):
def create(self, daemon_id, host) -> str:
--- /dev/null
+from unittest.mock import MagicMock
+
+from cephadm.services.cephadmservice import CephadmService
+
+
+class FakeMgr:
+ def __init__(self):
+ self.config = ''
+ self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
+
+ def _check_mon_command(self, cmd_dict):
+ prefix = cmd_dict.get('prefix')
+ if prefix == 'get-cmd':
+ return 0, self.config, ''
+ if prefix == 'set-cmd':
+ self.config = cmd_dict.get('value')
+ return 0, 'value set', ''
+ return -1, '', 'error'
+
+
+class TestCephadmService:
+ def test_set_service_url_on_dashboard(self):
+ # pylint: disable=protected-access
+ mgr = FakeMgr()
+ service_url = 'http://svc:1000'
+ service = CephadmService(mgr)
+ service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
+ assert mgr.config == service_url
+
+ # set-cmd should not be called if value doesn't change
+ mgr.check_mon_command.reset_mock()
+ service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
+ mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'})