import time
#from threading import Event
import threading
+import importlib.util
+import pathlib
from ceph.cryptotools.select import get_crypto_caller
desc='Time frequency for the alerts report'
),
Option(
- name='interval_performance_report_seconds',
+ name='interval_service_report_seconds',
type='int',
min=0,
- default = int(os.environ.get('CHA_INTERVAL_PERFORMANCE_REPORT_SECONDS', 60 * 5)), # 5 minutes
- desc='Time frequency for the performance report'
+ default = int(os.environ.get('CHA_INTERVAL_SERVICE_REPORT_SECONDS', 60 * 60)), # 60 minutes
+ desc='Time frequency for checking for new alerts for service events'
),
Option(
name='customer_email',
default=3600 * 2,
desc='Time interval in seconds to allow a cooldown between level 2 upload snap requests'
),
+ Option(
+ name='disable_service_events',
+ type='bool',
+ default=False,
+ desc='Disable service events'
+ ),
]
def __init__(self, *args: Any, **kwargs: Any) -> None:
# set up some members to enable the serve() method and shutdown()
self.run = True
- # Module options
- self.refresh_options()
-
# Health checks
self.health_checks: Dict[str, Dict[str, Any]] = dict()
+ # Module options
+ self.refresh_options()
+
# Unsolicited Request support
# identify messages that we received in the past self.stale_timeout seconds (10 days). such messages will be ignored and removed from the queue.
self.log.info("Cleaning old module's db_operations")
self.set_store('db_operations', None)
-
+ self.scheduler = sched.scheduler(time.time, time.sleep)
+
def get_jwt_jti(self) -> str:
# Extract jti from JWT. This is another way to identify clusters in addition to the ICN.
jwt_jti = ""
self.jwt_jti = self.get_jwt_jti()
+ # validate icn and country_code
+ self.validate_config()
+
def ceph_command(self, srv_type: str, prefix: str, srv_spec: Optional[str] = '', inbuf: str = '', **kwargs):
# Note: A simplified version of the function used in dashboard ceph services
"""
self.log.error(f"Execution of command '{prefix}' failed: {ex}")
return outb
+
+ def validate_country_code(self, country_code: str) -> Tuple[bool, str]:
+ """
+ Validates a 2-letter country code using ISO 3166-1 alpha-2 codes
+ """
+ # Set of valid ISO 3166-1 alpha-2 country codes
+ VALID_COUNTRY_CODES = {
+ 'AF', 'AX', 'AL', 'DZ', 'AS', 'AD', 'AO', 'AI', 'AQ', 'AG',
+ 'AR', 'AM', 'AW', 'AU', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB',
+ 'BY', 'BE', 'BZ', 'BJ', 'BM', 'BT', 'BO', 'BQ', 'BA', 'BW',
+ 'BV', 'BR', 'IO', 'BN', 'BG', 'BF', 'BI', 'KH', 'CM', 'CA',
+ 'CV', 'KY', 'CF', 'TD', 'CL', 'CN', 'CX', 'CC', 'CO', 'KM',
+ 'CG', 'CD', 'CK', 'CR', 'CI', 'HR', 'CU', 'CW', 'CY', 'CZ',
+ 'DK', 'DJ', 'DM', 'DO', 'EC', 'EG', 'SV', 'GQ', 'ER', 'EE',
+ 'SZ', 'ET', 'FK', 'FO', 'FJ', 'FI', 'FR', 'GF', 'PF', 'TF',
+ 'GA', 'GM', 'GE', 'DE', 'GH', 'GI', 'GR', 'GL', 'GD', 'GP',
+ 'GU', 'GT', 'GG', 'GN', 'GW', 'GY', 'HT', 'HM', 'VA', 'HN',
+ 'HK', 'HU', 'IS', 'IN', 'ID', 'IR', 'IQ', 'IE', 'IM', 'IL',
+ 'IT', 'JM', 'JP', 'JE', 'JO', 'KZ', 'KE', 'KI', 'KP', 'KR',
+ 'KW', 'KG', 'LA', 'LV', 'LB', 'LS', 'LR', 'LY', 'LI', 'LT',
+ 'LU', 'MO', 'MG', 'MW', 'MY', 'MV', 'ML', 'MT', 'MH', 'MQ',
+ 'MR', 'MU', 'YT', 'MX', 'FM', 'MD', 'MC', 'MN', 'ME', 'MS',
+ 'MA', 'MZ', 'MM', 'NA', 'NR', 'NP', 'NL', 'NC', 'NZ', 'NI',
+ 'NE', 'NG', 'NU', 'NF', 'MK', 'MP', 'NO', 'OM', 'PK', 'PW',
+ 'PS', 'PA', 'PG', 'PY', 'PE', 'PH', 'PN', 'PL', 'PT', 'PR',
+ 'QA', 'RE', 'RO', 'RU', 'RW', 'BL', 'SH', 'KN', 'LC', 'MF',
+ 'PM', 'VC', 'WS', 'SM', 'ST', 'SA', 'SN', 'RS', 'SC', 'SL',
+ 'SG', 'SX', 'SK', 'SI', 'SB', 'SO', 'ZA', 'GS', 'SS', 'ES',
+ 'LK', 'SD', 'SR', 'SJ', 'SE', 'CH', 'SY', 'TW', 'TJ', 'TZ',
+ 'TH', 'TL', 'TG', 'TK', 'TO', 'TT', 'TN', 'TR', 'TM', 'TC',
+ 'TV', 'UG', 'UA', 'AE', 'GB', 'US', 'UM', 'UY', 'UZ', 'VU',
+ 'VE', 'VN', 'VG', 'VI', 'WF', 'EH', 'YE', 'ZM', 'ZW'
+ }
+
+ # if the user did not populate the customer_country_code config option yet
+ if country_code is None:
+ return False, "Country code is not configured"
+
+ if country_code.upper() not in VALID_COUNTRY_CODES:
+ return False, f"'{country_code}' must be a valid ISO 3166-1 alpha-2 country code"
+
+ return True, f"'{country_code}' is a valid country code"
+
+ def validate_icn(self, icn: str) -> Tuple[bool, str]:
+ # if the user did not populate the ICN config option yet
+ if icn is None:
+ return False, f"IBM Customer Number (ICN) is not configured"
+
+ # ICN is exactly 7 alphanumeric characters long
+ if len(icn) != 7 or not icn.isalnum():
+ return False, f"Invalid ICN: '{icn}'. ICN must be exactly 7 alphanumeric characters long"
+
+ return True, "ICN is valid"
+
+ def validate_config(self) -> None:
+ """
+ Validate the mandatory fields in the config options
+ """
+ # ICN is a mandatory field for service events
+ is_valid, msg = self.validate_icn(self.icn)
+ if not is_valid:
+ self.log.warning(f"CHA_INVALID_ICN: {msg}")
+ self.health_checks.update({
+ 'CHA_INVALID_ICN': {
+ 'severity': 'warning',
+ 'summary': 'Call Home Agent: IBM Customer Number (ICN) is invalid. '
+ 'Run "ceph callhome set icn <icn>" to set it.',
+ 'detail': [msg]
+ }
+ })
+ else:
+ self.health_checks.pop('CHA_INVALID_ICN', None)
+
+ # Country code is a mandatory field for service events
+ is_valid, msg = self.validate_country_code(self.customer_country_code)
+ if not is_valid:
+ self.log.warning(f"CHA_INVALID_COUNTRY_CODE: {msg}")
+ self.health_checks.update({
+ 'CHA_INVALID_COUNTRY_CODE': {
+ 'severity': 'warning',
+ 'summary': 'Call Home Agent: Country code is invalid. '
+ 'Run "ceph callhome set country-code <country-code>" to set it.',
+ 'detail': [msg]
+ }
+ })
+ else:
+ self.health_checks.pop('CHA_INVALID_COUNTRY_CODE', None)
+
+ self.set_health_checks(self.health_checks)
+
def connectivity_update(self, response: dict) -> None:
"""
Validate that the response is from IBM call home and update the connectivity check struct
"""
This only affects changes in ceph config options.
To change configuration using env. vars a restart of the module
- will be neeed or the change in one config option will refresh
+ will be needed or the change in one config option will refresh
configuration coming from env vars
"""
self.refresh_options()
def serve(self):
self.log.info('Starting IBM Ceph Call Home Agent')
- self.scheduler = sched.scheduler(time.time, time.sleep)
self.schedule_tasks()
while self.run:
# Passing False causes the scheduler.run() to return the time until the next event. therefore we're not blocked in
self.set_store('ur_cooldown', json.dumps(self.ur_cooldown))
return HandleCommandResult(stdout="Success")
+
+ @CLIWriteCommand('callhome set country-code')
+ def cli_set_country_code(self, country_code: str) -> Tuple[int, str, str]:
+ """
+ Set a 2 letter country code
+ """
+ is_valid, msg = self.validate_country_code(country_code)
+
+ if not is_valid:
+ self.log.error(f"Invalid country code: {msg}")
+ return HandleCommandResult(retval=1, stderr=f'Invalid country code: {msg}')
+
+ try:
+ self.set_module_option('customer_country_code', country_code.upper())
+ except Exception as e:
+ return HandleCommandResult(retval=1, stderr=str(e))
+ else:
+ return HandleCommandResult(stdout=f'country code is set to {country_code}')
+ finally:
+ self.refresh_options() # This will always run, no matter what.
+
+ @CLIWriteCommand('callhome set icn')
+ def cli_set_icn(self, icn: str) -> Tuple[int, str, str]:
+ """
+ Set an IBM customer number (7-character alphanumeric string)
+ """
+ is_valid, msg = self.validate_icn(icn)
+
+ if not is_valid:
+ self.log.warning(msg)
+ return HandleCommandResult(retval=1, stderr=msg)
+
+ try:
+ self.set_module_option('icn', icn)
+ except Exception as e:
+ return HandleCommandResult(retval=1, stderr=str(e))
+ else:
+ return HandleCommandResult()
+ finally:
+ self.refresh_options() # This will always run, no matter what.
+
self.report_type = report_type
self.event_classes = event_classes
self.report_event_id = None
+ self.events = []
+
+ def add_event(self, event):
+ self.events.append(event)
+ self.data['events'].append(event.data)
def compile(self) -> Optional[dict]:
report_times = ReportTimes()
- report = self.get_report_headers(report_times, self.report_event_id)
+ self.set_headers(report_times, self.report_event_id)
for event_class in self.event_classes:
event = event_class(self.agent).generate(report_times)
- report['events'].append(event.data)
+ self.add_event(event)
- return report
+ return self.data
def run(self) -> Optional[str]:
compiled = self.compile()
return None
return self.send(compiled)
- def get_report_headers(self, report_times: ReportTimes, report_event_id = None) -> dict:
+ def set_headers(self, report_times: ReportTimes, report_event_id = None) -> None:
try:
secrets = self.agent.get_secrets()
except Exception as e:
if not report_event_id:
report_event_id = f"IBM_chc_event_RedHatMarine_ceph_{self.agent.ceph_cluster_id}_{self.report_type}_report_{report_times.time_ms}"
+ self.report_event_id = report_event_id
- header = {
+ self.data = {
"agent": "RedHat_Marine_firmware_agent",
"api_key": secrets['api_key'],
"private_key": secrets['private_key'],
"events": []
}
- #header.update(self._header_times(report_timestamp))
-
- return header
+ #data.update(self._header_times(report_timestamp)) # TODO: check
def send(self, report: dict, force: bool = False) -> str:
resp = None
data=json.dumps(report),
proxies=self.agent.proxies,
timeout=60)
- self.agent.log.debug(f"Report response: {resp.text}")
+ self.agent.log.debug(f"Report response: {resp.text}") # TODO: remove keys
resp.raise_for_status()
ch_response = resp.json()
--- /dev/null
+from .report import Report, ReportTimes, EventGeneric
+from .report_last_contact import EventLastContact
+from typing import Optional
+import time
+
+class ReportLastContactServiceQuery(Report):
+ def __init__(self, agent, filter_event_id) -> None:
+ super().__init__(agent, 'last_contact')
+
+ self.filter_event_id = filter_event_id
+
+ def compile(self) -> Optional[dict]:
+ # We override compile() because this event gets a non standard generate arguments
+ report_times = ReportTimes()
+ self.set_headers(report_times, self.report_event_id)
+ event = EventLastContactServiceQuery(self.agent).generate(report_times, self.filter_event_id)
+ self.add_event(event)
+ return self.data
+
+
+class EventLastContactServiceQuery(EventLastContact):
+ def generate(self, report_times: ReportTimes, filter_event_id):
+ super().generate(report_times)
+
+ self.data["body"]["enable_response_detail_filter"] = [filter_event_id]
+ return self
+
+
--- /dev/null
+from .report import Report, ReportTimes, EventGeneric
+from typing import Optional
+import time
+import json
+
+class ReportService(Report):
+ def __init__(self, agent, alerts: list) -> None:
+ super().__init__(agent, 'service')
+ self.alerts = alerts
+
+ def compile(self) -> Optional[dict]:
+ # We override compile() because this event gets a non standard generate() arguments
+ report_times = ReportTimes()
+ self.set_headers(report_times, self.report_event_id)
+ event = EventService(self.agent).generate(report_times, self.alerts)
+ self.add_event(event)
+ self.agent.log.debug("Generated service event:")
+ self.agent.log.debug(json.dumps(self.data)) # TODO: remove keys from the output
+ return self.data
+
+class EventService(EventGeneric):
+ def generate(self, report_times: ReportTimes, alerts: list):
+ super().generate('service', 'ibm_redhat_ceph_service_manager', 'Ceph service request', report_times)
+
+ r, outb, outs = self.agent.mon_command({
+ 'prefix': 'status',
+ 'format': 'text'
+ })
+ note = outb
+ '''
+ # in case we add the new alerts to the 'notes' field:
+ note += "\n\n"
+ note += "Alerts:\n"
+ # print(f"------------------------- [{alerts}] ---------------------")
+ note += "\n".join(a.get('labels', {}).get('alertname', "Unknown alert") for a in alerts) + "\n"
+ '''
+
+ r, outb, outs = self.agent.mon_command({
+ 'prefix': 'versions',
+ 'format': 'json'
+ })
+ versions = json.loads(outb)
+
+ # We use the first alert as a base for the code (subject) text of the ticket.
+ # We do that by stable sorting the list of alerts.
+ alerts_sorted = sorted(alerts, key = lambda d: json.dumps(d, sort_keys = True))
+ first_alert = alerts_sorted[0]
+ alert_name = first_alert['labels']['alertname'] # this value must exist since we sorted the original alert list by it
+ alert_instance = first_alert['labels'].get('instance') # this value might not exist
+ alert_subject = alert_name + ((':' + alert_instance) if alert_instance else '')
+ alert_subject = alert_subject[:140] # Call home 'code' field is limited to 140 characters.
+
+ description = json.dumps(alerts_sorted, sort_keys = True, indent=4)
+ description = description[:10000]
+
+ now = time.time()
+
+ self.data['body'].update( {
+ "customer": self.agent.icn,
+ "country": self.agent.customer_country_code,
+ "error_type": "software",
+ "error_software_type": "distributed",
+ "routing_identifier": "5900AVA00",
+ "product_code_identifier": "SCSTZ",
+ "record_type": 1,
+ "test": False,
+ "code": alert_subject, # the title of the error that we're seeing; max 140 characters
+ "note": note, # currently the "note" field contains the output of `ceph -s`
+ "context": {
+ "origin": 2,
+ "timestamp": int(now), # time in seconds
+ "transid": int(now * 1000) # time in milliseconds
+ },
+ "description": description, # semi described, 10K character limit
+ "object_instance_virtual_id": self.agent.ceph_cluster_id,
+ "object_instance": self.agent.ceph_cluster_id,
+ "object_type": "ceph",
+ "object_category": "RedHat",
+ "object_group": "Storage",
+ "object_version": "612", # a static value for now since it's not used
+ "object_logical_name": "01t3p00000TKZPjAAP", # hardcoded for now; can be used in the future to describe a subsystem
+ "submitter_instance_virtual_id": "ceph_storage_00001", # hardcoded for now; can be used in case of a proxy submitter, like fusion
+ "submitter_instance": "infrastructure",
+ "submitter_type": "ceph",
+ "submitter_category": "RedHat",
+ "submitter_group": "Storage",
+ # for now these fields are not required:
+ # "contact_email": "admin@company.com",
+ # "contact_name": "John Doe",
+ # "contact_organization": "Company",
+ # "contact_phone": "123-456-6789",
+ # "contact_address": "123 Main Street, City, CA 12345, US",
+ ###############
+ "payload": {
+ "ceph_versions": versions,
+ "software": {
+ "diagnostic_provided": True,
+ "ibm_ceph_version": "7.1.0" # TODO: change to 9.0.0 after the value is available
+ }
+ }
+ } )
+
+ return self
+
from .report import Report, ReportTimes, EventGeneric
from .prometheus import Prometheus
+from .workflow_service_events import WorkFlowServiceEvents
import time
import json
import requests
+from datetime import datetime
from typing import Optional
class ReportStatusAlerts(Report):
def compile(self) -> Optional[dict]:
report_times = ReportTimes()
- report = self.get_report_headers(report_times)
+ self.set_headers(report_times)
event = EventStatusAlerts(self.agent).generate(report_times)
# If there are no alerts to send then return and dont send the report
if not event.has_content:
return None
- report['events'].append(event.data)
- return report
+ self.add_event(event)
+ return self.data
#self.send(report)
@staticmethod
self.set_content(content)
return self
+ def service_events(self, alerts: list) -> None:
+ if self.agent.disable_service_events:
+ return
+
+ # Service Events (opening a support case)
+ service_events_alerts = list(filter(self.is_alert_relevant_service_events, alerts))
+ if not service_events_alerts:
+ self.agent.log.debug(f"No alerts for service events")
+ return
+
+ last_service_events_sent = int(self.agent.get_store('last_service_events_sent', '0'))
+ now = int(time.time())
+ self.agent.log.debug(f"Now = {datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S')}, \
+ last_service_events_sent = {datetime.fromtimestamp(last_service_events_sent).strftime('%Y-%m-%d %H:%M:%S')}")
+
+ if now - last_service_events_sent < self.agent.interval_service_report_seconds: # 60 minutes by default
+ self.agent.log.debug(f"Waiting to send the next service event. Now = {now}, last_service_events_sent = {last_service_events_sent}")
+ return # don't send more than one service event per hour by default
+
+ new_events = list(filter(
+ lambda e: 'activeAt' in e and datetime.fromisoformat(e['activeAt'].replace("Z", "+00:00")).timestamp() > last_service_events_sent,
+ service_events_alerts
+ ))
+ if not new_events:
+ self.agent.log.debug(f"Found alerts for service events, but we already handled them")
+ return
+
+ self.agent.log.debug(f"Found new alerts for service events, starting workflow")
+ # We set last_service_events_sent here to avoid loopers in case of
+ # issues with the backend, e.g. the events are not sent successfully
+ # due to a server error, thus we keep trying to send them forever.
+ # Instead, we currently try to send several times, and if it fails we
+ # will try to send the active alerts after
+ # interval_service_report_seconds).
+ self.agent.set_store('last_service_events_sent', str(now))
+ WorkFlowServiceEvents(self.agent, new_events).run()
+
def gather(self) -> dict:
+ all_prometheus_alerts = self.get_prometheus_alerts()
+ self.agent.log.debug(f"all_prometheus_alerts: {all_prometheus_alerts}")
+
+ self.service_events(all_prometheus_alerts)
+
+ # Sending alerts via a dedicated status report
# Filter the alert list
- current_alerts_list = list(filter(self.is_alert_relevant, self.get_prometheus_alerts()))
+ current_alerts_list = list(filter(self.is_alert_relevant_status_alerts, all_prometheus_alerts))
current_alerts = {self.alert_uid(a):a for a in current_alerts_list}
# Find all new alerts - alerts that are currently active but were not sent until now (not in sent_alerts)
"""
return json.dumps(alert['labels'], sort_keys=True) + alert['activeAt'] + alert['value']
- def is_alert_relevant(self, alert: dict) -> bool:
+ def is_alert_relevant_status_alerts(self, alert: dict) -> bool:
"""
Returns True if this alert should be sent, False if it should be filtered out of the report
"""
return state == 'firing' and severity == 'critical'
+ def is_alert_relevant_service_events(self, alert: dict) -> bool:
+ # This list holds the names of the alerts for which we open support cases:
+ alerts_for_service = [
+ 'CephMonDiskspaceCritical',
+ 'CephOSDFull',
+ 'CephFilesystemOffline',
+ 'CephFilesystemDamaged',
+ 'CephPGsInactive',
+ 'CephObjectMissing',
+ ]
+
+ return alert.get('labels', {}).get('alertname', "Unknown alert") in alerts_for_service
+
def get_prometheus_alerts(self):
"""
Returns a list of all the alerts currently active in Prometheus
import json
import os
from collections import defaultdict
+from typing import Optional
from unittest.mock import MagicMock, Mock, patch
#from call_home_agent.module import Report
from call_home_agent.module import CallHomeAgent
-from call_home_agent.ReportLastContact import ReportLastContact, EventLastContact
-from call_home_agent.ReportInventory import ReportInventory, EventInventory
-from call_home_agent.ReportStatusAlerts import ReportStatusAlerts
-from call_home_agent.ReportStatusHealth import ReportStatusHealth
-from call_home_agent.WorkFlowUploadSnap import WorkFlowUploadSnap
-from call_home_agent.Report import Report, ReportTimes
+from call_home_agent.report_last_contact import ReportLastContact, EventLastContact
+from call_home_agent.report_inventory import ReportInventory, EventInventory
+from call_home_agent.report_status_alerts import ReportStatusAlerts
+from call_home_agent.report_status_health import ReportStatusHealth
+from call_home_agent.workflow_upload_snap import WorkFlowUploadSnap
+from call_home_agent.report import Report, ReportTimes
+from call_home_agent.workflow_service_events import WorkFlowServiceEvents
import mgr_module
import traceback
JWT_REG_CREDS =json.dumps(JWT_REG_CREDS_DICT)
PLAIN_PASSWORD_REG_CREDS_DICT = {"url": "test.icr.io", "username": "test_username", "password": "plain_password"}
-
class MockedMgr():
class Log:
def error(self, msg):
return ['mock_serverA', 'mock_serverB']
def mon_command(self, command):
- return 0, json.dumps({'health': {'status': 'mocked health status mon_cmd'}}), ""
+ if command['prefix'] == 'status':
+ if command['format'] == 'text':
+ return 0, "\nceph_status:\n cluster:\n id: 4a7851de-8b13-11ef-85fe-525400f89c16\n health: HEALTH_WARN\n mon y789-node-00 is low on available space\n\n services:\n mon: 3 daemons, quorum y789-node-00,y789-node-02,y789-node-01 (age 11d)\n mgr: y789-node-00.bxrhew(active, since 9d), standbys: y789-node-02.gbigcx\n osd: 3 osds: 3 up (since 3M), 3 in (since 3M)\n\n data:\n pools: 1 pools, 1 pgs\n objects: 2 objects, 449 KiB\n usage: 328 MiB used, 15 GiB / 15 GiB avail\n pgs: 1 active+clean", ""
+ else:
+ return 0, json.dumps({'health': {'status': 'mocked health status mon_cmd'}}), ""
+ elif command['prefix'] == 'versions' and command['format'] == 'json':
+ return 0, json.dumps({"mon":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":3},"mgr":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":2},"osd":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":3},"overall":{"ceph version 19.2.1-52.el9cp (dc69009c814f9f71fd831a91c591b9da1df69ffb) squid (stable)":8}}), ""
+ else:
+ raise Exception(f"Unknown mon_command {command}")
def remote(self, component, command, service_name=None, hostname=None, sos_params=None):
m = MagicMock()
else:
raise Exception(f"Unknown ceph command [{prefix}], please mock it")
-def mocked_requests_get(url, auth=None, data=None, headers=None, proxies=None, params=None):
- """
- Used by ReportStatusAlerts to query prometheous
- """
- m = Mock()
- if "api/v1/query" in url:
- # This is a request for Prometheous
- m.json.return_value = {'data': {'result': [{'value': "1234"}] }}
- elif "api/v1/targets" in url:
- m.json.return_value = {'data': {'activeTargets': [{'health': "up"}] }}
- else:
- m.json.return_value = {'data': {'alerts': [] }}
- return m
-
original_time_time = time.time
test_object = None
-debug = False
-verbose = False
+debug = True
+verbose = True
def mock_glob(pattern: str):
print(f"mock_glob: globbing {pattern}")
current_dir = os.path.dirname(os.path.abspath(__file__))
return [f"{current_dir}/testfile1", f"{current_dir}/testfile2", f"{current_dir}/testfile3"]
+def prometheus_make_alert(name: str, activeAt: Optional[str] = None) -> dict:
+ return {
+ 'labels': {
+ 'alertname': name,
+ 'severity': 'critical'
+ },
+ 'annotations': {
+ 'description': "some alert"
+ },
+ 'state': 'firing',
+ # 'activeAt' and 'value' are here for alert_uid() to work. They should be '0' so that we won't send this alert again and again
+ 'activeAt': activeAt if activeAt else "2025-08-15T10:12:13.123Z",
+ 'value': '0'
+ }
+
#@patch('mgr_module.MgrModule.version', '99.9')
class TestAgent(unittest.TestCase):
#print("".join(traceback.format_stack()))
########################### HTTP requests ############################
+ def mocked_requests_get(self, url, auth=None, data=None, headers=None, proxies=None, params=None):
+ """
+ Used by ReportStatusAlerts to query prometheous
+ """
+ m = Mock()
+ if "api/v1/query" in url:
+ # This is a request for Prometheous
+ m.json.return_value = {'data': {'result': [{'value': "1234"}] }}
+ elif "api/v1/targets" in url:
+ m.json.return_value = {'data': {'activeTargets': [{'health': "up"}] }}
+ else:
+ m.json.return_value = {'data': {'alerts': self.prometheus_alerts }}
+ return m
+
def mocked_requests_post(self, url, auth=None, data=None, headers=None, proxies=None, timeout=None):
print("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv request.post vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
print(f" URL: {url}")
event_type = None
if data:
try:
- pretty = json.dumps(json.loads(data), indent=4)
+ data_dict = json.loads(data)
+ pretty = json.dumps(data_dict, indent=4)
try:
- event_type = json.loads(data)['events'][0]['header']['event_type']
+ event_type = data_dict['events'][0]['header']['event_type']
if event_type == 'confirm_response':
component = 'NA'
else:
- component = json.loads(data)['events'][0]['body']['component']
+ component = data_dict['events'][0]['body']['component']
print(f" event_type={event_type} component={component}")
self.sent_events[f"{event_type}-{component}"] += 1 # so we can assertEqual on it later
except:
return m
elif 'esupport' in url: # call home
if event_type == 'last_contact':
- if self.mock_requests_send_has_ur:
- self.mock_requests_send_has_ur -= 1
- print("mocked_requests_post(): Returning yes UR")
- m.text = self.mocked_last_contact_response_yes_ur
- if self.mock_requests_cooldown_pmr:
- # replace the pmr so that it will be different UR for stale, but same for cooldown
- new_pmr = self.mock_requests_cooldown_pmr.pop()
- m.text = self.mocked_last_contact_response_yes_ur.replace('TS1234567', new_pmr)
- print(f"####### replacing PMR to {new_pmr}")
+ if data_dict["events"][0]["body"]["enable_response_detail_filter"] == ["Unsolicited_Storage_Insights_RedHatMarine_ceph_Request"]:
+ if self.mock_requests_send_has_ur:
+ self.mock_requests_send_has_ur -= 1
+ print("mocked_requests_post(): Returning yes UR")
+ m.text = self.mocked_last_contact_response_yes_ur
+ if self.mock_requests_cooldown_pmr:
+ # replace the pmr so that it will be different UR for stale, but same for cooldown
+ new_pmr = self.mock_requests_cooldown_pmr.pop()
+ m.text = self.mocked_last_contact_response_yes_ur.replace('TS1234567', new_pmr)
+ print(f"####### replacing PMR to {new_pmr}")
+ else:
+ m.text = self.mocked_last_contact_response_no_ur
+ m.json.return_value = json.loads(m.text)
else:
- m.text = self.mocked_last_contact_response_no_ur
- m.json.return_value = json.loads(m.text)
+ # Currently we only have two types of last_contact - one with a filter for unsolicited requests
+ # and one with a filter for a specific event_id, part of the service events workflow.
+
+ # The WorkFlowServiceEvents code retries querying for the service creation 10 times.
+ # we'll mock 3 replies of "not ready yet" before returning a good reply with the ticket's problem_id
+ if self.mock_service_event_no_case_opened_replies > 0:
+ # Return "ticket not opened yet" for 3 tries".
+ self.mock_service_event_no_case_opened_replies -= 1
+ m.json.return_value = {'response_state' : {'transactions': { } } }
+ m.text = json.dumps(m.json.return_value)
+ else:
+ # Return ticket opened, with the ticket ID (T1234) nested correctly.
+ event_id = next(iter((data_dict["events"][0]["body"]["enable_response_detail_filter"]))) # Get the first key of the dict. This should be the event_id.
+ m.json.return_value = {'response_state' : {'transactions': { event_id : {'response_object': {'problem_id': 'T1234'} } } } }
+ m.text = json.dumps(m.json.return_value)
+
+ elif event_type == 'service':
+ print(f"Got service event")
+ m.json.return_value = { 'response_state': 123 }
+ m.text = json.dumps(m.json.return_value)
return m
else:
raise Exception(f"Unknown mocked_requests_post URL [{url}], please mock it")
CallHomeAgent.__bases__ = (MockedMgr,)
#patch('mgr_module.MgrModule.version', '99.9').start()
patch('call_home_agent.module.CallHomeAgent.ceph_command', mocked_ceph_command).start()
- patch('call_home_agent.WorkFlowUploadSnap.DIAGS_FOLDER', '/tmp').start()
+ #patch('call_home_agent.WorkFlowUploadSnap.DIAGS_FOLDER', '/tmp').start()
+ patch('call_home_agent.workflow_upload_snap.DIAGS_FOLDER', '/tmp').start()
patch('call_home_agent.module.CallHomeAgent.get_secrets',
return_value={'api_key': 'mocked_api_key',
'private_key': 'mocked_private_key',
patch('requests.post', self.mocked_requests_post).start()
- patch('requests.get', mocked_requests_get).start()
+ patch('requests.get', self.mocked_requests_get).start()
patch('glob.glob', mock_glob).start()
patch('os.remove', Mock()).start()
patch('os.path.getsize', Mock(return_value=42)).start()
self.mock_requests_send_has_ur = 0
self.mock_requests_cooldown_pmr = []
self.sent_events = defaultdict(int)
+ self.mock_service_event_no_case_opened_replies = 3
+ self.prometheus_alerts = []
# Load the json answers that requests.post() should return
with open(os.path.dirname(__file__) + '/response_no_pending_ur.json', 'r') as resp:
status = agent.get_call_home_status()
self.assertEqual(status['connectivity'], True)
self.assertEqual(status['connectivity_error'], 'Success')
+
+ def test_service_event(self):
+ with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep):
+ self.mock_service_event_no_case_opened_replies = 3
+ agent = CallHomeAgent()
+ self.agent = agent
+ WorkFlowServiceEvents(agent, [prometheus_make_alert('alert_a'), prometheus_make_alert('alert_b')]).run()
+ #ReportLastContact(agent).run()
+
+ self.test_end = self.mocked_time_time() + 15 * 60
+ agent.serve()
+
+ self.assertEqual(self.sent_events['status-ceph_log_upload'], 0)
+ self.assertEqual(self.sent_events['confirm_response-NA'], 1)
+ self.assertEqual(len(agent.ur_queue), 0)
+ self.assertEqual(len(agent.ur_stale), 0)
+ self.assertEqual(len(agent.ur_cooldown), 0)
+
+ def test_service_event_no_case_opened(self):
+ with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep):
+ self.mock_service_event_no_case_opened_replies = 60 * 60 # Greater than the number of tries that the WorkFlowServiceEvents tries.
+ agent = CallHomeAgent()
+ self.agent = agent
+ WorkFlowServiceEvents(agent, [prometheus_make_alert('alert_a'), prometheus_make_alert('alert_b')]).run()
+ #ReportLastContact(agent).run()
+
+ self.test_end = self.mocked_time_time() + 60 * 60
+ agent.serve()
+
+ self.assertEqual(self.sent_events['status-ceph_log_upload'], 0)
+ self.assertEqual(self.sent_events['confirm_response-NA'], 0)
+ self.assertEqual(self.sent_events['last_contact-ceph_last_contact'], 13) # 10 from the WorkFlowServiceEvents, 3 from generic last_contact messages
+ self.assertEqual(len(agent.ur_queue), 0)
+ self.assertEqual(len(agent.ur_stale), 0)
+ self.assertEqual(len(agent.ur_cooldown), 0)
+
+ def test_service_event_mixed_prom_alerts(self):
+ # a mix of relevant and non relevant prometheus alerts
+
+ with patch('time.time', side_effect=self.mocked_time_time), patch('time.sleep', side_effect=self.mocked_sleep), patch('threading.Event.wait', side_effect=self.mocked_sleep):
+ self.mock_service_event_no_case_opened_replies = 3
+ self.prometheus_alerts = [ prometheus_make_alert("some_alert_1"), prometheus_make_alert("CephOSDFull"),
+ prometheus_make_alert("CephObjectMissing"), prometheus_make_alert("some_alert_2") ]
+
+ # we wait 1 hour between service_events. we test it by now - last_sent. last_sent == 0, therefore
+ # now must be at least 3600 for the test to work. so we changed our mocked time to >3600
+ self.mocked_now = 4000
+
+ agent = CallHomeAgent()
+ self.agent = agent
+ ReportStatusAlerts(agent).run()
+
+ self.test_end = self.mocked_time_time() + 15 * 60
+ #agent.serve()
+
+ self.assertEqual(self.sent_events['status-ceph_log_upload'], 0)
+ self.assertEqual(self.sent_events['confirm_response-NA'], 1)
+ self.assertEqual(len(agent.ur_queue), 0)
+ self.assertEqual(len(agent.ur_stale), 0)
+ self.assertEqual(len(agent.ur_cooldown), 0)
+
+
--- /dev/null
+from .report import Report, ReportTimes, Event
+from .report_ur_error import ReportURError
+from .report_service import ReportService
+from .report_last_contact_service_query import ReportLastContactServiceQuery
+from .workflow_upload_snap import WorkFlowUploadSnap
+from .exceptions import *
+import time
+import urllib.parse
+from typing import Tuple, Optional
+import glob
+import os
+import traceback
+import re
+import requests
+import json
+
+class WorkFlowServiceEvents:
+ def __init__(self, agent, alerts: list):
+ self.agent = agent
+ self.alerts = alerts
+ self.service_query_num_tries = 0
+
+ def run(self) -> None:
+ self.agent.log.info(f"WorkFlowServiceEvents: Processing new request")
+ try:
+ report_service = ReportService(self.agent, self.alerts)
+ response_text = report_service.run()
+ response = json.loads(response_text)
+
+ # Validate the response to the ReportService request
+ if 'response_state' not in response:
+ self.agent.log.error(f'WorkFlowServiceEvents: Bad reply to ReportService(): {response_text}')
+ return
+
+ self.report_service_event_id = report_service.events[0].event_event_id
+
+ # Ticket will take time to be created. Poll for the creation completion.
+ self.schedule_service_query()
+
+ except Exception as ex:
+ self.agent.log.error(f'Error in creating service event. Exception={ex} trace={traceback.format_exc()}')
+
+ def schedule_service_query(self) -> None:
+ if self.service_query_num_tries >= 10:
+ self.agent.log.warning(f"WorkFlowServiceEvents: Did not receive a service_query reply in 10 tries.")
+ return
+ self.service_query_num_tries += 1
+ self.agent.scheduler.enter(30, 1, self.run_scheduled_service_query) # we might get the timeout from a conf option in a later version
+
+ def run_scheduled_service_query(self) -> None:
+ response_text = ReportLastContactServiceQuery(self.agent, self.report_service_event_id).run()
+ try:
+ response = json.loads(response_text)
+ transactions = response['response_state']['transactions'] # Will raise if we got a bad reply
+ if not transactions or self.report_service_event_id not in transactions: # No reply yet
+ self.schedule_service_query()
+ return
+ else:
+ problem_id = transactions[self.report_service_event_id]['response_object']['problem_id']
+ except Exception as ex:
+ self.agent.log.error(f'WorkFlowServiceEvents: Error querying for service creation. Exception={ex} response={response_text}')
+ return
+
+ try:
+ request = {
+ 'options': {
+ 'pmr': problem_id,
+ 'level': 2 # Includes an SOS report
+ # No need to simulate si_requestid
+ }
+ }
+ self.agent.log.debug(f"Starting WorkFlowUploadSnap for problem_id: {problem_id}")
+ WorkFlowUploadSnap(self.agent, request, 'service_request', None, False).run()
+ except Exception as ex:
+ return HandleCommandResult(stderr=f"Error sending service request diagnostics: {ex}")
+
DIAGS_FOLDER = '/var/log/ceph'
class WorkFlowUploadSnap:
- def __init__(self, agent, req, req_id, report_event_id):
+ def __init__(self, agent, req, req_id, report_event_id, send_status_log_upload = True):
+ """
+ send_status_log_upload: Should the workflow send percent complete reports to SI. Set to False in service_events.
+ """
self.agent = agent
self.req = req
self.req_id = req_id # unique ID for this request
self.pmr = self.req.get('options', {}).get('pmr', None)
self.report_event_id = report_event_id
self._event_id_counter = 0
+ self.send_status_log_upload = send_status_log_upload
self.si_requestid = self.req.get('options', {}).get('si_requestid', '')
def next_event_id(self):
self.agent.log.info(f"WorkFlowUploadSnap <{self.req_id}> : Processing new request {self.req}")
if not self.pmr:
self.agent.log.warning(f"WorkFlowUploadSnap <{self.req_id}> : Error - No PMR in request.")
- ReportURError(self.agent, self.next_event_id())
+ ReportURError(self.agent, self.next_event_id()).run()
return
try:
self.agent.log.info(f"WorkFlowUploadSnap <{self.req_id}> : Completed operation")
except Exception as ex:
self.agent.log.error(f'Operations ({self.req_id}): Error processing operation {self.req}. Exception={ex} trace={traceback.format_exc()}')
- ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, 0, f"ERROR: {ex}", OPERATION_STATUS_ERROR).run()
+ if self.send_status_log_upload:
+ ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, 0, f"ERROR: {ex}", OPERATION_STATUS_ERROR).run()
# if it was ok or not, we always report the state
ReportConfirmResponse(self.agent, self.next_event_id()).run()
resp.raise_for_status()
start_byte += chunk_size
part_sent += 1
- if chunk_pattern:
- percent_progress = int(part_sent/len(files_to_upload) * 100)
- status = OPERATION_STATUS_COMPLETE if percent_progress == 100 else OPERATION_STATUS_IN_PROGRESS
- ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_progress, f"file <{file_name}> is being sent", status).run()
- else:
- status = OPERATION_STATUS_COMPLETE if percent_complete == 100 else OPERATION_STATUS_IN_PROGRESS
- ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_complete, status, status).run()
+ if self.send_status_log_upload:
+ if chunk_pattern:
+ percent_progress = int(part_sent/len(files_to_upload) * 100)
+ status = OPERATION_STATUS_COMPLETE if percent_progress == 100 else OPERATION_STATUS_IN_PROGRESS
+ ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_progress, f"file <{file_name}> is being sent", status).run()
+ else:
+ status = OPERATION_STATUS_COMPLETE if percent_complete == 100 else OPERATION_STATUS_IN_PROGRESS
+ ReportStatusLogUpload(self.agent, self.next_event_id(), self.si_requestid, percent_complete, status, status).run()
except Exception as ex:
explanation = resp.text if resp else ""
raise SendError(f'WorkFlowUploadSnap <{self.req_id}> : Failed to send <{file_path}> to <{ecurep_file_upload_url}>: {ex}: {explanation} trace={traceback.format_exc()}')
def compile(self) -> Optional[dict]:
# We override run because this event gets a non standard generate arguments
report_times = ReportTimes()
- report = self.get_report_headers(report_times, self.report_event_id)
+ self.set_headers(report_times, self.report_event_id)
event = EventStatusLogUpload(self.agent).generate(report_times, self.si_requestid, self.percent_progress, self.description, self.status)
- report['events'].append(event.data)
- return report
+ self.add_event(event)
+ return self.data
class EventStatusLogUpload(Event):
def gather(self) -> dict: