--- /dev/null
+from __future__ import absolute_import
+
+
+class PredictAgent(object):
+
+ measurement = 'predictor'
+
+ def __init__(self, mgr_module, obj_sender, timeout=30):
+ self.data = []
+ self._client = None
+ self._client = obj_sender
+ self._logger = mgr_module.log
+ self._module_inst = mgr_module
+ self._timeout = timeout
+
+ def __nonzero__(self):
+ if not self._module_inst:
+ return False
+ else:
+ return True
+
+ def run(self):
+ result = self._module_inst.get('devices')
+ cluster_id = self._module_inst.get('mon_map').get('fsid')
+ if not result:
+ return -1, '', 'unable to get all devices for prediction'
+ for dev in result.get('devices', []):
+ for location in dev.get('location', []):
+ host = location.get('host')
+ host_domain_id = '{}_{}'.format(cluster_id, host)
+ prediction_data = self._get_cloud_prediction_result(host_domain_id, dev.get('devid'))
+ if prediction_data:
+ self._module_inst.prediction_result[dev.get('devid')] = prediction_data
+
+ def _get_cloud_prediction_result(self, host_domain_id, disk_domain_id):
+ result = {}
+ try:
+ query_info = self._client.query_info(host_domain_id, disk_domain_id, 'sai_disk_prediction')
+ status_code = query_info.status_code
+ if status_code == 200:
+ result = query_info.json()
+ else:
+ resp = query_info.json()
+ if resp.get('error'):
+ self._logger.error(str(resp['error']))
+ except Exception as e:
+ self._logger.error('failed to get %s prediction result %s' % (disk_domain_id, str(e)))
+ return result
"""
from __future__ import absolute_import
+import base64
from datetime import datetime
import errno
import json
from mgr_module import MgrModule
import os
+from string import maketrans
from threading import Event
from .common import DP_MGR_STAT_ENABLED, DP_MGR_STAT_DISABLED
-from .task import MetricsRunner, SmartRunner, TestRunner
+from .task import MetricsRunner, SmartRunner, PredictRunner, TestRunner
TIME_DAYS = 24*60*60
TIME_WEEK = TIME_DAYS * 7
-DP_AGENTS = [MetricsRunner, SmartRunner]
+DP_AGENTS = [MetricsRunner, SmartRunner, PredictRunner]
+CUSTOMER_ALPHABET = "ABCDEFG&HIJKLMN@OQRS.TUV(WXYZabcd)efghijlmn-opqrstu*vwxyz0123=45"
+ORIGIN_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
+
+
+def encode_string(value):
+ transtable = maketrans(ORIGIN_ALPHABET, CUSTOMER_ALPHABET)
+ e = base64.b64encode(value)
+ return e.translate(transtable)[:-1]
+
+
+def decode_string(value):
+ transtable = maketrans(CUSTOMER_ALPHABET, ORIGIN_ALPHABET)
+ e = str(value).translate(transtable) + "="
+ return base64.b64decode(e)
class Module(MgrModule):
{
'name': 'sleep_interval',
'default': str(600),
- },
- {
- 'name': 'predict_interval',
- 'default': str(86400),
}
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.status = {'status': DP_MGR_STAT_DISABLED}
- self.shutdown_event = Event()
+ self._event = Event()
+ self._predict_event = Event()
self._agents = []
self._activated_cloud = False
- self._prediction_result = {}
+ self.prediction_result = {}
self.config = dict()
+ self._run = True
+
+ def config_notify(self):
+ for opt in self.OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_config(opt['name']) or opt['default'])
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+ if not self._activated_cloud and self.get_option('device_failure_prediction_mode') == 'cloud':
+ self._event.set()
+ if self._activated_cloud and self.get_option('device_failure_prediction_mode') != 'cloud':
+ self._event.set()
@property
def config_keys(self):
_agent.event.set()
self.set_config('diskprediction_server', cmd['server'])
self.set_config('diskprediction_user', cmd['user'])
- self.set_config('diskprediction_password', cmd['password'])
+ self.set_config('diskprediction_password', encode_string(cmd['password']))
if cmd.get('port'):
self.set_config('diskprediction_port', cmd['port'])
return 0, 'succeed to config cloud mode connection', ''
def _status(self, cmd):
return 0, json.dumps(self.status), ''
- def _get_cloud_prediction_result(self, host_domain_id, disk_domain_id):
- result = {}
- obj_sender = None
- from .common.grpcclient import GRPcClient, gen_configuration
- conf = gen_configuration(
- host=self.get_configuration('diskprediction_server'),
- user=self.get_configuration('diskprediction_user'),
- password=self.get_configuration('diskprediction_password'),
- port=self.get_configuration('diskprediction_port'),
- cert_context=self.get_configuration('diskprediction_cert_context'),
- mgr_inst=self,
- ssl_target_name=self.get_configuration('diskprediction_ssl_target_name_override'),
- default_authority=self.get_configuration('diskprediction_default_authority'))
- try:
- obj_sender = GRPcClient(conf)
- if obj_sender:
- query_info = obj_sender.query_info(host_domain_id, disk_domain_id, 'sai_disk_prediction')
- status_code = query_info.status_code
- if status_code == 200:
- result = query_info.json()
- else:
- resp = query_info.json()
- if resp.get('error'):
- self.log.error(str(resp['error']))
- finally:
- if obj_sender:
- obj_sender.close()
- return result
+ def _refresh_cloud_prediction_result(self):
+ for _agent in self._agents:
+ if isinstance(_agent, PredictRunner):
+ self._predict_event.clear()
+ _agent.event.set()
+ self._predict_event.wait(300)
+ if self._predict_event.is_set():
+ self._predict_event.clear()
+ break
def predict_life_expectancy(self, devid):
assert devid
- self.refresh_config()
- prediction_data = {}
result = self.get('device {}'.format(devid))
if not result:
return -1, '', 'device {} not found'.format(devid)
dev_info = result.get('device', {})
if not dev_info:
return -1, '', 'device {} not found'.format(devid)
- cluster_id = self.get('mon_map').get('fsid', '')
- for location in dev_info.get('location', []):
- host = location.get('host')
- host_domain_id = '{}_{}'.format(cluster_id, host)
- prediction_data = self._get_cloud_prediction_result(host_domain_id, devid)
- if prediction_data:
- break
+ self._refresh_cloud_prediction_result()
+ prediction_data = self.prediction_result.get(devid)
if not prediction_data:
return -1, '', 'device {} prediction data not ready'.format(devid)
elif prediction_data.get('near_failure', '').lower() == 'good':
def predict_all_devices(self):
if not self._activated_cloud:
return -1, '', 'diskprecition_cloud not ready'
- prediction_data = {}
self.refresh_config()
result = self.get('devices')
- cluster_id = self.get('mon_map').get('fsid')
if not result:
return -1, '', 'unable to get all devices for prediction'
+ self._refresh_cloud_prediction_result()
for dev in result.get('devices', []):
- for location in dev.get('location', []):
- host = location.get('host')
- host_domain_id = '{}_{}'.format(cluster_id, host)
- prediction_data = self._get_cloud_prediction_result(host_domain_id, dev.get('devid'))
- if prediction_data:
- break
+ devid = dev.get('devid')
+ if not devid:
+ continue
+ prediction_data = self.prediction_result.get(devid)
+ if prediction_data:
+ break
if not prediction_data:
return -1, '', 'device {} prediction data not ready'.format(dev.get('devid'))
else:
def serve(self):
self.log.info('Starting diskprediction module')
+ self.config_notify()
self.status = {'status': DP_MGR_STAT_ENABLED}
- while True:
+ while self._run:
self.refresh_config()
mode = self.get_option('device_failure_prediction_mode')
if mode == 'cloud':
self.stop_disk_prediction()
# Check agent hang is?
- restartAgent = False
+ restart_agent = False
try:
for dp_agent in self._agents:
if dp_agent.is_timeout():
self.log.error('agent name: {] timeout'.format(dp_agent.task_name))
- restartAgent = True
+ restart_agent = True
break
except Exception as IOError:
self.log.error('disk prediction plugin faield to started and try to restart')
- restartAgent = True
+ restart_agent = True
- if restartAgent:
+ if restart_agent:
self.stop_disk_prediction()
else:
sleep_interval = int(self.sleep_interval) or 60
- self.shutdown_event.wait(sleep_interval)
- if self.shutdown_event.is_set():
- break
+ self._event.wait(sleep_interval)
+ self._event.clear()
self.stop_disk_prediction()
+ def _agent_call_back(self):
+ self.log.debug('notify refresh devices prediction result')
+ self._predict_event.set()
+
def start_cloud_disk_prediction(self):
assert not self._activated_cloud
for dp_agent in DP_AGENTS:
- obj_agent = dp_agent(self, 300)
+ if dp_agent == PredictRunner:
+ obj_agent = dp_agent(self, 300, self._agent_call_back)
+ else:
+ obj_agent = dp_agent(self, 300)
if obj_agent:
obj_agent.start()
else:
self.log.error('failed to stop disk prediction clould plugin')
def shutdown(self):
- self.shutdown_event.set()
+ self._run = False
+ self._event.set()
super(Module, self).shutdown()
def self_test(self):
- objTest = TestRunner(self)
- objTest.run()
+ obj_test = TestRunner(self)
+ obj_test.run()
self.log.info('self test completed')