]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
call_home: add service events
authorYaarit Hatuka <yhatuka@ibm.com>
Mon, 4 Aug 2025 00:28:21 +0000 (20:28 -0400)
committerYaarit Hatuka <yhatuka@ibm.com>
Fri, 17 Oct 2025 11:43:55 +0000 (07:43 -0400)
Open a support case when these events are detected by Prometheus:

CephMonDiskspaceCritical,
CephOSDFull
CephFilesystemOffline
CephFilesystemDamaged
CephPGsInactive
CephObjectMissing

Users need to populate the IBM Customer Number (ICN) and
customer_country_code fields.

Logs of level 1 and 2 will be uploaded to the ticket after it's open.

Resolves: rhbz#2242911
Signed-off-by: Yaarit Hatuka <yhatuka@ibm.com>
(cherry picked from commit 66f20c29299047d2e365da5a6779b287c2f572af)

src/pybind/mgr/call_home_agent/module.py
src/pybind/mgr/call_home_agent/report.py
src/pybind/mgr/call_home_agent/report_last_contact_service_query.py [new file with mode: 0644]
src/pybind/mgr/call_home_agent/report_service.py [new file with mode: 0644]
src/pybind/mgr/call_home_agent/report_status_alerts.py
src/pybind/mgr/call_home_agent/tests/test_agent.py
src/pybind/mgr/call_home_agent/workflow_service_events.py [new file with mode: 0644]
src/pybind/mgr/call_home_agent/workflow_upload_snap.py

index 96c3f7be651d0281e681f1389b7467f541528ef3..a31f1184d9e046d7497caa38c6af9f777787a474 100644 (file)
@@ -24,6 +24,8 @@ import sched
 import time
 #from threading import Event
 import threading
+import importlib.util
+import pathlib
 
 from ceph.cryptotools.select import get_crypto_caller
 
@@ -100,11 +102,11 @@ class CallHomeAgent(MgrModule):
             desc='Time frequency for the alerts report'
         ),
         Option(
-            name='interval_performance_report_seconds',
+            name='interval_service_report_seconds',
             type='int',
             min=0,
-            default = int(os.environ.get('CHA_INTERVAL_PERFORMANCE_REPORT_SECONDS', 60 * 5)),  # 5 minutes
-            desc='Time frequency for the performance report'
+            default = int(os.environ.get('CHA_INTERVAL_SERVICE_REPORT_SECONDS', 60 * 60)),  # 60 minutes
+            desc='Time frequency for checking for new alerts for service events'
         ),
         Option(
             name='customer_email',
@@ -250,6 +252,12 @@ class CallHomeAgent(MgrModule):
             default=3600 * 2,
             desc='Time interval in seconds to allow a cooldown between level 2 upload snap requests'
         ),
+        Option(
+            name='disable_service_events',
+            type='bool',
+            default=False,
+            desc='Disable service events'
+        ),
     ]
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
@@ -293,12 +301,12 @@ class CallHomeAgent(MgrModule):
         # set up some members to enable the serve() method and shutdown()
         self.run = True
 
-        # Module options
-        self.refresh_options()
-
         # Health checks
         self.health_checks: Dict[str, Dict[str, Any]] = dict()
 
+        # Module options
+        self.refresh_options()
+
         # Unsolicited Request support
 
         # identify messages that we received in the past self.stale_timeout seconds (10 days). such messages will be ignored and removed from the queue.
@@ -332,7 +340,8 @@ class CallHomeAgent(MgrModule):
             self.log.info("Cleaning old module's db_operations")
             self.set_store('db_operations', None)
 
-        
+        self.scheduler = sched.scheduler(time.time, time.sleep)
+
     def get_jwt_jti(self) -> str:
         # Extract jti from JWT. This is another way to identify clusters in addition to the ICN.
         jwt_jti = ""
@@ -375,6 +384,9 @@ class CallHomeAgent(MgrModule):
 
         self.jwt_jti = self.get_jwt_jti()
 
