import rados
from threading import Event
from datetime import datetime, timedelta
-from typing import Optional
+from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
TIME_FORMAT = '%Y%m%d-%H%M%S'
),
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
# populate options (just until serve() runs)
self.event = Event()
self.has_device_pool = False
- def is_valid_daemon_name(self, who):
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.enable_monitoring = True
+ self.scrape_frequency = 0.0
+ self.pool_name = ''
+ self.device_health_metrics = ''
+ self.retention_period = 0.0
+ self.mark_out_threshold = 0.0
+ self.warn_threshold = 0.0
+ self.self_heal = True
+ self.sleep_interval = 0.0
+
+ def is_valid_daemon_name(self, who: str) -> bool:
parts = who.split('.')
if len(parts) != 2:
return False
@CLICommand('device query-daemon-health-metrics',
perm='r')
- def do_query_daemon_health_metrics(self, who: str):
+ def do_query_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
'''
Get device health metrics for a given daemon
'''
@CLICommand('device scrape-daemon-health-metrics',
perm='r')
- def do_scrape_daemon_health_metrics(self, who: str):
+ def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
'''
Scrape and store device health metrics for a given daemon
'''
@CLICommand('device scrape-daemon-health-metrics',
perm='r')
- def do_scrape_health_metrics(self, devid: Optional[str] = None):
+ def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
'''
Scrape and store device health metrics
'''
@CLICommand('device get-health-metrics',
perm='r')
- def do_get_health_metrics(self, devid: str, sample: Optional[str] = None):
+ def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
'''
Show stored device metrics for the device
'''
@CLICommand('device check-health',
perm='rw')
- def do_check_health(self):
+ def do_check_health(self) -> Tuple[int, str, str]:
'''
Check life expectancy of devices
'''
@CLICommand('device monitoring on',
perm='rw')
- def do_monitoring_on(self):
+ def do_monitoring_on(self) -> Tuple[int, str, str]:
'''
Enable device health monitoring
'''
@CLICommand('device monitoring off',
perm='rw')
- def do_monitoring_off(self):
+ def do_monitoring_off(self) -> Tuple[int, str, str]:
'''
Disable device health monitoring
'''
@CLICommand('device predict-life-expectancy',
perm='r')
- def do_predict_life_expectancy(self, devid: str):
+ def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
'''
Predict life expectancy with local predictor
'''
return self.predict_lift_expectancy(devid)
- def self_test(self):
+ def self_test(self) -> None:
self.config_notify()
osdmap = self.get('osd_map')
osd_id = osdmap['osds'][0]['osd']
assert r == 0
assert before != after
- def config_notify(self):
+ def config_notify(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
if osd["up"]:
up += 1
- need = int(self.get_ceph_option("osd_pool_default_size"))
+ need = cast(int, self.get_ceph_option("osd_pool_default_size"))
return up >= need
def maybe_create_device_pool(self) -> bool:
self.has_device_pool = True
return True
- def create_device_pool(self):
+ def create_device_pool(self) -> None:
self.log.debug('create %s pool' % self.pool_name)
# create pool
result = CommandResult('')
r, outb, outs = result.wait()
assert r == 0
- def serve(self):
+ def serve(self) -> None:
self.log.info("Starting")
self.config_notify()
ret = self.event.wait(sleep_interval)
self.event.clear()
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping')
self.run = False
self.event.set()
ioctx = self.rados.open_ioctx(self.pool_name)
return ioctx
- def scrape_daemon(self, daemon_type, daemon_id):
+ def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]:
ioctx = self.open_connection()
if not ioctx:
return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
ioctx.close()
return 0, "", ""
- def scrape_all(self):
+ def scrape_all(self) -> Tuple[int, str, str]:
osdmap = self.get("osd_map")
assert osdmap is not None
ioctx = self.open_connection()
ioctx.close()
return 0, "", ""
- def scrape_device(self, devid):
+ def scrape_device(self, devid: str) -> Tuple[int, str, str]:
r = self.get("device " + devid)
if not r or 'device' not in r.keys():
return -errno.ENOENT, '', 'device ' + devid + ' not found'
ioctx.close()
return 0, "", ""
- def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
+ def do_scrape_daemon(self,
+ daemon_type: str,
+ daemon_id: str,
+ devid: str = '') -> Optional[Dict[str, Any]]:
"""
:return: a dict, or None if the scrape failed.
"""
self.log.error(
"Fail to parse JSON result from daemon {0}.{1} ({2})".format(
daemon_type, daemon_id, outb))
+ return None
- def put_device_metrics(self, ioctx, devid, data):
+ def put_device_metrics(self, ioctx: rados.Ioctx, devid: str, data: Any) -> None:
assert devid
old_key = datetime.utcnow() - timedelta(
seconds=self.retention_period)
ioctx.remove_omap_keys(op, tuple(erase))
ioctx.operate_write_op(op, devid)
- def _get_device_metrics(self, devid, sample=None, min_sample=None):
+ def _get_device_metrics(self, devid: str,
+ sample: Optional[str] = None,
+ min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
res = {}
ioctx = self.open_connection(create_if_missing=False)
if not ioctx:
raise
return res
- def show_device_metrics(self, devid, sample):
+ def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]:
# verify device exists
r = self.get("device " + devid)
if not r or 'device' not in r.keys():
res = self._get_device_metrics(devid, sample=sample)
return 0, json.dumps(res, indent=4, sort_keys=True), ''
- def check_health(self):
+ def check_health(self) -> Tuple[int, str, str]:
self.log.info('Check health')
config = self.get('config')
min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
mark_out_threshold_td = timedelta(seconds=self.mark_out_threshold)
warn_threshold_td = timedelta(seconds=self.warn_threshold)
- checks = {}
- health_warnings = {
+ checks: Dict[str, Dict[str, Union[int, str, Sequence[str]]]] = {}
+ health_warnings: Dict[str, List[str]] = {
DEVICE_HEALTH: [],
DEVICE_HEALTH_IN_USE: [],
}
self.set_health_checks(checks)
return 0, "", ""
- def is_osd_in(self, osdmap, osd_id):
+ def is_osd_in(self, osdmap: Dict[str, Any], osd_id: str) -> bool:
for osd in osdmap['osds']:
- if str(osd_id) == str(osd['osd']):
+ if osd_id == str(osd['osd']):
return bool(osd['in'])
return False
- def get_osd_num_pgs(self, osd_id):
+ def get_osd_num_pgs(self, osd_id: str) -> int:
stats = self.get('osd_stats')
assert stats is not None
for stat in stats['osd_stats']:
- if str(osd_id) == str(stat['osd']):
+ if osd_id == str(stat['osd']):
return stat['num_pgs']
return -1
- def mark_out_etc(self, osd_ids):
+ def mark_out_etc(self, osd_ids: List[str]) -> None:
self.log.info('Marking out OSDs: %s' % osd_ids)
result = CommandResult('')
self.send_command(result, 'mon', '', json.dumps({
'r: [%s], outb: [%s], outs: [%s]',
osd_id, r, outb, outs)
- def extract_smart_features(self, raw):
+ def extract_smart_features(self, raw: Any) -> Any:
# FIXME: extract and normalize raw smartctl --json output and
# generate a dict of the fields we care about.
return raw
- def predict_lift_expectancy(self, devid):
+ def predict_lift_expectancy(self, devid: str) -> Tuple[int, str, str]:
plugin_name = ''
model = self.get_ceph_option('device_failure_prediction_mode')
- if model and model.lower() == 'local':
+ if cast(str, model).lower() == 'local':
plugin_name = 'diskprediction_local'
else:
return -1, '', 'unable to enable any disk prediction model[local/cloud]'
except:
return -1, '', 'unable to invoke diskprediction local or remote plugin'
- def predict_all_devices(self):
+ def predict_all_devices(self) -> Tuple[int, str, str]:
plugin_name = ''
model = self.get_ceph_option('device_failure_prediction_mode')
- if model and model.lower() == 'local':
+ if cast(str, model).lower() == 'local':
plugin_name = 'diskprediction_local'
else:
return -1, '', 'unable to enable any disk prediction model[local/cloud]'
except:
return -1, '', 'unable to invoke diskprediction local or remote plugin'
- def get_recent_device_metrics(self, devid, min_sample):
+ def get_recent_device_metrics(self, devid: str, min_sample: str) -> Dict[str, Dict[str, Any]]:
return self._get_device_metrics(devid, min_sample=min_sample)
- def get_time_format(self):
+ def get_time_format(self) -> str:
return TIME_FORMAT