]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: introduce NotifyType enum
authorSage Weil <sage@newdream.net>
Thu, 2 Dec 2021 15:22:48 +0000 (10:22 -0500)
committerSage Weil <sage@newdream.net>
Fri, 3 Dec 2021 02:15:47 +0000 (21:15 -0500)
Note that we don't annotate the dashboard NotificationQueue because it is
used internally by the dashboard with other events.

Signed-off-by: Sage Weil <sage@newdream.net>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/dashboard/module.py
src/pybind/mgr/insights/module.py
src/pybind/mgr/k8sevents/module.py
src/pybind/mgr/localpool/module.py
src/pybind/mgr/mds_autoscaler/module.py
src/pybind/mgr/mgr_module.py
src/pybind/mgr/mirroring/fs/snapshot_mirror.py
src/pybind/mgr/mirroring/module.py
src/pybind/mgr/restful/module.py
src/pybind/mgr/stats/module.py

index bbd37e80ab8ddaf0f8de5a10a2fca1340109622d..d0653c2a8eda906dd0ed0666b831e1445a620ac2 100644 (file)
@@ -31,7 +31,7 @@ from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from cephadm.agent import CherryPyThread, CephadmAgentHelpers
 
 
-from mgr_module import MgrModule, HandleCommandResult, Option
+from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
 import orchestrator
 from orchestrator.module import to_format, Format
 
@@ -434,7 +434,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self.apply_spec_fails: List[Tuple[str, str]] = []
             self.max_osd_draining_count = 10
 
-        self.notify('mon_map', None)
+        self.notify(NotifyType.mon_map, None)
         self.config_notify()
 
         path = self.get_ceph_option('cephadm_path')
@@ -580,8 +580,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.event.set()
 
-    def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
-        if notify_type == "mon_map":
+    def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
+        if notify_type == NotifyType.mon_map:
             # get monmap mtime so we can refresh configs when mons change
             monmap = self.get('mon_map')
             self.last_monmap = str_to_datetime(monmap['modified'])
@@ -590,7 +590,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             if getattr(self, 'manage_etc_ceph_ceph_conf', False):
                 # getattr, due to notify() being called before config_notify()
                 self._kick_serve_loop()
-        if notify_type == "pg_summary":
+        if notify_type == NotifyType.pg_summary:
             self._trigger_osd_removal()
 
     def _trigger_osd_removal(self) -> None:
index 9b9e028d6947c19ac5d5877e7ae85489cc509a95..2963af3a089045bed5fac2a2ab5e1640f91bb08c 100644 (file)
@@ -20,7 +20,7 @@ if TYPE_CHECKING:
         from typing_extensions import Literal
 
 from mgr_module import CLICommand, CLIWriteCommand, HandleCommandResult, \
-    MgrModule, MgrStandbyModule, Option, _get_localized_key
+    MgrModule, MgrStandbyModule, NotifyType, Option, _get_localized_key
 from mgr_util import ServerConfigException, build_url, \
     create_self_signed_cert, get_default_addr, verify_tls_files
 
@@ -478,8 +478,8 @@ class Module(MgrModule, CherryPyConfig):
         return (-errno.EINVAL, '', 'Command not found \'{0}\''
                 .format(cmd['prefix']))
 
-    def notify(self, notify_type, notify_id):
-        NotificationQueue.new_notification(notify_type, notify_id)
+    def notify(self, notify_type: NotifyType, notify_id):
+        NotificationQueue.new_notification(str(notify_type), notify_id)
 
     def get_updated_pool_stats(self):
         df = self.get('df')
index 4c7fd98cdaed1c56088dd03a9e458351a43043ee..c20ada657298a83b94eb9d14b9e785578eaf3bcf 100644 (file)
@@ -4,7 +4,7 @@ import re
 import threading
 
 from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, NotifyType
 from . import health as health_util
 
 # hours of crash history to report
@@ -50,9 +50,9 @@ class Module(MgrModule):
         return { k: v for k, v in self._store.items() if k.startswith(prefix) }
 
 
