# Prometheus API.
sent_alerts = {}
+def get_prometheus_url(mgr: Any) -> str:
+ """
+ Provides the prometheus server URL
+ """
+ daemon_list = mgr.remote('cephadm', 'list_daemons', service_name='prometheus')
+ if daemon_list.exception_str:
+ raise Exception(f"Alert report: Error finding the Prometheus instance: {daemon_list.exception_str}")
+ if len(daemon_list.result) < 1:
+ raise Exception(f"Alert report: Can't find the Prometheus instance")
+
+ d = daemon_list.result[0]
+ host = d.ip if d.ip else d.hostname # ip is of type str
+ port = str(d.ports[0]) if d.ports else "" # ports is a list of ints
+ if not (host and port):
+ raise Exception(f"Can't get Prometheus IP and/or port from manager")
+
+ return f"http://{host}:{port}/api/v1"
+
def get_status(mgr: Any) -> dict:
r, outb, outs = mgr.mon_command({
'prefix': 'status',
status_dict = json.loads(outb)
status_dict["ceph_version"] = mgr.version
status_dict["health_detail"] = json.loads(mgr.get('health')['json'])
+ status_dict["support"] = get_support_metrics(mgr)
+ status_dict["support"]["health_status"] = status_dict["health_detail"]["status"]
+ status_dict["support"]["health_summary"] = get_health_summary(status_dict["health_detail"])
return status_dict
except Exception as ex:
mgr.log.exception(str(ex))
return {'exception': str(ex)}
+def get_health_summary(ceph_health: dict) -> str:
+ health_summary = ""
+ for error_key, error_details in ceph_health["checks"].items():
+ msg = "\n".join([item["message"] for item in error_details.get("detail",[])])
+ health_summary += f'{error_key}({error_details["severity"]}): {error_details["summary"]["message"]}\n{msg}\n'
+ return health_summary
+
+def get_support_metrics(mgr) -> dict:
+ """
+ Collect cluster metrics needed for Ceph support team tools
+ """
+ support_metrics = {}
+ status_interval_minutes = os.environ.get('CHA_INTERVAL_STATUS_REPORT_SECONDS',
+ mgr.get_module_option('interval_status_report_seconds'))
+ try:
+ query_url = f"{get_prometheus_url(mgr)}/query"
+ queries = {
+ 'total_capacity_bytes': 'sum(ceph_osd_stat_bytes)',
+ 'total_raw_usage_bytes': 'sum(ceph_osd_stat_bytes_used)',
+ 'usage_percentage': '(sum(ceph_osd_stat_bytes_used)/sum(ceph_osd_stat_bytes)) * 100',
+ 'slow_ops_total': 'sum(ceph_daemon_health_metrics{type="SLOW_OPS", ceph_daemon=~"osd.*"})',
+ 'osds_total_with_slow_ops': 'count(ceph_daemon_health_metrics{type="SLOW_OPS", ceph_daemon=~"osd.*"}>0) or on() vector(0)',
+ 'pg_total': 'sum(ceph_pg_total)',
+ 'pg_active': 'sum(ceph_pg_active)',
+ 'pg_clean': 'sum(ceph_pg_clean)',
+ 'pg_degraded': 'sum(ceph_pg_degraded)',
+ 'pg_unknown': 'sum(ceph_pg_unknown)',
+ 'pg_down': 'sum(ceph_pg_down)',
+ 'pg_scrubbing': 'sum(ceph_pg_scrubbing)',
+ 'pg_deep_scrubbing': 'sum(ceph_pg_deep)',
+ 'network_receive_errors': f'avg(increase(node_network_receive_errs_total{{device!="lo"}}[{status_interval_minutes}m]))',
+ 'network_send_errors': f'avg(increase(node_network_transmit_errs_total{{device!="lo"}}[{status_interval_minutes}m]))',
+ 'network_receive_packet_drops': f'avg(increase(node_network_receive_drop_total{{device!="lo"}}[{status_interval_minutes}m]))',
+ 'network_transmit_packet_drops': f'avg(increase(node_network_transmit_drop_total{{device!="lo"}}[{status_interval_minutes}m]))',
+ 'inconsistent_mtu': 'sum(node_network_mtu_bytes * (node_network_up{device!="lo"} > 0) == scalar(max by (device) (node_network_mtu_bytes * (node_network_up{device!="lo"} > 0)) != quantile by (device) (.5, node_network_mtu_bytes * (node_network_up{device!="lo"} > 0)) )or node_network_mtu_bytes * (node_network_up{device!="lo"} > 0) == scalar(min by (device) (node_network_mtu_bytes * (node_network_up{device!="lo"} > 0)) != quantile by (device) (.5, node_network_mtu_bytes * (node_network_up{device!="lo"} > 0))) or vector(0))',
+ 'pool_number': 'count(ceph_pool_bytes_used)',
+ 'raw_capacity_bytes': 'sum(ceph_osd_stat_bytes)',
+ 'raw_capacity_consumed_bytes': 'sum(ceph_pool_bytes_used)',
+ 'logical_stored_bytes': 'sum(ceph_pool_stored)',
+ 'pool_growth_bytes': f'sum(delta(ceph_pool_stored[{status_interval_minutes}m]))',
+ 'pool_bandwidth_bytes': f'sum(rate(ceph_pool_rd_bytes[{status_interval_minutes}m]) + rate(ceph_pool_wr_bytes[{status_interval_minutes}m]))',
+ 'pg_per_osd_ratio':'(avg(ceph_osd_numpg)/sum(ceph_pg_total))*100',
+ 'monitors_number': 'count(ceph_mon_metadata)',
+ 'monitors_not_in_quorum_number': 'count(ceph_mon_quorum_status!=1) or on() vector(0)',
+ 'clock_skews_number': 'ceph_health_detail{name="MON_CLOCK_SKEW"} or on() vector(0)',
+ }
+
+ t1 = time.time()
+ for k,q in queries.items():
+ data = exec_prometheus_query(query_url, q)
+ try:
+ support_metrics[k] = float(data['data']['result'][0]['value'][1])
+ except Exception as ex:
+ mgr.log.error(f"Error reading status metric for support <{k}>: {ex} - {data}")
+ total_time = round((time.time() - t1) * 1000, 2)
+ support_metrics['time_to_get_support_data_ms'] = total_time
+ mgr.log.info(f"Time to get support data for status report: {total_time} ms")
+ except Exception as ex:
+ mgr.log.error(f"Error collecting support data for status report: {ex}")
+
+ return support_metrics
+
+def exec_prometheus_query(query_url: str, prom_query: str) -> dict:
+ """
+ Execute a Prometheus query and returns the result as dict
+ """
+ result = {}
+ r = None
+ try:
+ r = requests.get(query_url, params={'query': prom_query})
+ result = json.loads(r.text)
+ r.raise_for_status()
+ except Exception as ex:
+ raise Exception(f"Error executing Prometheus query: {ex}-{result}")
+ return result
+
def inventory_get_hardware_status(mgr: Any) -> dict:
try:
hw_status = mgr.remote('orchestrator', 'node_proxy_summary')
Returns a list of all the alerts currently active in Prometheus
"""
try:
- daemon_list = mgr.remote('cephadm', 'list_daemons', service_name='prometheus')
- if daemon_list.exception_str:
- raise Exception(f"Alert report: Error finding the Prometheus instance: {daemon_list.exception_str}")
- if len(daemon_list.result) < 1:
- raise Exception(f"Alert report: Can't find the Prometheus instance")
-
- d = daemon_list.result[0]
- host = d.ip if d.ip else d.hostname # ip is of type str
- port = str(d.ports[0]) if d.ports else "" # ports is a list of ints
- if not (host and port):
- raise Exception(f"Can't get Prometheus IP and/or port from manager")
-
+ alerts_url = f"{get_prometheus_url(mgr)}/alerts"
# Get the alerts
resp = {}
try:
- resp = requests.get(f"http://{host}:{port}/api/v1/alerts").json()
+ resp = requests.get(alerts_url).json()
except Exception as e:
- raise Exception(f"Error getting alerts from Prometheus at {host}:{port} : {e}")
+ raise Exception(f"Error getting alerts from Prometheus at {alerts_url} : {e}")
if 'data' not in resp or 'alerts' not in resp['data']:
raise Exception(f"Prometheus returned a bad reply: {resp}")
return HandleCommandResult(stderr=str(ex))
else:
return HandleCommandResult(stdout=output)
+
from unittest.mock import MagicMock, Mock, patch
-from call_home_agent.module import Report
+from call_home_agent.module import Report, exec_prometheus_query
TEST_JWT_TOKEN = r"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJ0ZXN0IiwiaWF0IjoxNjkxNzUzNDM5LCJqdGkiOiIwMTIzNDU2Nzg5MDEyMzQ1Njc4OTAwMTIzNDU2Nzg5MCJ9.0F66k81_PmKoSd9erQoxnq73760SXs8WQTd3s8pqEFY\\"
EXPECTED_JTI = '01234567890123456789001234567890'
self.report.send(force=True)
mock_post.assert_called()
+ @patch('requests.get')
+ def test_exec_prometheus_query(self, mock_get):
+ request_get_response = MagicMock(status_code=200, reason='pepe', text='{"status":"success","data":{"resultType":"vector","result":[{"metric":{"ceph_health":"HEALTH_OK"},"value":[1616414100,"1"]}]}}')
+ mock_get.return_value = request_get_response
+ result = exec_prometheus_query("http://prom/query/v1", "ceph_health")
+ assert result['status'] == "success"
+
+ # Test metric error (server is ok, but something wrong executing the query):
+ request_get_response.raise_for_status = MagicMock(side_effect=Exception("Error in metrics"))
+ with self.assertRaises(Exception) as exception_context:
+ result = exec_prometheus_query("http://prom/query/v1", "ceph_health")
+ self.assertRegex(str(exception_context.exception), "Error in metrics")
+
+ # Result metrics not returned because a Prometheus server problem
+ mock_get.side_effect=Exception("Server error")
+ with self.assertRaises(Exception) as exception_context:
+ result = exec_prometheus_query("http://prom/query/v1", "ceph_health")
+ self.assertRegex(str(exception_context.exception), "Server error")
+
def tearDown(self):
self.patcher.stop()