Collect statistics from Ceph cluster and every X seconds send data to a Zabbix
server using the zabbix_sender executable.
"""
+import logging
import json
import errno
import re
from subprocess import Popen, PIPE
from threading import Event
-from mgr_module import MgrModule, Option
+from mgr_module import MgrModule, Option, OptionValue
+from typing import cast, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union
-def avg(data):
+def avg(data: Sequence[Union[int, float]]) -> float:
if len(data):
return sum(data) / float(len(data))
else:
class ZabbixSender(object):
- def __init__(self, sender, host, port, log):
+ def __init__(self, sender: str, host: str, port: int, log: logging.Logger) -> None:
self.sender = sender
self.host = host
self.port = port
self.log = log
- def send(self, hostname, data):
+ def send(self, hostname: str, data: Mapping[str, Union[int, float, str]]) -> None:
if len(data) == 0:
return
proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
for key, value in data.items():
+ assert proc.stdin
proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value).encode('utf-8'))
stdout, stderr = proc.communicate()
class Module(MgrModule):
run = False
- config = dict()
+ config: Dict[str, OptionValue] = {}
ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
- _zabbix_hosts = list()
+ _zabbix_hosts: List[Dict[str, Union[str, int]]] = list()
@property
- def config_keys(self):
+ def config_keys(self) -> Dict[str, OptionValue]:
return dict((o['name'], o.get('default', None))
for o in self.MODULE_OPTIONS)
},
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
- def init_module_config(self):
+ def init_module_config(self) -> None:
self.fsid = self.get('mon_map')['fsid']
self.log.debug('Found Ceph fsid %s', self.fsid)
if self.config['zabbix_host']:
self._parse_zabbix_hosts()
- def set_config_option(self, option, value):
+ def set_config_option(self, option: str, value: OptionValue) -> bool:
if option not in self.config_keys.keys():
raise RuntimeError('{0} is a unknown configuration '
'option'.format(option))
if option in ['zabbix_port', 'interval', 'discovery_interval']:
try:
- value = int(value)
+ int_value = int(value) # type: ignore
except (ValueError, TypeError):
raise RuntimeError('invalid {0} configured. Please specify '
'a valid integer'.format(option))
- if option == 'interval' and value < 10:
+ if option == 'interval' and int_value < 10:
raise RuntimeError('interval should be set to at least 10 seconds')
- if option == 'discovery_interval' and value < 10:
+ if option == 'discovery_interval' and int_value < 10:
raise RuntimeError(
"discovery_interval should not be more frequent "
"than once in 10 regular data collection"
self.config[option] = value
return True
- def _parse_zabbix_hosts(self):
+ def _parse_zabbix_hosts(self) -> None:
self._zabbix_hosts = list()
- servers = self.config['zabbix_host'].split(",")
+ servers = cast(str, self.config['zabbix_host']).split(",")
for server in servers:
uri = re.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server)
if uri:
self.log.error('Parsed Zabbix hosts: %s', self._zabbix_hosts)
- def get_pg_stats(self):
+ def get_pg_stats(self) -> Dict[str, int]:
stats = dict()
pg_states = ['active', 'peering', 'clean', 'scrubbing', 'undersized',
return stats
- def get_data(self):
+ def get_data(self) -> Dict[str, Union[int, float]]:
data = dict()
health = json.loads(self.get('health')['json'])
return data
- def send(self, data):
- identifier = self.config['identifier']
+ def send(self, data: Mapping[str, Union[int, float, str]]) -> bool:
+ identifier = cast(Optional[str], self.config['identifier'])
if identifier is None or len(identifier) == 0:
identifier = 'ceph-{0}'.format(self.fsid)
'detail': ['Configuration value zabbix_host not configured']
}
})
- return
+ return False
result = True
self.log.debug(data)
try:
- zabbix = ZabbixSender(self.config['zabbix_sender'],
- server['zabbix_host'],
- server['zabbix_port'], self.log)
+ zabbix = ZabbixSender(cast(str, self.config['zabbix_sender']),
+ cast(str, server['zabbix_host']),
+ cast(int, server['zabbix_port']), self.log)
zabbix.send(identifier, data)
except Exception as exc:
self.log.exception('Failed to send.')
self.set_health_checks(dict())
return result
- def discovery(self):
+ def discovery(self) -> bool:
osd_map = self.get('osd_map')
osd_map_crush = self.get('osd_map_crush')
}
return bool(self.send(data))
- def handle_command(self, inbuf, command):
+ def handle_command(self, inbuf: str, command: Dict[str, Any]) -> Tuple[int, str, str]:
if command['prefix'] == 'zabbix config-show':
return 0, json.dumps(self.config, indent=4, sort_keys=True), ''
elif command['prefix'] == 'zabbix config-set':
return (-errno.EINVAL, '',
"Command not found '{0}'".format(command['prefix']))
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping zabbix')
self.run = False
self.event.set()
- def serve(self):
+ def serve(self) -> None:
self.log.info('Zabbix module starting up')
self.run = True
discovery_interval = self.config['discovery_interval']
# We are sending discovery once plugin is loaded
- discovery_counter = discovery_interval
+ discovery_counter = cast(int, discovery_interval)
while self.run:
self.log.debug('Waking up for new iteration')
# rather than dying completely.
self.log.exception("Unexpected error during send():")
- interval = self.config['interval']
+ interval = cast(float, self.config['interval'])
self.log.debug('Sleeping for %d seconds', interval)
discovery_counter += 1
self.event.wait(interval)
- def self_test(self):
+ def self_test(self) -> None:
data = self.get_data()
if data['overall_status'] not in self.ceph_health_mapping: