From: Guillaume Abrioux Date: Fri, 30 Jan 2026 15:02:28 +0000 (+0100) Subject: node-proxy: split out config, bootstrap and redfish logic X-Git-Tag: testing/wip-vshankar-testing-20260219.125903~7^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=64fc7e4daf0b680343c1eacb991618f47c9c77ba;p=ceph-ci.git node-proxy: split out config, bootstrap and redfish logic refactor config, bootstrap, redfish layer, and monitoring: this: - adds a config module (CephadmCofnig, load_cephadm_config and get_node_proxy_config) and protocols for api/reporter. - extracts redfish logic to redfish.py - adds a vendor registry with entrypoints. - simplifies main() and NodeProxyManager(). Fixes: https://tracker.ceph.com/issues/74749 Signed-off-by: Guillaume Abrioux --- diff --git a/src/ceph-node-proxy/ceph_node_proxy/api.py b/src/ceph-node-proxy/ceph_node_proxy/api.py index 676d7c1c61d..a6cf5b8436a 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/api.py +++ b/src/ceph-node-proxy/ceph_node_proxy/api.py @@ -3,8 +3,8 @@ from urllib.error import HTTPError from cherrypy._cpserver import Server # type: ignore from threading import Thread, Event from typing import Dict, Any, List +from ceph_node_proxy.protocols import SystemBackend from ceph_node_proxy.util import Config, get_logger, write_tmp_file -from ceph_node_proxy.basesystem import BaseSystem from ceph_node_proxy.reporter import Reporter from typing import TYPE_CHECKING, Optional @@ -12,6 +12,8 @@ if TYPE_CHECKING: from ceph_node_proxy.main import NodeProxyManager +# Admin endpoints (start/stop/reload) are not mounted by default. +# To enable, mount: cherrypy.tree.mount(Admin(api), '/admin', config=config) @cherrypy.tools.auth_basic(on=True) @cherrypy.tools.allow(methods=['PUT']) @cherrypy.tools.json_out() @@ -53,7 +55,7 @@ class Admin(): class API(Server): def __init__(self, - backend: 'BaseSystem', + backend: SystemBackend, reporter: 'Reporter', config: 'Config', addr: str = '0.0.0.0', diff --git a/src/ceph-node-proxy/ceph_node_proxy/atollon.py b/src/ceph-node-proxy/ceph_node_proxy/atollon.py index fd67ef47f18..5d71b773042 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/atollon.py +++ b/src/ceph-node-proxy/ceph_node_proxy/atollon.py @@ -9,4 +9,4 @@ class AtollonSystem(BaseRedfishSystem): self.log = get_logger(__name__) def get_component_spec_overrides(self) -> Dict[str, Dict[str, Any]]: - return {'power': {'path': 'PowerSubsystem'}} \ No newline at end of file + return {'power': {'path': 'PowerSubsystem'}} diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py index 88a84fb6f0c..98b88d8416e 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py +++ b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py @@ -1,216 +1,16 @@ import concurrent.futures import dataclasses -import json -from dataclasses import dataclass from ceph_node_proxy.basesystem import BaseSystem +from ceph_node_proxy.redfish import ( + ComponentUpdateSpec, + Endpoint, + EndpointMgr, + update_component, +) from ceph_node_proxy.redfish_client import RedFishClient -from time import sleep from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict +from time import sleep from typing import Dict, Any, List, Callable, Optional -from urllib.error import HTTPError, URLError - - -@dataclass -class ComponentUpdateSpec: - collection: str - path: str - fields: List[str] - attribute: Optional[str] = None - - -class EndpointMgr: - NAME: str = 'EndpointMgr' - - def __init__(self, - client: RedFishClient, - prefix: str = RedFishClient.PREFIX) -> None: - self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}') - self.prefix: str = prefix - self.client: RedFishClient = client - # Use explicit dictionary instead of dynamic attributes - self._endpoints: Dict[str, Endpoint] = {} - self._session_url: str = '' - - def __getitem__(self, index: str) -> 'Endpoint': - if index not in self._endpoints: - raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}") - return self._endpoints[index] - - def get(self, name: str, default: Any = None) -> Any: - return self._endpoints.get(name, default) - - def list_endpoints(self) -> List[str]: - return list(self._endpoints.keys()) - - @property - def session(self) -> str: - return self._session_url - - def init(self) -> None: - error_msg: str = "Can't discover entrypoint(s)" - try: - _, _data, _ = self.client.query(endpoint=self.prefix) - json_data: Dict[str, Any] = json.loads(_data) - - # Discover endpoints - for k, v in json_data.items(): - if isinstance(v, dict) and '@odata.id' in v: - name: str = to_snake_case(k) - url: str = v['@odata.id'] - self.log.info(f'entrypoint found: {name} = {url}') - self._endpoints[name] = Endpoint(url, self.client) - - # Extract session URL if available - try: - self._session_url = json_data['Links']['Sessions']['@odata.id'] - except (KeyError, TypeError): - self.log.warning('Session URL not found in root response') - self._session_url = '' - - except (URLError, KeyError, json.JSONDecodeError) as e: - msg = f'{error_msg}: {e}' - self.log.error(msg) - raise RuntimeError(msg) from e - - -class Endpoint: - NAME: str = 'Endpoint' - - def __init__(self, url: str, client: RedFishClient) -> None: - self.log = get_logger(f'{__name__}:{Endpoint.NAME}') - self.url: str = url - self.client: RedFishClient = client - self._children: Dict[str, 'Endpoint'] = {} - self.data: Dict[str, Any] = self.get_data() - self.id: str = '' - self.members_names: List[str] = [] - - if self.has_members: - self.members_names = self.get_members_names() - - if self.data: - try: - self.id = self.data['Id'] - except KeyError: - self.id = self.data['@odata.id'].split('/')[-1] - else: - self.log.warning(f'No data could be loaded for {self.url}') - - def __getitem__(self, key: str) -> 'Endpoint': - if not isinstance(key, str) or not key or '/' in key: - raise KeyError(key) - - if key not in self._children: - child_url: str = f'{self.url.rstrip("/")}/{key}' - self._children[key] = Endpoint(child_url, self.client) - - return self._children[key] - - def list_children(self) -> List[str]: - return list(self._children.keys()) - - def query(self, url: str) -> Dict[str, Any]: - data: Dict[str, Any] = {} - try: - self.log.debug(f'Querying {url}') - _, _data, _ = self.client.query(endpoint=url) - if not _data: - self.log.warning(f'Empty response from {url}') - else: - data = json.loads(_data) - except KeyError as e: - self.log.error(f'KeyError while querying {url}: {e}') - except HTTPError as e: - self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}') - except json.JSONDecodeError as e: - self.log.error(f'JSON decode error while querying {url}: {e}') - except Exception as e: - self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}') - return data - - def get_data(self) -> Dict[str, Any]: - return self.query(self.url) - - def get_members_names(self) -> List[str]: - result: List[str] = [] - if self.has_members: - for member in self.data['Members']: - name: str = member['@odata.id'].split('/')[-1] - result.append(name) - return result - - def get_name(self, endpoint: str) -> str: - return endpoint.split('/')[-1] - - def get_members_endpoints(self) -> Dict[str, str]: - members: Dict[str, str] = {} - - self.log.error(f'get_members_endpoints called on {self.url}, has_members={self.has_members}') - - if self.has_members: - url_parts = self.url.split('/redfish/v1/') - if len(url_parts) > 1: - base_path = '/redfish/v1/' + url_parts[1].split('/')[0] - else: - base_path = None - - for member in self.data['Members']: - name = self.get_name(member['@odata.id']) - endpoint_url = member['@odata.id'] - self.log.debug(f'Found member: {name} -> {endpoint_url}') - - if base_path and not endpoint_url.startswith(base_path): - self.log.warning( - f'Member endpoint {endpoint_url} does not match base path {base_path} ' - f'from {self.url}. Skipping this member.' - ) - continue - - members[name] = endpoint_url - else: - if self.data: - name = self.get_name(self.url) - members[name] = self.url - self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}') - else: - self.log.debug(f'Endpoint {self.url} has no data and no Members array') - - return members - - def get_members_data(self) -> Dict[str, Any]: - result: Dict[str, Any] = {} - self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}') - - if self.has_members: - self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}') - members_endpoints = self.get_members_endpoints() - - # If no valid members after filtering, fall back to using the endpoint itself - if not members_endpoints: - self.log.warning( - f'Endpoint {self.url} has Members array but no valid members after filtering. ' - f'Using endpoint itself as singleton resource.' - ) - if self.data: - name = self.get_name(self.url) - result[name] = self.data - else: - for member, endpoint_url in members_endpoints.items(): - self.log.info(f'Fetching data for member: {member} at {endpoint_url}') - result[member] = self.query(endpoint_url) - else: - self.log.debug(f'Endpoint {self.url} has no Members array, returning own data') - if self.data: - name = self.get_name(self.url) - result[name] = self.data - else: - self.log.warning(f'Endpoint {self.url} has no members and empty data') - - return result - - @property - def has_members(self) -> bool: - return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list)) class BaseRedfishSystem(BaseSystem): @@ -242,7 +42,9 @@ class BaseRedfishSystem(BaseSystem): self.username: str = kw['username'] self.password: str = kw['password'] # move the following line (class attribute?) - self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password) + self.client: RedFishClient = RedFishClient( + host=self.host, port=self.port, username=self.username, password=self.password + ) self.endpoints: EndpointMgr = EndpointMgr(self.client) self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}') self.data_ready: bool = False @@ -268,74 +70,16 @@ class BaseRedfishSystem(BaseSystem): f = getattr(self, func) self.update_funcs.append(f) - def build_data(self, - data: Dict[str, Any], - fields: List[str], - attribute: Optional[str] = None) -> Dict[str, Dict[str, Dict]]: - result: Dict[str, Dict[str, Optional[Dict]]] = dict() - member_id: str = '' - - def process_data(m_id: str, fields: List[str], data: Dict[str, Any]) -> Dict[str, Any]: - result: Dict[str, Any] = {} - for field in fields: - try: - result[to_snake_case(field)] = data[field] - except KeyError: - self.log.debug(f'Could not find field: {field} in data: {data}') - result[to_snake_case(field)] = None - return result - - try: - if attribute is not None: - data_items = data[attribute] - else: - # The following is a hack to re-inject the key to the dict - # as we have the following structure when `attribute` is passed: - # "PowerSupplies": [ {"MemberId": "0", ...}, {"MemberId": "1", ...} ] - # vs. this structure in the opposite case: - # { "CPU.Socket.2": { "Id": "CPU.Socket.2", "Manufacturer": "Intel" }, "CPU.Socket.1": {} } - # With the first case, we clearly use the field "MemberId". - # With the second case, we use the key of the dict. - # This is mostly for avoiding code duplication. - data_items = [{'MemberId': k, **v} for k, v in data.items()] - self.log.error(f"GUITS_DEBUG: data_items= {data_items}") - for d in data_items: - member_id = d.get('MemberId') - result[member_id] = {} - result[member_id] = process_data(member_id, fields, d) - except (KeyError, TypeError, AttributeError) as e: - self.log.error(f"Can't build data: {e}") - raise - return normalize_dict(result) - def update(self, collection: str, component: str, path: str, fields: List[str], attribute: Optional[str] = None) -> None: - members: List[str] = self.endpoints[collection].get_members_names() - result: Dict[str, Any] = {} - data: Dict[str, Any] = {} - data_built: Dict[str, Any] = {} - if not members: - data = self.endpoints[collection][path].get_members_data() - data_built = self.build_data(data=data, fields=fields, attribute=attribute) - result = data_built - else: - for member in members: - data_built = {} - try: - if attribute is None: - data = self.endpoints[collection][member][path].get_members_data() - else: - data = self.endpoints[collection][member][path].data - except HTTPError as e: - self.log.error(f'Error while updating {component}: {e}') - else: - data_built = self.build_data(data=data, fields=fields, attribute=attribute) - result[member] = data_built - self._sys[component] = result + update_component( + self.endpoints, collection, component, path, fields, + self._sys, self.log, attribute=attribute, + ) def main(self) -> None: self.stop = False @@ -546,4 +290,4 @@ class BaseRedfishSystem(BaseSystem): raise NotImplementedError() def schedule_reboot_job(self, job_id: str) -> int: - raise NotImplementedError() \ No newline at end of file + raise NotImplementedError() diff --git a/src/ceph-node-proxy/ceph_node_proxy/bootstrap.py b/src/ceph-node-proxy/ceph_node_proxy/bootstrap.py new file mode 100644 index 00000000000..c691680ecda --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/bootstrap.py @@ -0,0 +1,39 @@ +from typing import TYPE_CHECKING + +from ceph_node_proxy.config import CephadmConfig, get_node_proxy_config +from ceph_node_proxy.util import DEFAULTS, write_tmp_file + +if TYPE_CHECKING: + from ceph_node_proxy.main import NodeProxyManager + + +def create_node_proxy_manager(cephadm_config: CephadmConfig) -> 'NodeProxyManager': + """ + Build NodeProxyManager from cephadm bootstrap config. + Creates temporary CA file and loads node-proxy YAML config. + """ + from ceph_node_proxy.main import NodeProxyManager + + ca_file = write_tmp_file( + cephadm_config.root_cert_pem, + prefix_name='cephadm-endpoint-root-cert-', + ) + config = get_node_proxy_config( + path=cephadm_config.node_proxy_config_path, + defaults=DEFAULTS, + ) + + manager = NodeProxyManager( + mgr_host=cephadm_config.target_ip, + cephx_name=cephadm_config.name, + cephx_secret=cephadm_config.keyring, + mgr_agent_port=cephadm_config.target_port, + ca_path=ca_file.name, + api_ssl_crt=cephadm_config.listener_crt, + api_ssl_key=cephadm_config.listener_key, + config_path=cephadm_config.node_proxy_config_path, + config=config, + ) + # Keep temp file alive for the lifetime of the manager + manager._ca_temp_file = ca_file + return manager diff --git a/src/ceph-node-proxy/ceph_node_proxy/config.py b/src/ceph-node-proxy/ceph_node_proxy/config.py new file mode 100644 index 00000000000..8ff4aa64e70 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/config.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from ceph_node_proxy.util import DEFAULTS, Config + + +REQUIRED_CEPHADM_KEYS = ( + 'target_ip', + 'target_port', + 'keyring', + 'root_cert.pem', + 'listener.crt', + 'listener.key', + 'name', +) + + +@dataclass +class CephadmConfig: + """Parsed cephadm bootstrap config (from --config JSON file)""" + target_ip: str + target_port: str + keyring: str + root_cert_pem: str + listener_crt: str + listener_key: str + name: str + node_proxy_config_path: str + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> CephadmConfig: + for key in REQUIRED_CEPHADM_KEYS: + if key not in data: + raise ValueError(f'Missing required cephadm config key: {key}') + # Normalize key with dot to attribute name + node_proxy_config_path = data.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml') + assert node_proxy_config_path is not None + return cls( + target_ip=data['target_ip'], + target_port=data['target_port'], + keyring=data['keyring'], + root_cert_pem=data['root_cert.pem'], + listener_crt=data['listener.crt'], + listener_key=data['listener.key'], + name=data['name'], + node_proxy_config_path=node_proxy_config_path, + ) + + +def load_cephadm_config(path: str) -> CephadmConfig: + """ + Load and validate cephadm bootstrap config from a JSON file. + Raises FileNotFoundError if path does not exist, ValueError if invalid. + """ + if not os.path.exists(path): + raise FileNotFoundError(f'Config file not found: {path}') + with open(path, 'r') as f: + try: + data = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f'Invalid JSON config: {e}') from e + if not isinstance(data, dict): + raise ValueError('Config must be a JSON object') + return CephadmConfig.from_dict(data) + + +def get_node_proxy_config( + path: Optional[str] = None, + defaults: Optional[Dict[str, Any]] = None, +) -> Config: + effective_path = path or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml') + assert effective_path is not None + return Config(effective_path, defaults=defaults or DEFAULTS) diff --git a/src/ceph-node-proxy/ceph_node_proxy/main.py b/src/ceph-node-proxy/ceph_node_proxy/main.py index 13887112589..5bd37d090fd 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/main.py +++ b/src/ceph-node-proxy/ceph_node_proxy/main.py @@ -1,49 +1,62 @@ from ceph_node_proxy.api import NodeProxyApi -from ceph_node_proxy.atollon import AtollonSystem -from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem -from ceph_node_proxy.redfishdellsystem import RedfishDellSystem +from ceph_node_proxy.bootstrap import create_node_proxy_manager +from ceph_node_proxy.config import load_cephadm_config +from ceph_node_proxy.registry import get_system_class from ceph_node_proxy.reporter import Reporter -from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req, write_tmp_file +from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req from urllib.error import HTTPError -from typing import Dict, Any, Optional, Type +from typing import Dict, Any, Optional import argparse +import json import os +import signal import ssl -import json import time -import signal - - -REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = { - 'generic': BaseRedfishSystem, - 'dell': RedfishDellSystem, - 'atollon': AtollonSystem, -} class NodeProxyManager: - def __init__(self, **kw: Any) -> None: + def __init__( + self, + *, + mgr_host: str, + cephx_name: str, + cephx_secret: str, + ca_path: str, + api_ssl_crt: str, + api_ssl_key: str, + mgr_agent_port: str, + config: Optional[Config] = None, + config_path: Optional[str] = None, + reporter_scheme: str = 'https', + reporter_endpoint: str = '/node-proxy/data', + ) -> None: self.exc: Optional[Exception] = None self.log = get_logger(__name__) - self.mgr_host: str = kw['mgr_host'] - self.cephx_name: str = kw['cephx_name'] - self.cephx_secret: str = kw['cephx_secret'] - self.ca_path: str = kw['ca_path'] - self.api_ssl_crt: str = kw['api_ssl_crt'] - self.api_ssl_key: str = kw['api_ssl_key'] - self.mgr_agent_port: str = str(kw['mgr_agent_port']) + self.mgr_host = mgr_host + self.cephx_name = cephx_name + self.cephx_secret = cephx_secret + self.ca_path = ca_path + self.api_ssl_crt = api_ssl_crt + self.api_ssl_key = api_ssl_key + self.mgr_agent_port = str(mgr_agent_port) self.stop: bool = False self.ssl_ctx = ssl.create_default_context() self.ssl_ctx.check_hostname = True self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED self.ssl_ctx.load_verify_locations(self.ca_path) - self.reporter_scheme: str = kw.get('reporter_scheme', 'https') - self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data') + self.reporter_scheme = reporter_scheme + self.reporter_endpoint = reporter_endpoint self.cephx = {'cephx': {'name': self.cephx_name, 'secret': self.cephx_secret}} - config_path = kw.get('config_path') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml') - self.config = Config(config_path, defaults=DEFAULTS) + if config is not None: + self.config = config + else: + path = ( + config_path or os.environ.get('NODE_PROXY_CONFIG') + or '/etc/ceph/node-proxy.yml' + ) + self.config = Config(path, defaults=DEFAULTS) self.username: str = '' self.password: str = '' @@ -87,7 +100,7 @@ class NodeProxyManager: raise SystemExit(1) try: vendor = self.config.get('system', {}).get('vendor', 'generic') - system_cls = REDFISH_SYSTEM_CLASSES.get(vendor, BaseRedfishSystem) + system_cls = get_system_class(vendor) self.system = system_cls(host=oob_details['host'], port=oob_details['port'], username=oob_details['username'], @@ -100,12 +113,14 @@ class NodeProxyManager: def init_reporter(self) -> None: try: + max_retries = self.config.get('reporter', {}).get('push_data_max_retries', 30) self.reporter_agent = Reporter(self.system, self.cephx, reporter_scheme=self.reporter_scheme, reporter_hostname=self.mgr_host, reporter_port=self.mgr_agent_port, - reporter_endpoint=self.reporter_endpoint) + reporter_endpoint=self.reporter_endpoint, + max_retries=max_retries) self.reporter_agent.start() except RuntimeError: self.log.error("Can't initialize the reporter.") @@ -121,19 +136,34 @@ class NodeProxyManager: raise def loop(self) -> None: + check_interval = 20 + min_interval = 20 + max_interval = 300 + backoff_factor = 1.5 + consecutive_failures = 0 + while not self.stop: - for thread in [self.system, self.reporter_agent]: - try: + try: + for thread in [self.system, self.reporter_agent]: status = thread.check_status() label = 'Ok' if status else 'Critical' self.log.debug(f'{thread} status: {label}') - except Exception as e: - self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}') + consecutive_failures = 0 + check_interval = min_interval + self.log.debug('All threads are alive, next check in %ds.', check_interval) + except Exception as e: + consecutive_failures += 1 + self.log.error( + f'{consecutive_failures} failure(s): thread not running: ' + f'{e.__class__.__name__}: {e}' + ) + for thread in [self.system, self.reporter_agent]: thread.shutdown() - self.init_system() - self.init_reporter() - self.log.debug('All threads are alive, next check in 20sec.') - time.sleep(20) + self.init_system() + self.init_reporter() + check_interval = min(int(check_interval * backoff_factor), max_interval) + self.log.debug('Next check in %ds (backoff).', check_interval) + time.sleep(check_interval) def shutdown(self) -> None: self.stop = True @@ -147,11 +177,13 @@ class NodeProxyManager: def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None: - t_mgr.system.pending_shutdown = True + if hasattr(t_mgr, 'system') and t_mgr.system is not None: + t_mgr.system.pending_shutdown = True t_mgr.log.info('SIGTERM caught, shutting down threads...') t_mgr.shutdown() - t_mgr.log.info('Logging out from RedFish API') - t_mgr.system.client.logout() + if hasattr(t_mgr, 'system') and t_mgr.system is not None and hasattr(t_mgr.system, 'client') and t_mgr.system.client is not None: + t_mgr.log.info('Logging out from RedFish API') + t_mgr.system.client.logout() raise SystemExit(0) @@ -174,37 +206,14 @@ def main() -> None: if args.debug: DEFAULTS['logging']['level'] = 10 - if not os.path.exists(args.config): - raise Exception(f'No config file found at provided config path: {args.config}') + try: + cephadm_config = load_cephadm_config(args.config) + except FileNotFoundError as e: + raise SystemExit(f'Config error: {e}') + except ValueError as e: + raise SystemExit(f'Config error: {e}') - with open(args.config, 'r') as f: - try: - config_json = f.read() - config = json.loads(config_json) - except Exception as e: - raise Exception(f'Failed to load json config: {str(e)}') - - target_ip = config['target_ip'] - target_port = config['target_port'] - keyring = config['keyring'] - root_cert = config['root_cert.pem'] - listener_cert = config['listener.crt'] - listener_key = config['listener.key'] - name = config['name'] - - ca_file = write_tmp_file(root_cert, - prefix_name='cephadm-endpoint-root-cert') - - config_path = config.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml') - - node_proxy_mgr = NodeProxyManager(mgr_host=target_ip, - cephx_name=name, - cephx_secret=keyring, - mgr_agent_port=target_port, - ca_path=ca_file.name, - api_ssl_crt=listener_cert, - api_ssl_key=listener_key, - config_path=config_path) + node_proxy_mgr = create_node_proxy_manager(cephadm_config) signal.signal(signal.SIGTERM, lambda signum, frame: handler(signum, frame, node_proxy_mgr)) node_proxy_mgr.run() diff --git a/src/ceph-node-proxy/ceph_node_proxy/protocols.py b/src/ceph-node-proxy/ceph_node_proxy/protocols.py new file mode 100644 index 00000000000..f71d66561fc --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/protocols.py @@ -0,0 +1,70 @@ +from threading import Lock +from typing import Any, Dict, Protocol, runtime_checkable + + +@runtime_checkable +class SystemBackend(Protocol): + def get_memory(self) -> Dict[str, Any]: + ... + + def get_network(self) -> Dict[str, Any]: + ... + + def get_storage(self) -> Dict[str, Any]: + ... + + def get_processors(self) -> Dict[str, Any]: + ... + + def get_power(self) -> Dict[str, Any]: + ... + + def get_fans(self) -> Dict[str, Any]: + ... + + def get_firmwares(self) -> Dict[str, Any]: + ... + + def get_led(self) -> Dict[str, Any]: + ... + + def set_led(self, data: Dict[str, str]) -> int: + ... + + def get_chassis_led(self) -> Dict[str, Any]: + ... + + def chassis_led_on(self) -> int: + ... + + def chassis_led_off(self) -> int: + ... + + def get_device_led(self, device: str) -> Dict[str, Any]: + ... + + def device_led_on(self, device: str) -> int: + ... + + def device_led_off(self, device: str) -> int: + ... + + def shutdown_host(self, force: bool = False) -> int: + ... + + def powercycle(self) -> int: + ... + + def flush(self) -> None: + ... + + +@runtime_checkable +class SystemForReporter(Protocol): + lock: Lock + data_ready: bool + pending_shutdown: bool + previous_data: Dict[str, Any] + + def get_system(self) -> Dict[str, Any]: + ... diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfish.py b/src/ceph-node-proxy/ceph_node_proxy/redfish.py new file mode 100644 index 00000000000..c9ef874f31f --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/redfish.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from ceph_node_proxy.redfish_client import RedFishClient +from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict +from urllib.error import HTTPError, URLError + + +@dataclass +class ComponentUpdateSpec: + collection: str + path: str + fields: List[str] + attribute: Optional[str] = None + + +class EndpointMgr: + """Manages Redfish root endpoints (Systems, Chassis, etc.) discovered from the service root.""" + + NAME: str = 'EndpointMgr' + + def __init__(self, + client: RedFishClient, + prefix: str = RedFishClient.PREFIX) -> None: + self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}') + self.prefix: str = prefix + self.client: RedFishClient = client + self._endpoints: Dict[str, 'Endpoint'] = {} + self._session_url: str = '' + + def __getitem__(self, index: str) -> Endpoint: + if index not in self._endpoints: + raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}") + return self._endpoints[index] + + def get(self, name: str, default: Any = None) -> Any: + return self._endpoints.get(name, default) + + def list_endpoints(self) -> List[str]: + return list(self._endpoints.keys()) + + @property + def session(self) -> str: + return self._session_url + + def init(self) -> None: + error_msg: str = "Can't discover entrypoint(s)" + try: + _, _data, _ = self.client.query(endpoint=self.prefix) + json_data: Dict[str, Any] = json.loads(_data) + + for k, v in json_data.items(): + if isinstance(v, dict) and '@odata.id' in v: + name: str = to_snake_case(k) + url: str = v['@odata.id'] + self.log.info(f'entrypoint found: {name} = {url}') + self._endpoints[name] = Endpoint(url, self.client) + + try: + self._session_url = json_data['Links']['Sessions']['@odata.id'] + except (KeyError, TypeError): + self.log.warning('Session URL not found in root response') + self._session_url = '' + + except (URLError, KeyError, json.JSONDecodeError) as e: + msg = f'{error_msg}: {e}' + self.log.error(msg) + raise RuntimeError(msg) from e + + +class Endpoint: + """Single Redfish resource or collection; supports lazy child resolution and member listing.""" + + NAME: str = 'Endpoint' + + def __init__(self, url: str, client: RedFishClient) -> None: + self.log = get_logger(f'{__name__}:{Endpoint.NAME}') + self.url: str = url + self.client: RedFishClient = client + self._children: Dict[str, 'Endpoint'] = {} + self.data: Dict[str, Any] = self.get_data() + self.id: str = '' + self.members_names: List[str] = [] + + if self.has_members: + self.members_names = self.get_members_names() + + if self.data: + try: + self.id = self.data['Id'] + except KeyError: + self.id = self.data['@odata.id'].split('/')[-1] + else: + self.log.warning(f'No data could be loaded for {self.url}') + + def __getitem__(self, key: str) -> 'Endpoint': + if not isinstance(key, str) or not key or '/' in key: + raise KeyError(key) + + if key not in self._children: + child_url: str = f'{self.url.rstrip("/")}/{key}' + self._children[key] = Endpoint(child_url, self.client) + + return self._children[key] + + def list_children(self) -> List[str]: + return list(self._children.keys()) + + def query(self, url: str) -> Dict[str, Any]: + data: Dict[str, Any] = {} + try: + self.log.debug(f'Querying {url}') + _, _data, _ = self.client.query(endpoint=url) + if not _data: + self.log.warning(f'Empty response from {url}') + else: + data = json.loads(_data) + except KeyError as e: + self.log.error(f'KeyError while querying {url}: {e}') + except HTTPError as e: + self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}') + except json.JSONDecodeError as e: + self.log.error(f'JSON decode error while querying {url}: {e}') + except Exception as e: + self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}') + return data + + def get_data(self) -> Dict[str, Any]: + return self.query(self.url) + + def get_members_names(self) -> List[str]: + result: List[str] = [] + if self.has_members: + for member in self.data['Members']: + name: str = member['@odata.id'].split('/')[-1] + result.append(name) + return result + + def get_name(self, endpoint: str) -> str: + return endpoint.split('/')[-1] + + def get_members_endpoints(self) -> Dict[str, str]: + members: Dict[str, str] = {} + self.log.debug(f'get_members_endpoints called on {self.url}, has_members={self.has_members}') + + if self.has_members: + url_parts = self.url.split('/redfish/v1/') + if len(url_parts) > 1: + base_path = '/redfish/v1/' + url_parts[1].split('/')[0] + else: + base_path = None + + for member in self.data['Members']: + name = self.get_name(member['@odata.id']) + endpoint_url = member['@odata.id'] + self.log.debug(f'Found member: {name} -> {endpoint_url}') + + if base_path and not endpoint_url.startswith(base_path): + self.log.warning( + f'Member endpoint {endpoint_url} does not match base path {base_path} ' + f'from {self.url}. Skipping this member.' + ) + continue + + members[name] = endpoint_url + else: + if self.data: + name = self.get_name(self.url) + members[name] = self.url + self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}') + else: + self.log.debug(f'Endpoint {self.url} has no data and no Members array') + + return members + + def get_members_data(self) -> Dict[str, Any]: + result: Dict[str, Any] = {} + self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}') + + if self.has_members: + self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}') + members_endpoints = self.get_members_endpoints() + + if not members_endpoints: + self.log.warning( + f'Endpoint {self.url} has Members array but no valid members after filtering. ' + f'Using endpoint itself as singleton resource.' + ) + if self.data: + name = self.get_name(self.url) + result[name] = self.data + else: + for member, endpoint_url in members_endpoints.items(): + self.log.info(f'Fetching data for member: {member} at {endpoint_url}') + result[member] = self.query(endpoint_url) + else: + self.log.debug(f'Endpoint {self.url} has no Members array, returning own data') + if self.data: + name = self.get_name(self.url) + result[name] = self.data + else: + self.log.warning(f'Endpoint {self.url} has no members and empty data') + + return result + + @property + def has_members(self) -> bool: + return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list)) + + +def build_data( + data: Dict[str, Any], + fields: List[str], + log: Any, + attribute: Optional[str] = None, +) -> Dict[str, Dict[str, Dict]]: + result: Dict[str, Dict[str, Optional[Dict]]] = dict() + + def process_data(m_id: str, flds: List[str], d: Dict[str, Any]) -> Dict[str, Any]: + out: Dict[str, Any] = {} + for field in flds: + try: + out[to_snake_case(field)] = d[field] + except KeyError: + log.debug(f'Could not find field: {field} in data: {d}') + out[to_snake_case(field)] = None + return out + + try: + if attribute is not None: + data_items = data[attribute] + else: + data_items = [{'MemberId': k, **v} for k, v in data.items()] + log.debug(f"build_data: data_items count={len(data_items)}") + for d in data_items: + member_id = d.get('MemberId') + result[member_id] = {} + result[member_id] = process_data(member_id, fields, d) + except (KeyError, TypeError, AttributeError) as e: + log.error(f"Can't build data: {e}") + raise + return normalize_dict(result) + + +def update_component( + endpoints: EndpointMgr, + collection: str, + component: str, + path: str, + fields: List[str], + _sys: Dict[str, Any], + log: Any, + attribute: Optional[str] = None, +) -> None: + """Update _sys[component] from Redfish endpoints using the given spec.""" + members: List[str] = endpoints[collection].get_members_names() + result: Dict[str, Any] = {} + if not members: + data = endpoints[collection][path].get_members_data() + result = build_data(data=data, fields=fields, log=log, attribute=attribute) + else: + for member in members: + try: + if attribute is None: + data = endpoints[collection][member][path].get_members_data() + else: + data = endpoints[collection][member][path].data + except HTTPError as e: + log.error(f'Error while updating {component}: {e}') + continue + result[member] = build_data(data=data, fields=fields, log=log, attribute=attribute) + _sys[component] = result diff --git a/src/ceph-node-proxy/ceph_node_proxy/registry.py b/src/ceph-node-proxy/ceph_node_proxy/registry.py new file mode 100644 index 00000000000..110033b42be --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/registry.py @@ -0,0 +1,44 @@ +from typing import Dict, Type + +from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem + +# Built-in implementations +from ceph_node_proxy.atollon import AtollonSystem +from ceph_node_proxy.redfishdellsystem import RedfishDellSystem +from ceph_node_proxy.util import get_logger + +REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = { + 'generic': BaseRedfishSystem, + 'dell': RedfishDellSystem, + 'atollon': AtollonSystem, +} + +logger = get_logger(__name__) + + +def _load_entry_point_systems() -> None: + try: + import pkg_resources # type: ignore[import-not-found] + + except ImportError: + logger.debug( + "pkg_resources not available; only built-in Redfish systems will be used." + ) + return + + for ep in pkg_resources.iter_entry_points("ceph_node_proxy.systems"): + try: + REDFISH_SYSTEM_CLASSES[ep.name] = ep.load() + except (ImportError, AttributeError, ModuleNotFoundError) as e: + logger.warning( + "Failed to load Redfish system entry point %s: %s", ep.name, e + ) + + +def get_system_class(vendor: str) -> Type[BaseRedfishSystem]: + """Return the Redfish system class for the given vendor. + Falls back to generic.""" + return REDFISH_SYSTEM_CLASSES.get(vendor, BaseRedfishSystem) + + +_load_entry_point_systems() diff --git a/src/ceph-node-proxy/ceph_node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py index 20d43b59d33..1973d6ec365 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/reporter.py +++ b/src/ceph-node-proxy/ceph_node_proxy/reporter.py @@ -1,18 +1,24 @@ import time import json +from ceph_node_proxy.protocols import SystemForReporter from ceph_node_proxy.util import get_logger, http_req, BaseThread from urllib.error import HTTPError, URLError from typing import Dict, Any +DEFAULT_MAX_RETRIES = 30 +RETRY_SLEEP_SEC = 5 + + class Reporter(BaseThread): def __init__(self, - system: Any, + system: SystemForReporter, cephx: Dict[str, Any], reporter_scheme: str = 'https', reporter_hostname: str = '', reporter_port: str = '443', - reporter_endpoint: str = '/node-proxy/data') -> None: + reporter_endpoint: str = '/node-proxy/data', + max_retries: int = DEFAULT_MAX_RETRIES) -> None: super().__init__() self.system = system self.data: Dict[str, Any] = {} @@ -23,44 +29,51 @@ class Reporter(BaseThread): self.reporter_hostname: str = reporter_hostname self.reporter_port: str = reporter_port self.reporter_endpoint: str = reporter_endpoint + self.max_retries: int = max_retries self.log = get_logger(__name__) self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:' f'{reporter_port}{reporter_endpoint}') self.log.info(f'Reporter url set to {self.reporter_url}') + def _send_with_retries(self) -> bool: + """Send data to mgr. Returns True on success, False after max_retries failures.""" + for attempt in range(1, self.max_retries + 1): + try: + self.log.info(f'sending data to {self.reporter_url} (attempt {attempt}/{self.max_retries})') + http_req(hostname=self.reporter_hostname, + port=self.reporter_port, + method='POST', + headers={'Content-Type': 'application/json'}, + endpoint=self.reporter_endpoint, + scheme=self.reporter_scheme, + data=json.dumps(self.data)) + return True + except (HTTPError, URLError) as e: + self.log.error( + f"The reporter couldn't send data to the mgr (attempt {attempt}/{self.max_retries}): {e}" + ) + if attempt < self.max_retries: + time.sleep(RETRY_SLEEP_SEC) + return False + def main(self) -> None: while not self.stop: - # Any logic to avoid sending the all the system - # information every loop can go here. In a real - # scenario probably we should just send the sub-parts - # that have changed to minimize the traffic in - # dense clusters self.log.debug('waiting for a lock in reporter loop.') with self.system.lock: if not self.system.pending_shutdown: self.log.debug('lock acquired in reporter loop.') if self.system.data_ready: self.log.debug('data ready to be sent to the mgr.') - if not self.system.get_system() == self.system.previous_data: + if self.system.get_system() != self.system.previous_data: self.log.info('data has changed since last iteration.') self.data['patch'] = self.system.get_system() - try: - # TODO: add a timeout parameter to the reporter in the config file - self.log.info(f'sending data to {self.reporter_url}') - http_req(hostname=self.reporter_hostname, - port=self.reporter_port, - method='POST', - headers={'Content-Type': 'application/json'}, - endpoint=self.reporter_endpoint, - scheme=self.reporter_scheme, - data=json.dumps(self.data)) - except (HTTPError, URLError) as e: - self.log.error(f"The reporter couldn't send data to the mgr: {e}") - raise - # Need to add a new parameter 'max_retries' to the reporter if it can't - # send the data for more than x times, maybe the daemon should stop altogether - else: + if self._send_with_retries(): self.system.previous_data = self.system.get_system() + else: + self.log.error( + f'Failed to send data after {self.max_retries} retries; ' + 'will retry on next cycle.' + ) else: self.log.debug('no diff, not sending data to the mgr.') self.log.debug('lock released in reporter loop.') diff --git a/src/ceph-node-proxy/setup.py b/src/ceph-node-proxy/setup.py index 7dcc7cdf5bf..13b69433ad6 100644 --- a/src/ceph-node-proxy/setup.py +++ b/src/ceph-node-proxy/setup.py @@ -22,11 +22,14 @@ setup( 'tox', 'ceph', ], - entry_points=dict( - console_scripts=[ + entry_points={ + 'console_scripts': [ 'ceph-node-proxy = ceph_node_proxy.main:main', ], - ), + # vendors can register Redfish system implementations + # example: 'myvendor = mypackage.redfish_system:MyVendorSystem' + 'ceph_node_proxy.systems': [], + }, classifiers=[ 'Environment :: Console', 'Intended Audience :: Information Technology',