from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from mgr_module import MgrModule, HandleCommandResult, Option
+from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
from mgr_util import create_self_signed_cert
import secrets
import orchestrator
self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
remoto.backends.LegacyModuleExecute]] = {}
- self.notify('mon_map', None)
+ self.notify(NotifyType.mon_map, None)
self.config_notify()
path = self.get_ceph_option('cephadm_path')
self.event.set()
- def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
- if notify_type == "mon_map":
+ def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
+ if notify_type == NotifyType.mon_map:
# get monmap mtime so we can refresh configs when mons change
monmap = self.get('mon_map')
self.last_monmap = str_to_datetime(monmap['modified'])
if getattr(self, 'manage_etc_ceph_ceph_conf', False):
# getattr, due to notify() being called before config_notify()
self._kick_serve_loop()
- if notify_type == "pg_summary":
+ if notify_type == NotifyType.pg_summary:
self._trigger_osd_removal()
def _trigger_osd_removal(self) -> None:
from typing_extensions import Literal
from mgr_module import CLIWriteCommand, HandleCommandResult, MgrModule, \
- MgrStandbyModule, Option, _get_localized_key
+ MgrStandbyModule, NotifyType, Option, _get_localized_key
from mgr_util import ServerConfigException, build_url, \
create_self_signed_cert, get_default_addr, verify_tls_files
return (-errno.EINVAL, '', 'Command not found \'{0}\''
.format(cmd['prefix']))
- def notify(self, notify_type, notify_id):
- NotificationQueue.new_notification(notify_type, notify_id)
+ def notify(self, notify_type: NotifyType, notify_id):
+ NotificationQueue.new_notification(str(notify_type), notify_id)
def get_updated_pool_stats(self):
df = self.get('df')
import threading
from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, NotifyType
from . import health as health_util
# hours of crash history to report
return { k: v for k, v in self._store.items() if k.startswith(prefix) }
- def notify(self, ttype, ident):
+ def notify(self, ttype: NotifyType, ident):
"""Queue updates for processing"""
- if ttype == "health":
+ if ttype == NotifyType.health:
self.log.info("Received health check update {} pending".format(
len(self._pending_health)))
health = json.loads(self.get("health")["json"])
from collections import OrderedDict
import rados
-from mgr_module import MgrModule
+from mgr_module import MgrModule, NotifyType
from mgr_util import verify_cacrt, ServerConfigException
try:
else:
self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
- def notify(self, notify_type, notify_id):
+ def notify(self, notify_type: NotifyType, notify_id):
"""
Called by the ceph-mgr service to notify the Python plugin
that new state is available.
"""
# only interested in cluster log (clog) messages for now
- if notify_type == 'clog':
+ if notify_type == NotifyType.clog:
self.log.debug("received a clog entry from mgr.notify")
if isinstance(notify_id, dict):
# create a log object to process
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, NotifyType
import json
import threading
self.serve_event = threading.Event()
def notify(self, notify_type, notify_id):
- if notify_type == 'osd_map':
+ if notify_type == NotifyType.osd_map:
self.handle_osd_map()
def handle_osd_map(self):
import logging
from typing import Optional, List, Set
-from mgr_module import MgrModule
+from mgr_module import MgrModule, NotifyType
from ceph.deployment.service_spec import ServiceSpec
import orchestrator
import copy
pass
def notify(self, notify_type, notify_id):
- if notify_type != 'fs_map':
+ if notify_type != NotifyType.fs_map:
return
fs_map = self.get('fs_map')
if not fs_map:
import subprocess
import threading
from collections import defaultdict
-from enum import IntEnum
+from enum import IntEnum, Enum
import rados
import re
import socket
NFS_POOL_NAME = '.nfs'
+class NotifyType(str, Enum):
+ mon_map = 'mon_map'
+ pg_summary = 'pg_summary'
+ health = 'health'
+ clog = 'clog'
+ osd_map = 'osd_map'
+ fs_map = 'fs_map'
+ command = 'command'
+
+ # these are disabled because there are no users.
+ # see Mgr.cc:
+ # service_map = 'service_map'
+ # mon_status = 'mon_status'
+ # see DaemonServer.cc:
+ # perf_schema_update = 'perf_schema_update'
+
+
class CommandResult(object):
"""
Use with MgrModule.send_command
"""
return self._ceph_get_context()
- def notify(self, notify_type: str, notify_id: str) -> None:
+ def notify(self, notify_type: NotifyType, notify_id: str) -> None:
"""
Called by the ceph-mgr service to notify the Python plugin
that new state is available.
from mgr_util import RTimer, CephfsClient, open_filesystem,\
CephfsConnectionException
+from mgr_module import NotifyType
from .blocklist import blocklist
from .notify import Notifier, InstanceWatcher
from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
self.refresh_pool_policy()
self.local_fs = CephfsClient(mgr)
- def notify(self, notify_type):
+ def notify(self, notify_type: NotifyType):
log.debug(f'got notify type {notify_type}')
- if notify_type == 'fs_map':
+ if notify_type == NotifyType.fs_map:
with self.lock:
self.fs_map = self.mgr.get('fs_map')
self.refresh_pool_policy_locked()
from typing import List, Optional
-from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option
+from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option, NotifyType
from .fs.snapshot_mirror import FSSnapshotMirror
super().__init__(*args, **kwargs)
self.fs_snapshot_mirror = FSSnapshotMirror(self)
- def notify(self, notify_type, notify_id):
+ def notify(self, notify_type: NotifyType, notify_id):
self.fs_snapshot_mirror.notify(notify_type)
@CLIWriteCommand('fs snapshot mirror enable')
from werkzeug.serving import make_server, make_ssl_devcert
from .hooks import ErrorHook
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, NotifyType
from mgr_util import build_url
self.log.error(str(traceback.format_exc()))
- def notify(self, notify_type, tag):
+ def notify(self, notify_type: NotifyType, tag: str):
try:
self._notify(notify_type, tag)
except:
self.log.error(str(traceback.format_exc()))
- def _notify(self, notify_type, tag):
- if notify_type != "command":
+ def _notify(self, notify_type: NotifyType, tag):
+ if notify_type != NotifyType.command:
self.log.debug("Unhandled notification type '%s'", notify_type)
return
# we can safely skip all the sequential commands
import json
from typing import List, Dict
-from mgr_module import MgrModule, Option
+from mgr_module import MgrModule, Option, NotifyType
from .fs.perf_stats import FSPerfStats
super(Module, self).__init__(*args, **kwargs)
self.fs_perf_stats = FSPerfStats(self)
- def notify(self, notify_type, notify_id):
- if notify_type == "command":
+ def notify(self, notify_type: NotifyType, notify_id):
+ if notify_type == NotifyType.command:
self.fs_perf_stats.notify(notify_id)
def handle_command(self, inbuf, cmd):