]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: Diskprediction_cloud action when config changes 25165/head
authorhsiang41 <rick.chen@prophetstor.com>
Tue, 20 Nov 2018 02:39:21 +0000 (10:39 +0800)
committerhsiang41 <rick.chen@prophetstor.com>
Fri, 23 Nov 2018 09:28:50 +0000 (17:28 +0800)
Trigger diskprediction cloud action when receive notify configuration changed.

Signed-off-by: Rick Chen <rick.chen@prophetstor.com>
src/pybind/mgr/devicehealth/module.py
src/pybind/mgr/diskprediction_cloud/agent/metrics/ceph_mon_osd.py
src/pybind/mgr/diskprediction_cloud/agent/metrics/db_relay.py
src/pybind/mgr/diskprediction_cloud/agent/predictor.py [new file with mode: 0644]
src/pybind/mgr/diskprediction_cloud/common/__init__.py
src/pybind/mgr/diskprediction_cloud/common/grpcclient.py
src/pybind/mgr/diskprediction_cloud/module.py
src/pybind/mgr/diskprediction_cloud/task.py
src/pybind/mgr/diskprediction_local/module.py

index ad16ebff9fa85223c6f338a20ceee5583186bebd..5c85b878a093fbfee20bcc22b6ee14b25fac6170 100644 (file)
@@ -585,8 +585,9 @@ class Module(MgrModule):
         else:
             return -1, '', 'unable to enable any disk prediction model[local/cloud]'
         try:
-            if self.remote(plugin_name, 'can_run'):
-                return self.remote(plugin_name, 'predict_life_expentancy', devid=devid)
+            can_run, _ = self.remote(plugin_name, 'can_run')
+            if can_run:
+                return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
         except:
             return -1, '', 'unable to invoke diskprediction local or remote plugin'
 
@@ -600,7 +601,8 @@ class Module(MgrModule):
         else:
             return -1, '', 'unable to enable any disk prediction model[local/cloud]'
         try:
-            if self.remote(plugin_name, 'can_run'):
+            can_run, _ = self.remote(plugin_name, 'can_run')
+            if can_run:
                 return self.remote(plugin_name, 'predict_all_devices')
         except:
             return -1, '', 'unable to invoke diskprediction local or remote plugin'
index 138618695ec0a814377cc632d59c87d7cf235a50..c8d9ae92f70ce44de4843bbbf48f07f8c451ef4d 100644 (file)
@@ -117,8 +117,8 @@ class CephMonOsdAgent(MetricsAgent):
                 e_osd.fields['name'] = n\r
                 e_osd.tags['cluster_id'] = cluster_id\r
                 e_osd.fields['agenthost'] = socket.gethostname()\r
