+++ /dev/null
-================
-cephadm Exporter
-================
-
-There are a number of long running tasks that the cephadm 'binary' runs which can take several seconds
-to run. This latency represents a scalability challenge to the Ceph orchestrator management plane.
-
-To address this, cephadm needs to be able to run some of these longer running tasks asynchronously - this
-frees up processing on the mgr by offloading tasks to each host, reduces latency and improves scalability.
-
-This document describes the implementation requirements and design for an 'exporter' feature
-
-
-Requirements
-============
-The exporter should address these functional and non-functional requirements;
-
-* run as a normal systemd unit
-* utilise the same filesystem schema as other services deployed with cephadm
-* require only python3 standard library modules (no external dependencies)
-* use encryption to protect the data flowing from a host to Ceph mgr
-* execute data gathering tasks as background threads
-* be easily extended to include more data gathering tasks
-* monitor itself for the health of the data gathering threads
-* cache metadata to respond to queries quickly
-* respond to a metadata query in <30ms to support large Ceph clusters (000's nodes)
-* provide CLI interaction to enable the exporter to be deployed either at bootstrap time, or once the
- cluster has been deployed.
-* be deployed as a normal orchestrator service (similar to the node-exporter)
-
-High Level Design
-=================
-
-This section will focus on the exporter logic **only**.
-
-.. code::
-
- Establish a metadata cache object (tasks will be represented by separate attributes)
- Create a thread for each data gathering task; host, ceph-volume and list_daemons
- each thread updates it's own attribute within the cache object
- Start a server instance passing requests to a specific request handler
- the request handler only interacts with the cache object
- the request handler passes metadata back to the caller
- Main Loop
- Leave the loop if a 'stop' request is received
- check thread health
- if a thread that was active, is now inactive
- update the cache marking the task as inactive
- update the cache with an error message for that task
- wait for n secs
-
-
-In the initial exporter implementation, the exporter has been implemented as a RESTful API.
-
-
-Security
-========
-
-The cephadm 'binary' only supports standard python3 features, which has meant the RESTful API has been
-developed using the http module, which itself is not intended for production use. However, the implementation
-is not complex (based only on HTTPServer and BaseHHTPRequestHandler) and only supports the GET method - so the
-security risk is perceived as low.
-
-Current mgr to host interactions occurs within an ssh connection, so the goal of the exporter is to adopt a similar
-security model.
-
-The initial REST API is implemented with the following features;
-
-* generic self-signed, or user provided SSL crt/key to encrypt traffic between the mgr and the host
-* 'token' based authentication of the request
-
-All exporter instances will use the **same** crt/key to secure the link from the mgr to the host(s), in the same way
-that the ssh access uses the same public key and port for each host connection.
-
-.. note:: Since the same SSL configuration is used on every exporter, when you supply your own settings you must
- ensure that the CN or SAN components of the distinguished name are either **not** used or created using wildcard naming.
-
-The crt, key and token files are all defined with restrictive permissions (600), to help mitigate against the risk of exposure
-to any other user on the Ceph cluster node(s).
-
-Administrator Interaction
-=========================
-Several new commands are required to configure the exporter, and additional parameters should be added to the bootstrap
-process to allow the exporter to be deployed automatically for new clusters.
-
-
-Enhancements to the 'bootstrap' process
----------------------------------------
-bootstrap should support additional parameters to automatically configure exporter daemons across hosts
-
-``--with-exporter``
-
-By using this flag, you're telling the bootstrap process to include the cephadm-exporter service within the
-cluster. If you do not provide a specific configuration (SSL, token, port) to use, defaults would be applied.
-
-``--exporter-config``
-
-With the --exporter-config option, you may pass your own SSL, token and port information. The file must be in
-JSON format and contain the following fields; crt, key, token and port. The JSON content should be validated, and any
-errors detected passed back to the user during the argument parsing phase (before any changes are done).
-
-
-Additional ceph commands
-------------------------
-::
-
-# ceph cephadm generate-exporter-config
-
-This command will create generate a default configuration consisting of; a self signed certificate, a randomly generated
-32 character token and the default port of 9443 for the REST API.
-::
-
-# ceph cephadm set-exporter-config -i <config.json>
-
-Use a JSON file to define the crt, key, token and port for the REST API. The crt, key and token are validated by
-the mgr/cephadm module prior storing the values in the KV store. Invalid or missing entries should be reported to the
-user.
-::
-
-# ceph cephadm clear-exporter-config
-
-Clear the current configuration (removes the associated keys from the KV store)
-::
-
-# ceph cephadm get-exporter-config
-
-Show the current exporter configuration, in JSON format
-
-
-.. note:: If the service is already deployed any attempt to change or clear the configuration will
- be denied. In order to change settings you must remove the service, apply the required configuration
- and re-apply (``ceph orch apply cephadm-exporter``)
-
-
-
-New Ceph Configuration Keys
-===========================
-The exporter configuration is persisted to the monitor's KV store, with the following keys:
-
-| mgr/cephadm/exporter_config
-| mgr/cephadm/exporter_enabled
-
-
-
-RESTful API
-===========
-The primary goal of the exporter is the provision of metadata from the host to the mgr. This interaction takes
-place over a simple GET interface. Although only the GET method is supported, the API provides multiple URLs to
-provide different views on the metadata that has been gathered.
-
-.. csv-table:: Supported URL endpoints
- :header: "URL", "Purpose"
-
- "/v1/metadata", "show all metadata including health of all threads"
- "/v1/metadata/health", "only report on the health of the data gathering threads"
- "/v1/metadata/disks", "show the disk output (ceph-volume inventory data)"
- "/v1/metadata/host", "show host related metadata from the gather-facts command"
- "/v1/metatdata/daemons", "show the status of all ceph cluster related daemons on the host"
-
-Return Codes
-------------
-The following HTTP return codes are generated by the API
-
-.. csv-table:: Supported HTTP Responses
- :header: "Status Code", "Meaning"
-
- "200", "OK"
- "204", "the thread associated with this request is no longer active, no data is returned"
- "206", "some threads have stopped, so some content is missing"
- "401", "request is not authorised - check your token is correct"
- "404", "URL is malformed, not found"
- "500", "all threads have stopped - unable to provide any metadata for the host"
-
-
-Deployment
-==========
-During the initial phases of the exporter implementation, deployment is regarded as optional but is available
-to new clusters and existing clusters that have the feature (Pacific and above).
-
-* new clusters : use the ``--with-exporter`` option
-* existing clusters : you'll need to set the configuration and deploy the service manually
-
-.. code::
-
- # ceph cephadm generate-exporter-config
- # ceph orch apply cephadm-exporter
-
-If you choose to remove the cephadm-exporter service, you may simply
-
-.. code::
-
- # ceph orch rm cephadm-exporter
-
-This will remove the daemons, and the exporter releated settings stored in the KV store.
-
-
-Management
-==========
-Once the exporter is deployed, you can use the following snippet to extract the host's metadata.
-
-.. code-block:: python
-
- import ssl
- import json
- import sys
- import tempfile
- import time
- from urllib.request import Request, urlopen
-
- # CHANGE THIS V
- hostname = "rh8-1.storage.lab"
-
- print("Reading config.json")
- try:
- with open('./config.json', 'r') as f:
- raw=f.read()
- except FileNotFoundError as e:
- print("You must first create a config.json file using the cephadm get-exporter-config command")
- sys.exit(1)
-
- cfg = json.loads(raw)
- with tempfile.NamedTemporaryFile(buffering=0) as t:
- print("creating a temporary local crt file from the json")
- t.write(cfg['crt'].encode('utf-8'))
-
- ctx = ssl.create_default_context()
- ctx.check_hostname = False
- ctx.load_verify_locations(t.name)
- hdrs={"Authorization":f"Bearer {cfg['token']}"}
- print("Issuing call to gather metadata")
- req=Request(f"https://{hostname}:9443/v1/metadata",headers=hdrs)
- s_time = time.time()
- r = urlopen(req,context=ctx)
- print(r.status)
- print("call complete")
- # assert r.status == 200
- if r.status in [200, 206]:
-
- raw=r.read() # bytes string
- js=json.loads(raw.decode())
- print(json.dumps(js, indent=2))
- elapsed = time.time() - s_time
- print(f"Elapsed secs : {elapsed}")
-
-
-.. note:: the above example uses python3, and assumes that you've extracted the config using the ``get-exporter-config`` command.
-
-
-Implementation Specific Details
-===============================
-
-In the same way as a typical container based deployment, the exporter is deployed to a directory under ``/var/lib/ceph/<fsid>``. The
-cephadm binary is stored in this cluster folder, and the daemon's configuration and systemd settings are stored
-under ``/var/lib/ceph/<fsid>/cephadm-exporter.<id>/``.
-
-.. code::
-
- [root@rh8-1 cephadm-exporter.rh8-1]# pwd
- /var/lib/ceph/cb576f70-2f72-11eb-b141-525400da3eb7/cephadm-exporter.rh8-1
- [root@rh8-1 cephadm-exporter.rh8-1]# ls -al
- total 24
- drwx------. 2 root root 100 Nov 25 18:10 .
- drwx------. 8 root root 160 Nov 25 23:19 ..
- -rw-------. 1 root root 1046 Nov 25 18:10 crt
- -rw-------. 1 root root 1704 Nov 25 18:10 key
- -rw-------. 1 root root 64 Nov 25 18:10 token
- -rw-------. 1 root root 38 Nov 25 18:10 unit.configured
- -rw-------. 1 root root 48 Nov 25 18:10 unit.created
- -rw-r--r--. 1 root root 157 Nov 25 18:10 unit.run
-
-
-In order to respond to requests quickly, the CephadmDaemon uses a cache object (CephadmCache) to hold the results
-of the cephadm commands.
-
-The exporter doesn't introduce any new data gathering capability - instead it merely calls the existing cephadm commands.
-
-The CephadmDaemon class creates a local HTTP server(uses ThreadingMixIn), secured with TLS and uses the CephadmDaemonHandler
-to handle the requests. The request handler inspects the request header and looks for a valid Bearer token - if this is invalid
-or missing the caller receives a 401 Unauthorized error.
-
-The 'run' method of the CephadmDaemon class, places the scrape_* methods into different threads with each thread supporting
-a different refresh interval. Each thread then periodically issues it's cephadm command, and places the output
-in the cache object.
-
-In addition to the command output, each thread also maintains it's own timestamp record in the cache so the caller can
-very easily determine the age of the data it's received.
-
-If the underlying cephadm command execution hits an exception, the thread passes control to a _handle_thread_exception method.
-Here the exception is logged to the daemon's log file and the exception details are added to the cache, providing visibility
-of the problem to the caller.
-
-Although each thread is effectively given it's own URL endpoint (host, disks, daemons), the recommended way to gather data from
-the host is to simply use the ``/v1/metadata`` endpoint. This will provide all of the data, and indicate whether any of the
-threads have failed.
-
-The run method uses "signal" to establish a reload hook, but in the initial implementation this doesn't take any action and simply
-logs that a reload was received.
-
-
-Future Work
-===========
-
-#. Consider the potential of adding a restart policy for threads
-#. Once the exporter is fully integrated into mgr/cephadm, the goal would be to make the exporter the
- default means of data gathering. However, until then the exporter will remain as an opt-in 'feature
- preview'.
developing-cephadm
host-maintenance
compliance-check
- cephadm-exporter
Storage devices and OSDs management <./design/storage_devices_and_osds>
scalability-notes
--output-pub-ssh-key $TMPDIR/ceph.pub \
--allow-overwrite \
--skip-mon-network \
- --skip-monitoring-stack \
- --with-exporter
+ --skip-monitoring-stack
test -e $CONFIG
test -e $KEYRING
rm -f $ORIG_CONFIG
cond="curl 'http://localhost:9093' | grep -q 'Alertmanager'"
is_available "alertmanager" "$cond" 10
-# Fetch the token we need to access the exporter API
-token=$($CEPHADM shell --fsid $FSID --config $CONFIG --keyring $KEYRING ceph cephadm get-exporter-config | jq -r '.token')
-[[ ! -z "$token" ]]
-
-# check all exporter threads active
-cond="curl -k -s -H \"Authorization: Bearer $token\" \
- https://localhost:9443/v1/metadata/health | \
- jq -r '.tasks | select(.disks == \"active\" and .daemons == \"active\" and .host == \"active\")'"
-is_available "exporter_threads_active" "$cond" 3
-
-# check we deployed for all hosts
-$CEPHADM shell --fsid $FSID --config $CONFIG --keyring $KEYRING ceph orch ls --service-type cephadm-exporter --format json
-host_pattern=$($CEPHADM shell --fsid $FSID --config $CONFIG --keyring $KEYRING ceph orch ls --service-type cephadm-exporter --format json | jq -r '.[0].placement.host_pattern')
-[[ "$host_pattern" = "*" ]]
-
## run
# WRITE ME
import datetime
import fcntl
import ipaddress
+import io
import json
import logging
from logging.config import dictConfig
import time
import errno
import struct
-from socketserver import ThreadingMixIn
-from http.server import BaseHTTPRequestHandler, HTTPServer
-import signal
-import io
-from contextlib import redirect_stdout
import ssl
from enum import Enum
import uuid
from configparser import ConfigParser
+from contextlib import redirect_stdout
from functools import wraps
from glob import glob
from io import StringIO
-from threading import Thread, RLock, Event
+from threading import Thread, Event
from urllib.error import HTTPError
from urllib.request import urlopen, Request
from pathlib import Path
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
supported_daemons.append(CustomContainer.daemon_type)
- supported_daemons.append(CephadmDaemon.daemon_type)
supported_daemons.append(HAproxy.daemon_type)
supported_daemons.append(Keepalived.daemon_type)
supported_daemons.append(CephadmAgent.daemon_type)
def get_unit_name(fsid, daemon_type, daemon_id=None):
# type: (str, str, Optional[Union[int, str]]) -> str
# accept either name or type + id
- if daemon_type == CephadmDaemon.daemon_type and daemon_id is not None:
- return 'ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id)
- elif daemon_id is not None:
+ if daemon_id is not None:
return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id)
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
config, keyring)
if not reconfig:
- if daemon_type == CephadmDaemon.daemon_type:
- port = next(iter(ports), None) # get first tcp port provided or None
-
- if ctx.config_json == '-':
- config_js = get_parm('-')
- else:
- config_js = get_parm(ctx.config_json)
- assert isinstance(config_js, dict)
- assert isinstance(daemon_id, str)
-
- cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port)
- cephadm_exporter.deploy_daemon_unit(config_js)
- elif daemon_type == CephadmAgent.daemon_type:
+ if daemon_type == CephadmAgent.daemon_type:
if ctx.config_json == '-':
config_js = get_parm('-')
else:
cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(ctx.container_init), '--force'])
- if ctx.with_exporter:
- cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true'])
- if ctx.exporter_config:
- logger.info('Applying custom cephadm exporter settings')
- # validated within the parser, so we can just apply to the store
- with tempfile.NamedTemporaryFile(buffering=0) as tmp:
- tmp.write(json.dumps(ctx.exporter_config).encode('utf-8'))
- mounts = {
- tmp.name: '/tmp/exporter-config.json:z'
- }
- cli(['cephadm', 'set-exporter-config', '-i', '/tmp/exporter-config.json'], extra_mounts=mounts)
- logger.info('-> Use ceph orch apply cephadm-exporter to deploy')
- else:
- # generate a default SSL configuration for the exporter(s)
- logger.info('Generating a default cephadm exporter configuration (self-signed)')
- cli(['cephadm', 'generate-exporter-config'])
- #
- # deploy the service (commented out until the cephadm changes are in the ceph container build)
- logger.info('Deploying cephadm exporter service with default placement...')
- cli(['orch', 'apply', 'cephadm-exporter'])
-
if not ctx.skip_dashboard:
prepare_dashboard(ctx, uid, gid, cli, wait_for_mgr_restart)
keyring=None, reconfig=ctx.reconfig,
ports=daemon_ports)
- elif daemon_type == CephadmDaemon.daemon_type:
- # get current user gid and uid
- uid = os.getuid()
- gid = os.getgid()
- config_js = get_parm(ctx.config_json) # type: Dict[str, str]
- if not daemon_ports:
- logger.info('cephadm-exporter will use default port ({})'.format(CephadmDaemon.default_port))
- daemon_ports = [CephadmDaemon.default_port]
-
- CephadmDaemon.validate_config(config_js)
-
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
- uid, gid, ports=daemon_ports)
-
elif daemon_type == CephadmAgent.daemon_type:
# get current user gid and uid
uid = os.getuid()
os.rename(data_dir,
os.path.join(backup_dir, dirname))
else:
- if daemon_type == CephadmDaemon.daemon_type:
- CephadmDaemon.uninstall(ctx, ctx.fsid, daemon_type, daemon_id)
call_throws(ctx, ['rm', '-rf', data_dir])
##################################
if self.dest == 'name':
self._check_name(values)
setattr(namespace, self.dest, values)
- elif self.dest == 'exporter_config':
- cfg = get_parm(values)
- # run the class' validate method, and convert to an argparse error
- # if problems are found
- try:
- CephadmDaemon.validate_config(cfg)
- except Error as e:
- raise argparse.ArgumentError(self,
- str(e))
- setattr(namespace, self.dest, cfg)
##################################
##################################
-class CephadmCache:
- task_types = ['disks', 'daemons', 'host', 'http_server']
-
- def __init__(self) -> None:
- self.started_epoch_secs = time.time()
- self.tasks = {
- 'daemons': 'inactive',
- 'disks': 'inactive',
- 'host': 'inactive',
- 'http_server': 'inactive',
- }
- self.errors: list = []
- self.disks: dict = {}
- self.daemons: dict = {}
- self.host: dict = {}
- self.lock = RLock()
-
- @property
- def health(self) -> dict:
- return {
- 'started_epoch_secs': self.started_epoch_secs,
- 'tasks': self.tasks,
- 'errors': self.errors,
- }
-
- def to_json(self) -> dict:
- return {
- 'health': self.health,
- 'host': self.host,
- 'daemons': self.daemons,
- 'disks': self.disks,
- }
-
- def update_health(self, task_type: str, task_status: str, error_msg: Optional[str] = None) -> None:
- assert task_type in CephadmCache.task_types
- with self.lock:
- self.tasks[task_type] = task_status
- if error_msg:
- self.errors.append(error_msg)
-
- def update_task(self, task_type: str, content: dict) -> None:
- assert task_type in CephadmCache.task_types
- assert isinstance(content, dict)
- with self.lock:
- current = getattr(self, task_type)
- for k in content:
- current[k] = content[k]
-
- setattr(self, task_type, current)
-
-
-class CephadmHTTPServer(ThreadingMixIn, HTTPServer):
- allow_reuse_address = True
- daemon_threads = True
- cephadm_cache: CephadmCache
- token: str
-
-
-class CephadmDaemonHandler(BaseHTTPRequestHandler):
- server: CephadmHTTPServer
- api_version = 'v1'
- valid_routes = [
- f'/{api_version}/metadata',
- f'/{api_version}/metadata/health',
- f'/{api_version}/metadata/disks',
- f'/{api_version}/metadata/daemons',
- f'/{api_version}/metadata/host',
- ]
-
- class Decorators:
- @classmethod
- def authorize(cls, f: Any) -> Any:
- """Implement a basic token check.
-
- The token is installed at deployment time and must be provided to
- ensure we only respond to callers who know our token i.e. mgr
- """
-
- def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
- auth = self.headers.get('Authorization', None)
- if auth != 'Bearer ' + self.server.token:
- self.send_error(401)
- return
- f(self, *args, **kwargs)
-
- return wrapper
-
- def _help_page(self) -> str:
- return """<!DOCTYPE html>
-<html>
-<head><title>cephadm metadata exporter</title></head>
-<style>
-body {{
- font-family: sans-serif;
- font-size: 0.8em;
-}}
-table {{
- border-width: 0px;
- border-spacing: 0px;
- margin-left:20px;
-}}
-tr:hover {{
- background: PowderBlue;
-}}
-td,th {{
- padding: 5px;
-}}
-</style>
-<body>
- <h1>cephadm metadata exporter {api_version}</h1>
- <table>
- <thead>
- <tr><th>Endpoint</th><th>Methods</th><th>Response</th><th>Description</th></tr>
- </thead>
- <tr><td><a href='{api_version}/metadata'>{api_version}/metadata</a></td><td>GET</td><td>JSON</td><td>Return <b>all</b> metadata for the host</td></tr>
- <tr><td><a href='{api_version}/metadata/daemons'>{api_version}/metadata/daemons</a></td><td>GET</td><td>JSON</td><td>Return daemon and systemd states for ceph daemons (ls)</td></tr>
- <tr><td><a href='{api_version}/metadata/disks'>{api_version}/metadata/disks</a></td><td>GET</td><td>JSON</td><td>show disk inventory (ceph-volume)</td></tr>
- <tr><td><a href='{api_version}/metadata/health'>{api_version}/metadata/health</a></td><td>GET</td><td>JSON</td><td>Show current health of the exporter sub-tasks</td></tr>
- <tr><td><a href='{api_version}/metadata/host'>{api_version}/metadata/host</a></td><td>GET</td><td>JSON</td><td>Show host metadata (gather-facts)</td></tr>
- </table>
-</body>
-</html>""".format(api_version=CephadmDaemonHandler.api_version)
-
- def _fetch_root(self) -> None:
- self.send_response(200)
- self.send_header('Content-type', 'text/html; charset=utf-8')
- self.end_headers()
- self.wfile.write(self._help_page().encode('utf-8'))
-
- @Decorators.authorize
- def do_GET(self) -> None:
- """Handle *all* GET requests"""
-
- if self.path == '/':
- # provide a html response if someone hits the root url, to document the
- # available api endpoints
- return self._fetch_root()
- elif self.path in CephadmDaemonHandler.valid_routes:
- u = self.path.split('/')[-1]
- data = json.dumps({})
- status_code = 200
-
- tasks = self.server.cephadm_cache.health.get('tasks', {})
- assert tasks
-
- # We're using the http status code to help indicate thread health
- # - 200 (OK): request successful
- # - 204 (No Content): access to a cache relating to a dead thread
- # - 206 (Partial content): one or more theads are inactive
- # - 500 (Server Error): all threads inactive
- if u == 'metadata':
- data = json.dumps(self.server.cephadm_cache.to_json())
- if all([tasks[task_name] == 'inactive' for task_name in tasks if task_name != 'http_server']):
- # All the subtasks are dead!
- status_code = 500
- elif any([tasks[task_name] == 'inactive' for task_name in tasks if task_name != 'http_server']):
- status_code = 206
-
- # Individual GETs against the a tasks endpoint will also return a 503 if the corresponding thread is inactive
- elif u == 'daemons':
- data = json.dumps(self.server.cephadm_cache.daemons)
- if tasks['daemons'] == 'inactive':
- status_code = 204
- elif u == 'disks':
- data = json.dumps(self.server.cephadm_cache.disks)
- if tasks['disks'] == 'inactive':
- status_code = 204
- elif u == 'host':
- data = json.dumps(self.server.cephadm_cache.host)
- if tasks['host'] == 'inactive':
- status_code = 204
-
- # a GET against health will always return a 200, since the op is always successful
- elif u == 'health':
- data = json.dumps(self.server.cephadm_cache.health)
-
- self.send_response(status_code)
- self.send_header('Content-type', 'application/json')
- self.end_headers()
- self.wfile.write(data.encode('utf-8'))
- else:
- # Invalid GET URL
- bad_request_msg = 'Valid URLs are: {}'.format(', '.join(CephadmDaemonHandler.valid_routes))
- self.send_response(404, message=bad_request_msg) # reason
- self.send_header('Content-type', 'application/json')
- self.end_headers()
- self.wfile.write(json.dumps({'message': bad_request_msg}).encode('utf-8'))
-
- def log_message(self, format: str, *args: Any) -> None:
- rqst = ' '.join(str(a) for a in args)
- logger.info(f'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}')
-
-
-class CephadmDaemon():
-
- daemon_type = 'cephadm-exporter'
- default_port = 9443
- key_name = 'key'
- crt_name = 'crt'
- token_name = 'token'
- config_requirements = [
- key_name,
- crt_name,
- token_name,
- ]
- loop_delay = 1
- thread_check_interval = 5
-
- def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Optional[str] = None, port: Optional[int] = None) -> None:
- self.ctx = ctx
- self.fsid = fsid
- self.daemon_id = daemon_id
- if not port:
- self.port = CephadmDaemon.default_port
- else:
- self.port = port
- self.workers: List[Thread] = []
- self.http_server: CephadmHTTPServer
- self.stop = False
- self.cephadm_cache = CephadmCache()
- self.errors: List[str] = []
- self.token = read_file([os.path.join(self.daemon_path, CephadmDaemon.token_name)])
-
- @classmethod
- def validate_config(cls, config: dict) -> None:
- reqs = ', '.join(CephadmDaemon.config_requirements)
- errors = []
-
- if not config or not all([k_name in config for k_name in CephadmDaemon.config_requirements]):
- raise Error(f'config must contain the following fields : {reqs}')
-
- if not all([isinstance(config[k_name], str) for k_name in CephadmDaemon.config_requirements]):
- errors.append(f'the following fields must be strings: {reqs}')
-
- crt = config[CephadmDaemon.crt_name]
- key = config[CephadmDaemon.key_name]
- token = config[CephadmDaemon.token_name]
-
- if not crt.startswith('-----BEGIN CERTIFICATE-----') or not crt.endswith('-----END CERTIFICATE-----\n'):
- errors.append('crt field is not a valid SSL certificate')
- if not key.startswith('-----BEGIN PRIVATE KEY-----') or not key.endswith('-----END PRIVATE KEY-----\n'):
- errors.append('key is not a valid SSL private key')
- if len(token) < 8:
- errors.append("'token' must be more than 8 characters long")
-
- if 'port' in config:
- try:
- p = int(config['port'])
- if p <= 1024:
- raise ValueError
- except (TypeError, ValueError):
- errors.append('port must be an integer > 1024')
-
- if errors:
- raise Error('Parameter errors : {}'.format(', '.join(errors)))
-
- @property
- def port_active(self) -> bool:
- return port_in_use(self.ctx, self.port)
-
- @property
- def can_run(self) -> bool:
- # if port is in use
- if self.port_active:
- self.errors.append(f'TCP port {self.port} already in use, unable to bind')
- if not os.path.exists(os.path.join(self.daemon_path, CephadmDaemon.key_name)):
- self.errors.append(f"Key file '{CephadmDaemon.key_name}' is missing from {self.daemon_path}")
- if not os.path.exists(os.path.join(self.daemon_path, CephadmDaemon.crt_name)):
- self.errors.append(f"Certificate file '{CephadmDaemon.crt_name}' is missing from {self.daemon_path}")
- if self.token == 'Unknown':
- self.errors.append(f"Authentication token '{CephadmDaemon.token_name}' is missing from {self.daemon_path}")
- return len(self.errors) == 0
-
- @staticmethod
- def _unit_name(fsid: str, daemon_id: str) -> str:
- return '{}.service'.format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id))
-
- @property
- def unit_name(self) -> str:
- assert self.daemon_id is not None
- return CephadmDaemon._unit_name(self.fsid, self.daemon_id)
-
- @property
- def daemon_path(self) -> str:
- return os.path.join(
- self.ctx.data_dir,
- self.fsid,
- f'{self.daemon_type}.{self.daemon_id}'
- )
-
- @property
- def binary_path(self) -> str:
- path = os.path.realpath(__file__)
- assert os.path.isfile(path)
- return path
-
- def _handle_thread_exception(self, exc: Exception, thread_type: str) -> None:
- e_msg = f'{exc.__class__.__name__} exception: {str(exc)}'
- thread_info = getattr(self.cephadm_cache, thread_type)
- errors = thread_info.get('scrape_errors', [])
- errors.append(e_msg)
- logger.error(e_msg)
- logger.exception(exc)
- self.cephadm_cache.update_task(
- thread_type,
- {
- 'scrape_errors': errors,
- 'data': None,
- }
- )
-
- def _scrape_host_facts(self, refresh_interval: int = 10) -> None:
- ctr = 0
- exception_encountered = False
-
- while True:
-
- if self.stop or exception_encountered:
- break
-
- if ctr >= refresh_interval:
- ctr = 0
- logger.debug('executing host-facts scrape')
- errors = []
- s_time = time.time()
-
- try:
- facts = HostFacts(self.ctx)
- except Exception as e:
- self._handle_thread_exception(e, 'host')
- exception_encountered = True
- else:
- elapsed = time.time() - s_time
- try:
- data = json.loads(facts.dump())
- except json.decoder.JSONDecodeError:
- errors.append('host-facts provided invalid JSON')
- logger.warning(errors[-1])
- data = {}
- self.cephadm_cache.update_task(
- 'host',
- {
- 'scrape_timestamp': s_time,
- 'scrape_duration_secs': elapsed,
- 'scrape_errors': errors,
- 'data': data,
- }
- )
- logger.debug(f'completed host-facts scrape - {elapsed}s')
-
- time.sleep(CephadmDaemon.loop_delay)
- ctr += CephadmDaemon.loop_delay
- logger.info('host-facts thread stopped')
-
- def _scrape_ceph_volume(self, refresh_interval: int = 15) -> None:
- # we're invoking the ceph_volume command, so we need to set the args that it
- # expects to use
- self.ctx.command = 'inventory --format=json'.split()
- self.ctx.fsid = self.fsid
-
- ctr = 0
- exception_encountered = False
-
- while True:
- if self.stop or exception_encountered:
- break
-
- if ctr >= refresh_interval:
- ctr = 0
- logger.debug('executing ceph-volume scrape')
- errors = []
- s_time = time.time()
- stream = io.StringIO()
- try:
- with redirect_stdout(stream):
- command_ceph_volume(self.ctx)
- except Exception as e:
- self._handle_thread_exception(e, 'disks')
- exception_encountered = True
- else:
- elapsed = time.time() - s_time
-
- # if the call to ceph-volume returns junk with the
- # json, it won't parse
- stdout = stream.getvalue()
-
- data = []
- if stdout:
- try:
- data = json.loads(stdout)
- except json.decoder.JSONDecodeError:
- errors.append('ceph-volume thread provided bad json data')
- logger.warning(errors[-1])
- else:
- errors.append('ceph-volume did not return any data')
- logger.warning(errors[-1])
-
- self.cephadm_cache.update_task(
- 'disks',
- {
- 'scrape_timestamp': s_time,
- 'scrape_duration_secs': elapsed,
- 'scrape_errors': errors,
- 'data': data,
- }
- )
-
- logger.debug(f'completed ceph-volume scrape - {elapsed}s')
- time.sleep(CephadmDaemon.loop_delay)
- ctr += CephadmDaemon.loop_delay
-
- logger.info('ceph-volume thread stopped')
-
- def _scrape_list_daemons(self, refresh_interval: int = 20) -> None:
- ctr = 0
- exception_encountered = False
- while True:
- if self.stop or exception_encountered:
- break
-
- if ctr >= refresh_interval:
- ctr = 0
- logger.debug('executing list-daemons scrape')
- errors = []
- s_time = time.time()
-
- try:
- # list daemons should ideally be invoked with a fsid
- data = list_daemons(self.ctx)
- except Exception as e:
- self._handle_thread_exception(e, 'daemons')
- exception_encountered = True
- else:
- if not isinstance(data, list):
- errors.append('list-daemons did not supply a list?')
- logger.warning(errors[-1])
- data = []
- elapsed = time.time() - s_time
- self.cephadm_cache.update_task(
- 'daemons',
- {
- 'scrape_timestamp': s_time,
- 'scrape_duration_secs': elapsed,
- 'scrape_errors': errors,
- 'data': data,
- }
- )
- logger.debug(f'completed list-daemons scrape - {elapsed}s')
-
- time.sleep(CephadmDaemon.loop_delay)
- ctr += CephadmDaemon.loop_delay
- logger.info('list-daemons thread stopped')
-
- def _create_thread(self, target: Any, name: str, refresh_interval: Optional[int] = None) -> Thread:
- if refresh_interval:
- t = Thread(target=target, args=(refresh_interval,))
- else:
- t = Thread(target=target)
- t.daemon = True
- t.name = name
- self.cephadm_cache.update_health(name, 'active')
- t.start()
-
- start_msg = f'Started {name} thread'
- if refresh_interval:
- logger.info(f'{start_msg}, with a refresh interval of {refresh_interval}s')
- else:
- logger.info(f'{start_msg}')
- return t
-
- def reload(self, *args: Any) -> None:
- """reload -HUP received
-
- This is a placeholder function only, and serves to provide the hook that could
- be exploited later if the exporter evolves to incorporate a config file
- """
- logger.info('Reload request received - ignoring, no action needed')
-
- def shutdown(self, *args: Any) -> None:
- logger.info('Shutdown request received')
- self.stop = True
- self.http_server.shutdown()
-
- def run(self) -> None:
- logger.info(f"cephadm exporter starting for FSID '{self.fsid}'")
- if not self.can_run:
- logger.error('Unable to start the exporter daemon')
- for e in self.errors:
- logger.error(e)
- return
-
- # register signal handlers for running under systemd control
- signal.signal(signal.SIGTERM, self.shutdown)
- signal.signal(signal.SIGINT, self.shutdown)
- signal.signal(signal.SIGHUP, self.reload)
- logger.debug('Signal handlers attached')
-
- host_facts = self._create_thread(self._scrape_host_facts, 'host', 5)
- self.workers.append(host_facts)
-
- daemons = self._create_thread(self._scrape_list_daemons, 'daemons', 20)
- self.workers.append(daemons)
-
- disks = self._create_thread(self._scrape_ceph_volume, 'disks', 20)
- self.workers.append(disks)
-
- self.http_server = CephadmHTTPServer(('0.0.0.0', self.port), CephadmDaemonHandler) # IPv4 only
- self.http_server.socket = ssl.wrap_socket(self.http_server.socket,
- keyfile=os.path.join(self.daemon_path, CephadmDaemon.key_name),
- certfile=os.path.join(self.daemon_path, CephadmDaemon.crt_name),
- server_side=True)
-
- self.http_server.cephadm_cache = self.cephadm_cache
- self.http_server.token = self.token
- server_thread = self._create_thread(self.http_server.serve_forever, 'http_server')
- logger.info(f'https server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}')
-
- ctr = 0
- while server_thread.is_alive():
- if self.stop:
- break
-
- if ctr >= CephadmDaemon.thread_check_interval:
- ctr = 0
- for worker in self.workers:
- if self.cephadm_cache.tasks[worker.name] == 'inactive':
- continue
- if not worker.is_alive():
- logger.warning(f'{worker.name} thread not running')
- stop_time = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
- self.cephadm_cache.update_health(worker.name, 'inactive', f'{worker.name} stopped at {stop_time}')
-
- time.sleep(CephadmDaemon.loop_delay)
- ctr += CephadmDaemon.loop_delay
-
- logger.info('Main http server thread stopped')
-
- @property
- def unit_run(self) -> str:
-
- return """set -e
-{py3} {bin_path} exporter --fsid {fsid} --id {daemon_id} --port {port} &""".format(
- py3=shutil.which('python3'),
- bin_path=self.binary_path,
- fsid=self.fsid,
- daemon_id=self.daemon_id,
- port=self.port
- )
-
- @property
- def unit_file(self) -> str:
- docker = isinstance(self.ctx.container_engine, Docker)
- return """#generated by cephadm
-[Unit]
-Description=cephadm exporter service for cluster {fsid}
-After=network-online.target{docker_after}
-Wants=network-online.target
-{docker_requires}
-
-PartOf=ceph-{fsid}.target
-Before=ceph-{fsid}.target
-
-[Service]
-Type=forking
-ExecStart=/bin/bash {daemon_path}/unit.run
-ExecReload=/bin/kill -HUP $MAINPID
-Restart=on-failure
-RestartSec=10s
-
-[Install]
-WantedBy=ceph-{fsid}.target
-""".format(fsid=self.fsid,
- daemon_path=self.daemon_path,
- # if docker, we depend on docker.service
- docker_after=' docker.service' if docker else '',
- docker_requires='Requires=docker.service\n' if docker else '')
-
- def deploy_daemon_unit(self, config: Optional[dict] = None) -> None:
- """deploy a specific unit file for cephadm
-
- The normal deploy_daemon_units doesn't apply for this
- daemon since it's not a container, so we just create a
- simple service definition and add it to the fsid's target
- """
- if not config:
- raise Error('Attempting to deploy cephadm daemon without a config')
- assert isinstance(config, dict)
-
- # Create the required config files in the daemons dir, with restricted permissions
- for filename in config:
- with open(os.open(os.path.join(self.daemon_path, filename), os.O_CREAT | os.O_WRONLY, mode=0o600), 'w') as f:
- f.write(config[filename])
-
- # When __file__ is <stdin> we're being invoked over asyncssh via the orchestrator, so
- # we pick up the file from where the orchestrator placed it - otherwise we'll
- # copy it to the binary location for this cluster
- if not __file__ == '<stdin>':
- try:
- shutil.copy(__file__,
- self.binary_path)
- except shutil.SameFileError:
- pass
-
- with open(os.path.join(self.daemon_path, 'unit.run'), 'w') as f:
- f.write(self.unit_run)
-
- with open(
- os.path.join(self.ctx.unit_dir,
- f'{self.unit_name}.new'),
- 'w'
- ) as f:
- f.write(self.unit_file)
- os.rename(
- os.path.join(self.ctx.unit_dir, f'{self.unit_name}.new'),
- os.path.join(self.ctx.unit_dir, self.unit_name))
-
- call_throws(self.ctx, ['systemctl', 'daemon-reload'])
- call(self.ctx, ['systemctl', 'stop', self.unit_name],
- verbosity=CallVerbosity.DEBUG)
- call(self.ctx, ['systemctl', 'reset-failed', self.unit_name],
- verbosity=CallVerbosity.DEBUG)
- call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name])
-
- @classmethod
- def uninstall(cls, ctx: CephadmContext, fsid: str, daemon_type: str, daemon_id: str) -> None:
- unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
- unit_path = os.path.join(ctx.unit_dir, unit_name)
- unit_run = os.path.join(ctx.data_dir, fsid, f'{daemon_type}.{daemon_id}', 'unit.run')
- port = None
- try:
- with open(unit_run, 'r') as u:
- contents = u.read().strip(' &')
- except OSError:
- logger.warning(f'Unable to access the unit.run file @ {unit_run}')
- return
-
- port = None
- for line in contents.split('\n'):
- if '--port ' in line:
- try:
- port = int(line.split('--port ')[-1])
- except ValueError:
- logger.warning('Unexpected format in unit.run file: port is not numeric')
- logger.warning('Unable to remove the systemd file and close the port')
- return
- break
-
- if port:
- fw = Firewalld(ctx)
- try:
- fw.close_ports([port])
- except RuntimeError:
- logger.error(f'Unable to close port {port}')
-
- stdout, stderr, rc = call(ctx, ['rm', '-f', unit_path])
- if rc:
- logger.error(f'Unable to remove the systemd file @ {unit_path}')
- else:
- logger.info(f'removed systemd unit file @ {unit_path}')
- stdout, stderr, rc = call(ctx, ['systemctl', 'daemon-reload'])
-
-
-def command_exporter(ctx: CephadmContext) -> None:
- exporter = CephadmDaemon(ctx, ctx.fsid, daemon_id=ctx.id, port=ctx.port)
-
- if ctx.fsid not in os.listdir(ctx.data_dir):
- raise Error(f"cluster fsid '{ctx.fsid}' not found in '{ctx.data_dir}'")
-
- exporter.run()
-
-##################################
-
-
def systemd_target_state(ctx: CephadmContext, target_name: str, subsystem: str = 'ceph') -> bool:
# TODO: UNITTEST
return os.path.exists(
action='store_true',
default=CONTAINER_INIT,
help=argparse.SUPPRESS)
- parser_bootstrap.add_argument(
- '--with-exporter',
- action='store_true',
- help='Automatically deploy cephadm metadata exporter to each node')
- parser_bootstrap.add_argument(
- '--exporter-config',
- action=CustomValidation,
- help=f'Exporter configuration information in JSON format (providing: {", ".join(CephadmDaemon.config_requirements)}, port information)')
parser_bootstrap.add_argument(
'--cluster-network',
help='subnet to use for cluster replication, recovery and heartbeats (in CIDR notation network/mask)')
'gather-facts', help='gather and return host related information (JSON format)')
parser_gather_facts.set_defaults(func=command_gather_facts)
- parser_exporter = subparsers.add_parser(
- 'exporter', help='Start cephadm in exporter mode (web service), providing host/daemon/disk metadata')
- parser_exporter.add_argument(
- '--fsid',
- required=True,
- type=str,
- help='fsid of the cephadm exporter to run against')
- parser_exporter.add_argument(
- '--port',
- type=int,
- default=int(CephadmDaemon.default_port),
- help='port number for the cephadm exporter service')
- parser_exporter.add_argument(
- '--id',
- type=str,
- default=get_hostname().split('.')[0],
- help='daemon identifer for the exporter')
- parser_exporter.set_defaults(func=command_exporter)
-
parser_maintenance = subparsers.add_parser(
'host-maintenance', help='Manage the maintenance state of a host')
parser_maintenance.add_argument(
time.sleep(1)
if not t.is_alive():
obj.cephadm_cache.update_health('host', "inactive", "host thread stopped")
-
-
-@pytest.fixture
-def exporter():
- with mock.patch('cephadm.CephadmDaemon.daemon_path', _daemon_path()), \
- mock.patch('cephadm.CephadmDaemon.can_run', return_value=True), \
- mock.patch('cephadm.CephadmDaemon.run', _mock_run), \
- mock.patch('cephadm.CephadmDaemon._scrape_host_facts', _mock_scrape_host):
-
- ctx = cd.CephadmContext()
- exporter = cd.CephadmDaemon(ctx, fsid='foobar', daemon_id='test')
- assert exporter.token == 'MyAccessToken'
- yield exporter
@pytest.fixture()
import threading
import unittest
-from http.server import HTTPServer
from textwrap import dedent
-from urllib.request import Request, urlopen
-from urllib.error import HTTPError
from typing import List, Optional
from .fixtures import (
cephadm_fs,
- exporter,
mock_docker,
mock_podman,
with_cephadm_ctx,
])
-class TestCephadmExporter(object):
- exporter: cd.CephadmDaemon
- files_created: List[str] = []
- crt = """-----BEGIN CERTIFICATE-----
-MIIC1zCCAb8CEFHoZE2MfUVzo53fzzBKAT0wDQYJKoZIhvcNAQENBQAwKjENMAsG
-A1UECgwEQ2VwaDEZMBcGA1UECwwQY2VwaGFkbS1leHBvcnRlcjAeFw0yMDExMjUy
-MzEwNTVaFw0zMDExMjMyMzEwNTVaMCoxDTALBgNVBAoMBENlcGgxGTAXBgNVBAsM
-EGNlcGhhZG0tZXhwb3J0ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
-AQCsTfcJcXbREqfx1zTUuEmK+lJn9WWjk0URRF1Z+QgPkascNdkX16PnvhbGwXmF
-BTdAcNl7V0U+z4EsGJ7hJsB7qTq6Rb6wNl7r0OxjeWOmB9xbF4Q/KR5yrbM1DA9A
-B5fNswrUXViku5Y2jlOAz+ZMBhYxMx0edqhxSn297j04Z6RF4Mvkc43v0FH7Ju7k
-O5+0VbdzcOdu37DFpoE4Ll2MZ/GuAHcJ8SD06sEdzFEjRCraav976743XcUlhZGX
-ZTTG/Zf/a+wuCjtMG3od7vRFfuRrM5oTE133DuQ5deR7ybcZNDyopDjHF8xB1bAk
-IOz4SbP6Q25K99Czm1K+3kMLAgMBAAEwDQYJKoZIhvcNAQENBQADggEBACmtvZb8
-dJGHx/WC0/JHxnEJCJM2qnn87ELzbbIQL1w1Yb/I6JQYPgq+WiQPaHaLL9eYsm0l
-dFwvrh+WC0JpXDfADnUnkTSB/WpZ2nC+2JxBptrQEuIcqNXpcJd0bKDiHunv04JI
-uEVpTAK05dBV38qNmIlu4HyB4OEnuQpyOr9xpIhdxuJ95O9K0j5BIw98ZaEwYNUP
-Rm3YlQwfS6R5xaBvL9kyfxyAD2joNj44q6w/5zj4egXVIA5VpkQm8DmMtu0Pd2NG
-dzfYRmqrDolh+rty8HiyIxzeDJQ5bj6LKbUkmABvX50nDySVyMfHmt461/n7W65R
-CHFLoOmfJJik+Uc=\n-----END CERTIFICATE-----
-"""
- key = """-----BEGIN PRIVATE KEY-----
-MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCsTfcJcXbREqfx
-1zTUuEmK+lJn9WWjk0URRF1Z+QgPkascNdkX16PnvhbGwXmFBTdAcNl7V0U+z4Es
-GJ7hJsB7qTq6Rb6wNl7r0OxjeWOmB9xbF4Q/KR5yrbM1DA9AB5fNswrUXViku5Y2
-jlOAz+ZMBhYxMx0edqhxSn297j04Z6RF4Mvkc43v0FH7Ju7kO5+0VbdzcOdu37DF
-poE4Ll2MZ/GuAHcJ8SD06sEdzFEjRCraav976743XcUlhZGXZTTG/Zf/a+wuCjtM
-G3od7vRFfuRrM5oTE133DuQ5deR7ybcZNDyopDjHF8xB1bAkIOz4SbP6Q25K99Cz
-m1K+3kMLAgMBAAECggEASnAwToMXWsGdjqxzpYasNv9oBIOO0nk4OHp5ffpJUjiT
-XM+ip1tA80g7HMjPD/mt4gge3NtaDgWlf4Bve0O7mnEE7x5cgFIs9eG/jkYOF9eD
-ilMBjivcfJywNDWujPH60iIMhqyBNEHaZl1ck+S9UJC8m6rCZLvMj40n/5riFfBy
-1sjf2uOwcfWrjSj9Ju4wlMI6khSSz2aYC7glQQ/fo2+YArbEUcy60iloPQ6wEgZK
-okoVWZA9AehwLcnRjkwd9EVmMMtRGPE/AcP4s/kKA0tRDRicPLN727Ke/yxv+Ppo
-hbIZIcOn7soOFAENcodJ4YRSCd++QfCNaVAi7vwWWQKBgQDeBY4vvr+H0brbSjQg
-O7Fpqub/fxZY3UoHWDqWs2X4o3qhDqaTQODpuYtCm8YQE//55JoLWKAD0evq5dLS
-YLrtC1Vyxf+TA7opCUjWBe+liyndbJdB5q0zF7qdWUtQKGVSWyUWhK8gHa6M64fP
-oi83DD7F0OGusTWGtfbceErk/wKBgQDGrJLRo/5xnAH5VmPfNu+S6h0M2qM6CYwe
-Y5wHFG2uQQct73adf53SkhvZVmOzJsWQbVnlDOKMhqazcs+7VWRgO5X3naWVcctE
-Hggw9MgpbXAWFOI5sNYsCYE58E+fTHjE6O4A3MhMCsze+CIC3sKuPQBBiL9bWSOX
-8POswqfl9QKBgDe/nVxPwTgRaaH2l/AgDQRDbY1qE+psZlJBzTRaB5jPM9ONIjaH
-a/JELLuk8a7H1tagmC2RK1zKMTriSnWY5FbxKZuQLAR2QyBavHdBNlOTBggbZD+f
-9I2Hv8wSx95wxkBPsphc6Lxft5ya55czWjewU3LIaGK9DHuu5TWm3udxAoGBAJGP
-PsJ59KIoOwoDUYjpJv3sqPwR9CVBeXeKY3aMcQ+KdUgiejVKmsb8ZYsG0GUhsv3u
-ID7BAfsTbG9tXuVR2wjmnymcRwUHKnXtyvKTZVN06vpCsryx4zjAff2FI9ECpjke
-r8HSAK41+4QhKEoSC3C9IMLi/dBfrsRTtTSOKZVBAoGBAI2dl5HEIFpufaI4toWM
-LO5HFrlXgRDGoc/+Byr5/8ZZpYpU115Ol/q6M+l0koV2ygJ9jeJJEllFWykIDS6F
-XxazFI74swAqobHb2ZS/SLhoVxE82DdSeXrjkTvUjNtrW5zs1gIMKBR4nD6H8AqL
-iMN28C2bKGao5UHvdER1rGy7
------END PRIVATE KEY-----
-"""
- token = "MyAccessToken"
-
- @classmethod
- def setup_class(cls):
- # create the ssl files
- fname = os.path.join(os.getcwd(), 'crt')
- with open(fname, 'w') as crt:
- crt.write(cls.crt)
- cls.files_created.append(fname)
- fname = os.path.join(os.getcwd(), 'key')
- with open(fname, 'w') as crt:
- crt.write(cls.key)
- cls.files_created.append(fname)
- fname = os.path.join(os.getcwd(), 'token')
- with open(fname, 'w') as crt:
- crt.write(cls.token)
- cls.files_created.append(fname)
- # start a simple http instance to test the requesthandler
- cls.server = HTTPServer(('0.0.0.0', 9443), cd.CephadmDaemonHandler)
- cls.server.cephadm_cache = cd.CephadmCache()
- cls.server.token = cls.token
- t = threading.Thread(target=cls.server.serve_forever)
- t.daemon = True
- t.start()
-
- @classmethod
- def teardown_class(cls):
- cls.server.shutdown()
- assert len(cls.files_created) > 0
- for f in cls.files_created:
- os.remove(f)
-
- def setup_method(self):
- # re-init the cache for every test
- TestCephadmExporter.server.cephadm_cache = cd.CephadmCache()
-
- def teardown_method(self):
- pass
-
- def test_files_ready(self):
- assert os.path.exists(os.path.join(os.getcwd(), 'crt'))
- assert os.path.exists(os.path.join(os.getcwd(), 'key'))
- assert os.path.exists(os.path.join(os.getcwd(), 'token'))
-
- def test_can_run(self, exporter):
- assert exporter.can_run
-
- def test_token_valid(self, exporter):
- assert exporter.token == self.token
-
- def test_unit_name(self,exporter):
- assert exporter.unit_name
- assert exporter.unit_name == "ceph-foobar-cephadm-exporter.test.service"
-
- def test_unit_run(self,exporter):
- assert exporter.unit_run
- lines = exporter.unit_run.split('\n')
- assert len(lines) == 2
- assert "cephadm exporter --fsid foobar --id test --port 9443 &" in lines[1]
-
- def test_binary_path(self, exporter):
- assert os.path.isfile(exporter.binary_path)
-
- def test_systemd_unit(self, exporter):
- assert exporter.unit_file
-
- def test_validate_passes(self, exporter):
- config = {
- "crt": self.crt,
- "key": self.key,
- "token": self.token,
- }
- cd.CephadmDaemon.validate_config(config)
-
- def test_validate_fails(self, exporter):
- config = {
- "key": self.key,
- "token": self.token,
- }
- with pytest.raises(cd.Error):
- cd.CephadmDaemon.validate_config(config)
-
- def test_port_active(self, exporter):
- assert exporter.port_active == True
-
- def test_rqst_health_200(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs)
- r = urlopen(req)
- assert r.status == 200
-
- def test_rqst_all_inactive_500(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata",headers=hdrs)
- try:
- r = urlopen(req)
- except HTTPError as e:
- assert e.code == 500
-
- def test_rqst_no_auth_401(self):
- req=Request("http://localhost:9443/v1/metadata")
- try:
- urlopen(req)
- except HTTPError as e:
- assert e.code == 401
-
- def test_rqst_bad_auth_401(self):
- hdrs={"Authorization":f"Bearer BogusAuthToken"}
- req=Request("http://localhost:9443/v1/metadata",headers=hdrs)
- try:
- urlopen(req)
- except HTTPError as e:
- assert e.code == 401
-
- def test_rqst_badURL_404(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metazoic",headers=hdrs)
- try:
- urlopen(req)
- except HTTPError as e:
- assert e.code == 404
-
- def test_rqst_inactive_task_204(self):
- # all tasks initialise as inactive, and then 'go' active as their thread starts
- # so we can pick any task to check for an inactive response (no content)
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/disks",headers=hdrs)
- r = urlopen(req)
- assert r.status == 204
-
- def test_rqst_active_task_200(self):
- TestCephadmExporter.server.cephadm_cache.tasks['host'] = 'active'
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/host",headers=hdrs)
- r = urlopen(req)
- assert r.status == 200
-
- def test_rqst_all_206(self):
- TestCephadmExporter.server.cephadm_cache.tasks['disks'] = 'active'
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata",headers=hdrs)
- r = urlopen(req)
- assert r.status == 206
-
- def test_rqst_disks_200(self):
- TestCephadmExporter.server.cephadm_cache.tasks['disks'] = 'active'
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/disks",headers=hdrs)
- r = urlopen(req)
- assert r.status == 200
-
- def test_thread_exception(self, exporter):
- # run is patched to invoke a mocked scrape_host thread that will raise so
- # we check here that the exception handler updates the cache object as we'd
- # expect with the error
- exporter.run()
- assert exporter.cephadm_cache.host['scrape_errors']
- assert exporter.cephadm_cache.host['scrape_errors'] == ['ValueError exception: wah']
- assert exporter.cephadm_cache.errors == ['host thread stopped']
-
- # Test the requesthandler does the right thing with invalid methods...
- # ie. return a "501" - Not Implemented / Unsupported Method
- def test_invalid_method_HEAD(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="HEAD")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_DELETE(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="DELETE")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_POST(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="POST")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_PUT(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="PUT")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_CONNECT(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="CONNECT")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_TRACE(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="TRACE")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_OPTIONS(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="OPTIONS")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_invalid_method_PATCH(self):
- hdrs={"Authorization":f"Bearer {TestCephadmExporter.token}"}
- req=Request("http://localhost:9443/v1/metadata/health",headers=hdrs, method="PATCH")
- with pytest.raises(HTTPError, match=r"HTTP Error 501: .*") as e:
- urlopen(req)
-
- def test_ipv4_subnet(self):
- rc, v, msg = cd.check_subnet('192.168.1.0/24')
- assert rc == 0 and v[0] == 4
-
- def test_ipv4_subnet_list(self):
- rc, v, msg = cd.check_subnet('192.168.1.0/24,10.90.90.0/24')
- assert rc == 0 and not msg
-
- def test_ipv4_subnet_badlist(self):
- rc, v, msg = cd.check_subnet('192.168.1.0/24,192.168.1.1')
- assert rc == 1 and msg
-
- def test_ipv4_subnet_mixed(self):
- rc, v, msg = cd.check_subnet('192.168.100.0/24,fe80::/64')
- assert rc == 0 and v == [4,6]
-
- def test_ipv6_subnet(self):
- rc, v, msg = cd.check_subnet('fe80::/64')
- assert rc == 0 and v[0] == 6
-
- def test_subnet_mask_missing(self):
- rc, v, msg = cd.check_subnet('192.168.1.58')
- assert rc == 1 and msg
-
- def test_subnet_mask_junk(self):
- rc, v, msg = cd.check_subnet('wah')
- assert rc == 1 and msg
-
-
class TestMaintenance:
systemd_target = "ceph.00000000-0000-0000-0000-000000c0ffee.target"
fsid = '0ea8cdd0-1bbf-11ec-a9c7-5254002763fa'
from mgr_module import MgrModule, HandleCommandResult, Option
-from mgr_util import create_self_signed_cert
-import secrets
import orchestrator
from orchestrator.module import to_format, Format
from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
-from .services.exporter import CephadmExporter, CephadmExporterConfig
from .schedule import HostAssignment
from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
from .upgrade import CephadmUpgrade
# ------------------------------------------------------------------------------
-def service_inactive(spec_name: str) -> Callable:
- def inner(func: Callable) -> Callable:
- @wraps(func)
- def wrapper(*args: Any, **kwargs: Any) -> Any:
- obj = args[0]
- if obj.get_store(f"spec.{spec_name}") is not None:
- return 1, "", f"Unable to change configuration of an active service {spec_name}"
- return func(*args, **kwargs)
- return wrapper
- return inner
-
-
def host_exists(hostname_position: int = 1) -> Callable:
"""Check that a hostname exists in the inventory"""
def inner(func: Callable) -> Callable:
OSDService, NFSService, MonService, MgrService, MdsService,
RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
PrometheusService, NodeExporterService, CrashService, IscsiService,
- IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService,
+ IngressService, CustomContainerService, CephfsMirrorService,
CephadmAgent
]
suffix = daemon_type not in [
'mon', 'crash',
'prometheus', 'node-exporter', 'grafana', 'alertmanager',
- 'container', 'cephadm-exporter', 'agent'
+ 'container', 'agent'
]
if forcename:
if len([d for d in existing if d.daemon_id == forcename]):
"""
return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
- def _set_exporter_config(self, config: Dict[str, str]) -> None:
- self.set_store('exporter_config', json.dumps(config))
-
- def _get_exporter_config(self) -> Dict[str, str]:
- cfg_str = self.get_store('exporter_config')
- return json.loads(cfg_str) if cfg_str else {}
-
- def _set_exporter_option(self, option: str, value: Optional[str] = None) -> None:
- kv_option = f'exporter_{option}'
- self.set_store(kv_option, value)
-
- def _get_exporter_option(self, option: str) -> Optional[str]:
- kv_option = f'exporter_{option}'
- return self.get_store(kv_option)
-
- @orchestrator._cli_write_command(
- prefix='cephadm generate-exporter-config')
- @service_inactive('cephadm-exporter')
- def _generate_exporter_config(self) -> Tuple[int, str, str]:
- """
- Generate default SSL crt/key and token for cephadm exporter daemons
- """
- self._set_exporter_defaults()
- self.log.info('Default settings created for cephadm exporter(s)')
- return 0, "", ""
-
- def _set_exporter_defaults(self) -> None:
- crt, key = self._generate_exporter_ssl()
- token = self._generate_exporter_token()
- self._set_exporter_config({
- "crt": crt,
- "key": key,
- "token": token,
- "port": CephadmExporterConfig.DEFAULT_PORT
- })
- self._set_exporter_option('enabled', 'true')
-
- def _generate_exporter_ssl(self) -> Tuple[str, str]:
- return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"})
-
- def _generate_exporter_token(self) -> str:
- return secrets.token_hex(32)
-
- @orchestrator._cli_write_command(
- prefix='cephadm clear-exporter-config')
- @service_inactive('cephadm-exporter')
- def _clear_exporter_config(self) -> Tuple[int, str, str]:
- """
- Clear the SSL configuration used by cephadm exporter daemons
- """
- self._clear_exporter_config_settings()
- self.log.info('Cleared cephadm exporter configuration')
- return 0, "", ""
-
- def _clear_exporter_config_settings(self) -> None:
- self.set_store('exporter_config', None)
- self._set_exporter_option('enabled', None)
-
- @orchestrator._cli_write_command(
- prefix='cephadm set-exporter-config')
- @service_inactive('cephadm-exporter')
- def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
- """
- Set custom cephadm-exporter configuration from a json file (-i <file>). JSON must contain crt, key, token and port
- """
- if not inbuf:
- return 1, "", "JSON configuration has not been provided (-i <filename>)"
-
- cfg = CephadmExporterConfig(self)
- rc, reason = cfg.load_from_json(inbuf)
- if rc:
- return 1, "", reason
-
- rc, reason = cfg.validate_config()
- if rc:
- return 1, "", reason
-
- self._set_exporter_config({
- "crt": cfg.crt,
- "key": cfg.key,
- "token": cfg.token,
- "port": cfg.port
- })
- self.log.info("Loaded and verified the TLS configuration")
- return 0, "", ""
-
- @orchestrator._cli_read_command(
- 'cephadm get-exporter-config')
- def _show_exporter_config(self) -> Tuple[int, str, str]:
- """
- Show the current cephadm-exporter configuraion (JSON)'
- """
- cfg = self._get_exporter_config()
- return 0, json.dumps(cfg, indent=2), ""
-
@orchestrator._cli_read_command('cephadm config-check ls')
def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
"""List the available configuration checks and their current state"""
'node-exporter': PlacementSpec(host_pattern='*'),
'crash': PlacementSpec(host_pattern='*'),
'container': PlacementSpec(count=1),
- 'cephadm-exporter': PlacementSpec(host_pattern='*'),
}
spec.placement = defaults[spec.service_type]
elif spec.service_type in ['mon', 'mgr'] and \
def apply_container(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @handle_orch_error
- def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
- return self._apply(spec)
-
@handle_orch_error
def upgrade_check(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
if spec.ports:
ports.extend(spec.ports)
- if daemon_spec.daemon_type == 'cephadm-exporter':
- if not reconfig:
- assert daemon_spec.host
- self._deploy_cephadm_binary(daemon_spec.host)
-
# TCP port to open in the host firewall
if len(ports) > 0:
daemon_spec.extra_args.extend([
self.log.debug(f"_run_cephadm : command = {command}")
self.log.debug(f"_run_cephadm : args = {args}")
- bypass_image = ('cephadm-exporter', 'agent')
+ bypass_image = ('agent')
assert image or entity
# Skip the image check for daemons deployed that are not ceph containers
+++ /dev/null
-import json
-import logging
-from typing import TYPE_CHECKING, List, Dict, Any, Tuple
-
-from orchestrator import OrchestratorError
-from mgr_util import ServerConfigException, verify_tls
-
-from .cephadmservice import CephadmService, CephadmDaemonDeploySpec
-
-if TYPE_CHECKING:
- from cephadm.module import CephadmOrchestrator
-
-logger = logging.getLogger(__name__)
-
-
-class CephadmExporterConfig:
- required_keys = ['crt', 'key', 'token', 'port']
- DEFAULT_PORT = '9443'
-
- def __init__(self, mgr, crt="", key="", token="", port=""):
- # type: (CephadmOrchestrator, str, str, str, str) -> None
- self.mgr = mgr
- self.crt = crt
- self.key = key
- self.token = token
- self.port = port
-
- @property
- def ready(self) -> bool:
- return all([self.crt, self.key, self.token, self.port])
-
- def load_from_store(self) -> None:
- cfg = self.mgr._get_exporter_config()
-
- assert isinstance(cfg, dict)
- self.crt = cfg.get('crt', "")
- self.key = cfg.get('key', "")
- self.token = cfg.get('token', "")
- self.port = cfg.get('port', "")
-
- def load_from_json(self, json_str: str) -> Tuple[int, str]:
- try:
- cfg = json.loads(json_str)
- except ValueError:
- return 1, "Invalid JSON provided - unable to load"
-
- if not all([k in cfg for k in CephadmExporterConfig.required_keys]):
- return 1, "JSON file must contain crt, key, token and port"
-
- self.crt = cfg.get('crt')
- self.key = cfg.get('key')
- self.token = cfg.get('token')
- self.port = cfg.get('port')
-
- return 0, ""
-
- def validate_config(self) -> Tuple[int, str]:
- if not self.ready:
- return 1, "Incomplete configuration. cephadm-exporter needs crt, key, token and port to be set"
-
- for check in [self._validate_tls, self._validate_token, self._validate_port]:
- rc, reason = check()
- if rc:
- return 1, reason
-
- return 0, ""
-
- def _validate_tls(self) -> Tuple[int, str]:
-
- try:
- verify_tls(self.crt, self.key)
- except ServerConfigException as e:
- return 1, str(e)
-
- return 0, ""
-
- def _validate_token(self) -> Tuple[int, str]:
- if not isinstance(self.token, str):
- return 1, "token must be a string"
- if len(self.token) < 8:
- return 1, "Token must be a string of at least 8 chars in length"
-
- return 0, ""
-
- def _validate_port(self) -> Tuple[int, str]:
- try:
- p = int(str(self.port))
- if p <= 1024:
- raise ValueError
- except ValueError:
- return 1, "Port must be a integer (>1024)"
-
- return 0, ""
-
-
-class CephadmExporter(CephadmService):
- TYPE = 'cephadm-exporter'
-
- def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
- assert self.TYPE == daemon_spec.daemon_type
-
- cfg = CephadmExporterConfig(self.mgr)
- cfg.load_from_store()
-
- if cfg.ready:
- rc, reason = cfg.validate_config()
- if rc:
- raise OrchestratorError(reason)
- else:
- logger.info(
- "Incomplete/Missing configuration, applying defaults")
- self.mgr._set_exporter_defaults()
- cfg.load_from_store()
-
- if not daemon_spec.ports:
- daemon_spec.ports = [int(cfg.port)]
-
- daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
-
- return daemon_spec
-
- def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
- assert self.TYPE == daemon_spec.daemon_type
- deps: List[str] = []
-
- cfg = CephadmExporterConfig(self.mgr)
- cfg.load_from_store()
-
- if cfg.ready:
- rc, reason = cfg.validate_config()
- if rc:
- raise OrchestratorError(reason)
- else:
- logger.info("Using default configuration for cephadm-exporter")
- self.mgr._set_exporter_defaults()
- cfg.load_from_store()
-
- config = {
- "crt": cfg.crt,
- "key": cfg.key,
- "token": cfg.token
- }
- return config, deps
-
- def purge(self, service_name: str) -> None:
- logger.info("Purging cephadm-exporter settings from mon K/V store")
- self.mgr._clear_exporter_config_settings()
ServiceSpec('mds', service_id='fsname'),
RGWSpec(rgw_realm='realm', rgw_zone='zone'),
RGWSpec(service_id="foo"),
- ServiceSpec('cephadm-exporter'),
]
)
@mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
envs=['SECRET=password'],
ports=[8080, 8443]
), CephadmOrchestrator.apply_container),
- (ServiceSpec('cephadm-exporter'), CephadmOrchestrator.apply_cephadm_exporter),
]
)
@mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
from cephadm.services.osd import OSDService
from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
-from cephadm.services.exporter import CephadmExporter
from ceph.deployment.service_spec import IscsiServiceSpec
from orchestrator import OrchestratorError
node_exporter_service = NodeExporterService(mgr)
crash_service = CrashService(mgr)
iscsi_service = IscsiService(mgr)
- cephadm_exporter_service = CephadmExporter(mgr)
cephadm_services = {
'mon': mon_service,
'mgr': mgr_service,
'node-exporter': node_exporter_service,
'crash': crash_service,
'iscsi': iscsi_service,
- 'cephadm-exporter': cephadm_exporter_service,
}
return cephadm_services
# services based on CephadmService shouldn't have get_auth_entity
with pytest.raises(AttributeError):
- for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter', 'cephadm-exporter']:
+ for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter']:
cephadm_services[daemon_type].get_auth_entity("id1", "host")
cephadm_services[daemon_type].get_auth_entity("id1", "")
cephadm_services[daemon_type].get_auth_entity("id1")
True
),
- (
- # daemon_id only contains hostname
- ServiceSpec(
- service_type='cephadm-exporter',
- ),
- DaemonDescription(
- daemon_type='cephadm-exporter',
- daemon_id="testhost",
- hostname="testhost",
- ),
- True
- ),
])
def test_daemon_description_service_name(spec: ServiceSpec,
dd: DaemonDescription,
'rgw': self.apply_rgw,
'ingress': self.apply_ingress,
'host': self.add_host,
- 'cephadm-exporter': self.apply_cephadm_exporter,
}
def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
- """Update an existing cephadm exporter daemon"""
- raise NotImplementedError()
-
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
'crash': 'crash',
'crashcollector': 'crash', # Specific Rook Daemon
'container': 'container',
- 'cephadm-exporter': 'cephadm-exporter',
'agent': 'agent'
}
return mapping[dtype]
'node-exporter': ['node-exporter'],
'crash': ['crash'],
'container': ['container'],
- 'cephadm-exporter': ['cephadm-exporter'],
'agent': ['agent']
}
return mapping[stype]
rgw = 'rgw'
nfs = 'nfs'
iscsi = 'iscsi'
- cephadm_exporter = 'cephadm-exporter'
class ServiceAction(enum.Enum):
"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw agent ' \
- 'container cephadm-exporter ingress cephfs-mirror'.split()
+ 'container ingress cephfs-mirror'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',