-    def notify(self, ttype, ident):
+    def notify(self, ttype: NotifyType, ident):
         """Queue updates for processing"""
-        if ttype == "health":
+        if ttype == NotifyType.health:
             self.log.info("Received health check update {} pending".format(
                 len(self._pending_health)))
             health = json.loads(self.get("health")["json"])
index 1e12d1f27a25c07959ed0d01ca5c1709fc40a606..8c1521bfbda3fb22e71a286bcfc55fde0c2f739e 100644 (file)
@@ -40,7 +40,7 @@ from urllib3.exceptions import MaxRetryError,ProtocolError
 from collections import OrderedDict
 
 import rados
-from mgr_module import MgrModule
+from mgr_module import MgrModule, NotifyType
 from mgr_util import verify_cacrt, ServerConfigException
 
 try:
@@ -1138,7 +1138,7 @@ class Module(MgrModule):
         else:
             self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
 
-    def notify(self, notify_type, notify_id):
+    def notify(self, notify_type: NotifyType, notify_id):
         """
         Called by the ceph-mgr service to notify the Python plugin
         that new state is available.
@@ -1153,7 +1153,7 @@ class Module(MgrModule):
         """
         
         # only interested in cluster log (clog) messages for now
-        if notify_type == 'clog':
+        if notify_type == NotifyType.clog:
             self.log.debug("received a clog entry from mgr.notify")
             if isinstance(notify_id, dict):
                 # create a log object to process
index ce7cb1af2a6ea8c4cdaf05f55ebeedafb336e97b..907c7026b7d150b7b8137c34288346064661c86a 100644 (file)
@@ -1,4 +1,4 @@
-from mgr_module import MgrModule, CommandResult, Option
+from mgr_module import MgrModule, CommandResult, Option, NotifyType
 import json
 import threading
 from typing import cast, Any
@@ -51,8 +51,8 @@ class Module(MgrModule):
         super(Module, self).__init__(*args, **kwargs)
         self.serve_event = threading.Event()
 
-    def notify(self, notify_type: str, notify_id: str) -> None:
-        if notify_type == 'osd_map':
+    def notify(self, notify_type: NotifyType, notify_id: str) -> None:
+        if notify_type == NotifyType.osd_map:
             self.handle_osd_map()
 
     def handle_osd_map(self) -> None:
index 006d8547c95564b5a935232152bb3ef38b0aa7de..66b026bd706ddffc1e81ab85b847081af8678bc8 100644 (file)
@@ -4,7 +4,7 @@ Automatically scale MDSs based on status of the file-system using the FSMap
 
 import logging
 from typing import Any, Optional
-from mgr_module import MgrModule
+from mgr_module import MgrModule, NotifyType
 from ceph.deployment.service_spec import ServiceSpec
 import orchestrator
 import copy
@@ -84,8 +84,8 @@ class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule):
             self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
             pass
 
-    def notify(self, notify_type: str, notify_id: str) -> None:
-        if notify_type != 'fs_map':
+    def notify(self, notify_type: NotifyType, notify_id: str) -> None:
+        if notify_type != NotifyType.fs_map:
             return
         fs_map = self.get('fs_map')
         if not fs_map:
index 0f6a19263d22295137f2cbca065fb6c4f68aa384..66382fb3beaaf318649ad3f875d6bdcae82503b0 100644 (file)
@@ -17,7 +17,7 @@ import json
 import subprocess
 import threading
 from collections import defaultdict
-from enum import IntEnum
+from enum import IntEnum, Enum
 import rados
 import re
 import socket
@@ -84,6 +84,23 @@ NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
 NFS_POOL_NAME = '.nfs'
 
 
+class NotifyType(str, Enum):
+    mon_map = 'mon_map'
+    pg_summary = 'pg_summary'
+    health = 'health'
+    clog = 'clog'
+    osd_map = 'osd_map'
+    fs_map = 'fs_map'
+    command = 'command'
+
+    # these are disabled because there are no users.
+    #  see Mgr.cc:
+    # service_map = 'service_map'
+    # mon_status = 'mon_status'
+    #  see DaemonServer.cc:
+    # perf_schema_update = 'perf_schema_update'
+
+
 class CommandResult(object):
     """
     Use with MgrModule.send_command
@@ -1169,7 +1186,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_context()
 
-    def notify(self, notify_type: str, notify_id: str) -> None:
+    def notify(self, notify_type: NotifyType, notify_id: str) -> None:
         """
         Called by the ceph-mgr service to notify the Python plugin
         that new state is available.
