A simple cluster health alerting module.
"""
-from mgr_module import MgrModule, HandleCommandResult
+from mgr_module import CLIReadCommand, MgrModule, HandleCommandResult
from threading import Event
from typing import Any, Optional, Dict, List, TYPE_CHECKING, Union
import json
import smtplib
class Alerts(MgrModule):
- COMMANDS = [
- {
- "cmd": "alerts send",
- "desc": "(re)send alerts immediately",
- "perm": "r"
- },
- ]
-
# ยด# type: ignore` due to the introduction of the Option type.
MODULE_OPTIONS: List[Dict[str, Any]] = [ # type: ignore
{
self.get_ceph_option(opt))
self.log.debug(' native option %s = %s', opt, getattr(self, opt))
- def handle_command(self, inbuf: Optional[str], cmd: Dict[str, Any]) -> HandleCommandResult:
- ret = 0
- out = ''
- err = ''
- if cmd['prefix'] == 'alerts send':
- status = json.loads(self.get('health')['json'])
- self._send_alert(status, {})
- return HandleCommandResult(
- retval=ret, # exit code
- stdout=out, # stdout
- stderr=err)
+ @CLIReadCommand('alerts send')
+ def send(self) -> HandleCommandResult:
+ """
+ (re)send alerts immediately
+ """
+ status = json.loads(self.get('health')['json'])
+ self._send_alert(status, {})
+ return HandleCommandResult()
def _diff(self, last: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, Any]:
d: Dict[str, Any] = {}