-                e_osd.tags['agenthost_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname)\r
-                e_osd.tags['host_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname)\r
+                e_osd.tags['agenthost_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname())\r
+                e_osd.tags['host_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname())\r
                 for k in n_value.keys():\r
                     e_osd.fields[k] = str(n_value[k])\r
                 self.data.append(e_osd)\r
@@ -131,8 +131,8 @@ class CephMonOsdAgent(MetricsAgent):
                 n_node = CephOsdTree()\r
                 n_node.tags['cluster_id'] = cluster_id\r
                 n_node.fields['agenthost'] = socket.gethostname()\r
-                n_node.tags['agenthost_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname)\r
-                n_node.tags['host_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname)\r
+                n_node.tags['agenthost_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname())\r
+                n_node.tags['host_domain_id'] = '%s_%s' % (cluster_id, socket.gethostname())\r
                 n_node.fields['children'] = ','.join(str(x) for x in node.get('children', []))\r
                 n_node.fields['type_id'] = str(node.get('type_id', ''))\r
                 n_node.fields['id'] = str(node.get('id', ''))\r
index ed4dad35e34e96666e393e4377fafc1e75ccba47..4a92a079a01f58d0629bd22b2236085f3d96edc4 100644 (file)
@@ -574,7 +574,7 @@ class DBRelayAgent(MetricsAgent):
         cluster_host = socket.gethostname()\r
         for d_name, d_node in self._dev_nodes.items():\r
             keys = {'data_osd': 'DataDiskOfOSD',\r
-                    'fs_journal_osd': 'FsJounalDiskOfOSD',\r
+                    'fs_journal_osd': 'FsJournalDiskOfOSD',\r
                     'bs_db_osd': 'BsDBDiskOfOSD',\r
                     'bs_wal_osd': 'BsWalDiskOfOSD'}\r
             for k, v in keys.items():\r
diff --git a/src/pybind/mgr/diskprediction_cloud/agent/predictor.py b/src/pybind/mgr/diskprediction_cloud/agent/predictor.py
new file mode 100644 (file)
index 0000000..1fdea46
--- /dev/null
@@ -0,0 +1,48 @@
+from __future__ import absolute_import
+
+
+class PredictAgent(object):
+
+    measurement = 'predictor'
+
+    def __init__(self, mgr_module, obj_sender, timeout=30):
+        self.data = []
+        self._client = None
+        self._client = obj_sender
+        self._logger = mgr_module.log
+        self._module_inst = mgr_module
+        self._timeout = timeout
+
+    def __nonzero__(self):
+        if not self._module_inst:
+            return False
+        else:
+            return True
+
+    def run(self):
+        result = self._module_inst.get('devices')
+        cluster_id = self._module_inst.get('mon_map').get('fsid')
+        if not result:
+            return -1, '', 'unable to get all devices for prediction'
+        for dev in result.get('devices', []):
+            for location in dev.get('location', []):
+                host = location.get('host')
+                host_domain_id = '{}_{}'.format(cluster_id, host)
+                prediction_data = self._get_cloud_prediction_result(host_domain_id, dev.get('devid'))
+                if prediction_data:
+                    self._module_inst.prediction_result[dev.get('devid')] = prediction_data
+
+    def _get_cloud_prediction_result(self, host_domain_id, disk_domain_id):
+        result = {}
+        try:
+            query_info = self._client.query_info(host_domain_id, disk_domain_id, 'sai_disk_prediction')
+            status_code = query_info.status_code
+            if status_code == 200:
+                result = query_info.json()
+            else:
+                resp = query_info.json()
+                if resp.get('error'):
+                    self._logger.error(str(resp['error']))
+        except Exception as e:
+            self._logger.error('failed to get %s prediction result %s' % (disk_domain_id, str(e)))
+        return result
index cbc3c30ebf68795231427f4455990ded725e96fc..ce5131b8f2b906a749717e7b6be1b41930d6596d 100644 (file)
@@ -1,7 +1,6 @@
 from __future__ import absolute_import\r
 import errno\r
 from functools import wraps\r
-from six.moves.http_client import BAD_REQUEST\r
 import os\r
 import signal\r
 \r
@@ -17,11 +16,14 @@ class DummyResonse:
     def __init__(self):\r
         self.resp_json = dict()\r
         self.content = 'DummyResponse'\r
-        self.status_code = BAD_REQUEST\r
+        self.status_code = 404\r
 \r
     def json(self):\r
         return self.resp_json\r
 \r
+    def __str__(self):\r
+        return '{}'.format({'resp': self.resp_json, 'content': self.content, 'status_code': self.status_code})\r
+\r
 \r
 class TimeoutError(Exception):\r
     pass\r
index 9a9f0a12ba8063fdc03daf8119edf1241d023a01..5a1d5e7e2afdb51a7cf53109dafe094bec03072e 100644 (file)
@@ -22,7 +22,7 @@ def gen_configuration(**kwargs):
     return configuration
 
 
-class GRPcClient:
+class GRPcClient(object):
 
     def __init__(self, configuration):
         self.auth = None
@@ -71,6 +71,7 @@ class GRPcClient:
         try:
             stub_accout = client_pb2_grpc.AccountStub(self._client)
             result = stub_accout.AccountHeartbeat(client_pb2.Empty())
+            self._logger.debug('text connection result: {}'.format(str(result)))
             if result and "is alive" in str(result.message):
                 return True
             else:
@@ -225,14 +226,17 @@ class GRPcClient:
                                 for name, value in zip(columns, item):
                                     # process prediction data
                                     resp.resp_json[name] = value
+                self._logger.debug("query {}:{} result:{}".format(host_domain_id, disk_domain_id, resp))
                 return resp
             else:
                 resp.status_code = 400
                 resp.content = ''
                 resp.resp_json = {'error': ';'.join(str(predicted).split('\n\t'))}
+                self._logger.debug("query {}:{} result:{}".format(host_domain_id, disk_domain_id, resp))
                 return resp
         except Exception as e:
             resp.status_code = 400
             resp.content = ';'.join(str(e).split('\n\t'))
             resp.resp_json = {'error': resp.content}
+            self._logger.debug("query {}:{} result:{}".format(host_domain_id, disk_domain_id, resp))
             return resp
index 204184ba1a1223cca3af1fa070b74a7910c2cabf..885243ab75a7dfaa524141ce74a2d6a881d54587 100644 (file)
@@ -3,19 +3,35 @@ diskprediction with cloud predictor
 """
 from __future__ import absolute_import
 
+import base64
 from datetime import datetime
 import errno
 import json
 from mgr_module import MgrModule
 import os
+from string import maketrans
 from threading import Event
 
 from .common import DP_MGR_STAT_ENABLED, DP_MGR_STAT_DISABLED
-from .task import MetricsRunner, SmartRunner, TestRunner
+from .task import MetricsRunner, SmartRunner, PredictRunner, TestRunner
 
 TIME_DAYS = 24*60*60
 TIME_WEEK = TIME_DAYS * 7
-DP_AGENTS = [MetricsRunner, SmartRunner]
+DP_AGENTS = [MetricsRunner, SmartRunner, PredictRunner]
+CUSTOMER_ALPHABET = "ABCDEFG&HIJKLMN@OQRS.TUV(WXYZabcd)efghijlmn-opqrstu*vwxyz0123=45"
+ORIGIN_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
+
+
+def encode_string(value):
+    transtable = maketrans(ORIGIN_ALPHABET, CUSTOMER_ALPHABET)
+    e = base64.b64encode(value)
+    return e.translate(transtable)[:-1]
+
+
+def decode_string(value):
+    transtable = maketrans(CUSTOMER_ALPHABET, ORIGIN_ALPHABET)
+    e = str(value).translate(transtable) + "="
+    return base64.b64decode(e)
 
 
 class Module(MgrModule):
@@ -64,10 +80,6 @@ class Module(MgrModule):
         {
             'name': 'sleep_interval',
             'default': str(600),
-        },
-        {
-            'name': 'predict_interval',
-            'default': str(86400),
         }
     ]
 
@@ -107,11 +119,24 @@ class Module(MgrModule):
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
         self.status = {'status': DP_MGR_STAT_DISABLED}
-        self.shutdown_event = Event()
+        self._event = Event()
+        self._predict_event = Event()
         self._agents = []
         self._activated_cloud = False
-        self._prediction_result = {}
+        self.prediction_result = {}
         self.config = dict()
+        self._run = True
+
+    def config_notify(self):
+        for opt in self.OPTIONS:
+            setattr(self,
+                    opt['name'],
+                    self.get_config(opt['name']) or opt['default'])
+            self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+        if not self._activated_cloud and self.get_option('device_failure_prediction_mode') == 'cloud':
+            self._event.set()
+        if self._activated_cloud and self.get_option('device_failure_prediction_mode') != 'cloud':
+            self._event.set()
 
     @property
     def config_keys(self):
@@ -183,7 +208,7 @@ class Module(MgrModule):
                 _agent.event.set()
             self.set_config('diskprediction_server', cmd['server'])
             self.set_config('diskprediction_user', cmd['user'])
-            self.set_config('diskprediction_password', cmd['password'])
+            self.set_config('diskprediction_password', encode_string(cmd['password']))
             if cmd.get('port'):
                 self.set_config('diskprediction_port', cmd['port'])
             return 0, 'succeed to config cloud mode connection', ''
@@ -216,52 +241,26 @@ class Module(MgrModule):
     def _status(self,  cmd):
         return 0, json.dumps(self.status), ''
 
-    def _get_cloud_prediction_result(self, host_domain_id, disk_domain_id):
-        result = {}
-        obj_sender = None
-        from .common.grpcclient import GRPcClient, gen_configuration
-        conf = gen_configuration(
-            host=self.get_configuration('diskprediction_server'),
-            user=self.get_configuration('diskprediction_user'),
-            password=self.get_configuration('diskprediction_password'),
-            port=self.get_configuration('diskprediction_port'),
-            cert_context=self.get_configuration('diskprediction_cert_context'),
-            mgr_inst=self,
-            ssl_target_name=self.get_configuration('diskprediction_ssl_target_name_override'),
-            default_authority=self.get_configuration('diskprediction_default_authority'))
-        try:
-            obj_sender = GRPcClient(conf)
-            if obj_sender:
-                query_info = obj_sender.query_info(host_domain_id, disk_domain_id, 'sai_disk_prediction')
-                status_code = query_info.status_code
-                if status_code == 200:
-                    result = query_info.json()
-                else:
-                    resp = query_info.json()
-                    if resp.get('error'):
-                        self.log.error(str(resp['error']))
-        finally:
-            if obj_sender:
-                obj_sender.close()
-        return result
+    def _refresh_cloud_prediction_result(self):
+        for _agent in self._agents:
+            if isinstance(_agent, PredictRunner):
+                self._predict_event.clear()
+                _agent.event.set()
+                self._predict_event.wait(300)
+                if self._predict_event.is_set():
+                    self._predict_event.clear()
+                break
 
     def predict_life_expectancy(self, devid):
         assert devid
-        self.refresh_config()
-        prediction_data = {}
         result = self.get('device {}'.format(devid))
         if not result:
             return -1, '', 'device {} not found'.format(devid)
         dev_info = result.get('device', {})
         if not dev_info:
             return -1, '', 'device {} not found'.format(devid)
-        cluster_id = self.get('mon_map').get('fsid', '')
-        for location in dev_info.get('location', []):
-            host = location.get('host')
-            host_domain_id = '{}_{}'.format(cluster_id, host)
-            prediction_data = self._get_cloud_prediction_result(host_domain_id, devid)
-            if prediction_data:
-                break
+        self._refresh_cloud_prediction_result()
+        prediction_data = self.prediction_result.get(devid)
         if not prediction_data:
             return -1, '', 'device {} prediction data not ready'.format(devid)
         elif prediction_data.get('near_failure', '').lower() == 'good':
@@ -322,19 +321,18 @@ class Module(MgrModule):
     def predict_all_devices(self):
         if not self._activated_cloud:
             return -1, '', 'diskprecition_cloud not ready'
-        prediction_data = {}
         self.refresh_config()
         result = self.get('devices')
-        cluster_id = self.get('mon_map').get('fsid')
         if not result:
             return -1, '', 'unable to get all devices for prediction'
+        self._refresh_cloud_prediction_result()
         for dev in result.get('devices', []):
-            for location in dev.get('location', []):
-                host = location.get('host')
-                host_domain_id = '{}_{}'.format(cluster_id, host)
-                prediction_data = self._get_cloud_prediction_result(host_domain_id, dev.get('devid'))
-                if prediction_data:
-                    break
+            devid = dev.get('devid')
+            if not devid:
+                continue
+            prediction_data = self.prediction_result.get(devid)
+            if prediction_data:
+                break
             if not prediction_data:
                 return -1, '', 'device {} prediction data not ready'.format(dev.get('devid'))
             else:
@@ -367,9 +365,10 @@ class Module(MgrModule):
 
     def serve(self):
         self.log.info('Starting diskprediction module')
+        self.config_notify()
         self.status = {'status': DP_MGR_STAT_ENABLED}
 
-        while True:
+        while self._run:
             self.refresh_config()
             mode = self.get_option('device_failure_prediction_mode')
             if mode == 'cloud':
@@ -380,30 +379,36 @@ class Module(MgrModule):
                     self.stop_disk_prediction()
 
             # Check agent hang is?
-            restartAgent = False
+            restart_agent = False
             try:
                 for dp_agent in self._agents:
                     if dp_agent.is_timeout():
                         self.log.error('agent name: {] timeout'.format(dp_agent.task_name))
-                        restartAgent = True
+                        restart_agent = True
                         break
             except Exception as IOError:
                 self.log.error('disk prediction plugin faield to started and try to restart')
-                restartAgent = True
+                restart_agent = True
 
-            if restartAgent:
+            if restart_agent:
                 self.stop_disk_prediction()
             else:
                 sleep_interval = int(self.sleep_interval) or 60
-                self.shutdown_event.wait(sleep_interval)
-                if self.shutdown_event.is_set():
-                    break
+                self._event.wait(sleep_interval)
+                self._event.clear()
         self.stop_disk_prediction()
 
+    def _agent_call_back(self):
+        self.log.debug('notify refresh devices prediction result')
+        self._predict_event.set()
+
     def start_cloud_disk_prediction(self):
         assert not self._activated_cloud
         for dp_agent in DP_AGENTS:
-            obj_agent = dp_agent(self, 300)
+            if dp_agent == PredictRunner:
+                obj_agent = dp_agent(self, 300, self._agent_call_back)
+            else:
+                obj_agent = dp_agent(self, 300)
             if obj_agent:
                 obj_agent.start()
             else:
@@ -428,10 +433,11 @@ class Module(MgrModule):
             self.log.error('failed to stop disk prediction clould plugin')
 
     def shutdown(self):
-        self.shutdown_event.set()
+        self._run = False
+        self._event.set()
         super(Module, self).shutdown()
 
     def self_test(self):
-        objTest = TestRunner(self)
-        objTest.run()
+        obj_test = TestRunner(self)
+        obj_test.run()
         self.log.info('self test completed')
index d03450ed614756c159b71388196f44ba4bf1d5af..6ed04e600394a176156634e83cf972f264fac8e2 100644 (file)
@@ -3,6 +3,7 @@ from __future__ import absolute_import
 import time\r
 from threading import Event, Thread\r
 \r
+from .agent.predictor import PredictAgent\r
 from .agent.metrics.ceph_cluster import CephClusterAgent\r
 from .agent.metrics.ceph_mon_osd import CephMonOsdAgent\r
 from .agent.metrics.ceph_pool import CephPoolAgent\r
@@ -21,7 +22,7 @@ class AgentRunner(Thread):
     interval_key = ''\r
     agents = []\r
 \r
-    def __init__(self, mgr_module, agent_timeout=60):\r
+    def __init__(self, mgr_module, agent_timeout=60, call_back=None):\r
         """\r
 \r
         :param mgr_module: parent ceph mgr module\r
@@ -33,7 +34,7 @@ class AgentRunner(Thread):
         self._log = mgr_module.log\r
         self._start_time = time.time()\r
         self._th = None\r
-\r
+        self._call_back = call_back\r
         self.exit = False\r
         self.event = Event()\r
         self.task_interval = \\r
@@ -51,6 +52,8 @@ class AgentRunner(Thread):
             % (self.task_name, self.task_interval))\r
         while not self.exit:\r
             self.run_agents()\r
+            if self._call_back:\r
+                self._call_back()\r
             if self.event:\r
                 self.event.wait(int(self.task_interval))\r
                 self.event.clear()\r
@@ -147,6 +150,13 @@ class MetricsRunner(AgentRunner):
               SAIAgent]\r
 \r
 \r
+class PredictRunner(AgentRunner):\r
+\r
+    task_name = 'Predictor Agent'\r
+    interval_key = 'diskprediction_retrieve_prediction_interval'\r
+    agents = [PredictAgent]\r
+\r
+\r
 class SmartRunner(AgentRunner):\r
 \r
     task_name = 'Smart data Agent'\r
index 8662e2f75fbdbdcb7e472539cc6af413ef5c9458..98a014209f01e03653fec4b21273526acd556d80 100644 (file)
@@ -171,7 +171,7 @@ class Module(MgrModule):
             predicted_result = obj_predictor.predict(predict_datas)
         return predicted_result
 
-    def predict_life_expentancy(self, devid):
+    def predict_life_expectancy(self, devid):
         result = self._predict_life_expentancy(devid)
         if result.lower() == 'good':
             return 0, '>6w', ''