+        # validate icn and country_code
+        self.validate_config()
+
     def ceph_command(self, srv_type: str, prefix: str, srv_spec: Optional[str] = '', inbuf: str = '', **kwargs):
         # Note: A simplified version of the function used in dashboard ceph services
         """
@@ -399,6 +411,96 @@ class CallHomeAgent(MgrModule):
             self.log.error(f"Execution of command '{prefix}' failed: {ex}")
             return outb
 
+
+    def validate_country_code(self, country_code: str) -> Tuple[bool, str]:
+        """
+        Validates a 2-letter country code using ISO 3166-1 alpha-2 codes
+        """
+        # Set of valid ISO 3166-1 alpha-2 country codes
+        VALID_COUNTRY_CODES = {
+            'AF', 'AX', 'AL', 'DZ', 'AS', 'AD', 'AO', 'AI', 'AQ', 'AG',
+            'AR', 'AM', 'AW', 'AU', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB',
+            'BY', 'BE', 'BZ', 'BJ', 'BM', 'BT', 'BO', 'BQ', 'BA', 'BW',
+            'BV', 'BR', 'IO', 'BN', 'BG', 'BF', 'BI', 'KH', 'CM', 'CA',
+            'CV', 'KY', 'CF', 'TD', 'CL', 'CN', 'CX', 'CC', 'CO', 'KM',
+            'CG', 'CD', 'CK', 'CR', 'CI', 'HR', 'CU', 'CW', 'CY', 'CZ',
+            'DK', 'DJ', 'DM', 'DO', 'EC', 'EG', 'SV', 'GQ', 'ER', 'EE',
+            'SZ', 'ET', 'FK', 'FO', 'FJ', 'FI', 'FR', 'GF', 'PF', 'TF',
+            'GA', 'GM', 'GE', 'DE', 'GH', 'GI', 'GR', 'GL', 'GD', 'GP',
+            'GU', 'GT', 'GG', 'GN', 'GW', 'GY', 'HT', 'HM', 'VA', 'HN',
+            'HK', 'HU', 'IS', 'IN', 'ID', 'IR', 'IQ', 'IE', 'IM', 'IL',
+            'IT', 'JM', 'JP', 'JE', 'JO', 'KZ', 'KE', 'KI', 'KP', 'KR',
+            'KW', 'KG', 'LA', 'LV', 'LB', 'LS', 'LR', 'LY', 'LI', 'LT',
+            'LU', 'MO', 'MG', 'MW', 'MY', 'MV', 'ML', 'MT', 'MH', 'MQ',
+            'MR', 'MU', 'YT', 'MX', 'FM', 'MD', 'MC', 'MN', 'ME', 'MS',
+            'MA', 'MZ', 'MM', 'NA', 'NR', 'NP', 'NL', 'NC', 'NZ', 'NI',
+            'NE', 'NG', 'NU', 'NF', 'MK', 'MP', 'NO', 'OM', 'PK', 'PW',
+            'PS', 'PA', 'PG', 'PY', 'PE', 'PH', 'PN', 'PL', 'PT', 'PR',
+            'QA', 'RE', 'RO', 'RU', 'RW', 'BL', 'SH', 'KN', 'LC', 'MF',
+            'PM', 'VC', 'WS', 'SM', 'ST', 'SA', 'SN', 'RS', 'SC', 'SL',
+            'SG', 'SX', 'SK', 'SI', 'SB', 'SO', 'ZA', 'GS', 'SS', 'ES',
+            'LK', 'SD', 'SR', 'SJ', 'SE', 'CH', 'SY', 'TW', 'TJ', 'TZ',
+            'TH', 'TL', 'TG', 'TK', 'TO', 'TT', 'TN', 'TR', 'TM', 'TC',
+            'TV', 'UG', 'UA', 'AE', 'GB', 'US', 'UM', 'UY', 'UZ', 'VU',
+            'VE', 'VN', 'VG', 'VI', 'WF', 'EH', 'YE', 'ZM', 'ZW'
+        }
+
+        # if the user did not populate the customer_country_code config option yet
+        if country_code is None:
+            return False, "Country code is not configured"
+
+        if country_code.upper() not in VALID_COUNTRY_CODES:
+            return False, f"'{country_code}' must be a valid ISO 3166-1 alpha-2 country code"
+
+        return True, f"'{country_code}' is a valid country code"
+
+    def validate_icn(self, icn: str) -> Tuple[bool, str]:
+        # if the user did not populate the ICN config option yet
+        if icn is None:
+            return False, f"IBM Customer Number (ICN) is not configured"
+
+        # ICN is exactly 7 alphanumeric characters long
+        if len(icn) != 7 or not icn.isalnum():
+            return False, f"Invalid ICN: '{icn}'. ICN must be exactly 7 alphanumeric characters long"
+
+        return True, "ICN is valid"
+
+    def validate_config(self) -> None:
+        """
+        Validate the mandatory fields in the config options
+        """
+        # ICN is a mandatory field for service events
+        is_valid, msg = self.validate_icn(self.icn)
+        if not is_valid:
+            self.log.warning(f"CHA_INVALID_ICN: {msg}")
+            self.health_checks.update({
+                'CHA_INVALID_ICN': {
+                    'severity': 'warning',
+                    'summary': 'Call Home Agent: IBM Customer Number (ICN) is invalid. '
+                               'Run "ceph callhome set icn <icn>" to set it.',
+                    'detail': [msg]
+                }
+            })
+        else:
+            self.health_checks.pop('CHA_INVALID_ICN', None)
+
+        # Country code is a mandatory field for service events
+        is_valid, msg = self.validate_country_code(self.customer_country_code)
+        if not is_valid:
+            self.log.warning(f"CHA_INVALID_COUNTRY_CODE: {msg}")
+            self.health_checks.update({
+                'CHA_INVALID_COUNTRY_CODE': {
+                    'severity': 'warning',
+                    'summary': 'Call Home Agent: Country code is invalid. '
+                               'Run "ceph callhome set country-code <country-code>" to set it.',
+                    'detail': [msg]
+                }
+            })
+        else:
+            self.health_checks.pop('CHA_INVALID_COUNTRY_CODE', None)
+
+        self.set_health_checks(self.health_checks)
+
     def connectivity_update(self, response: dict) -> None:
         """
         Validate that the response is from IBM call home and update the connectivity check struct
@@ -625,7 +727,7 @@ class CallHomeAgent(MgrModule):
         """
         This only affects changes in ceph config options.
         To change configuration using env. vars a restart of the module
-        will be neeed or the change in one config option will refresh
+        will be needed or the change in one config option will refresh
         configuration coming from env vars
         """
         self.refresh_options()
@@ -636,7 +738,6 @@ class CallHomeAgent(MgrModule):
 
     def serve(self):
         self.log.info('Starting IBM Ceph Call Home Agent')
-        self.scheduler = sched.scheduler(time.time, time.sleep)
         self.schedule_tasks()
         while self.run:
             # Passing False causes the scheduler.run() to return the time until the next event. therefore we're not blocked in
@@ -879,3 +980,44 @@ class CallHomeAgent(MgrModule):
         self.set_store('ur_cooldown', json.dumps(self.ur_cooldown))
 
         return HandleCommandResult(stdout="Success")
+
+    @CLIWriteCommand('callhome set country-code')
+    def cli_set_country_code(self, country_code: str) -> Tuple[int, str, str]:
+        """
+        Set a 2 letter country code
+        """
+        is_valid, msg = self.validate_country_code(country_code)
+
+        if not is_valid:
+            self.log.error(f"Invalid country code: {msg}")
+            return HandleCommandResult(retval=1, stderr=f'Invalid country code: {msg}')
+
+        try:
+            self.set_module_option('customer_country_code', country_code.upper())
+        except Exception as e:
+            return HandleCommandResult(retval=1, stderr=str(e))
+        else:
+            return HandleCommandResult(stdout=f'country code is set to {country_code}')
+        finally:
+            self.refresh_options()  # This will always run, no matter what.
+
+    @CLIWriteCommand('callhome set icn')
+    def cli_set_icn(self, icn: str) -> Tuple[int, str, str]:
+        """
+        Set an IBM customer number (7-character alphanumeric string)
+        """
+        is_valid, msg = self.validate_icn(icn)
+
+        if not is_valid:
+            self.log.warning(msg)
+            return HandleCommandResult(retval=1, stderr=msg)
+
+        try:
+            self.set_module_option('icn', icn)
+        except Exception as e:
+            return HandleCommandResult(retval=1, stderr=str(e))
+        else:
+            return HandleCommandResult()
+        finally:
+            self.refresh_options()  # This will always run, no matter what.
+
index 7e62a9c5a8ba91a1b0b36e826e528d6c3ba72433..706c8e87d66f63977a01c21a18e31867faf2f8ff 100644 (file)
@@ -27,15 +27,20 @@ class Report:
         self.report_type = report_type
         self.event_classes = event_classes
         self.report_event_id = None
+        self.events = []
+
+    def add_event(self, event):
+        self.events.append(event)
+        self.data['events'].append(event.data)
 
     def compile(self) -> Optional[dict]:
         report_times = ReportTimes()
-        report = self.get_report_headers(report_times, self.report_event_id)
+        self.set_headers(report_times, self.report_event_id)
         for event_class in self.event_classes:
             event = event_class(self.agent).generate(report_times)
-            report['events'].append(event.data)
+            self.add_event(event)
 
-        return report
+        return self.data
 
     def run(self) -> Optional[str]:
         compiled = self.compile()
@@ -43,7 +48,7 @@ class Report:
             return None
         return self.send(compiled)
 
-    def get_report_headers(self, report_times: ReportTimes, report_event_id = None) -> dict:
+    def set_headers(self, report_times: ReportTimes, report_event_id = None) -> None:
         try:
             secrets = self.agent.get_secrets()
         except Exception as e:
@@ -55,8 +60,9 @@ class Report:
 
         if not report_event_id:
             report_event_id = f"IBM_chc_event_RedHatMarine_ceph_{self.agent.ceph_cluster_id}_{self.report_type}_report_{report_times.time_ms}"
+        self.report_event_id = report_event_id
 
-        header = {
+        self.data = {
                 "agent": "RedHat_Marine_firmware_agent",
                 "api_key": secrets['api_key'],
                 "private_key": secrets['private_key'],
@@ -86,9 +92,7 @@ class Report:
                 "events": []
             }
 
-        #header.update(self._header_times(report_timestamp))
-
-        return header
+        #data.update(self._header_times(report_timestamp))  # TODO: check
 
     def send(self, report: dict, force: bool = False) -> str:
         resp = None
@@ -105,7 +109,7 @@ class Report:
                                  data=json.dumps(report),
                                  proxies=self.agent.proxies,
                                  timeout=60)
-            self.agent.log.debug(f"Report response: {resp.text}")
+            self.agent.log.debug(f"Report response: {resp.text}")  # TODO: remove keys
             resp.raise_for_status()
 
             ch_response = resp.json()
diff --git a/src/pybind/mgr/call_home_agent/report_last_contact_service_query.py b/src/pybind/mgr/call_home_agent/report_last_contact_service_query.py
new file mode 100644 (file)
index 0000000..5b421eb
--- /dev/null
@@ -0,0 +1,28 @@
+from .report import Report, ReportTimes, EventGeneric
+from .report_last_contact import EventLastContact
+from typing import Optional
+import time
+
+class ReportLastContactServiceQuery(Report):
+    def __init__(self, agent, filter_event_id) -> None:
+        super().__init__(agent, 'last_contact')
+
+        self.filter_event_id = filter_event_id
+
+    def compile(self) -> Optional[dict]:
+        # We override compile() because this event gets a non standard generate arguments
+        report_times = ReportTimes()
+        self.set_headers(report_times, self.report_event_id)
+        event = EventLastContactServiceQuery(self.agent).generate(report_times, self.filter_event_id)
+        self.add_event(event)
+        return self.data
+
+
+class EventLastContactServiceQuery(EventLastContact):
+    def generate(self, report_times: ReportTimes, filter_event_id):
+        super().generate(report_times)
+
+        self.data["body"]["enable_response_detail_filter"] = [filter_event_id]
+        return self
+
+
diff --git a/src/pybind/mgr/call_home_agent/report_service.py b/src/pybind/mgr/call_home_agent/report_service.py
new file mode 100644 (file)
index 0000000..cc20f23
--- /dev/null
@@ -0,0 +1,104 @@
+from .report import Report, ReportTimes, EventGeneric
+from typing import Optional
+import time
+import json
+
+class ReportService(Report):
+    def __init__(self, agent, alerts: list) -> None:
+        super().__init__(agent, 'service')
+        self.alerts = alerts
+
+    def compile(self) -> Optional[dict]:
+        # We override compile() because this event gets a non standard generate() arguments
+        report_times = ReportTimes()
+        self.set_headers(report_times, self.report_event_id)
+        event = EventService(self.agent).generate(report_times, self.alerts)
+        self.add_event(event)
+        self.agent.log.debug("Generated service event:")
+        self.agent.log.debug(json.dumps(self.data)) # TODO: remove keys from the output
+        return self.data
+
+class EventService(EventGeneric):
+    def generate(self, report_times: ReportTimes, alerts: list):
+        super().generate('service', 'ibm_redhat_ceph_service_manager', 'Ceph service request', report_times)
+
+        r, outb, outs = self.agent.mon_command({
+            'prefix': 'status',
+            'format': 'text'
+        })
+        note = outb
+        '''
+        # in case we add the new alerts to the 'notes' field:
+        note += "\n\n"
+        note += "Alerts:\n"
+        # print(f"------------------------- [{alerts}] ---------------------")
+        note += "\n".join(a.get('labels', {}).get('alertname', "Unknown alert") for a in alerts) + "\n"
+        '''
+
+        r, outb, outs = self.agent.mon_command({
+            'prefix': 'versions',
+            'format': 'json'
+        })
+        versions = json.loads(outb)
+
+        # We use the first alert as a base for the code (subject) text of the ticket.
+        # We do that by stable sorting the list of alerts.
+        alerts_sorted = sorted(alerts, key = lambda d: json.dumps(d, sort_keys = True))
+        first_alert = alerts_sorted[0]
+        alert_name = first_alert['labels']['alertname']  # this value must exist since we sorted the original alert list by it
+        alert_instance = first_alert['labels'].get('instance')  # this value might not exist
+        alert_subject = alert_name + ((':' + alert_instance) if alert_instance else '')
+        alert_subject = alert_subject[:140]  # Call home 'code' field is limited to 140 characters.
+
+        description = json.dumps(alerts_sorted, sort_keys = True, indent=4)
+        description = description[:10000]
+
+        now = time.time()
+
+        self.data['body'].update( {
+            "customer": self.agent.icn,
+            "country": self.agent.customer_country_code,
+            "error_type": "software",
+            "error_software_type": "distributed",
+            "routing_identifier": "5900AVA00",
+            "product_code_identifier": "SCSTZ",
+            "record_type": 1,
+            "test": False,
+            "code": alert_subject,  # the title of the error that we're seeing; max 140 characters
+            "note": note,  # currently the "note" field contains the output of `ceph -s`
+            "context": {
+                "origin": 2,
+                "timestamp": int(now),  # time in seconds
+                "transid": int(now * 1000)  # time in milliseconds
+            },
+            "description": description, # semi described, 10K character limit
+            "object_instance_virtual_id": self.agent.ceph_cluster_id,
+            "object_instance": self.agent.ceph_cluster_id,
+            "object_type": "ceph",
+            "object_category": "RedHat",
+            "object_group": "Storage",
+            "object_version": "612",  # a static value for now since it's not used
+            "object_logical_name": "01t3p00000TKZPjAAP",  # hardcoded for now; can be used in the future to describe a subsystem
+            "submitter_instance_virtual_id": "ceph_storage_00001",  # hardcoded for now; can be used in case of a proxy submitter, like fusion
+            "submitter_instance": "infrastructure",
+            "submitter_type": "ceph",
+            "submitter_category": "RedHat",
+            "submitter_group": "Storage",
+            # for now these fields are not required:
+            # "contact_email": "admin@company.com",
+            # "contact_name": "John Doe",
+            # "contact_organization": "Company",
+            # "contact_phone": "123-456-6789",
+            # "contact_address": "123 Main Street, City, CA 12345, US",
+            ###############
+            "payload": {
+                "ceph_versions": versions,
+                "software": {
+                    "diagnostic_provided": True,
+                    "ibm_ceph_version": "7.1.0"  # TODO: change to 9.0.0 after the value is available
+                }
+            }
+        } )
+
+        return self
+
index 7396ca4d090ae7fcb4f4b5cebdd1f7ac1546a1cd..1a0d33c0ad0897791362ad4f532719e0018e8c1a 100644 (file)
@@ -1,8 +1,10 @@
 from .report import Report, ReportTimes, EventGeneric
 from .prometheus import Prometheus
+from .workflow_service_events import WorkFlowServiceEvents
 import time
 import json
 import requests
+from datetime import datetime
 from typing import Optional
 
 class ReportStatusAlerts(Report):
@@ -24,14 +26,14 @@ class ReportStatusAlerts(Report):
 
     def compile(self) -> Optional[dict]:
         report_times = ReportTimes()
-        report = self.get_report_headers(report_times)
+        self.set_headers(report_times)
         event = EventStatusAlerts(self.agent).generate(report_times)
         # If there are no alerts to send then return and dont send the report
         if not event.has_content:
             return None
 
-        report['events'].append(event.data)
-        return report
+        self.add_event(event)
+        return self.data
         #self.send(report)
 
     @staticmethod
@@ -60,9 +62,52 @@ class EventStatusAlerts(EventGeneric):
         self.set_content(content)
         return self
 
+    def service_events(self, alerts: list) -> None:
+        if self.agent.disable_service_events:
+            return
+
+        # Service Events (opening a support case)
+        service_events_alerts = list(filter(self.is_alert_relevant_service_events, alerts))
+        if not service_events_alerts:
+            self.agent.log.debug(f"No alerts for service events")
+            return
+
+        last_service_events_sent = int(self.agent.get_store('last_service_events_sent', '0'))
+        now = int(time.time())
+        self.agent.log.debug(f"Now = {datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S')}, \
+            last_service_events_sent = {datetime.fromtimestamp(last_service_events_sent).strftime('%Y-%m-%d %H:%M:%S')}")
+
+        if now - last_service_events_sent < self.agent.interval_service_report_seconds:  # 60 minutes by default
+            self.agent.log.debug(f"Waiting to send the next service event. Now = {now}, last_service_events_sent = {last_service_events_sent}")
+            return  # don't send more than one service event per hour by default
+
+        new_events = list(filter(
+            lambda e: 'activeAt' in e and datetime.fromisoformat(e['activeAt'].replace("Z", "+00:00")).timestamp() > last_service_events_sent,
+            service_events_alerts
+        ))
+        if not new_events:
+            self.agent.log.debug(f"Found alerts for service events, but we already handled them")
+            return
+
+        self.agent.log.debug(f"Found new alerts for service events, starting workflow")
+        # We set last_service_events_sent here to avoid loopers in case of
+        # issues with the backend, e.g. the events are not sent successfully
+        # due to a server error, thus we keep trying to send them forever.
+        # Instead, we currently try to send several times, and if it fails we
+        # will try to send the active alerts after
+        # interval_service_report_seconds).
+        self.agent.set_store('last_service_events_sent', str(now))
+        WorkFlowServiceEvents(self.agent, new_events).run()
+
     def gather(self) -> dict:
+        all_prometheus_alerts = self.get_prometheus_alerts()
+        self.agent.log.debug(f"all_prometheus_alerts: {all_prometheus_alerts}")
+
+        self.service_events(all_prometheus_alerts)
+
+        # Sending alerts via a dedicated status report
         # Filter the alert list
