class Ceph(object):
daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
- 'crash', 'cephfs-mirror')
+ 'crash', 'cephfs-mirror', 'ceph-exporter')
##################################
##################################
+class CephExporter(object):
+ """Defines a Ceph exporter container"""
+
+ daemon_type = 'ceph-exporter'
+ entrypoint = '/usr/bin/ceph-exporter'
+ DEFAULT_PORT = 9926
+ port_map = {
+ 'ceph-exporter': DEFAULT_PORT,
+ }
+
+ def __init__(self,
+ ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str],
+ config_json: Dict[str, Any],
+ image: str = DEFAULT_IMAGE) -> None:
+ self.ctx = ctx
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ self.sock_dir = config_json.get('sock-dir', '/var/run/ceph/')
+ self.addrs = config_json.get('addrs', socket.gethostbyname(socket.gethostname()))
+ self.port = config_json.get('port', self.DEFAULT_PORT)
+ self.prio_limit = config_json.get('prio-limit', 5)
+ self.stats_period = config_json.get('stats-period', 5)
+
+ self.validate()
+
+ @classmethod
+ def init(cls, ctx: CephadmContext, fsid: str,
+ daemon_id: Union[int, str]) -> 'CephExporter':
+ return cls(ctx, fsid, daemon_id,
+ get_parm(ctx.config_json), ctx.image)
+
+ @staticmethod
+ def get_container_mounts() -> Dict[str, str]:
+ mounts = dict()
+ mounts['/var/run/ceph'] = '/var/run/ceph:z'
+ return mounts
+
+ def get_daemon_args(self) -> List[str]:
+ args = [
+ f'--sock-dir={self.sock_dir}',
+ f'--addrs={self.addrs}',
+ f'--port={self.port}',
+ f'--prio-limit={self.prio_limit}',
+ f'--stats-period={self.stats_period}',
+ ]
+ return args
+
+ def validate(self) -> None:
+ if not os.path.isdir(self.sock_dir):
+ raise Error(f'Directory does not exist. Got: {self.sock_dir}')
+
+
+##################################
+
+
class HAproxy(object):
"""Defines an HAproxy container"""
daemon_type = 'haproxy'
# type: (CephadmContext, str, str, Union[int, str]) -> List[str]
r = list() # type: List[str]
- if daemon_type in Ceph.daemons and daemon_type != 'crash':
+ if daemon_type in Ceph.daemons and daemon_type not in ['crash', 'ceph-exporter']:
r += [
'--setuser', 'ceph',
'--setgroup', 'ceph',
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
+ elif daemon_type == CephExporter.daemon_type:
+ ceph_exporter = CephExporter.init(ctx, fsid, daemon_id)
+ r.extend(ceph_exporter.get_daemon_args())
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(ctx, fsid, daemon_id)
r += haproxy.get_daemon_args()
mounts[data_dir] = cdata_dir + ':z'
if not no_config:
mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z'
- if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash']:
+ if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash', 'ceph-exporter']:
# these do not search for their keyrings in a data directory
mounts[data_dir + '/keyring'] = '/etc/ceph/ceph.client.%s.%s.keyring' % (daemon_type, daemon_id)
entrypoint = NFSGanesha.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(NFSGanesha.get_container_envs())
+ elif daemon_type == CephExporter.daemon_type:
+ entrypoint = CephExporter.entrypoint
+ name = 'client.ceph-exporter.%s' % daemon_id
elif daemon_type == HAproxy.daemon_type:
name = '%s.%s' % (daemon_type, daemon_id)
container_args.extend(['--user=root']) # haproxy 2.4 defaults to a different user
cli(['orch', 'apply', 'crash'])
if not ctx.skip_monitoring_stack:
- for t in ['prometheus', 'grafana', 'node-exporter', 'alertmanager']:
+ for t in ['ceph-exporter', 'prometheus', 'grafana', 'node-exporter', 'alertmanager']:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
from ceph.utils import datetime_now
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
-from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephExporterService
from cephadm.services.ingress import IngressSpec
from datetime import datetime, timedelta
return self.node_exporter_sd_config()
elif service == 'haproxy':
return self.haproxy_sd_config()
+ elif service == 'ceph-exporter':
+ return self.ceph_exporter_sd_config()
else:
return []
})
return srv_entries
+ def ceph_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]:
+ """Return <http_sd_config> compatible prometheus config for ceph-exporter service."""
+ srv_entries = []
+ for dd in self.mgr.cache.get_daemons_by_service('ceph-exporter'):
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
+ port = dd.ports[0] if dd.ports else CephExporterService.DEFAULT_SERVICE_PORT
+ srv_entries.append({
+ 'targets': [build_url(host=addr, port=port).lstrip('/')],
+ 'labels': {'instance': dd.hostname}
+ })
+ return srv_entries
+
@cherrypy.expose(alias='prometheus/rules')
def get_prometheus_rules(self) -> str:
"""Return currently configured prometheus rules as Yaml."""
from . import ssh
from .migrations import Migrations
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent
+ RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \
+ CephExporterService
from .services.ingress import IngressService
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
IngressService, CustomContainerService, CephfsMirrorService,
- CephadmAgent, SNMPGatewayService
+ CephadmAgent, SNMPGatewayService, CephExporterService
]
# https://github.com/python/mypy/issues/8993
Generate a unique random service name
"""
suffix = daemon_type not in [
- 'mon', 'crash',
+ 'mon', 'crash', 'ceph-exporter',
'prometheus', 'node-exporter', 'grafana', 'alertmanager',
'container', 'agent', 'snmp-gateway', 'loki', 'promtail'
]
deps = [self.get_mgr_ip()]
else:
need = {
- 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
+ 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress', 'ceph-exporter'],
'grafana': ['prometheus', 'loki'],
'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
'promtail': ['loki'],
'alertmanager': PlacementSpec(count=1),
'prometheus': PlacementSpec(count=1),
'node-exporter': PlacementSpec(host_pattern='*'),
+ 'ceph-exporter': PlacementSpec(host_pattern='*'),
'loki': PlacementSpec(count=1),
'promtail': PlacementSpec(host_pattern='*'),
'crash': PlacementSpec(host_pattern='*'),
def apply_node_exporter(self, spec: ServiceSpec) -> str:
return self._apply(spec)
+ @handle_orch_error
+ def apply_ceph_exporter(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
@handle_orch_error
def apply_crash(self, spec: ServiceSpec) -> str:
return self._apply(spec)
from mgr_module import HandleCommandResult, MonCommandFailed
-from ceph.deployment.service_spec import ServiceSpec, RGWSpec
+from ceph.deployment.service_spec import ServiceSpec, RGWSpec, CephExporterSpec
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
-from mgr_util import build_url
+from mgr_util import build_url, merge_dicts
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
"""
# despite this mapping entity names to daemons, self.TYPE within
# the CephService class refers to service types, not daemon types
- if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
+ if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress', 'ceph-exporter']:
return AuthEntity(f'client.{daemon_type}.{daemon_id}')
elif daemon_type in ['crash', 'agent']:
if host == "":
'prefix': 'auth get',
'entity': entity,
})
-
config = self.mgr.get_minimal_ceph_conf()
if extra_ceph_config:
return daemon_spec
+class CephExporterService(CephService):
+ TYPE = 'ceph-exporter'
+ DEFAULT_SERVICE_PORT = 9926
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ spec = cast(CephExporterSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_spec.daemon_id),
+ ['mon', 'profile ceph-exporter',
+ 'mon', 'allow r',
+ 'mgr', 'allow r',
+ 'osd', 'allow r'])
+ exporter_config = {}
+ if spec.sock_dir:
+ exporter_config.update({'sock-dir': spec.sock_dir})
+ if spec.port:
+ exporter_config.update({'port': f'{spec.port}'})
+ if spec.prio_limit is not None:
+ exporter_config.update({'prio-limit': f'{spec.prio_limit}'})
+ if spec.stats_period:
+ exporter_config.update({'stats-period': f'{spec.stats_period}'})
+
+ daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ daemon_spec.final_config = merge_dicts(daemon_spec.final_config, exporter_config)
+ return daemon_spec
+
+
class CephfsMirrorService(CephService):
TYPE = 'cephfs-mirror'
"service": dd.service_name(),
})
+ # scrape ceph-exporters
+ ceph_exporter_targets = []
+ for dd in self.mgr.cache.get_daemons_by_service('ceph-exporter'):
+ assert dd.hostname is not None
+ deps.append(dd.name())
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else 9926
+ ceph_exporter_targets.append({
+ 'url': build_url(host=addr, port=port).lstrip('/'),
+ 'hostname': dd.hostname
+ })
+
# generate the prometheus configuration
context = {
'alertmgr_targets': alertmgr_targets,
'mgr_scrape_list': mgr_scrape_list,
'haproxy_targets': haproxy_targets,
+ 'ceph_exporter_targets': ceph_exporter_targets,
'nodes': nodes,
}
r: Dict[str, Any] = {
instance: '{{ haproxy.service }}'
{% endfor %}
{% endif %}
+
+{% if ceph_exporter_targets %}
+ - job_name: 'ceph-exporter'
+ honor_labels: true
+ static_configs:
+{% for ceph_exporter in ceph_exporter_targets %}
+ - targets: ['{{ ceph_exporter.url }}']
+ labels:
+ instance: '{{ ceph_exporter.hostname }}'
+{% endfor %}
+{% endif %}
class FakeCache:
def get_daemons_by_service(self, service_type):
+ if service_type == 'ceph-exporter':
+ return [FakeDaemonDescription('1.2.3.4', [9926], 'node0'),
+ FakeDaemonDescription('1.2.3.5', [9926], 'node1')]
+
return [FakeDaemonDescription('1.2.3.4', [9100], 'node0'),
FakeDaemonDescription('1.2.3.5', [9200], 'node1')]
assert cfg[0]['targets'] == ['1.2.3.4:9049']
assert cfg[0]['labels'] == {'instance': 'ingress'}
+ def test_get_sd_config_ceph_exporter(self):
+ mgr = FakeMgr()
+ root = Root(mgr)
+ cfg = root.get_sd_config('ceph-exporter')
+
+ # check response structure
+ assert cfg
+ for entry in cfg:
+ assert 'labels' in entry
+ assert 'targets' in entry
+
+ # check content
+ assert cfg[0]['targets'] == ['1.2.3.4:9926']
+
def test_get_sd_config_invalid_service(self):
mgr = FakeMgr()
root = Root(mgr)
NodeExporterService, LokiService, PromtailService
from cephadm.module import CephadmOrchestrator
from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec, \
- ServiceSpec, RGWSpec, GrafanaSpec, SNMPGatewaySpec, IngressSpec, PlacementSpec, PrometheusSpec
+ ServiceSpec, RGWSpec, GrafanaSpec, SNMPGatewaySpec, IngressSpec, PlacementSpec, \
+ PrometheusSpec, CephExporterSpec
from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect
from orchestrator import OrchestratorError
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
+ with_service(cephadm_module, CephExporterSpec('ceph-exporter')) as _, \
with_service(cephadm_module, PrometheusSpec('prometheus')) as _:
y = dedent("""
labels:
instance: 'test'
+
+ - job_name: 'ceph-exporter'
+ honor_labels: true
+ static_configs:
+ - targets: ['[1::4]:9926']
+ labels:
+ instance: 'test'
""").lstrip()
_run_cephadm.assert_called_with(
# ceph daemon types that use the ceph container image.
# NOTE: order important here as these are used for upgrade order
-CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
+CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw',
+ 'rbd-mirror', 'cephfs-mirror', 'ceph-exporter']
GATEWAY_TYPES = ['iscsi', 'nfs']
MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
'alertmanager', 'grafana', 'loki', 'promtail']
Map from daemon names to ceph entity names (as seen in config)
"""
daemon_type = name.split('.', 1)[0]
- if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
+ if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter']:
return ConfEntity('client.' + name)
elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
return ConfEntity(name)
'mon': self.apply_mon,
'nfs': self.apply_nfs,
'node-exporter': self.apply_node_exporter,
+ 'ceph-exporter': self.apply_ceph_exporter,
'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
'prometheus': self.apply_prometheus,
'loki': self.apply_loki,
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
+ def apply_ceph_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a ceph exporter daemon(s)"""
+ raise NotImplementedError()
+
def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a Loki daemon(s)"""
raise NotImplementedError()
'alertmanager': 'alertmanager',
'prometheus': 'prometheus',
'node-exporter': 'node-exporter',
+ 'ceph-exporter': 'ceph-exporter',
'loki': 'loki',
'promtail': 'promtail',
'crash': 'crash',
'loki': ['loki'],
'promtail': ['promtail'],
'node-exporter': ['node-exporter'],
+ 'ceph-exporter': ['ceph-exporter'],
'crash': ['crash'],
'container': ['container'],
'agent': ['agent'],
alertmanager = 'alertmanager'
grafana = 'grafana'
node_exporter = 'node-exporter'
+ ceph_exporter = 'ceph-exporter'
prometheus = 'prometheus'
loki = 'loki'
promtail = 'promtail'
"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi loki promtail mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw agent ' \
- 'container ingress cephfs-mirror snmp-gateway'.split()
+ 'container ingress cephfs-mirror snmp-gateway ceph-exporter'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',
'container': CustomContainerSpec,
'grafana': GrafanaSpec,
'node-exporter': MonitoringSpec,
+ 'ceph-exporter': CephExporterSpec,
'prometheus': PrometheusSpec,
'loki': MonitoringSpec,
'promtail': MonitoringSpec,
# for making deep copies so you can edit the settings in one without affecting the other
# mostly for testing purposes
return TunedProfileSpec(self.profile_name, self.placement, self.settings.copy())
+
+
+class CephExporterSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'ceph-exporter',
+ sock_dir: Optional[str] = None,
+ addrs: str = '',
+ port: Optional[int] = None,
+ prio_limit: Optional[int] = 5,
+ stats_period: Optional[int] = 5,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ extra_container_args: Optional[List[str]] = None,
+ ):
+ assert service_type == 'ceph-exporter'
+
+ super(CephExporterSpec, self).__init__(
+ service_type,
+ placement=placement,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ extra_container_args=extra_container_args)
+
+ self.service_type = service_type
+ self.sock_dir = sock_dir
+ self.addrs = addrs
+ self.port = port
+ self.prio_limit = prio_limit
+ self.stats_period = stats_period
+
+ def validate(self) -> None:
+ super(CephExporterSpec, self).validate()
+
+ if not isinstance(self.prio_limit, int):
+ raise SpecValidationError(
+ f'prio_limit must be an integer. Got {type(self.prio_limit)}')
+ if not isinstance(self.stats_period, int):
+ raise SpecValidationError(
+ f'stats_period must be an integer. Got {type(self.stats_period)}')
+
+
+yaml.add_representer(CephExporterSpec, ServiceSpec.yaml_representer)