import datetime
from threading import Event
import time
-
+from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from mgr_module import CommandResult, MgrModule, Option
-
# Importing scipy early appears to avoid a future deadlock when
# we try to do
#
#
# in a command thread. See https://tracker.ceph.com/issues/42764
import scipy # noqa: ignore=F401
-from .predictor import Predictor, get_diskfailurepredictor_path
+from .predictor import DevSmartT, Predictor, get_diskfailurepredictor_path
TIME_FORMAT = '%Y%m%d-%H%M%S'
default='prophetstor')
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
# options
for opt in self.MODULE_OPTIONS:
# other
self._run = True
self._event = Event()
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.sleep_interval = 0
+ self.predict_interval = 0
+ self.predictor_model = ''
- def config_notify(self):
+ def config_notify(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
if self.get_ceph_option('device_failure_prediction_mode') == 'local':
self._event.set()
- def refresh_config(self):
+ def refresh_config(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']))
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
- def self_test(self):
+ def self_test(self) -> None:
self.log.debug('self_test enter')
ret, out, err = self.predict_all_devices()
assert ret == 0
- return 0, 'self test succeed', ''
- def serve(self):
+ def serve(self) -> None:
self.log.info('Starting diskprediction local module')
self.config_notify()
last_predicted = None
self._event.wait(sleep_interval)
self._event.clear()
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping')
self._run = False
self._event.set()
@staticmethod
- def _convert_timestamp(predicted_timestamp, life_expectancy_day):
+ def _convert_timestamp(predicted_timestamp: int, life_expectancy_day: int) -> str:
"""
:param predicted_timestamp: unit is nanoseconds
:param life_expectancy_day: unit is seconds
return datetime.datetime.fromtimestamp(
predicted_timestamp / (1000 ** 3) + life_expectancy_day).strftime('%Y-%m-%d')
- def _predict_life_expentancy(self, devid):
+ def _predict_life_expentancy(self, devid: str) -> str:
predicted_result = ''
- health_data = {}
- predict_datas = []
+ health_data: Dict[str, Dict[str, Any]] = {}
+ predict_datas: List[DevSmartT] = []
try:
r, outb, outs = self.remote('devicehealth', 'show_device_metrics', devid=devid, sample='')
if r != 0:
dev_smart['smart_%s_normalized' % attr.get('id')] = \
attr.get('value')
# add power on hours manually if not available in smart attributes
- if s_val.get('power_on_time', {}).get('hours') is not None:
- dev_smart['smart_9_raw'] = int(s_val['power_on_time']['hours'])
+ power_on_time = s_val.get('power_on_time', {}).get('hours')
+ if power_on_time is not None:
+ dev_smart['smart_9_raw'] = int(power_on_time)
# add device capacity
- if s_val.get('user_capacity') is not None:
- if s_val.get('user_capacity').get('bytes') is not None:
- dev_smart['user_capacity'] = s_val.get('user_capacity').get('bytes')
- else:
- self.log.debug('user_capacity not found in smart attributes list')
+ user_capacity = s_val.get('user_capacity', {}).get('bytes')
+ if user_capacity is not None:
+ dev_smart['user_capacity'] = user_capacity
+ else:
+ self.log.debug('user_capacity not found in smart attributes list')
# add device model
- if s_val.get('model_name') is not None:
- dev_smart['model_name'] = s_val.get('model_name')
+ model_name = s_val.get('model_name')
+ if model_name is not None:
+ dev_smart['model_name'] = model_name
# add vendor
- if s_val.get('vendor') is not None:
- dev_smart['vendor'] = s_val.get('vendor')
+ vendor = s_val.get('vendor')
+ if vendor is not None:
+ dev_smart['vendor'] = vendor
# if smart data was found, then add that to list
if dev_smart:
predict_datas.append(dev_smart)
predicted_result = obj_predictor.predict(predict_datas)
return predicted_result
- def predict_life_expectancy(self, devid):
+ def predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
result = self._predict_life_expentancy(devid)
if result.lower() == 'good':
return 0, '>6w', ''
else:
return 0, 'unknown', ''
- def _reset_device_life_expectancy(self, device_id):
+ def _reset_device_life_expectancy(self, device_id: str) -> int:
result = CommandResult('')
self.send_command(result, 'mon', '', json.dumps({
'prefix': 'device rm-life-expectancy',
'failed to reset device life expectancy, %s' % outs)
return ret
- def _set_device_life_expectancy(self, device_id, from_date, to_date=None):
+ def _set_device_life_expectancy(self,
+ device_id: str,
+ from_date: str,
+ to_date: Optional[str] = None) -> int:
result = CommandResult('')
if to_date is None:
'failed to set device life expectancy, %s' % outs)
return ret
- def predict_all_devices(self):
+ def predict_all_devices(self) -> Tuple[int, str, str]:
self.log.debug('predict_all_devices')
devices = self.get('devices').get('devices', [])
for devInfo in devices:
if result.lower() == 'good':
life_expectancy_day_min = (TIME_WEEK * 6) + TIME_DAYS
- life_expectancy_day_max = None
+ life_expectancy_day_max = 0
elif result.lower() == 'warning':
life_expectancy_day_min = (TIME_WEEK * 2)
life_expectancy_day_max = (TIME_WEEK * 6)
life_expectancy_day_min = 0
life_expectancy_day_max = (TIME_WEEK * 2) - TIME_DAYS
else:
- predicted = None
- life_expectancy_day_min = None
- life_expectancy_day_max = None
+ predicted = 0
+ life_expectancy_day_min = 0
+ life_expectancy_day_max = 0
if predicted and devInfo['devid'] and life_expectancy_day_min:
from_date = None
to_date = None
try:
- if life_expectancy_day_min:
- from_date = self._convert_timestamp(predicted, life_expectancy_day_min)
+ assert life_expectancy_day_min
+ from_date = self._convert_timestamp(predicted, life_expectancy_day_min)
if life_expectancy_day_max:
to_date = self._convert_timestamp(predicted, life_expectancy_day_max)
import json
import pickle
import logging
+from typing import Any, Dict, List, Optional, Sequence, Tuple
import numpy as np
-def get_diskfailurepredictor_path():
+def get_diskfailurepredictor_path() -> str:
path = os.path.abspath(__file__)
dir_path = os.path.dirname(path)
return dir_path
+DevSmartT = Dict[str, Any]
+AttrNamesT = List[str]
+AttrDiffsT = List[Dict[str, int]]
+
+
class Predictor:
@classmethod
def create(cls, name: str) -> Optional['Predictor']:
else:
return None
- def initialize(self, model_dir: str) -> str:
+ def initialize(self, model_dir: str) -> None:
raise NotImplementedError()
def predict(self, dataset: Sequence[DevSmartT]) -> str:
LOGGER = logging.getLogger()
- def __init__(self):
+ def __init__(self) -> None:
"""
This function may throw exception due to wrong file operation.
"""
self.model_dirpath = ""
- self.model_context = {}
+ self.model_context: Dict[str, List[str]] = {}
- def initialize(self, model_dirpath):
+ def initialize(self, model_dirpath: str) -> None:
"""Initialize all models. Save paths of all trained model files to list
Arguments:
self.model_dirpath = model_dirpath
- def __preprocess(self, disk_days, manufacturer):
+ def __preprocess(self, disk_days: Sequence[DevSmartT], manufacturer: str) -> Optional[np.ndarray]:
"""Scales and transforms input dataframe to feed it to prediction model
Arguments:
return featurized
@staticmethod
- def __get_manufacturer(model_name):
+ def __get_manufacturer(model_name: str) -> Optional[str]:
"""Returns the manufacturer name for a given hard drive model name
Arguments:
"""
for prefix, manufacturer in RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.items():
if model_name.startswith(prefix):
- return manufacturer
+ return manufacturer.lower()
# print error message
RHDiskFailurePredictor.LOGGER.debug(
- "Could not infer manufacturer from model name {}".format(model_name)
- )
+ f"Could not infer manufacturer from model name {model_name}")
+ return None
- def predict(self, disk_days):
+ def predict(self, disk_days: Sequence[DevSmartT]) -> str:
# get manufacturer preferably as a smartctl attribute
# if not available then infer using model name
manufacturer = disk_days[0].get("vendor")
'"vendor" field not found in smartctl output. Will try to infer manufacturer from model name.'
)
manufacturer = RHDiskFailurePredictor.__get_manufacturer(
- disk_days[0].get("model_name", "")
- ).lower()
+ disk_days[0].get("model_name", ""))
# print error message, return Unknown, and continue execution
if manufacturer is None:
CONFIG_FILE = "config.json"
EXCLUDED_ATTRS = ["smart_9_raw", "smart_241_raw", "smart_242_raw"]
- def __init__(self):
+ def __init__(self) -> None:
"""
This function may throw exception due to wrong file operation.
"""
self.model_dirpath = ""
- self.model_context = {}
+ self.model_context: Dict[str, List[str]] = {}
- def initialize(self, model_dirpath):
+ def initialize(self, model_dirpath: str) -> None:
"""
Initialize all models.
self.model_dirpath = model_dirpath
- def __preprocess(self, disk_days):
+ def __preprocess(self, disk_days: Sequence[DevSmartT]) -> Sequence[DevSmartT]:
"""
Preprocess disk attributes.
return new_disk_days
@staticmethod
- def __get_diff_attrs(disk_days):
+ def __get_diff_attrs(disk_days: Sequence[DevSmartT]) -> Tuple[AttrNamesT, AttrDiffsT]:
"""
Get 5 days differential attributes.
all_attrs = [set(disk_day.keys()) for disk_day in disk_days]
attr_list = list(set.intersection(*all_attrs))
- attr_list = disk_days[0].keys()
+ attr_list = list(disk_days[0].keys())
prev_days = disk_days[:-1]
curr_days = disk_days[1:]
diff_disk_days = []
return attr_list, diff_disk_days
- def __get_best_models(self, attr_list):
+ def __get_best_models(self, attr_list: AttrNamesT) -> Optional[Dict[str, List[str]]]:
"""
Find the best model from model list according to given attribute list.
print("Too few matched attributes")
return None
- best_models = {}
+ best_models: Dict[str, List[str]] = {}
best_model_indices = [
idx for idx, score in enumerate(scores) if score > max_score - 2
]
# return os.path.join(self.model_dirpath, model_name), model_attrlist
@staticmethod
- def __get_ordered_attrs(disk_days, model_attrlist):
+ def __get_ordered_attrs(disk_days: Sequence[DevSmartT], model_attrlist: List[str]) -> List[List[float]]:
"""
Return ordered attributes of given disk days.
return ordered_attrs
- def predict(self, disk_days):
+ def predict(self, disk_days: Sequence[DevSmartT]) -> str:
"""
Predict using given 6-days disk S.M.A.R.T. attributes.