-        current_alerts_list = list(filter(self.is_alert_relevant, self.get_prometheus_alerts()))
+        current_alerts_list = list(filter(self.is_alert_relevant_status_alerts, all_prometheus_alerts))
 
         current_alerts = {self.alert_uid(a):a for a in current_alerts_list}
         # Find all new alerts - alerts that are currently active but were not sent until now (not in sent_alerts)
@@ -81,7 +126,7 @@ class EventStatusAlerts(EventGeneric):
         """
         return json.dumps(alert['labels'], sort_keys=True) + alert['activeAt'] + alert['value']
 
-    def is_alert_relevant(self, alert: dict) -> bool:
+    def is_alert_relevant_status_alerts(self, alert: dict) -> bool:
         """
         Returns True if this alert should be sent, False if it should be filtered out of the report
         """
@@ -90,6 +135,19 @@ class EventStatusAlerts(EventGeneric):
 
         return state == 'firing' and severity == 'critical'
 
+    def is_alert_relevant_service_events(self, alert: dict) -> bool:
+        # This list holds the names of the alerts for which we open support cases:
+        alerts_for_service = [
+                        'CephMonDiskspaceCritical',
+                        'CephOSDFull',
+                        'CephFilesystemOffline',
+                        'CephFilesystemDamaged',
+                        'CephPGsInactive',
+                        'CephObjectMissing',
+                        ]
+
+        return alert.get('labels', {}).get('alertname', "Unknown alert") in alerts_for_service
+
     def get_prometheus_alerts(self):
         """
         Returns a list of all the alerts currently active in Prometheus
index e3bb83429d474c1cb9ec834acbfad5fb2a55f726..7fb14369a9ecf575aec4168e66f56f114ad70355 100644 (file)
@@ -3,17 +3,19 @@ import time
 import json
 import os
 from collections import defaultdict
+from typing import Optional
 
 from unittest.mock import MagicMock, Mock, patch
 
 #from call_home_agent.module import Report
 from call_home_agent.module import CallHomeAgent
-from call_home_agent.ReportLastContact import ReportLastContact, EventLastContact
-from call_home_agent.ReportInventory import ReportInventory, EventInventory
-from call_home_agent.ReportStatusAlerts import ReportStatusAlerts
-from call_home_agent.ReportStatusHealth import ReportStatusHealth
-from call_home_agent.WorkFlowUploadSnap import WorkFlowUploadSnap
-from call_home_agent.Report import Report, ReportTimes
+from call_home_agent.report_last_contact import ReportLastContact, EventLastContact
+from call_home_agent.report_inventory import ReportInventory, EventInventory
+from call_home_agent.report_status_alerts import ReportStatusAlerts
+from call_home_agent.report_status_health import ReportStatusHealth
+from call_home_agent.workflow_upload_snap import WorkFlowUploadSnap
+from call_home_agent.report import Report, ReportTimes
+from call_home_agent.workflow_service_events import WorkFlowServiceEvents
 import mgr_module
 import traceback
 
@@ -24,7 +26,6 @@ JWT_REG_CREDS_DICT = {"url": "test.icr.io", "username": "test_username", "passwo
 JWT_REG_CREDS =json.dumps(JWT_REG_CREDS_DICT)
 PLAIN_PASSWORD_REG_CREDS_DICT = {"url": "test.icr.io", "username": "test_username", "password": "plain_password"}
 
-
 class MockedMgr():
     class Log:
         def error(self, msg):
@@ -67,7 +68,15 @@ class MockedMgr():
         return ['mock_serverA', 'mock_serverB']
 
     def mon_command(self, command):
-        return 0, json.dumps({'health': {'status': 'mocked health status  mon_cmd'}}), ""
+        if command['prefix'] == 'status':
+            if command['format'] == 'text':
+                return 0, "\nceph_status:\n  cluster:\n    id:     4a7851de-8b13-11ef-85fe-525400f89c16\n    health: HEALTH_WARN\n            mon y789-node-00 is low on available space\n\n  services:\n    mon: 3 daemons, quorum y789-node-00,y789-node-02,y789-node-01 (age 11d)\n    mgr: y789-node-00.bxrhew(active, since 9d), standbys: y789-node-02.gbigcx\n    osd: 3 osds: 3 up (since 3M), 3 in (since 3M)\n\n  data:\n    pools:   1 pools, 1 pgs\n    objects: 2 objects, 449 KiB\n    usage:   328 MiB used, 15 GiB / 15 GiB avail\n    pgs:     1 active+clean", ""
+            else:
+                return 0, json.dumps({'health': {'status': 'mocked health status  mon_cmd'}}), ""
+        elif command['prefix'] == 'versions' and command['format'] == 'json':
+            return 0, json.dumps({"mon":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":3},"mgr":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":2},"osd":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":3},"overall":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":8}}), ""
+        else:
+            raise Exception(f"Unknown mon_command {command}")
 
     def remote(self, component, command, service_name=None, hostname=None, sos_params=None):
         m = MagicMock()
@@ -114,31 +123,32 @@ def mocked_ceph_command(self, srv_type, prefix, key=None, mgr=None, detail=None)
     else:
         raise Exception(f"Unknown ceph command [{prefix}], please mock it")
 
-def mocked_requests_get(url, auth=None, data=None, headers=None, proxies=None, params=None):
-    """
-    Used by ReportStatusAlerts to query prometheous
-    """
-    m = Mock()
-    if "api/v1/query" in url:
-        # This is a request for Prometheous
-        m.json.return_value = {'data': {'result': [{'value': "1234"}] }}
-    elif "api/v1/targets" in url:
-        m.json.return_value = {'data': {'activeTargets': [{'health': "up"}] }}
-    else:
-        m.json.return_value = {'data': {'alerts': [] }}
-    return m
-
 
 original_time_time = time.time
 test_object = None
-debug = False
-verbose = False
+debug = True
+verbose = True
 
 def mock_glob(pattern: str):
     print(f"mock_glob: globbing {pattern}")
     current_dir = os.path.dirname(os.path.abspath(__file__))
     return [f"{current_dir}/testfile1", f"{current_dir}/testfile2", f"{current_dir}/testfile3"]
 
