From d3f5f1f707024fbe613c6cd46f1b79037ef7f700 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Juan=20Miguel=20Olmo=20Mart=C3=ADnez?= Date: Wed, 5 Apr 2023 19:11:35 +0200 Subject: [PATCH] mgr/ccha: IBM Ceph Call Home Agent MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit - basic functionality for sending/printing reports - Use environment vars preferentially as module options - Critical situations are reflected in the Ceph Health status Signed-off-by: Juan Miguel Olmo Martínez --- src/pybind/mgr/call_home_agent/__init__.py | 1 + src/pybind/mgr/call_home_agent/config.src | 47 +++ .../mgr/call_home_agent/dataCollectors.py | 66 ++++ src/pybind/mgr/call_home_agent/module.py | 322 ++++++++++++++++++ src/pybind/mgr/call_home_agent/options.py | 5 + 5 files changed, 441 insertions(+) create mode 100644 src/pybind/mgr/call_home_agent/__init__.py create mode 100644 src/pybind/mgr/call_home_agent/config.src create mode 100644 src/pybind/mgr/call_home_agent/dataCollectors.py create mode 100644 src/pybind/mgr/call_home_agent/module.py create mode 100644 src/pybind/mgr/call_home_agent/options.py diff --git a/src/pybind/mgr/call_home_agent/__init__.py b/src/pybind/mgr/call_home_agent/__init__.py new file mode 100644 index 0000000000000..f31cb652573a1 --- /dev/null +++ b/src/pybind/mgr/call_home_agent/__init__.py @@ -0,0 +1 @@ +from .module import CallHomeAgent diff --git a/src/pybind/mgr/call_home_agent/config.src b/src/pybind/mgr/call_home_agent/config.src new file mode 100644 index 0000000000000..8fdced89c0ad5 --- /dev/null +++ b/src/pybind/mgr/call_home_agent/config.src @@ -0,0 +1,47 @@ +import os +import sys + +from cryptography.fernet import Fernet, InvalidToken + +defaultKeyFile = '' +defaultKey = b'' +settings = { + 'api_key': b'', + 'private_key': b'' +} + +sys.tracebacklimit = 0 + +def get_settings() -> dict: + key = _load_key() + try: + d = {k: _decrypt(key, v) for k, v in settings.items()} + return d + except InvalidToken: + raise Exception("Settings encrypted with a different token. A new settings dict must be generated with the new token") + except Exception as ex: + raise Exception("Error getting encrypted settings: %s", (ex)) + +def generate_settings(source: dict) -> dict: + key = _load_key() + return {k: _encrypt(key, v) for k, v in source.items()} + +def _decrypt(key: bytes, value: bytes) -> bytes: + return Fernet(key).decrypt(value) + +def _encrypt(key: bytes, value: str) -> bytes: + return Fernet(key).encrypt(value.encode()) + +def _write_key() -> None: + if os.path.isfile(defaultKeyFile): + raise Exception("Not allowed to generate a new key file when a previous one exists") + else: + with open(defaultKeyFile, "wb") as key_file: + key_file.write(Fernet.generate_key()) + +def _load_key() -> bytes: + if os.path.isfile(defaultKeyFile): + with open(defaultKeyFile, 'rb') as f: + return f.read() + else: + return defaultKey diff --git a/src/pybind/mgr/call_home_agent/dataCollectors.py b/src/pybind/mgr/call_home_agent/dataCollectors.py new file mode 100644 index 0000000000000..32768e3b62eb2 --- /dev/null +++ b/src/pybind/mgr/call_home_agent/dataCollectors.py @@ -0,0 +1,66 @@ +from typing import List, Any, Tuple, Dict, Optional + +sample_inventory = { + "_source": { + "id": "c549a36a-b377-11ed-b55e-525400bfa136", + "daemons": [ + { + "mon": 3 + }, + { + "mgr": 2 + }, + { + "osd": 12, + "up": 12, + "in": 12 + }, + { + "rgw": 3 + }, + { + "alertmanager": 1 + }, + { + "prometheus": 1 + }, + { + "ceph-exporter": 3 + }, + { + "node- },exporter": 3 + }, + { + "grafana": 1 + } + ], + "health": "HEALTH_WARN", + "pools": 12, + "pgs": 3, + "objects": 200, + "capacity_used_Gib": 0.7, + "capacity_available_Gib": 15, + "alerts": [ + { + "name": "cephadmdaemonfailed", + "summary": "A ceph daemon manged by cephadm is down", + "description": "A daemon managed by cephadm is no longer active. Determine, which daemon is down with 'ceph health detail'. you may start daemons with the 'ceph orch daemon start '", + "severity": "critical", + "state": "active", + "started": "7 hours ago" + }, + { + "name": "CephHealthWarning", + "summary": "Ceph is in the WARNING state", + "description": "The cluster state has been HEALTH_WARN for more than 15 minutes. Please check 'ceph health detail' for more information.", + "severity": "warning", + "state": "active", + "started": "7 hours ago" + } + ] + } +} + + +def inventory() -> Dict[str, Dict[str, Any]]: + return sample_inventory diff --git a/src/pybind/mgr/call_home_agent/module.py b/src/pybind/mgr/call_home_agent/module.py new file mode 100644 index 0000000000000..7e9ee6d706da4 --- /dev/null +++ b/src/pybind/mgr/call_home_agent/module.py @@ -0,0 +1,322 @@ +""" +IBM Ceph Call Home Agent +""" + +from typing import List, Any, Tuple, Dict, Optional, Set, Callable +from datetime import datetime +import json +import requests +import asyncio +import os + +from mgr_module import Option, CLIReadCommand, MgrModule, HandleCommandResult, NotifyType + +from .config import get_settings +from .options import CHES_ENDPOINT, INTERVAL_INVENTORY_REPORT_MINUTES, INTERVAL_PERFORMANCE_REPORT_MINUTES + +# Module with the functions to be used in telemetry and in CCHA for retrieving information from the ceph cluster +from .dataCollectors import inventory + +reports_header = { + "agent": "RedHat_Marina_firmware_agent", + "api_key": "", + "private_key": "", + "target_space": "dev", + "asset": "ceph", + "asset_id": "", + "asset_type": "RedHatMarine", + "asset_vendor": "IBM", + "asset_virtual_id": "", + "country_code": "", + "event_id": "", + "event_time": "", + "event_time_ms": 0, + "local_event_time": "", + "software_level": { + "name": "ceph_software", + "vrmf": "8.3.0.1" + }, + "type": "eccnext_apisv1s", + "version": "1.0.0.1", + "analytics_event_source_type": "asset_event", + "analytics_type": "ceph", + "analytics_instance": "", + "analytics_virtual_id": "", + "analytics_group": "Storage", + "analytics_category": "RedHatMarine", + "events": [] +} + +# TODO: report functions providing the json paylod should be imported from the data collectors module + + +def report_inventory() -> str: + """ + Produce the inventory report + """ + dt = datetime.timestamp(datetime.now()) + # Set timestamps + reports_header['event_time'] = datetime.fromtimestamp(dt).strftime("%Y-%m-%d %H:%M:%S") + reports_header['event_time_ms'] = str(int(dt)) + reports_header['local_event_time'] = datetime.fromtimestamp(dt).strftime("%a %b %d %H:%M:%S %Z") + + # Set event id + reports_header['event_id'] = "IBM_chc_event_RedHatMarine_ceph_{}_daily_report_{}".format( + reports_header['asset_virtual_id'], + reports_header['event_time_ms']) + + # return json.dumps({**reports_header, **inventory()}) <---- TODO ADD INVENTORY (from datacollector) as event + return json.dumps(reports_header) + + +def report_performance() -> str: + """ + TODO: temporally uses inventory report + """ + return report_inventory() + + +class Report: + def __init__(self, name: str, fn: Callable[[], str], url: str, minutes_interval: int): + self.name = name # name of the report + self.fn = fn # function used to retrieve the data + self.url = url # url to send the report + self.interval = minutes_interval * 60 # interval to send the report (seconds) + + def print(self) -> str: + if reports_header['private_key'] and reports_header['api_key']: + return self.fn() + else: + raise Exception('Not able to print <%s> report. Identification keys are not available' % self.name) + + def send(self) -> str: + fail_reason = "" + resp = None + if reports_header['private_key'] and reports_header['api_key']: + try: + resp = requests.post(url=self.url, + headers={'accept': 'application/json', 'content-type': 'application/json'}, + data=self.fn()) + resp.raise_for_status() + except Exception as e: + explanation = resp.content if resp else '' + fail_reason = 'Failed to send <%s> to <%s>: %s \n%s' % (self.name, self.url, str(e), explanation) + else: + fail_reason = 'Not able to send <%s> report. Identification keys are not available' % self.name + return fail_reason + + +class CallHomeAgent(MgrModule): + MODULE_OPTIONS: List[Option] = [ + Option( + name='ches_endpoint', + type='str', + default=CHES_ENDPOINT, + desc='Call Home Event streamer end point' + ), + Option( + name='interval_inventory_report_minutes', + type='int', + default=INTERVAL_INVENTORY_REPORT_MINUTES, + desc='Time frequency for the inventory report' + ), + Option( + name='interval_performance_report_minutes', + type='int', + default=INTERVAL_PERFORMANCE_REPORT_MINUTES, + desc='Time frequency for the performance report' + ), + Option( + name='customer_email', + type='str', + default='', + desc='Customer contact email' + ), + Option( + name='country_code', + type='str', + default='', + desc='Customer country code' + ) + ] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(CallHomeAgent, self).__init__(*args, **kwargs) + + # set up some members to enable the serve() method and shutdown() + self.run = True + + # Init module options + # Env vars (if they exist) have preference over module options + self.ches_url = str(os.environ.get('CHA_CHES_ENDPOINT', self.get_module_option('ches_endpoint'))) + self.interval_performance_minutes = int( + os.environ.get('CHA_INTERVAL_INVENTORY_REPORT_MINUTES', + self.get_module_option('interval_inventory_report_minutes'))) # type: ignore + self.interval_inventory_minutes = int( + os.environ.get('CHA_INTERVAL_PERFORMANCE_REPORT_MINUTES', + self.get_module_option('interval_performance_report_minutes'))) # type: ignore + self.customer_email = os.environ.get('CHA_CUSTOMER_EMAIL', self.get_module_option('customer_email')) + self.country_code = os.environ.get('CHA_COUNTRY_CODE', self.get_module_option('country_code')) + # Health checks + self.health_checks: Dict[str, Dict[str, Any]] = dict() + + # configure common report headers + self._configure_headers() + + # Prepare reports + self.reports = {'inventory': Report('inventory', report_inventory, + self.ches_url, + self.interval_performance_minutes), + 'performance': Report('performance', report_performance, + self.ches_url, + self.interval_inventory_minutes) + } + + async def report_task(self, rpt: Report) -> None: + """ + Coroutine for sending the report passed as parameter + """ + self.log.info('Launched task for <%s> report each %s seconds)' % (rpt.name, rpt.interval)) + while self.run: + try: + self.log.info('Sending <%s> report to <%s>' % (rpt.name, rpt.url)) + send_error = rpt.send() + except Exception as ex: + send_error = str(ex) + + if send_error: + self.log.error(send_error) + self.health_checks.update({ + 'CCHA_ERROR_SENDING_REPORT': { + 'severity': 'error', + 'summary': 'Ceph Call Home Agent manager module: error sending <%s> report to IBM Storage ' + 'Insights systems' % rpt.name, + 'detail': [send_error] + } + }) + else: + self.health_checks = {} + self.log.info('Successfully sent <%s> report to <%s>' % (rpt.name, rpt.url)) + + self.set_health_checks(self.health_checks) + await asyncio.sleep(rpt.interval) + + def launch_coroutines(self) -> None: + """ + Launch module coroutines (reports or any other async task) + """ + loop = asyncio.new_event_loop() # type: ignore + try: + for rptName, rpt in self.reports.items(): + t = loop.create_task(self.report_task(rpt)) + loop.run_forever() + except Exception as ex: + self.log.error(ex) + + def serve(self) -> None: + """ + TODO: + - Register instance in CHES???: TODO + - Launch ccha web server: TODO + - Launch coroutines report tasks + """ + self.log.info('Starting IBM Ceph Call Home Agent') + + # Launch coroutines for the reports + self.launch_coroutines() + + self.log.info('IBM Call home agent finished') + + def shutdown(self) -> None: + """ + This method is called by the mgr when the module needs to shut + down (i.e., when the serve() function needs to exit). + """ + self.log.info('Stopping IBM Ceph Call Home Agent') + self.run = False + + def notify(self, notify_type: NotifyType, tag: str) -> None: + """ + TODO: + Way to detect changes in + osd_map, mon_map, fs_map, mon_status, health, pg_summary, command, service_map + Generate an "inventory change report" and send to CHES changes endpoint + """ + pass + + def config_notify(self) -> None: + """ + This method is called whenever one of our config options is changed. + """ + # This is some boilerplate that stores MODULE_OPTIONS in a class + # member, so that, for instance, the 'emphatic' option is always + # available as 'self.emphatic'. + for opt in self.MODULE_OPTIONS: + pass + + def _configure_headers(self) -> None: + id_data = {'private_key': b'', 'api_key': b''} + try: + id_data = get_settings() + self.health_checks = {} + except Exception as ex: + self.log.error('Error getting encrypted identification keys: %s. ' + 'Provide keys and restart Ceph Call Home module', ex) + self.health_checks.update({ + 'CCHA_ID_KEYS_NOT_AVAILABLE': { + 'severity': 'error', + 'summary': 'Ceph Call Home Agent manager module: The private identification keys needed to connect ' + 'with storage insights system are not available', + 'detail': ['Provide the right keys and restart the Ceph Call Home manager module'] + } + }) + self.set_health_checks(self.health_checks) + + cluster_config = self.get('config') + reports_header['private_key'] = id_data['private_key'].decode('utf-8') # type: ignore + reports_header['api_key'] = id_data['api_key'].decode('utf-8') # type: ignore + reports_header['asset_id'] = cluster_config['fsid'] + reports_header['asset_virtual_id'] = cluster_config['fsid'] + reports_header['country_code'] = self.get_module_option('country_code') + reports_header['analytics_instance'] = cluster_config['fsid'] + reports_header['analytics_virtual_id'] = cluster_config['fsid'] + + def alert_worker(self, alerts: str) -> None: + """ + TODO: + send the alerts to the ches alert endpoint + """ + + @CLIReadCommand('callhome print report') + def print_report_cmd(self, report_name: str) -> Tuple[int, str, str]: + """ + Prints the report requested. + Example: + ceph callhome print report inventory + Available reports: inventory + """ + return HandleCommandResult(stdout=f'report:, {self.reports[report_name].print()}') + + @CLIReadCommand('callhome send report') + def send_report_cmd(self, report_name: str) -> Tuple[int, str, str]: + """ + Command for sending the report requested. + """ + send_error = self.reports[report_name].send() + if send_error: + return HandleCommandResult(stdout=send_error) + else: + return HandleCommandResult(stdout=f'{report_name} report successfully sent') + + # Temporal Data collectors (they will be imported from a manager scope module library) + def ccha_web_server(self) -> None: + """ + TODO: + - configure https web server + - Initialize route endpoints. + In first phase we will have only the alert receiver for processing alert manager notifications + - https://:/ccha_receiver + it calls ccha.alertWorker + - start web server + """ + diff --git a/src/pybind/mgr/call_home_agent/options.py b/src/pybind/mgr/call_home_agent/options.py new file mode 100644 index 0000000000000..24d21833e9350 --- /dev/null +++ b/src/pybind/mgr/call_home_agent/options.py @@ -0,0 +1,5 @@ +# Configurable options for IBM Ceph Call Home Agent + +CHES_ENDPOINT = "https://stg-edge-cdt.eventsgslb.ibm.com/connect/api/v1" +INTERVAL_INVENTORY_REPORT_MINUTES = 1440 +INTERVAL_PERFORMANCE_REPORT_MINUTES = 5 -- 2.39.5