index ad9e550435d801eb36eaa6317a695061d1b76436..6fa8d0c4c53382b635c27d1fff0516d2354dc1c5 100644 (file)
@@ -15,6 +15,7 @@ import rados
 
 from mgr_util import RTimer, CephfsClient, open_filesystem,\
     CephfsConnectionException
+from mgr_module import NotifyType
 from .blocklist import blocklist
 from .notify import Notifier, InstanceWatcher
 from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
@@ -288,9 +289,9 @@ class FSSnapshotMirror:
         self.refresh_pool_policy()
         self.local_fs = CephfsClient(mgr)
 
-    def notify(self, notify_type):
+    def notify(self, notify_type: NotifyType):
         log.debug(f'got notify type {notify_type}')
-        if notify_type == 'fs_map':
+        if notify_type == NotifyType.fs_map:
             with self.lock:
                 self.fs_map = self.mgr.get('fs_map')
                 self.refresh_pool_policy_locked()
index b9223111ae902507aa3f52ea374d1adce0ae0cf7..196cb8e6c099aa26ca7c1af7a9b6539728e4e4b2 100644 (file)
@@ -1,6 +1,6 @@
 from typing import List, Optional
 
-from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option
+from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option, NotifyType
 
 from .fs.snapshot_mirror import FSSnapshotMirror
 
@@ -11,7 +11,7 @@ class Module(MgrModule):
         super().__init__(*args, **kwargs)
         self.fs_snapshot_mirror = FSSnapshotMirror(self)
 
-    def notify(self, notify_type, notify_id):
+    def notify(self, notify_type: NotifyType, notify_id):
         self.fs_snapshot_mirror.notify(notify_type)
 
     @CLIWriteCommand('fs snapshot mirror enable')
index b76464e76fa8e3d6ce93f306573dc6b95b163aff..97f99059931421ba55ff36220c2ef82e3e761be9 100644 (file)
@@ -23,7 +23,7 @@ from pecan.rest import RestController
 from werkzeug.serving import make_server, make_ssl_devcert
 
 from .hooks import ErrorHook
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, NotifyType
 from mgr_util import build_url
 
 
@@ -360,15 +360,15 @@ class Module(MgrModule):
             self.log.error(str(traceback.format_exc()))
 
 
-    def notify(self, notify_type, tag):
+    def notify(self, notify_type: NotifyType, tag: str):
         try:
             self._notify(notify_type, tag)
         except:
             self.log.error(str(traceback.format_exc()))
 
 
-    def _notify(self, notify_type, tag):
-        if notify_type != "command":
+    def _notify(self, notify_type: NotifyType, tag):
+        if notify_type != NotifyType.command:
             self.log.debug("Unhandled notification type '%s'", notify_type)
             return
         # we can safely skip all the sequential commands
index a4bc44630d637168eb8a6f6d94f926520bde7a43..5d74aeb1d81ee684b56b912181d4e95b23bdbcbe 100644 (file)
@@ -5,7 +5,7 @@ performance stats for ceph filesystem (for now...)
 import json
 from typing import List, Dict
 
-from mgr_module import MgrModule, Option
+from mgr_module import MgrModule, Option, NotifyType
 
 from .fs.perf_stats import FSPerfStats
 
@@ -26,8 +26,8 @@ class Module(MgrModule):
         super(Module, self).__init__(*args, **kwargs)
         self.fs_perf_stats = FSPerfStats(self)
 
-    def notify(self, notify_type, notify_id):
-        if notify_type == "command":
+    def notify(self, notify_type: NotifyType, notify_id):
+        if notify_type == NotifyType.command:
             self.fs_perf_stats.notify(notify_id)
 
     def handle_command(self, inbuf, cmd):