+def prometheus_make_alert(name: str, activeAt: Optional[str] = None) -> dict:
+    return {
+            'labels': {
+                'alertname': name,
+                'severity': 'critical'
+            },
+            'annotations': {
+                'description': "some alert"
+            },
+            'state': 'firing',
+            # 'activeAt' and 'value' are here for alert_uid() to work. They should be '0' so that we won't send this alert again and again
+            'activeAt': activeAt if activeAt else "2025-08-15T10:12:13.123Z",
+            'value': '0'
+    }
+
 #@patch('mgr_module.MgrModule.version', '99.9')
 class TestAgent(unittest.TestCase):
 
@@ -156,6 +166,20 @@ class TestAgent(unittest.TestCase):
         #print("".join(traceback.format_stack()))
 
     ########################### HTTP requests ############################
+    def mocked_requests_get(self, url, auth=None, data=None, headers=None, proxies=None, params=None):
+        """
+        Used by ReportStatusAlerts to query prometheous
+        """
+        m = Mock()
+        if "api/v1/query" in url:
+            # This is a request for Prometheous
+            m.json.return_value = {'data': {'result': [{'value': "1234"}] }}
+        elif "api/v1/targets" in url:
+            m.json.return_value = {'data': {'activeTargets': [{'health': "up"}] }}
+        else:
+            m.json.return_value = {'data': {'alerts': self.prometheus_alerts }}
+        return m
+
     def mocked_requests_post(self, url, auth=None, data=None, headers=None, proxies=None, timeout=None):
         print("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv request.post vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
         print(f"  URL: {url}")
@@ -163,13 +187,14 @@ class TestAgent(unittest.TestCase):
         event_type = None
         if data:
             try:
-                pretty = json.dumps(json.loads(data), indent=4)
+                data_dict = json.loads(data)
+                pretty = json.dumps(data_dict, indent=4)
                 try:
-                    event_type = json.loads(data)['events'][0]['header']['event_type']
+                    event_type = data_dict['events'][0]['header']['event_type']
                     if event_type == 'confirm_response':
                         component = 'NA'
                     else:
-                        component = json.loads(data)['events'][0]['body']['component']
+                        component = data_dict['events'][0]['body']['component']
                     print(f"  event_type={event_type}  component={component}")
                     self.sent_events[f"{event_type}-{component}"] += 1  # so we can assertEqual on it later
                 except:
@@ -190,18 +215,40 @@ class TestAgent(unittest.TestCase):
             return m
         elif 'esupport' in url:  # call home
             if event_type == 'last_contact':
-                if self.mock_requests_send_has_ur:
-                    self.mock_requests_send_has_ur -= 1
-                    print("mocked_requests_post(): Returning yes UR")
-                    m.text = self.mocked_last_contact_response_yes_ur
-                    if self.mock_requests_cooldown_pmr:
-                        # replace the pmr so that it will be different UR for stale, but same for cooldown
-                        new_pmr = self.mock_requests_cooldown_pmr.pop()
-                        m.text = self.mocked_last_contact_response_yes_ur.replace('TS1234567', new_pmr)
-                        print(f"####### replacing PMR to {new_pmr}")
+                if data_dict["events"][0]["body"]["enable_response_detail_filter"] == ["Unsolicited_Storage_Insights_RedHatMarine_ceph_Request"]:
+                    if self.mock_requests_send_has_ur:
+                        self.mock_requests_send_has_ur -= 1
+                        print("mocked_requests_post(): Returning yes UR")
+                        m.text = self.mocked_last_contact_response_yes_ur
+                        if self.mock_requests_cooldown_pmr:
+                            # replace the pmr so that it will be different UR for stale, but same for cooldown
+                            new_pmr = self.mock_requests_cooldown_pmr.pop()
+                            m.text = self.mocked_last_contact_response_yes_ur.replace('TS1234567', new_pmr)
+                            print(f"####### replacing PMR to {new_pmr}")
+                    else:
+                        m.text = self.mocked_last_contact_response_no_ur
+                    m.json.return_value = json.loads(m.text)
                 else:
-                    m.text = self.mocked_last_contact_response_no_ur
-                m.json.return_value = json.loads(m.text)
+                    # Currently we only have two types of last_contact - one with a filter for unsolicited requests
+                    # and one with a filter for a specific event_id, part of the service events workflow.
+
+                    # The WorkFlowServiceEvents code retries querying for the service creation 10 times.
+                    # we'll mock 3 replies of "not ready yet" before returning a good reply with the ticket's problem_id
+                    if self.mock_service_event_no_case_opened_replies > 0:
+                        # Return "ticket not opened yet" for 3 tries".
+                        self.mock_service_event_no_case_opened_replies -= 1
+                        m.json.return_value = {'response_state' : {'transactions': { } } }
+                        m.text = json.dumps(m.json.return_value)
+                    else:
+                        # Return ticket opened, with the ticket ID (T1234) nested correctly.
+                        event_id =  next(iter((data_dict["events"][0]["body"]["enable_response_detail_filter"])))  # Get the first key of the dict. This should be the event_id.
+                        m.json.return_value = {'response_state' : {'transactions': { event_id : {'response_object': {'problem_id': 'T1234'} } } } }
+                        m.text = json.dumps(m.json.return_value)
+
+            elif event_type == 'service':
+                print(f"Got service event")
+                m.json.return_value = { 'response_state': 123 }
+                m.text = json.dumps(m.json.return_value)
             return m
         else:
             raise Exception(f"Unknown mocked_requests_post URL [{url}], please mock it")
@@ -212,7 +259,8 @@ class TestAgent(unittest.TestCase):
         CallHomeAgent.__bases__ = (MockedMgr,)
         #patch('mgr_module.MgrModule.version', '99.9').start()
         patch('call_home_agent.module.CallHomeAgent.ceph_command', mocked_ceph_command).start()
-        patch('call_home_agent.WorkFlowUploadSnap.DIAGS_FOLDER', '/tmp').start()
+        #patch('call_home_agent.WorkFlowUploadSnap.DIAGS_FOLDER', '/tmp').start()
+        patch('call_home_agent.workflow_upload_snap.DIAGS_FOLDER', '/tmp').start()
         patch('call_home_agent.module.CallHomeAgent.get_secrets',
               return_value={'api_key': 'mocked_api_key',
                             'private_key': 'mocked_private_key',
@@ -222,7 +270,7 @@ class TestAgent(unittest.TestCase):
 
 
         patch('requests.post', self.mocked_requests_post).start()
-        patch('requests.get', mocked_requests_get).start()
+        patch('requests.get', self.mocked_requests_get).start()
         patch('glob.glob', mock_glob).start()
         patch('os.remove', Mock()).start()
         patch('os.path.getsize', Mock(return_value=42)).start()
@@ -243,6 +291,8 @@ class TestAgent(unittest.TestCase):
         self.mock_requests_send_has_ur = 0
         self.mock_requests_cooldown_pmr = []
         self.sent_events = defaultdict(int)
+        self.mock_service_event_no_case_opened_replies = 3
+        self.prometheus_alerts = []
 
         # Load the json answers that requests.post() should return
         with open(os.path.dirname(__file__) + '/response_no_pending_ur.json', 'r') as resp:
@@ -393,3 +443,65 @@ class TestAgent(unittest.TestCase):
         status = agent.get_call_home_status()
         self.assertEqual(status['connectivity'], True)
         self.assertEqual(status['connectivity_error'], 'Success')
+
+    def test_service_event(self):
+        with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep):
+            self.mock_service_event_no_case_opened_replies = 3
+            agent = CallHomeAgent()
+            self.agent = agent
+            WorkFlowServiceEvents(agent, [prometheus_make_alert('alert_a'), prometheus_make_alert('alert_b')]).run()
+            #ReportLastContact(agent).run()
+
+            self.test_end = self.mocked_time_time() + 15 * 60
+            agent.serve()
+
+            self.assertEqual(self.sent_events['status-ceph_log_upload'], 0)
+            self.assertEqual(self.sent_events['confirm_response-NA'], 1)
+            self.assertEqual(len(agent.ur_queue), 0)
+            self.assertEqual(len(agent.ur_stale), 0)
+            self.assertEqual(len(agent.ur_cooldown), 0)
+
+    def test_service_event_no_case_opened(self):
+        with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep):
+            self.mock_service_event_no_case_opened_replies = 60 * 60  # Greater than the number of tries that the WorkFlowServiceEvents tries.
+            agent = CallHomeAgent()
+            self.agent = agent
+            WorkFlowServiceEvents(agent, [prometheus_make_alert('alert_a'), prometheus_make_alert('alert_b')]).run()
+            #ReportLastContact(agent).run()
+
+            self.test_end = self.mocked_time_time() + 60 * 60
+            agent.serve()
+
+            self.assertEqual(self.sent_events['status-ceph_log_upload'], 0)
+            self.assertEqual(self.sent_events['confirm_response-NA'], 0)
+            self.assertEqual(self.sent_events['last_contact-ceph_last_contact'], 13)  # 10 from the WorkFlowServiceEvents, 3 from generic last_contact messages
+            self.assertEqual(len(agent.ur_queue), 0)
+            self.assertEqual(len(agent.ur_stale), 0)
+            self.assertEqual(len(agent.ur_cooldown), 0)
+
+    def test_service_event_mixed_prom_alerts(self):
+        # a mix of relevant and non relevant prometheus alerts
+
+        with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep):
+            self.mock_service_event_no_case_opened_replies = 3
+            self.prometheus_alerts = [ prometheus_make_alert("some_alert_1"), prometheus_make_alert("CephOSDFull"),
+                prometheus_make_alert("CephObjectMissing"), prometheus_make_alert("some_alert_2") ]
+
+            # we wait 1 hour between service_events. we test it by now - last_sent. last_sent == 0, therefore
+            # now must be at least 3600 for the test to work. so we changed our mocked time to >3600
+            self.mocked_now = 4000
+
+            agent = CallHomeAgent()
+            self.agent = agent
+            ReportStatusAlerts(agent).run()
+
+            self.test_end = self.mocked_time_time() + 15 * 60
+            #agent.serve()
+
+            self.assertEqual(self.sent_events['status-ceph_log_upload'], 0)
+            self.assertEqual(self.sent_events['confirm_response-NA'], 1)
+            self.assertEqual(len(agent.ur_queue), 0)
+            self.assertEqual(len(agent.ur_stale), 0)
+            self.assertEqual(len(agent.ur_cooldown), 0)
+
+
diff --git a/src/pybind/mgr/call_home_agent/workflow_service_events.py b/src/pybind/mgr/call_home_agent/workflow_service_events.py
new file mode 100644 (file)
index 0000000..1a4eac9
--- /dev/null
@@ -0,0 +1,76 @@
+from .report import Report, ReportTimes, Event
+from .report_ur_error import ReportURError
+from .report_service import ReportService
+from .report_last_contact_service_query import ReportLastContactServiceQuery
+from .workflow_upload_snap import WorkFlowUploadSnap
+from .exceptions import *
+import time
+import urllib.parse
+from typing import Tuple, Optional
+import glob
+import os
+import traceback
+import re
+import requests
+import json
+
+class WorkFlowServiceEvents:
+    def __init__(self, agent, alerts: list):
+        self.agent = agent
+        self.alerts = alerts
+        self.service_query_num_tries = 0
+
+    def run(self) -> None:
+        self.agent.log.info(f"WorkFlowServiceEvents: Processing new request")
+        try:
+            report_service = ReportService(self.agent, self.alerts)
+            response_text = report_service.run()
+            response = json.loads(response_text)
+
+            # Validate the response to the ReportService request
+            if 'response_state' not in response:
+                self.agent.log.error(f'WorkFlowServiceEvents: Bad reply to ReportService(): {response_text}')
+                return
+
+            self.report_service_event_id = report_service.events[0].event_event_id
+
+            # Ticket will take time to be created. Poll for the creation completion.
+            self.schedule_service_query()
+
+        except Exception as ex:
+            self.agent.log.error(f'Error in creating service event. Exception={ex} trace={traceback.format_exc()}')
+
+    def schedule_service_query(self) -> None:
+        if self.service_query_num_tries >= 10:
+            self.agent.log.warning(f"WorkFlowServiceEvents: Did not receive a service_query reply in 10 tries.")
+            return
+        self.service_query_num_tries += 1
+        self.agent.scheduler.enter(30, 1, self.run_scheduled_service_query)  # we might get the timeout from a conf option in a later version
+
+    def run_scheduled_service_query(self) -> None:
+        response_text = ReportLastContactServiceQuery(self.agent, self.report_service_event_id).run()
+        try:
+            response = json.loads(response_text)
+            transactions = response['response_state']['transactions']  # Will raise if we got a bad reply
+            if not transactions or self.report_service_event_id not in transactions:  # No reply yet
+                self.schedule_service_query()
+                return
+            else:
+                problem_id = transactions[self.report_service_event_id]['response_object']['problem_id']
+        except Exception as ex:
+            self.agent.log.error(f'WorkFlowServiceEvents: Error querying for service creation. Exception={ex} response={response_text}')
+            return
+
+        try:
+            request = {
+                    'options': {
+                        'pmr': problem_id,
+                        'level': 2  # Includes an SOS report
+                        # No need to simulate si_requestid
+                        }
+                    }
+            self.agent.log.debug(f"Starting WorkFlowUploadSnap for problem_id: {problem_id}")
+            WorkFlowUploadSnap(self.agent, request, 'service_request', None, False).run()
+        except Exception as ex:
+            return HandleCommandResult(stderr=f"Error sending service request diagnostics: {ex}")
+
index 90e740c1e567388f123fd72bec6a4dfcf4a77159..12c96960af570dab212121741070ffc0aa2d3456 100644 (file)
@@ -22,13 +22,17 @@ OPERATION_STATUS_REQUEST_REJECTED = 'REQUEST_REJECTED'
 DIAGS_FOLDER = '/var/log/ceph'
 
 class WorkFlowUploadSnap:
