From: Yaarit Hatuka Date: Mon, 4 Aug 2025 00:28:21 +0000 (-0400) Subject: call_home: add service events X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ff28a9be61af737e02e8eb82397319f25d0a9ee7;p=ceph-ci.git call_home: add service events Open a support case when these events are detected by Prometheus: CephMonDiskspaceCritical, CephOSDFull CephFilesystemOffline CephFilesystemDamaged CephPGsInactive CephObjectMissing Users need to populate the IBM Customer Number (ICN) and customer_country_code fields. Logs of level 1 and 2 will be uploaded to the ticket after it's open. Resolves: rhbz#2242911 Signed-off-by: Yaarit Hatuka (cherry picked from commit 66f20c29299047d2e365da5a6779b287c2f572af) --- diff --git a/src/pybind/mgr/call_home_agent/module.py b/src/pybind/mgr/call_home_agent/module.py index 96c3f7be651..a31f1184d9e 100644 --- a/src/pybind/mgr/call_home_agent/module.py +++ b/src/pybind/mgr/call_home_agent/module.py @@ -24,6 +24,8 @@ import sched import time #from threading import Event import threading +import importlib.util +import pathlib from ceph.cryptotools.select import get_crypto_caller @@ -100,11 +102,11 @@ class CallHomeAgent(MgrModule): desc='Time frequency for the alerts report' ), Option( - name='interval_performance_report_seconds', + name='interval_service_report_seconds', type='int', min=0, - default = int(os.environ.get('CHA_INTERVAL_PERFORMANCE_REPORT_SECONDS', 60 * 5)), # 5 minutes - desc='Time frequency for the performance report' + default = int(os.environ.get('CHA_INTERVAL_SERVICE_REPORT_SECONDS', 60 * 60)), # 60 minutes + desc='Time frequency for checking for new alerts for service events' ), Option( name='customer_email', @@ -250,6 +252,12 @@ class CallHomeAgent(MgrModule): default=3600 * 2, desc='Time interval in seconds to allow a cooldown between level 2 upload snap requests' ), + Option( + name='disable_service_events', + type='bool', + default=False, + desc='Disable service events' + ), ] def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -293,12 +301,12 @@ class CallHomeAgent(MgrModule): # set up some members to enable the serve() method and shutdown() self.run = True - # Module options - self.refresh_options() - # Health checks self.health_checks: Dict[str, Dict[str, Any]] = dict() + # Module options + self.refresh_options() + # Unsolicited Request support # identify messages that we received in the past self.stale_timeout seconds (10 days). such messages will be ignored and removed from the queue. @@ -332,7 +340,8 @@ class CallHomeAgent(MgrModule): self.log.info("Cleaning old module's db_operations") self.set_store('db_operations', None) - + self.scheduler = sched.scheduler(time.time, time.sleep) + def get_jwt_jti(self) -> str: # Extract jti from JWT. This is another way to identify clusters in addition to the ICN. jwt_jti = "" @@ -375,6 +384,9 @@ class CallHomeAgent(MgrModule): self.jwt_jti = self.get_jwt_jti() + # validate icn and country_code + self.validate_config() + def ceph_command(self, srv_type: str, prefix: str, srv_spec: Optional[str] = '', inbuf: str = '', **kwargs): # Note: A simplified version of the function used in dashboard ceph services """ @@ -399,6 +411,96 @@ class CallHomeAgent(MgrModule): self.log.error(f"Execution of command '{prefix}' failed: {ex}") return outb + + def validate_country_code(self, country_code: str) -> Tuple[bool, str]: + """ + Validates a 2-letter country code using ISO 3166-1 alpha-2 codes + """ + # Set of valid ISO 3166-1 alpha-2 country codes + VALID_COUNTRY_CODES = { + 'AF', 'AX', 'AL', 'DZ', 'AS', 'AD', 'AO', 'AI', 'AQ', 'AG', + 'AR', 'AM', 'AW', 'AU', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB', + 'BY', 'BE', 'BZ', 'BJ', 'BM', 'BT', 'BO', 'BQ', 'BA', 'BW', + 'BV', 'BR', 'IO', 'BN', 'BG', 'BF', 'BI', 'KH', 'CM', 'CA', + 'CV', 'KY', 'CF', 'TD', 'CL', 'CN', 'CX', 'CC', 'CO', 'KM', + 'CG', 'CD', 'CK', 'CR', 'CI', 'HR', 'CU', 'CW', 'CY', 'CZ', + 'DK', 'DJ', 'DM', 'DO', 'EC', 'EG', 'SV', 'GQ', 'ER', 'EE', + 'SZ', 'ET', 'FK', 'FO', 'FJ', 'FI', 'FR', 'GF', 'PF', 'TF', + 'GA', 'GM', 'GE', 'DE', 'GH', 'GI', 'GR', 'GL', 'GD', 'GP', + 'GU', 'GT', 'GG', 'GN', 'GW', 'GY', 'HT', 'HM', 'VA', 'HN', + 'HK', 'HU', 'IS', 'IN', 'ID', 'IR', 'IQ', 'IE', 'IM', 'IL', + 'IT', 'JM', 'JP', 'JE', 'JO', 'KZ', 'KE', 'KI', 'KP', 'KR', + 'KW', 'KG', 'LA', 'LV', 'LB', 'LS', 'LR', 'LY', 'LI', 'LT', + 'LU', 'MO', 'MG', 'MW', 'MY', 'MV', 'ML', 'MT', 'MH', 'MQ', + 'MR', 'MU', 'YT', 'MX', 'FM', 'MD', 'MC', 'MN', 'ME', 'MS', + 'MA', 'MZ', 'MM', 'NA', 'NR', 'NP', 'NL', 'NC', 'NZ', 'NI', + 'NE', 'NG', 'NU', 'NF', 'MK', 'MP', 'NO', 'OM', 'PK', 'PW', + 'PS', 'PA', 'PG', 'PY', 'PE', 'PH', 'PN', 'PL', 'PT', 'PR', + 'QA', 'RE', 'RO', 'RU', 'RW', 'BL', 'SH', 'KN', 'LC', 'MF', + 'PM', 'VC', 'WS', 'SM', 'ST', 'SA', 'SN', 'RS', 'SC', 'SL', + 'SG', 'SX', 'SK', 'SI', 'SB', 'SO', 'ZA', 'GS', 'SS', 'ES', + 'LK', 'SD', 'SR', 'SJ', 'SE', 'CH', 'SY', 'TW', 'TJ', 'TZ', + 'TH', 'TL', 'TG', 'TK', 'TO', 'TT', 'TN', 'TR', 'TM', 'TC', + 'TV', 'UG', 'UA', 'AE', 'GB', 'US', 'UM', 'UY', 'UZ', 'VU', + 'VE', 'VN', 'VG', 'VI', 'WF', 'EH', 'YE', 'ZM', 'ZW' + } + + # if the user did not populate the customer_country_code config option yet + if country_code is None: + return False, "Country code is not configured" + + if country_code.upper() not in VALID_COUNTRY_CODES: + return False, f"'{country_code}' must be a valid ISO 3166-1 alpha-2 country code" + + return True, f"'{country_code}' is a valid country code" + + def validate_icn(self, icn: str) -> Tuple[bool, str]: + # if the user did not populate the ICN config option yet + if icn is None: + return False, f"IBM Customer Number (ICN) is not configured" + + # ICN is exactly 7 alphanumeric characters long + if len(icn) != 7 or not icn.isalnum(): + return False, f"Invalid ICN: '{icn}'. ICN must be exactly 7 alphanumeric characters long" + + return True, "ICN is valid" + + def validate_config(self) -> None: + """ + Validate the mandatory fields in the config options + """ + # ICN is a mandatory field for service events + is_valid, msg = self.validate_icn(self.icn) + if not is_valid: + self.log.warning(f"CHA_INVALID_ICN: {msg}") + self.health_checks.update({ + 'CHA_INVALID_ICN': { + 'severity': 'warning', + 'summary': 'Call Home Agent: IBM Customer Number (ICN) is invalid. ' + 'Run "ceph callhome set icn " to set it.', + 'detail': [msg] + } + }) + else: + self.health_checks.pop('CHA_INVALID_ICN', None) + + # Country code is a mandatory field for service events + is_valid, msg = self.validate_country_code(self.customer_country_code) + if not is_valid: + self.log.warning(f"CHA_INVALID_COUNTRY_CODE: {msg}") + self.health_checks.update({ + 'CHA_INVALID_COUNTRY_CODE': { + 'severity': 'warning', + 'summary': 'Call Home Agent: Country code is invalid. ' + 'Run "ceph callhome set country-code " to set it.', + 'detail': [msg] + } + }) + else: + self.health_checks.pop('CHA_INVALID_COUNTRY_CODE', None) + + self.set_health_checks(self.health_checks) + def connectivity_update(self, response: dict) -> None: """ Validate that the response is from IBM call home and update the connectivity check struct @@ -625,7 +727,7 @@ class CallHomeAgent(MgrModule): """ This only affects changes in ceph config options. To change configuration using env. vars a restart of the module - will be neeed or the change in one config option will refresh + will be needed or the change in one config option will refresh configuration coming from env vars """ self.refresh_options() @@ -636,7 +738,6 @@ class CallHomeAgent(MgrModule): def serve(self): self.log.info('Starting IBM Ceph Call Home Agent') - self.scheduler = sched.scheduler(time.time, time.sleep) self.schedule_tasks() while self.run: # Passing False causes the scheduler.run() to return the time until the next event. therefore we're not blocked in @@ -879,3 +980,44 @@ class CallHomeAgent(MgrModule): self.set_store('ur_cooldown', json.dumps(self.ur_cooldown)) return HandleCommandResult(stdout="Success") + + @CLIWriteCommand('callhome set country-code') + def cli_set_country_code(self, country_code: str) -> Tuple[int, str, str]: + """ + Set a 2 letter country code + """ + is_valid, msg = self.validate_country_code(country_code) + + if not is_valid: + self.log.error(f"Invalid country code: {msg}") + return HandleCommandResult(retval=1, stderr=f'Invalid country code: {msg}') + + try: + self.set_module_option('customer_country_code', country_code.upper()) + except Exception as e: + return HandleCommandResult(retval=1, stderr=str(e)) + else: + return HandleCommandResult(stdout=f'country code is set to {country_code}') + finally: + self.refresh_options() # This will always run, no matter what. + + @CLIWriteCommand('callhome set icn') + def cli_set_icn(self, icn: str) -> Tuple[int, str, str]: + """ + Set an IBM customer number (7-character alphanumeric string) + """ + is_valid, msg = self.validate_icn(icn) + + if not is_valid: + self.log.warning(msg) + return HandleCommandResult(retval=1, stderr=msg) + + try: + self.set_module_option('icn', icn) + except Exception as e: + return HandleCommandResult(retval=1, stderr=str(e)) + else: + return HandleCommandResult() + finally: + self.refresh_options() # This will always run, no matter what. + diff --git a/src/pybind/mgr/call_home_agent/report.py b/src/pybind/mgr/call_home_agent/report.py index 7e62a9c5a8b..706c8e87d66 100644 --- a/src/pybind/mgr/call_home_agent/report.py +++ b/src/pybind/mgr/call_home_agent/report.py @@ -27,15 +27,20 @@ class Report: self.report_type = report_type self.event_classes = event_classes self.report_event_id = None + self.events = [] + + def add_event(self, event): + self.events.append(event) + self.data['events'].append(event.data) def compile(self) -> Optional[dict]: report_times = ReportTimes() - report = self.get_report_headers(report_times, self.report_event_id) + self.set_headers(report_times, self.report_event_id) for event_class in self.event_classes: event = event_class(self.agent).generate(report_times) - report['events'].append(event.data) + self.add_event(event) - return report + return self.data def run(self) -> Optional[str]: compiled = self.compile() @@ -43,7 +48,7 @@ class Report: return None return self.send(compiled) - def get_report_headers(self, report_times: ReportTimes, report_event_id = None) -> dict: + def set_headers(self, report_times: ReportTimes, report_event_id = None) -> None: try: secrets = self.agent.get_secrets() except Exception as e: @@ -55,8 +60,9 @@ class Report: if not report_event_id: report_event_id = f"IBM_chc_event_RedHatMarine_ceph_{self.agent.ceph_cluster_id}_{self.report_type}_report_{report_times.time_ms}" + self.report_event_id = report_event_id - header = { + self.data = { "agent": "RedHat_Marine_firmware_agent", "api_key": secrets['api_key'], "private_key": secrets['private_key'], @@ -86,9 +92,7 @@ class Report: "events": [] } - #header.update(self._header_times(report_timestamp)) - - return header + #data.update(self._header_times(report_timestamp)) # TODO: check def send(self, report: dict, force: bool = False) -> str: resp = None @@ -105,7 +109,7 @@ class Report: data=json.dumps(report), proxies=self.agent.proxies, timeout=60) - self.agent.log.debug(f"Report response: {resp.text}") + self.agent.log.debug(f"Report response: {resp.text}") # TODO: remove keys resp.raise_for_status() ch_response = resp.json() diff --git a/src/pybind/mgr/call_home_agent/report_last_contact_service_query.py b/src/pybind/mgr/call_home_agent/report_last_contact_service_query.py new file mode 100644 index 00000000000..5b421ebe9b7 --- /dev/null +++ b/src/pybind/mgr/call_home_agent/report_last_contact_service_query.py @@ -0,0 +1,28 @@ +from .report import Report, ReportTimes, EventGeneric +from .report_last_contact import EventLastContact +from typing import Optional +import time + +class ReportLastContactServiceQuery(Report): + def __init__(self, agent, filter_event_id) -> None: + super().__init__(agent, 'last_contact') + + self.filter_event_id = filter_event_id + + def compile(self) -> Optional[dict]: + # We override compile() because this event gets a non standard generate arguments + report_times = ReportTimes() + self.set_headers(report_times, self.report_event_id) + event = EventLastContactServiceQuery(self.agent).generate(report_times, self.filter_event_id) + self.add_event(event) + return self.data + + +class EventLastContactServiceQuery(EventLastContact): + def generate(self, report_times: ReportTimes, filter_event_id): + super().generate(report_times) + + self.data["body"]["enable_response_detail_filter"] = [filter_event_id] + return self + + diff --git a/src/pybind/mgr/call_home_agent/report_service.py b/src/pybind/mgr/call_home_agent/report_service.py new file mode 100644 index 00000000000..cc20f239b3c --- /dev/null +++ b/src/pybind/mgr/call_home_agent/report_service.py @@ -0,0 +1,104 @@ +from .report import Report, ReportTimes, EventGeneric +from typing import Optional +import time +import json + +class ReportService(Report): + def __init__(self, agent, alerts: list) -> None: + super().__init__(agent, 'service') + self.alerts = alerts + + def compile(self) -> Optional[dict]: + # We override compile() because this event gets a non standard generate() arguments + report_times = ReportTimes() + self.set_headers(report_times, self.report_event_id) + event = EventService(self.agent).generate(report_times, self.alerts) + self.add_event(event) + self.agent.log.debug("Generated service event:") + self.agent.log.debug(json.dumps(self.data)) # TODO: remove keys from the output + return self.data + +class EventService(EventGeneric): + def generate(self, report_times: ReportTimes, alerts: list): + super().generate('service', 'ibm_redhat_ceph_service_manager', 'Ceph service request', report_times) + + r, outb, outs = self.agent.mon_command({ + 'prefix': 'status', + 'format': 'text' + }) + note = outb + ''' + # in case we add the new alerts to the 'notes' field: + note += "\n\n" + note += "Alerts:\n" + # print(f"------------------------- [{alerts}] ---------------------") + note += "\n".join(a.get('labels', {}).get('alertname', "Unknown alert") for a in alerts) + "\n" + ''' + + r, outb, outs = self.agent.mon_command({ + 'prefix': 'versions', + 'format': 'json' + }) + versions = json.loads(outb) + + # We use the first alert as a base for the code (subject) text of the ticket. + # We do that by stable sorting the list of alerts. + alerts_sorted = sorted(alerts, key = lambda d: json.dumps(d, sort_keys = True)) + first_alert = alerts_sorted[0] + alert_name = first_alert['labels']['alertname'] # this value must exist since we sorted the original alert list by it + alert_instance = first_alert['labels'].get('instance') # this value might not exist + alert_subject = alert_name + ((':' + alert_instance) if alert_instance else '') + alert_subject = alert_subject[:140] # Call home 'code' field is limited to 140 characters. + + description = json.dumps(alerts_sorted, sort_keys = True, indent=4) + description = description[:10000] + + now = time.time() + + self.data['body'].update( { + "customer": self.agent.icn, + "country": self.agent.customer_country_code, + "error_type": "software", + "error_software_type": "distributed", + "routing_identifier": "5900AVA00", + "product_code_identifier": "SCSTZ", + "record_type": 1, + "test": False, + "code": alert_subject, # the title of the error that we're seeing; max 140 characters + "note": note, # currently the "note" field contains the output of `ceph -s` + "context": { + "origin": 2, + "timestamp": int(now), # time in seconds + "transid": int(now * 1000) # time in milliseconds + }, + "description": description, # semi described, 10K character limit + "object_instance_virtual_id": self.agent.ceph_cluster_id, + "object_instance": self.agent.ceph_cluster_id, + "object_type": "ceph", + "object_category": "RedHat", + "object_group": "Storage", + "object_version": "612", # a static value for now since it's not used + "object_logical_name": "01t3p00000TKZPjAAP", # hardcoded for now; can be used in the future to describe a subsystem + "submitter_instance_virtual_id": "ceph_storage_00001", # hardcoded for now; can be used in case of a proxy submitter, like fusion + "submitter_instance": "infrastructure", + "submitter_type": "ceph", + "submitter_category": "RedHat", + "submitter_group": "Storage", + # for now these fields are not required: + # "contact_email": "admin@company.com", + # "contact_name": "John Doe", + # "contact_organization": "Company", + # "contact_phone": "123-456-6789", + # "contact_address": "123 Main Street, City, CA 12345, US", + ############### + "payload": { + "ceph_versions": versions, + "software": { + "diagnostic_provided": True, + "ibm_ceph_version": "7.1.0" # TODO: change to 9.0.0 after the value is available + } + } + } ) + + return self + diff --git a/src/pybind/mgr/call_home_agent/report_status_alerts.py b/src/pybind/mgr/call_home_agent/report_status_alerts.py index 7396ca4d090..1a0d33c0ad0 100644 --- a/src/pybind/mgr/call_home_agent/report_status_alerts.py +++ b/src/pybind/mgr/call_home_agent/report_status_alerts.py @@ -1,8 +1,10 @@ from .report import Report, ReportTimes, EventGeneric from .prometheus import Prometheus +from .workflow_service_events import WorkFlowServiceEvents import time import json import requests +from datetime import datetime from typing import Optional class ReportStatusAlerts(Report): @@ -24,14 +26,14 @@ class ReportStatusAlerts(Report): def compile(self) -> Optional[dict]: report_times = ReportTimes() - report = self.get_report_headers(report_times) + self.set_headers(report_times) event = EventStatusAlerts(self.agent).generate(report_times) # If there are no alerts to send then return and dont send the report if not event.has_content: return None - report['events'].append(event.data) - return report + self.add_event(event) + return self.data #self.send(report) @staticmethod @@ -60,9 +62,52 @@ class EventStatusAlerts(EventGeneric): self.set_content(content) return self + def service_events(self, alerts: list) -> None: + if self.agent.disable_service_events: + return + + # Service Events (opening a support case) + service_events_alerts = list(filter(self.is_alert_relevant_service_events, alerts)) + if not service_events_alerts: + self.agent.log.debug(f"No alerts for service events") + return + + last_service_events_sent = int(self.agent.get_store('last_service_events_sent', '0')) + now = int(time.time()) + self.agent.log.debug(f"Now = {datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S')}, \ + last_service_events_sent = {datetime.fromtimestamp(last_service_events_sent).strftime('%Y-%m-%d %H:%M:%S')}") + + if now - last_service_events_sent < self.agent.interval_service_report_seconds: # 60 minutes by default + self.agent.log.debug(f"Waiting to send the next service event. Now = {now}, last_service_events_sent = {last_service_events_sent}") + return # don't send more than one service event per hour by default + + new_events = list(filter( + lambda e: 'activeAt' in e and datetime.fromisoformat(e['activeAt'].replace("Z", "+00:00")).timestamp() > last_service_events_sent, + service_events_alerts + )) + if not new_events: + self.agent.log.debug(f"Found alerts for service events, but we already handled them") + return + + self.agent.log.debug(f"Found new alerts for service events, starting workflow") + # We set last_service_events_sent here to avoid loopers in case of + # issues with the backend, e.g. the events are not sent successfully + # due to a server error, thus we keep trying to send them forever. + # Instead, we currently try to send several times, and if it fails we + # will try to send the active alerts after + # interval_service_report_seconds). + self.agent.set_store('last_service_events_sent', str(now)) + WorkFlowServiceEvents(self.agent, new_events).run() + def gather(self) -> dict: + all_prometheus_alerts = self.get_prometheus_alerts() + self.agent.log.debug(f"all_prometheus_alerts: {all_prometheus_alerts}") + + self.service_events(all_prometheus_alerts) + + # Sending alerts via a dedicated status report # Filter the alert list - current_alerts_list = list(filter(self.is_alert_relevant, self.get_prometheus_alerts())) + current_alerts_list = list(filter(self.is_alert_relevant_status_alerts, all_prometheus_alerts)) current_alerts = {self.alert_uid(a):a for a in current_alerts_list} # Find all new alerts - alerts that are currently active but were not sent until now (not in sent_alerts) @@ -81,7 +126,7 @@ class EventStatusAlerts(EventGeneric): """ return json.dumps(alert['labels'], sort_keys=True) + alert['activeAt'] + alert['value'] - def is_alert_relevant(self, alert: dict) -> bool: + def is_alert_relevant_status_alerts(self, alert: dict) -> bool: """ Returns True if this alert should be sent, False if it should be filtered out of the report """ @@ -90,6 +135,19 @@ class EventStatusAlerts(EventGeneric): return state == 'firing' and severity == 'critical' + def is_alert_relevant_service_events(self, alert: dict) -> bool: + # This list holds the names of the alerts for which we open support cases: + alerts_for_service = [ + 'CephMonDiskspaceCritical', + 'CephOSDFull', + 'CephFilesystemOffline', + 'CephFilesystemDamaged', + 'CephPGsInactive', + 'CephObjectMissing', + ] + + return alert.get('labels', {}).get('alertname', "Unknown alert") in alerts_for_service + def get_prometheus_alerts(self): """ Returns a list of all the alerts currently active in Prometheus diff --git a/src/pybind/mgr/call_home_agent/tests/test_agent.py b/src/pybind/mgr/call_home_agent/tests/test_agent.py index e3bb83429d4..7fb14369a9e 100644 --- a/src/pybind/mgr/call_home_agent/tests/test_agent.py +++ b/src/pybind/mgr/call_home_agent/tests/test_agent.py @@ -3,17 +3,19 @@ import time import json import os from collections import defaultdict +from typing import Optional from unittest.mock import MagicMock, Mock, patch #from call_home_agent.module import Report from call_home_agent.module import CallHomeAgent -from call_home_agent.ReportLastContact import ReportLastContact, EventLastContact -from call_home_agent.ReportInventory import ReportInventory, EventInventory -from call_home_agent.ReportStatusAlerts import ReportStatusAlerts -from call_home_agent.ReportStatusHealth import ReportStatusHealth -from call_home_agent.WorkFlowUploadSnap import WorkFlowUploadSnap -from call_home_agent.Report import Report, ReportTimes +from call_home_agent.report_last_contact import ReportLastContact, EventLastContact +from call_home_agent.report_inventory import ReportInventory, EventInventory +from call_home_agent.report_status_alerts import ReportStatusAlerts +from call_home_agent.report_status_health import ReportStatusHealth +from call_home_agent.workflow_upload_snap import WorkFlowUploadSnap +from call_home_agent.report import Report, ReportTimes +from call_home_agent.workflow_service_events import WorkFlowServiceEvents import mgr_module import traceback @@ -24,7 +26,6 @@ JWT_REG_CREDS_DICT = {"url": "test.icr.io", "username": "test_username", "passwo JWT_REG_CREDS =json.dumps(JWT_REG_CREDS_DICT) PLAIN_PASSWORD_REG_CREDS_DICT = {"url": "test.icr.io", "username": "test_username", "password": "plain_password"} - class MockedMgr(): class Log: def error(self, msg): @@ -67,7 +68,15 @@ class MockedMgr(): return ['mock_serverA', 'mock_serverB'] def mon_command(self, command): - return 0, json.dumps({'health': {'status': 'mocked health status mon_cmd'}}), "" + if command['prefix'] == 'status': + if command['format'] == 'text': + return 0, "\nceph_status:\n cluster:\n id: 4a7851de-8b13-11ef-85fe-525400f89c16\n health: HEALTH_WARN\n mon y789-node-00 is low on available space\n\n services:\n mon: 3 daemons, quorum y789-node-00,y789-node-02,y789-node-01 (age 11d)\n mgr: y789-node-00.bxrhew(active, since 9d), standbys: y789-node-02.gbigcx\n osd: 3 osds: 3 up (since 3M), 3 in (since 3M)\n\n data:\n pools: 1 pools, 1 pgs\n objects: 2 objects, 449 KiB\n usage: 328 MiB used, 15 GiB / 15 GiB avail\n pgs: 1 active+clean", "" + else: + return 0, json.dumps({'health': {'status': 'mocked health status mon_cmd'}}), "" + elif command['prefix'] == 'versions' and command['format'] == 'json': + return 0, json.dumps({"mon":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":3},"mgr":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":2},"osd":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":3},"overall":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":8}}), "" + else: + raise Exception(f"Unknown mon_command {command}") def remote(self, component, command, service_name=None, hostname=None, sos_params=None): m = MagicMock() @@ -114,31 +123,32 @@ def mocked_ceph_command(self, srv_type, prefix, key=None, mgr=None, detail=None) else: raise Exception(f"Unknown ceph command [{prefix}], please mock it") -def mocked_requests_get(url, auth=None, data=None, headers=None, proxies=None, params=None): - """ - Used by ReportStatusAlerts to query prometheous - """ - m = Mock() - if "api/v1/query" in url: - # This is a request for Prometheous - m.json.return_value = {'data': {'result': [{'value': "1234"}] }} - elif "api/v1/targets" in url: - m.json.return_value = {'data': {'activeTargets': [{'health': "up"}] }} - else: - m.json.return_value = {'data': {'alerts': [] }} - return m - original_time_time = time.time test_object = None -debug = False -verbose = False +debug = True +verbose = True def mock_glob(pattern: str): print(f"mock_glob: globbing {pattern}") current_dir = os.path.dirname(os.path.abspath(__file__)) return [f"{current_dir}/testfile1", f"{current_dir}/testfile2", f"{current_dir}/testfile3"] +def prometheus_make_alert(name: str, activeAt: Optional[str] = None) -> dict: + return { + 'labels': { + 'alertname': name, + 'severity': 'critical' + }, + 'annotations': { + 'description': "some alert" + }, + 'state': 'firing', + # 'activeAt' and 'value' are here for alert_uid() to work. They should be '0' so that we won't send this alert again and again + 'activeAt': activeAt if activeAt else "2025-08-15T10:12:13.123Z", + 'value': '0' + } + #@patch('mgr_module.MgrModule.version', '99.9') class TestAgent(unittest.TestCase): @@ -156,6 +166,20 @@ class TestAgent(unittest.TestCase): #print("".join(traceback.format_stack())) ########################### HTTP requests ############################ + def mocked_requests_get(self, url, auth=None, data=None, headers=None, proxies=None, params=None): + """ + Used by ReportStatusAlerts to query prometheous + """ + m = Mock() + if "api/v1/query" in url: + # This is a request for Prometheous + m.json.return_value = {'data': {'result': [{'value': "1234"}] }} + elif "api/v1/targets" in url: + m.json.return_value = {'data': {'activeTargets': [{'health': "up"}] }} + else: + m.json.return_value = {'data': {'alerts': self.prometheus_alerts }} + return m + def mocked_requests_post(self, url, auth=None, data=None, headers=None, proxies=None, timeout=None): print("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv request.post vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv") print(f" URL: {url}") @@ -163,13 +187,14 @@ class TestAgent(unittest.TestCase): event_type = None if data: try: - pretty = json.dumps(json.loads(data), indent=4) + data_dict = json.loads(data) + pretty = json.dumps(data_dict, indent=4) try: - event_type = json.loads(data)['events'][0]['header']['event_type'] + event_type = data_dict['events'][0]['header']['event_type'] if event_type == 'confirm_response': component = 'NA' else: - component = json.loads(data)['events'][0]['body']['component'] + component = data_dict['events'][0]['body']['component'] print(f" event_type={event_type} component={component}") self.sent_events[f"{event_type}-{component}"] += 1 # so we can assertEqual on it later except: @@ -190,18 +215,40 @@ class TestAgent(unittest.TestCase): return m elif 'esupport' in url: # call home if event_type == 'last_contact': - if self.mock_requests_send_has_ur: - self.mock_requests_send_has_ur -= 1 - print("mocked_requests_post(): Returning yes UR") - m.text = self.mocked_last_contact_response_yes_ur - if self.mock_requests_cooldown_pmr: - # replace the pmr so that it will be different UR for stale, but same for cooldown - new_pmr = self.mock_requests_cooldown_pmr.pop() - m.text = self.mocked_last_contact_response_yes_ur.replace('TS1234567', new_pmr) - print(f"####### replacing PMR to {new_pmr}") + if data_dict["events"][0]["body"]["enable_response_detail_filter"] == ["Unsolicited_Storage_Insights_RedHatMarine_ceph_Request"]: + if self.mock_requests_send_has_ur: + self.mock_requests_send_has_ur -= 1 + print("mocked_requests_post(): Returning yes UR") + m.text = self.mocked_last_contact_response_yes_ur + if self.mock_requests_cooldown_pmr: + # replace the pmr so that it will be different UR for stale, but same for cooldown + new_pmr = self.mock_requests_cooldown_pmr.pop() + m.text = self.mocked_last_contact_response_yes_ur.replace('TS1234567', new_pmr) + print(f"####### replacing PMR to {new_pmr}") + else: + m.text = self.mocked_last_contact_response_no_ur + m.json.return_value = json.loads(m.text) else: - m.text = self.mocked_last_contact_response_no_ur - m.json.return_value = json.loads(m.text) + # Currently we only have two types of last_contact - one with a filter for unsolicited requests + # and one with a filter for a specific event_id, part of the service events workflow. + + # The WorkFlowServiceEvents code retries querying for the service creation 10 times. + # we'll mock 3 replies of "not ready yet" before returning a good reply with the ticket's problem_id + if self.mock_service_event_no_case_opened_replies > 0: + # Return "ticket not opened yet" for 3 tries". + self.mock_service_event_no_case_opened_replies -= 1 + m.json.return_value = {'response_state' : {'transactions': { } } } + m.text = json.dumps(m.json.return_value) + else: + # Return ticket opened, with the ticket ID (T1234) nested correctly. + event_id = next(iter((data_dict["events"][0]["body"]["enable_response_detail_filter"]))) # Get the first key of the dict. This should be the event_id. + m.json.return_value = {'response_state' : {'transactions': { event_id : {'response_object': {'problem_id': 'T1234'} } } } } + m.text = json.dumps(m.json.return_value) + + elif event_type == 'service': + print(f"Got service event") + m.json.return_value = { 'response_state': 123 } + m.text = json.dumps(m.json.return_value) return m else: raise Exception(f"Unknown mocked_requests_post URL [{url}], please mock it") @@ -212,7 +259,8 @@ class TestAgent(unittest.TestCase): CallHomeAgent.__bases__ = (MockedMgr,) #patch('mgr_module.MgrModule.version', '99.9').start() patch('call_home_agent.module.CallHomeAgent.ceph_command', mocked_ceph_command).start() - patch('call_home_agent.WorkFlowUploadSnap.DIAGS_FOLDER', '/tmp').start() + #patch('call_home_agent.WorkFlowUploadSnap.DIAGS_FOLDER', '/tmp').start() + patch('call_home_agent.workflow_upload_snap.DIAGS_FOLDER', '/tmp').start() patch('call_home_agent.module.CallHomeAgent.get_secrets', return_value={'api_key': 'mocked_api_key', 'private_key': 'mocked_private_key', @@ -222,7 +270,7 @@ class TestAgent(unittest.TestCase): patch('requests.post', self.mocked_requests_post).start() - patch('requests.get', mocked_requests_get).start() + patch('requests.get', self.mocked_requests_get).start() patch('glob.glob', mock_glob).start() patch('os.remove', Mock()).start() patch('os.path.getsize', Mock(return_value=42)).start() @@ -243,6 +291,8 @@ class TestAgent(unittest.TestCase): self.mock_requests_send_has_ur = 0 self.mock_requests_cooldown_pmr = [] self.sent_events = defaultdict(int) + self.mock_service_event_no_case_opened_replies = 3 + self.prometheus_alerts = [] # Load the json answers that requests.post() should return with open(os.path.dirname(__file__) + '/response_no_pending_ur.json', 'r') as resp: @@ -393,3 +443,65 @@ class TestAgent(unittest.TestCase): status = agent.get_call_home_status() self.assertEqual(status['connectivity'], True) self.assertEqual(status['connectivity_error'], 'Success') + + def test_service_event(self): + with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep): + self.mock_service_event_no_case_opened_replies = 3 + agent = CallHomeAgent() + self.agent = agent + WorkFlowServiceEvents(agent, [prometheus_make_alert('alert_a'), prometheus_make_alert('alert_b')]).run() + #ReportLastContact(agent).run() + + self.test_end = self.mocked_time_time() + 15 * 60 + agent.serve() + + self.assertEqual(self.sent_events['status-ceph_log_upload'], 0) + self.assertEqual(self.sent_events['confirm_response-NA'], 1) + self.assertEqual(len(agent.ur_queue), 0) + self.assertEqual(len(agent.ur_stale), 0) + self.assertEqual(len(agent.ur_cooldown), 0) + + def test_service_event_no_case_opened(self): + with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep): + self.mock_service_event_no_case_opened_replies = 60 * 60 # Greater than the number of tries that the WorkFlowServiceEvents tries. + agent = CallHomeAgent() + self.agent = agent + WorkFlowServiceEvents(agent, [prometheus_make_alert('alert_a'), prometheus_make_alert('alert_b')]).run() + #ReportLastContact(agent).run() + + self.test_end = self.mocked_time_time() + 60 * 60 + agent.serve() + + self.assertEqual(self.sent_events['status-ceph_log_upload'], 0) + self.assertEqual(self.sent_events['confirm_response-NA'], 0) + self.assertEqual(self.sent_events['last_contact-ceph_last_contact'], 13) # 10 from the WorkFlowServiceEvents, 3 from generic last_contact messages + self.assertEqual(len(agent.ur_queue), 0) + self.assertEqual(len(agent.ur_stale), 0) + self.assertEqual(len(agent.ur_cooldown), 0) + + def test_service_event_mixed_prom_alerts(self): + # a mix of relevant and non relevant prometheus alerts + + with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep): + self.mock_service_event_no_case_opened_replies = 3 + self.prometheus_alerts = [ prometheus_make_alert("some_alert_1"), prometheus_make_alert("CephOSDFull"), + prometheus_make_alert("CephObjectMissing"), prometheus_make_alert("some_alert_2") ] + + # we wait 1 hour between service_events. we test it by now - last_sent. last_sent == 0, therefore + # now must be at least 3600 for the test to work. so we changed our mocked time to >3600 + self.mocked_now = 4000 + + agent = CallHomeAgent() + self.agent = agent + ReportStatusAlerts(agent).run() + + self.test_end = self.mocked_time_time() + 15 * 60 + #agent.serve() + + self.assertEqual(self.sent_events['status-ceph_log_upload'], 0) + self.assertEqual(self.sent_events['confirm_response-NA'], 1) + self.assertEqual(len(agent.ur_queue), 0) + self.assertEqual(len(agent.ur_stale), 0) + self.assertEqual(len(agent.ur_cooldown), 0) + + diff --git a/src/pybind/mgr/call_home_agent/workflow_service_events.py b/src/pybind/mgr/call_home_agent/workflow_service_events.py new file mode 100644 index 00000000000..1a4eac9538c --- /dev/null +++ b/src/pybind/mgr/call_home_agent/workflow_service_events.py @@ -0,0 +1,76 @@ +from .report import Report, ReportTimes, Event +from .report_ur_error import ReportURError +from .report_service import ReportService +from .report_last_contact_service_query import ReportLastContactServiceQuery +from .workflow_upload_snap import WorkFlowUploadSnap +from .exceptions import * +import time +import urllib.parse +from typing import Tuple, Optional +import glob +import os +import traceback +import re +import requests +import json + +class WorkFlowServiceEvents: + def __init__(self, agent, alerts: list): + self.agent = agent + self.alerts = alerts + self.service_query_num_tries = 0 + + def run(self) -> None: + self.agent.log.info(f"WorkFlowServiceEvents: Processing new request") + try: + report_service = ReportService(self.agent, self.alerts) + response_text = report_service.run() + response = json.loads(response_text) + + # Validate the response to the ReportService request + if 'response_state' not in response: + self.agent.log.error(f'WorkFlowServiceEvents: Bad reply to ReportService(): {response_text}') + return + + self.report_service_event_id = report_service.events[0].event_event_id + + # Ticket will take time to be created. Poll for the creation completion. + self.schedule_service_query() + + except Exception as ex: + self.agent.log.error(f'Error in creating service event. Exception={ex} trace={traceback.format_exc()}') + + def schedule_service_query(self) -> None: + if self.service_query_num_tries >= 10: + self.agent.log.warning(f"WorkFlowServiceEvents: Did not receive a service_query reply in 10 tries.") + return + self.service_query_num_tries += 1 + self.agent.scheduler.enter(30, 1, self.run_scheduled_service_query) # we might get the timeout from a conf option in a later version + + def run_scheduled_service_query(self) -> None: + response_text = ReportLastContactServiceQuery(self.agent, self.report_service_event_id).run() + try: + response = json.loads(response_text) + transactions = response['response_state']['transactions'] # Will raise if we got a bad reply + if not transactions or self.report_service_event_id not in transactions: # No reply yet + self.schedule_service_query() + return + else: + problem_id = transactions[self.report_service_event_id]['response_object']['problem_id'] + except Exception as ex: + self.agent.log.error(f'WorkFlowServiceEvents: Error querying for service creation. Exception={ex} response={response_text}') + return + + try: + request = { + 'options': { + 'pmr': problem_id, + 'level': 2 # Includes an SOS report + # No need to simulate si_requestid + } + } + self.agent.log.debug(f"Starting WorkFlowUploadSnap for problem_id: {problem_id}") + WorkFlowUploadSnap(self.agent, request, 'service_request', None, False).run() + except Exception as ex: + return HandleCommandResult(stderr=f"Error sending service request diagnostics: {ex}") + diff --git a/src/pybind/mgr/call_home_agent/workflow_upload_snap.py b/src/pybind/mgr/call_home_agent/workflow_upload_snap.py index 90e740c1e56..12c96960af5 100644 --- a/src/pybind/mgr/call_home_agent/workflow_upload_snap.py +++ b/src/pybind/mgr/call_home_agent/workflow_upload_snap.py @@ -22,13 +22,17 @@ OPERATION_STATUS_REQUEST_REJECTED = 'REQUEST_REJECTED' DIAGS_FOLDER = '/var/log/ceph' class WorkFlowUploadSnap: - def __init__(self, agent, req, req_id, report_event_id): + def __init__(self, agent, req, req_id, report_event_id, send_status_log_upload = True): + """ + send_status_log_upload: Should the workflow send percent complete reports to SI. Set to False in service_events. + """ self.agent = agent self.req = req self.req_id = req_id # unique ID for this request self.pmr = self.req.get('options', {}).get('pmr', None) self.report_event_id = report_event_id self._event_id_counter = 0 + self.send_status_log_upload = send_status_log_upload self.si_requestid = self.req.get('options', {}).get('si_requestid', '') def next_event_id(self): @@ -41,7 +45,7 @@ class WorkFlowUploadSnap: self.agent.log.info(f"WorkFlowUploadSnap <{self.req_id}> : Processing new request {self.req}") if not self.pmr: self.agent.log.warning(f"WorkFlowUploadSnap <{self.req_id}> : Error - No PMR in request.") - ReportURError(self.agent, self.next_event_id()) + ReportURError(self.agent, self.next_event_id()).run() return try: @@ -61,7 +65,8 @@ class WorkFlowUploadSnap: self.agent.log.info(f"WorkFlowUploadSnap <{self.req_id}> : Completed operation") except Exception as ex: self.agent.log.error(f'Operations ({self.req_id}): Error processing operation {self.req}. Exception={ex} trace={traceback.format_exc()}') - ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, 0, f"ERROR: {ex}", OPERATION_STATUS_ERROR).run() + if self.send_status_log_upload: + ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, 0, f"ERROR: {ex}", OPERATION_STATUS_ERROR).run() # if it was ok or not, we always report the state ReportConfirmResponse(self.agent, self.next_event_id()).run() @@ -269,13 +274,14 @@ class WorkFlowUploadSnap: resp.raise_for_status() start_byte += chunk_size part_sent += 1 - if chunk_pattern: - percent_progress = int(part_sent/len(files_to_upload) * 100) - status = OPERATION_STATUS_COMPLETE if percent_progress == 100 else OPERATION_STATUS_IN_PROGRESS - ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_progress, f"file <{file_name}> is being sent", status).run() - else: - status = OPERATION_STATUS_COMPLETE if percent_complete == 100 else OPERATION_STATUS_IN_PROGRESS - ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_complete, status, status).run() + if self.send_status_log_upload: + if chunk_pattern: + percent_progress = int(part_sent/len(files_to_upload) * 100) + status = OPERATION_STATUS_COMPLETE if percent_progress == 100 else OPERATION_STATUS_IN_PROGRESS + ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_progress, f"file <{file_name}> is being sent", status).run() + else: + status = OPERATION_STATUS_COMPLETE if percent_complete == 100 else OPERATION_STATUS_IN_PROGRESS + ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_complete, status, status).run() except Exception as ex: explanation = resp.text if resp else "" raise SendError(f'WorkFlowUploadSnap <{self.req_id}> : Failed to send <{file_path}> to <{ecurep_file_upload_url}>: {ex}: {explanation} trace={traceback.format_exc()}') @@ -292,10 +298,10 @@ class ReportStatusLogUpload(Report): def compile(self) -> Optional[dict]: # We override run because this event gets a non standard generate arguments report_times = ReportTimes() - report = self.get_report_headers(report_times, self.report_event_id) + self.set_headers(report_times, self.report_event_id) event = EventStatusLogUpload(self.agent).generate(report_times, self.si_requestid, self.percent_progress, self.description, self.status) - report['events'].append(event.data) - return report + self.add_event(event) + return self.data class EventStatusLogUpload(Event): def gather(self) -> dict: