]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/ccha: IBM Ceph Call Home Agent ccha
authorJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Wed, 5 Apr 2023 17:11:35 +0000 (19:11 +0200)
committerJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Wed, 17 May 2023 15:04:44 +0000 (17:04 +0200)
- basic functionality for sending/printing reports
- Use environment vars preferentially as module options
- Critical situations are reflected in the Ceph Health status

Signed-off-by: Juan Miguel Olmo Martínez <jolmomar@ibm.com>
src/pybind/mgr/call_home_agent/__init__.py [new file with mode: 0644]
src/pybind/mgr/call_home_agent/config.src [new file with mode: 0644]
src/pybind/mgr/call_home_agent/dataCollectors.py [new file with mode: 0644]
src/pybind/mgr/call_home_agent/module.py [new file with mode: 0644]
src/pybind/mgr/call_home_agent/options.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/call_home_agent/__init__.py b/src/pybind/mgr/call_home_agent/__init__.py
new file mode 100644 (file)
index 0000000..f31cb65
--- /dev/null
@@ -0,0 +1 @@
+from .module import CallHomeAgent
diff --git a/src/pybind/mgr/call_home_agent/config.src b/src/pybind/mgr/call_home_agent/config.src
new file mode 100644 (file)
index 0000000..8fdced8
--- /dev/null
@@ -0,0 +1,47 @@
+import os
+import sys
+
+from cryptography.fernet import Fernet, InvalidToken
+
+defaultKeyFile = ''
+defaultKey = b''
+settings = {
+    'api_key': b'',
+    'private_key': b''
+}
+
+sys.tracebacklimit = 0
+
+def get_settings() -> dict:
+    key = _load_key()
+    try:
+        d = {k: _decrypt(key, v) for k, v in settings.items()}
+        return d
+    except InvalidToken:
+        raise Exception("Settings encrypted with a different token. A new settings dict must be generated with the new token")
+    except Exception as ex:
+        raise Exception("Error getting encrypted settings: %s", (ex))
+
+def generate_settings(source: dict) -> dict:
+    key = _load_key()
+    return {k: _encrypt(key, v) for k, v in source.items()}
+
+def _decrypt(key: bytes, value: bytes) -> bytes:
+    return Fernet(key).decrypt(value)
+
+def _encrypt(key: bytes, value: str) -> bytes:
+    return Fernet(key).encrypt(value.encode())
+
+def _write_key() -> None:
+    if os.path.isfile(defaultKeyFile):
+        raise Exception("Not allowed to generate a new key file when a previous one exists")
+    else:
+        with open(defaultKeyFile, "wb") as key_file:
+            key_file.write(Fernet.generate_key())
+
+def _load_key() -> bytes:
+    if os.path.isfile(defaultKeyFile):
+        with open(defaultKeyFile, 'rb') as f:
+            return f.read()
+    else:
+        return defaultKey
diff --git a/src/pybind/mgr/call_home_agent/dataCollectors.py b/src/pybind/mgr/call_home_agent/dataCollectors.py
new file mode 100644 (file)
index 0000000..32768e3
--- /dev/null
@@ -0,0 +1,66 @@
+from typing import List, Any, Tuple, Dict, Optional
+
+sample_inventory = {
+    "_source": {
+        "id": "c549a36a-b377-11ed-b55e-525400bfa136",
+        "daemons": [
+            {
+                "mon": 3
+            },
+            {
+                "mgr": 2
+            },
+            {
+                "osd": 12,
+                "up": 12,
+                "in": 12
+            },
+            {
+                "rgw": 3
+            },
+            {
+                "alertmanager": 1
+            },
+            {
+                "prometheus": 1
+            },
+            {
+                "ceph-exporter": 3
+            },
+            {
+                "node-      },exporter": 3
+            },
+            {
+                "grafana": 1
+            }
+        ],
+        "health": "HEALTH_WARN",
+        "pools": 12,
+        "pgs": 3,
+        "objects": 200,
+        "capacity_used_Gib": 0.7,
+        "capacity_available_Gib": 15,
+        "alerts": [
+            {
+                "name": "cephadmdaemonfailed",
+                "summary": "A ceph daemon manged by cephadm is down",
+                "description": "A daemon managed by cephadm is no longer active. Determine, which daemon is down with 'ceph health detail'. you may start daemons with the 'ceph orch daemon start <daemon_id>'",
+                "severity": "critical",
+                "state": "active",
+                "started": "7 hours ago"
+            },
+            {
+                "name": "CephHealthWarning",
+                "summary": "Ceph is in the WARNING state",
+                "description": "The cluster state has been HEALTH_WARN for more than 15 minutes. Please check 'ceph health detail' for more information.",
+                "severity": "warning",
+                "state": "active",
+                "started": "7 hours ago"
+            }
+        ]
+    }
+}
+
+
+def inventory() -> Dict[str, Dict[str, Any]]:
+    return sample_inventory
diff --git a/src/pybind/mgr/call_home_agent/module.py b/src/pybind/mgr/call_home_agent/module.py
new file mode 100644 (file)
index 0000000..7e9ee6d
--- /dev/null
@@ -0,0 +1,322 @@
+"""
+IBM Ceph Call Home Agent
+"""
+
+from typing import List, Any, Tuple, Dict, Optional, Set, Callable
+from datetime import datetime
+import json
+import requests
+import asyncio
+import os
+
+from mgr_module import Option, CLIReadCommand, MgrModule, HandleCommandResult, NotifyType
+
+from .config import get_settings
+from .options import CHES_ENDPOINT, INTERVAL_INVENTORY_REPORT_MINUTES, INTERVAL_PERFORMANCE_REPORT_MINUTES
+
+# Module with the functions to be used in telemetry and in CCHA for retrieving information from the ceph cluster
+from .dataCollectors import inventory
+
+reports_header = {
+    "agent": "RedHat_Marina_firmware_agent",
+    "api_key": "",
+    "private_key": "",
+    "target_space": "dev",
+    "asset": "ceph",
+    "asset_id": "",
+    "asset_type": "RedHatMarine",
+    "asset_vendor": "IBM",
+    "asset_virtual_id": "",
+    "country_code": "",
+    "event_id": "",
+    "event_time": "",
+    "event_time_ms": 0,
+    "local_event_time": "",
+    "software_level": {
+        "name": "ceph_software",
+        "vrmf": "8.3.0.1"
+    },
+    "type": "eccnext_apisv1s",
+    "version": "1.0.0.1",
+    "analytics_event_source_type": "asset_event",
+    "analytics_type": "ceph",
+    "analytics_instance": "",
+    "analytics_virtual_id": "",
+    "analytics_group": "Storage",
+    "analytics_category": "RedHatMarine",
+    "events": []
+}
+
+# TODO: report functions providing the json paylod should be imported from the data collectors module
+
+
+def report_inventory() -> str:
+    """
+    Produce the inventory report
+    """
+    dt = datetime.timestamp(datetime.now())
+    # Set timestamps
+    reports_header['event_time'] = datetime.fromtimestamp(dt).strftime("%Y-%m-%d %H:%M:%S")
+    reports_header['event_time_ms'] = str(int(dt))
+    reports_header['local_event_time'] = datetime.fromtimestamp(dt).strftime("%a %b %d %H:%M:%S %Z")
+
+    # Set event id
+    reports_header['event_id'] = "IBM_chc_event_RedHatMarine_ceph_{}_daily_report_{}".format(
+                                            reports_header['asset_virtual_id'],
+                                            reports_header['event_time_ms'])
+
+    # return json.dumps({**reports_header, **inventory()}) <---- TODO ADD INVENTORY (from datacollector) as event
+    return json.dumps(reports_header)
+
+
+def report_performance() -> str:
+    """
+    TODO: temporally uses inventory report
+    """
+    return report_inventory()
+
+
+class Report:
+    def __init__(self, name: str, fn: Callable[[], str], url: str, minutes_interval: int):
+        self.name = name                       # name of the report
+        self.fn = fn                           # function used to retrieve the data
+        self.url = url                         # url to send the report
+        self.interval = minutes_interval * 60  # interval to send the report (seconds)
+
+    def print(self) -> str:
+        if reports_header['private_key'] and reports_header['api_key']:
+            return self.fn()
+        else:
+            raise Exception('Not able to print <%s> report. Identification keys are not available' % self.name)
+
+    def send(self) -> str:
+        fail_reason = ""
+        resp = None
+        if reports_header['private_key'] and reports_header['api_key']:
+            try:
+                resp = requests.post(url=self.url,
+                                     headers={'accept': 'application/json', 'content-type': 'application/json'},
+                                     data=self.fn())
+                resp.raise_for_status()
+            except Exception as e:
+                explanation = resp.content if resp else ''
+                fail_reason = 'Failed to send <%s> to <%s>: %s \n%s' % (self.name, self.url, str(e), explanation)
+        else:
+            fail_reason = 'Not able to send <%s> report. Identification keys are not available' % self.name
+        return fail_reason
+
+
+class CallHomeAgent(MgrModule):
+    MODULE_OPTIONS: List[Option] = [
+        Option(
+            name='ches_endpoint',
+            type='str',
+            default=CHES_ENDPOINT,
+            desc='Call Home Event streamer end point'
+        ),
+        Option(
+            name='interval_inventory_report_minutes',
+            type='int',
+            default=INTERVAL_INVENTORY_REPORT_MINUTES,
+            desc='Time frequency for the inventory report'
+        ),
+        Option(
+            name='interval_performance_report_minutes',
+            type='int',
+            default=INTERVAL_PERFORMANCE_REPORT_MINUTES,
+            desc='Time frequency for the performance report'
+        ),
+        Option(
+            name='customer_email',
+            type='str',
+            default='',
+            desc='Customer contact email'
+        ),
+        Option(
+            name='country_code',
+            type='str',
+            default='',
+            desc='Customer country code'
+        )
+    ]
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super(CallHomeAgent, self).__init__(*args, **kwargs)
+
+        # set up some members to enable the serve() method and shutdown()
+        self.run = True
+
+        # Init module options
+        # Env vars (if they exist) have preference over module options
+        self.ches_url = str(os.environ.get('CHA_CHES_ENDPOINT', self.get_module_option('ches_endpoint')))
+        self.interval_performance_minutes = int(
+            os.environ.get('CHA_INTERVAL_INVENTORY_REPORT_MINUTES',
+                           self.get_module_option('interval_inventory_report_minutes')))  # type: ignore
+        self.interval_inventory_minutes = int(
+            os.environ.get('CHA_INTERVAL_PERFORMANCE_REPORT_MINUTES',
+                           self.get_module_option('interval_performance_report_minutes')))  # type: ignore
+        self.customer_email = os.environ.get('CHA_CUSTOMER_EMAIL', self.get_module_option('customer_email'))
+        self.country_code = os.environ.get('CHA_COUNTRY_CODE', self.get_module_option('country_code'))
+        # Health checks
+        self.health_checks: Dict[str, Dict[str, Any]] = dict()
+
+        # configure common report headers
+        self._configure_headers()
+
+        # Prepare reports
+        self.reports = {'inventory': Report('inventory', report_inventory,
+                                            self.ches_url,
+                                            self.interval_performance_minutes),
+                        'performance': Report('performance', report_performance,
+                                              self.ches_url,
+                                              self.interval_inventory_minutes)
+                        }
+
+    async def report_task(self, rpt: Report) -> None:
+        """
+            Coroutine for sending the report passed as parameter
+        """
+        self.log.info('Launched task for <%s> report each %s seconds)' % (rpt.name, rpt.interval))
+        while self.run:
+            try:
+                self.log.info('Sending <%s> report to <%s>' % (rpt.name, rpt.url))
+                send_error = rpt.send()
+            except Exception as ex:
+                send_error = str(ex)
+
+            if send_error:
+                self.log.error(send_error)
+                self.health_checks.update({
+                    'CCHA_ERROR_SENDING_REPORT': {
+                        'severity': 'error',
+                        'summary': 'Ceph Call Home Agent manager module: error sending <%s> report to IBM Storage '
+                                   'Insights systems' % rpt.name,
+                        'detail': [send_error]
+                    }
+                })
+            else:
+                self.health_checks = {}
+                self.log.info('Successfully sent <%s> report to <%s>' % (rpt.name, rpt.url))
+
+            self.set_health_checks(self.health_checks)
+            await asyncio.sleep(rpt.interval)
+
+    def launch_coroutines(self) -> None:
+        """
+         Launch module coroutines (reports or any other async task)
+        """
+        loop = asyncio.new_event_loop()  # type: ignore
+        try:
+            for rptName, rpt in self.reports.items():
+                t = loop.create_task(self.report_task(rpt))
+            loop.run_forever()
+        except Exception as ex:
+            self.log.error(ex)
+
+    def serve(self) -> None:
+        """
+            TODO:
+            - Register instance in CHES???: TODO
+            - Launch ccha web server: TODO
+            - Launch coroutines report tasks
+        """
+        self.log.info('Starting IBM Ceph Call Home Agent')
+
+        # Launch coroutines for the reports
+        self.launch_coroutines()
+
+        self.log.info('IBM Call home agent finished')
+
+    def shutdown(self) -> None:
+        """
+        This method is called by the mgr when the module needs to shut
+        down (i.e., when the serve() function needs to exit).
+        """
+        self.log.info('Stopping IBM Ceph Call Home Agent')
+        self.run = False
+
+    def notify(self, notify_type: NotifyType, tag: str) -> None:
+        """
+            TODO:
+            Way to detect changes in
+            osd_map, mon_map, fs_map, mon_status, health, pg_summary, command, service_map
+            Generate an "inventory change report" and send to CHES changes endpoint
+        """
+        pass
+
+    def config_notify(self) -> None:
+        """
+        This method is called whenever one of our config options is changed.
+        """
+        # This is some boilerplate that stores MODULE_OPTIONS in a class
+        # member, so that, for instance, the 'emphatic' option is always
+        # available as 'self.emphatic'.
+        for opt in self.MODULE_OPTIONS:
+            pass
+
+    def _configure_headers(self) -> None:
+        id_data = {'private_key': b'', 'api_key': b''}
+        try:
+            id_data = get_settings()
+            self.health_checks = {}
+        except Exception as ex:
+            self.log.error('Error getting encrypted identification keys: %s. '
+                           'Provide keys and restart Ceph Call Home module', ex)
+            self.health_checks.update({
+                'CCHA_ID_KEYS_NOT_AVAILABLE': {
+                    'severity': 'error',
+                    'summary': 'Ceph Call Home Agent manager module: The private identification keys needed to connect '
+                               'with storage insights system are not available',
+                    'detail': ['Provide the right keys and restart the Ceph Call Home manager module']
+                }
+            })
+        self.set_health_checks(self.health_checks)
+
+        cluster_config = self.get('config')
+        reports_header['private_key'] = id_data['private_key'].decode('utf-8')  # type: ignore
+        reports_header['api_key'] = id_data['api_key'].decode('utf-8')  # type: ignore
+        reports_header['asset_id'] = cluster_config['fsid']
+        reports_header['asset_virtual_id'] =  cluster_config['fsid']
+        reports_header['country_code'] = self.get_module_option('country_code')
+        reports_header['analytics_instance'] = cluster_config['fsid']
+        reports_header['analytics_virtual_id'] = cluster_config['fsid']
+
+    def alert_worker(self, alerts: str) -> None:
+        """
+            TODO:
+            send the alerts to the ches alert endpoint
+        """
+
+    @CLIReadCommand('callhome print report')
+    def print_report_cmd(self, report_name: str) -> Tuple[int, str, str]:
+        """
+            Prints the report requested.
+            Example:
+                ceph callhome print report inventory
+            Available reports: inventory
+        """
+        return HandleCommandResult(stdout=f'report:, {self.reports[report_name].print()}')
+
+    @CLIReadCommand('callhome send report')
+    def send_report_cmd(self, report_name: str) -> Tuple[int, str, str]:
+        """
+            Command for sending the report requested.
+        """
+        send_error = self.reports[report_name].send()
+        if send_error:
+            return HandleCommandResult(stdout=send_error)
+        else:
+            return HandleCommandResult(stdout=f'{report_name} report successfully sent')
+
+    # Temporal Data collectors (they will be imported from a manager scope module library)
+    def ccha_web_server(self) -> None:
+        """
+            TODO:
+            - configure https web server
+            - Initialize route endpoints.
+                In first phase we will have only the alert receiver for processing alert manager notifications
+                - https://<mgr_host>:<port>/ccha_receiver
+                  it calls ccha.alertWorker
+            - start web server
+        """
+
diff --git a/src/pybind/mgr/call_home_agent/options.py b/src/pybind/mgr/call_home_agent/options.py
new file mode 100644 (file)
index 0000000..24d2183
--- /dev/null
@@ -0,0 +1,5 @@
+# Configurable options for IBM Ceph Call Home Agent
+
+CHES_ENDPOINT = "https://stg-edge-cdt.eventsgslb.ibm.com/connect/api/v1"
+INTERVAL_INVENTORY_REPORT_MINUTES = 1440
+INTERVAL_PERFORMANCE_REPORT_MINUTES = 5