-    def __init__(self, agent, req, req_id, report_event_id):
+    def __init__(self, agent, req, req_id, report_event_id, send_status_log_upload = True):
+        """
+            send_status_log_upload: Should the workflow send percent complete reports to SI. Set to False in service_events.
+        """
         self.agent = agent
         self.req = req
         self.req_id = req_id  # unique ID for this request
         self.pmr = self.req.get('options', {}).get('pmr', None)
         self.report_event_id = report_event_id
         self._event_id_counter = 0
+        self.send_status_log_upload = send_status_log_upload
         self.si_requestid = self.req.get('options', {}).get('si_requestid', '')
 
     def next_event_id(self):
@@ -41,7 +45,7 @@ class WorkFlowUploadSnap:
         self.agent.log.info(f"WorkFlowUploadSnap <{self.req_id}> : Processing new request {self.req}")
         if not self.pmr:
             self.agent.log.warning(f"WorkFlowUploadSnap <{self.req_id}> : Error - No PMR in request.")
-            ReportURError(self.agent, self.next_event_id())
+            ReportURError(self.agent, self.next_event_id()).run()
             return
 
         try:
@@ -61,7 +65,8 @@ class WorkFlowUploadSnap:
             self.agent.log.info(f"WorkFlowUploadSnap <{self.req_id}> :  Completed operation")
         except Exception as ex:
             self.agent.log.error(f'Operations ({self.req_id}): Error processing operation {self.req}. Exception={ex} trace={traceback.format_exc()}')
-            ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, 0, f"ERROR: {ex}", OPERATION_STATUS_ERROR).run()
+            if self.send_status_log_upload:
+                ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, 0, f"ERROR: {ex}", OPERATION_STATUS_ERROR).run()
 
         # if it was ok or not, we always report the state
         ReportConfirmResponse(self.agent, self.next_event_id()).run()
@@ -269,13 +274,14 @@ class WorkFlowUploadSnap:
                     resp.raise_for_status()
                 start_byte += chunk_size
                 part_sent += 1
-                if chunk_pattern:
-                    percent_progress = int(part_sent/len(files_to_upload) * 100)
-                    status = OPERATION_STATUS_COMPLETE if percent_progress == 100 else OPERATION_STATUS_IN_PROGRESS
-                    ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_progress, f"file <{file_name}> is being sent", status).run()
-                else:
-                    status = OPERATION_STATUS_COMPLETE if percent_complete == 100 else OPERATION_STATUS_IN_PROGRESS
-                    ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_complete, status, status).run()
+                if self.send_status_log_upload:
+                    if chunk_pattern:
+                        percent_progress = int(part_sent/len(files_to_upload) * 100)
+                        status = OPERATION_STATUS_COMPLETE if percent_progress == 100 else OPERATION_STATUS_IN_PROGRESS
+                        ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_progress, f"file <{file_name}> is being sent", status).run()
+                    else:
+                        status = OPERATION_STATUS_COMPLETE if percent_complete == 100 else OPERATION_STATUS_IN_PROGRESS
+                        ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_complete, status, status).run()
         except Exception as ex:
             explanation = resp.text if resp else ""
             raise SendError(f'WorkFlowUploadSnap <{self.req_id}> : Failed to send <{file_path}> to <{ecurep_file_upload_url}>: {ex}: {explanation} trace={traceback.format_exc()}')
@@ -292,10 +298,10 @@ class ReportStatusLogUpload(Report):
     def compile(self) -> Optional[dict]:
         # We override run because this event gets a non standard generate arguments
         report_times = ReportTimes()
-        report = self.get_report_headers(report_times, self.report_event_id)
+        self.set_headers(report_times, self.report_event_id)
         event = EventStatusLogUpload(self.agent).generate(report_times, self.si_requestid, self.percent_progress, self.description, self.status)
-        report['events'].append(event.data)
-        return report
+        self.add_event(event)
+        return self.data
 
 class EventStatusLogUpload(Event):
     def gather(self) -> dict: