"""
import json
import datetime
-import _strptime
from threading import Event
import time
# from .predictor import get_diskfailurepredictor_path
#
# in a command thread. See https://tracker.ceph.com/issues/42764
-import scipy
+import scipy # noqa: ignore=F401
TIME_FORMAT = '%Y%m%d-%H%M%S'
-TIME_DAYS = 24*60*60
+TIME_DAYS = 24 * 60 * 60
TIME_WEEK = TIME_DAYS * 7
# get normalized smart values
if attr.get('value') is not None:
dev_smart['smart_%s_normalized' % attr.get('id')] = \
- attr.get('value')
+ attr.get('value')
# add power on hours manually if not available in smart attributes
if s_val.get('power_on_time', {}).get('hours') is not None:
dev_smart['smart_9_raw'] = int(s_val['power_on_time']['hours'])
import logging
import numpy as np
-from scipy import stats
def get_diskfailurepredictor_path():
# get the attributes that were used to train model for current manufacturer
try:
model_smart_attr = self.model_context[manufacturer]
- except KeyError as e:
+ except KeyError:
RHDiskFailurePredictor.LOGGER.debug(
"No context (SMART attributes on which model has been trained) found for manufacturer: {}".format(
manufacturer
struc_dtypes = [(attr, np.float64) for attr in model_smart_attr]
values = [tuple(day[attr] for attr in model_smart_attr) for day in disk_days]
disk_days_sa = np.array(values, dtype=struc_dtypes)
- except KeyError as e:
+ except KeyError:
RHDiskFailurePredictor.LOGGER.debug(
"Mismatch in SMART attributes used to train model and SMART attributes available"
)
# view structured array as 2d array for applying rolling window transforms
# do not include capacity_bytes in this. only use smart_attrs
disk_days_attrs = disk_days_sa[[attr for attr in model_smart_attr if 'smart_' in attr]]\
- .view(np.float64).reshape(disk_days_sa.shape + (-1,))
+ .view(np.float64).reshape(disk_days_sa.shape + (-1,))
# featurize n (6 to 12) days data - mean,std,coefficient of variation
# current model is trained on 6 days of data because that is what will be
roll_window_size = 6
# rolling means generator
- gen = (disk_days_attrs[i: i + roll_window_size, ...].mean(axis=0) \
- for i in range(0, disk_days_attrs.shape[0] - roll_window_size + 1))
+ gen = (disk_days_attrs[i: i + roll_window_size, ...].mean(axis=0)
+ for i in range(0, disk_days_attrs.shape[0] - roll_window_size + 1))
means = np.vstack(gen)
# rolling stds generator
- gen = (disk_days_attrs[i: i + roll_window_size, ...].std(axis=0, ddof=1) \
- for i in range(0, disk_days_attrs.shape[0] - roll_window_size + 1))
+ gen = (disk_days_attrs[i: i + roll_window_size, ...].std(axis=0, ddof=1)
+ for i in range(0, disk_days_attrs.shape[0] - roll_window_size + 1))
stds = np.vstack(gen)
# coefficient of variation
cvs = stds / means
cvs[np.isnan(cvs)] = 0
- featurized = np.hstack((
- means,
+ featurized = np.hstack((means,
stds,
cvs,
disk_days_sa['user_capacity'][: disk_days_attrs.shape[0] - roll_window_size + 1].reshape(-1, 1)