import logging
+from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, List, Callable, Any
from mgr_module import MonCommandFailed
logger = logging.getLogger(__name__)
-class CephadmService:
+class CephadmService(metaclass=ABCMeta):
"""
Base class for service types. Often providing a create() and config() fn.
"""
+
+ @property
+ @abstractmethod
+ def TYPE(self):
+ pass
+
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
+
+ def ok_to_stop(self, daemon_ids: List[str]) -> bool:
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+
+ if self.TYPE not in ['mon', 'osd', 'mds']:
+ logger.info('Upgrade: It is presumed safe to stop %s' % names)
+ return True
+
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': f'{self.TYPE} ok-to-stop',
+ 'ids': daemon_ids,
+ })
+
+ if ret:
+ logger.info(f'It is NOT safe to stop {names}: {err}')
+ return False
+
+ return True
+
+
class MonService(CephadmService):
+ TYPE = 'mon'
+
def create(self, name, host, network):
"""
Create a new monitor on the given host.
class MgrService(CephadmService):
+ TYPE = 'mgr'
+
def create(self, mgr_id, host):
"""
Create a new manager instance on a host.
class MdsService(CephadmService):
+ TYPE = 'mds'
+
def config(self, spec: ServiceSpec):
# ensure mds_join_fs is set for these daemons
assert spec.service_id
class RgwService(CephadmService):
+ TYPE = 'rgw'
+
def config(self, spec: RGWSpec):
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mgr.check_mon_command({
class RbdMirrorService(CephadmService):
+ TYPE = 'rbd-mirror'
+
def create(self, daemon_id, host) -> str:
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
class CrashService(CephadmService):
+ TYPE = 'crash'
+
def create(self, daemon_id, host) -> str:
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
class IscsiService(CephadmService):
+ TYPE = 'iscsi'
+
def config(self, spec: IscsiServiceSpec):
self.mgr._check_pool_exists(spec.pool, spec.service_name())
logger = logging.getLogger(__name__)
class GrafanaService(CephadmService):
+ TYPE = 'grafana'
DEFAULT_SERVICE_PORT = 3000
def create(self, daemon_id, host):
)
class AlertmanagerService(CephadmService):
+ TYPE = 'alertmanager'
DEFAULT_SERVICE_PORT = 9093
def create(self, daemon_id, host) -> str:
class PrometheusService(CephadmService):
+ TYPE = 'prometheus'
DEFAULT_SERVICE_PORT = 9095
def create(self, daemon_id, host) -> str:
)
class NodeExporterService(CephadmService):
+ TYPE = 'node-exporter'
+
def create(self, daemon_id, host) -> str:
return self.mgr._create_daemon('node-exporter', daemon_id, host)
class NFSService(CephadmService):
+ TYPE = 'nfs'
+
def _generate_nfs_config(self, daemon_type, daemon_id, host):
# type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
deps = [] # type: List[str]
class OSDService(CephadmService):
+ TYPE = 'osd'
+
def create(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
ret = []
from unittest.mock import MagicMock
-from cephadm.services.cephadmservice import CephadmService
+from cephadm.services.monitoring import GrafanaService
class FakeMgr:
# pylint: disable=protected-access
mgr = FakeMgr()
service_url = 'http://svc:1000'
- service = CephadmService(mgr)
+ service = GrafanaService(mgr)
service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
assert mgr.config == service_url
import orchestrator
from cephadm.utils import name_to_config_section
-from orchestrator import OrchestratorError
+from orchestrator import OrchestratorError, DaemonDescription
if TYPE_CHECKING:
from .module import CephadmOrchestrator
return True
return False
- def _wait_for_ok_to_stop(self, s) -> bool:
+ def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
# only wait a little bit; the service might go away for something
tries = 4
while tries > 0:
- if s.daemon_type not in ['mon', 'osd', 'mds']:
- logger.info('Upgrade: It is presumed safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- return True
- ret, out, err = self.mgr.mon_command({
- 'prefix': '%s ok-to-stop' % s.daemon_type,
- 'ids': [s.daemon_id],
- })
if not self.upgrade_state or self.upgrade_state.get('paused'):
return False
- if ret:
- logger.info('Upgrade: It is NOT safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- time.sleep(15)
- tries -= 1
- else:
- logger.info('Upgrade: It is safe to stop %s.%s' %
+
+ ok = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id])
+
+ if ok:
+ logger.info('Upgrade: It is presumed safe to stop %s.%s' %
(s.daemon_type, s.daemon_id))
return True
+ logger.info('Upgrade: It is NOT safe to stop %s.%s' %
+ (s.daemon_type, s.daemon_id))
+ time.sleep(15)
+ tries -= 1
return False
def _clear_upgrade_health_checks(self) -> None: