from cherrypy._cpserver import Server # type: ignore
from threading import Thread, Event
from typing import Dict, Any, List
+from ceph_node_proxy.protocols import SystemBackend
from ceph_node_proxy.util import Config, get_logger, write_tmp_file
-from ceph_node_proxy.basesystem import BaseSystem
from ceph_node_proxy.reporter import Reporter
from typing import TYPE_CHECKING, Optional
from ceph_node_proxy.main import NodeProxyManager
+# Admin endpoints (start/stop/reload) are not mounted by default.
+# To enable, mount: cherrypy.tree.mount(Admin(api), '/admin', config=config)
@cherrypy.tools.auth_basic(on=True)
@cherrypy.tools.allow(methods=['PUT'])
@cherrypy.tools.json_out()
class API(Server):
def __init__(self,
- backend: 'BaseSystem',
+ backend: SystemBackend,
reporter: 'Reporter',
config: 'Config',
addr: str = '0.0.0.0',
self.log = get_logger(__name__)
def get_component_spec_overrides(self) -> Dict[str, Dict[str, Any]]:
- return {'power': {'path': 'PowerSubsystem'}}
\ No newline at end of file
+ return {'power': {'path': 'PowerSubsystem'}}
import concurrent.futures
import dataclasses
-import json
-from dataclasses import dataclass
from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.redfish import (
+ ComponentUpdateSpec,
+ Endpoint,
+ EndpointMgr,
+ update_component,
+)
from ceph_node_proxy.redfish_client import RedFishClient
-from time import sleep
from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
+from time import sleep
from typing import Dict, Any, List, Callable, Optional
-from urllib.error import HTTPError, URLError
-
-
-@dataclass
-class ComponentUpdateSpec:
- collection: str
- path: str
- fields: List[str]
- attribute: Optional[str] = None
-
-
-class EndpointMgr:
- NAME: str = 'EndpointMgr'
-
- def __init__(self,
- client: RedFishClient,
- prefix: str = RedFishClient.PREFIX) -> None:
- self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}')
- self.prefix: str = prefix
- self.client: RedFishClient = client
- # Use explicit dictionary instead of dynamic attributes
- self._endpoints: Dict[str, Endpoint] = {}
- self._session_url: str = ''
-
- def __getitem__(self, index: str) -> 'Endpoint':
- if index not in self._endpoints:
- raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}")
- return self._endpoints[index]
-
- def get(self, name: str, default: Any = None) -> Any:
- return self._endpoints.get(name, default)
-
- def list_endpoints(self) -> List[str]:
- return list(self._endpoints.keys())
-
- @property
- def session(self) -> str:
- return self._session_url
-
- def init(self) -> None:
- error_msg: str = "Can't discover entrypoint(s)"
- try:
- _, _data, _ = self.client.query(endpoint=self.prefix)
- json_data: Dict[str, Any] = json.loads(_data)
-
- # Discover endpoints
- for k, v in json_data.items():
- if isinstance(v, dict) and '@odata.id' in v:
- name: str = to_snake_case(k)
- url: str = v['@odata.id']
- self.log.info(f'entrypoint found: {name} = {url}')
- self._endpoints[name] = Endpoint(url, self.client)
-
- # Extract session URL if available
- try:
- self._session_url = json_data['Links']['Sessions']['@odata.id']
- except (KeyError, TypeError):
- self.log.warning('Session URL not found in root response')
- self._session_url = ''
-
- except (URLError, KeyError, json.JSONDecodeError) as e:
- msg = f'{error_msg}: {e}'
- self.log.error(msg)
- raise RuntimeError(msg) from e
-
-
-class Endpoint:
- NAME: str = 'Endpoint'
-
- def __init__(self, url: str, client: RedFishClient) -> None:
- self.log = get_logger(f'{__name__}:{Endpoint.NAME}')
- self.url: str = url
- self.client: RedFishClient = client
- self._children: Dict[str, 'Endpoint'] = {}
- self.data: Dict[str, Any] = self.get_data()
- self.id: str = ''
- self.members_names: List[str] = []
-
- if self.has_members:
- self.members_names = self.get_members_names()
-
- if self.data:
- try:
- self.id = self.data['Id']
- except KeyError:
- self.id = self.data['@odata.id'].split('/')[-1]
- else:
- self.log.warning(f'No data could be loaded for {self.url}')
-
- def __getitem__(self, key: str) -> 'Endpoint':
- if not isinstance(key, str) or not key or '/' in key:
- raise KeyError(key)
-
- if key not in self._children:
- child_url: str = f'{self.url.rstrip("/")}/{key}'
- self._children[key] = Endpoint(child_url, self.client)
-
- return self._children[key]
-
- def list_children(self) -> List[str]:
- return list(self._children.keys())
-
- def query(self, url: str) -> Dict[str, Any]:
- data: Dict[str, Any] = {}
- try:
- self.log.debug(f'Querying {url}')
- _, _data, _ = self.client.query(endpoint=url)
- if not _data:
- self.log.warning(f'Empty response from {url}')
- else:
- data = json.loads(_data)
- except KeyError as e:
- self.log.error(f'KeyError while querying {url}: {e}')
- except HTTPError as e:
- self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}')
- except json.JSONDecodeError as e:
- self.log.error(f'JSON decode error while querying {url}: {e}')
- except Exception as e:
- self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}')
- return data
-
- def get_data(self) -> Dict[str, Any]:
- return self.query(self.url)
-
- def get_members_names(self) -> List[str]:
- result: List[str] = []
- if self.has_members:
- for member in self.data['Members']:
- name: str = member['@odata.id'].split('/')[-1]
- result.append(name)
- return result
-
- def get_name(self, endpoint: str) -> str:
- return endpoint.split('/')[-1]
-
- def get_members_endpoints(self) -> Dict[str, str]:
- members: Dict[str, str] = {}
-
- self.log.error(f'get_members_endpoints called on {self.url}, has_members={self.has_members}')
-
- if self.has_members:
- url_parts = self.url.split('/redfish/v1/')
- if len(url_parts) > 1:
- base_path = '/redfish/v1/' + url_parts[1].split('/')[0]
- else:
- base_path = None
-
- for member in self.data['Members']:
- name = self.get_name(member['@odata.id'])
- endpoint_url = member['@odata.id']
- self.log.debug(f'Found member: {name} -> {endpoint_url}')
-
- if base_path and not endpoint_url.startswith(base_path):
- self.log.warning(
- f'Member endpoint {endpoint_url} does not match base path {base_path} '
- f'from {self.url}. Skipping this member.'
- )
- continue
-
- members[name] = endpoint_url
- else:
- if self.data:
- name = self.get_name(self.url)
- members[name] = self.url
- self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}')
- else:
- self.log.debug(f'Endpoint {self.url} has no data and no Members array')
-
- return members
-
- def get_members_data(self) -> Dict[str, Any]:
- result: Dict[str, Any] = {}
- self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}')
-
- if self.has_members:
- self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}')
- members_endpoints = self.get_members_endpoints()
-
- # If no valid members after filtering, fall back to using the endpoint itself
- if not members_endpoints:
- self.log.warning(
- f'Endpoint {self.url} has Members array but no valid members after filtering. '
- f'Using endpoint itself as singleton resource.'
- )
- if self.data:
- name = self.get_name(self.url)
- result[name] = self.data
- else:
- for member, endpoint_url in members_endpoints.items():
- self.log.info(f'Fetching data for member: {member} at {endpoint_url}')
- result[member] = self.query(endpoint_url)
- else:
- self.log.debug(f'Endpoint {self.url} has no Members array, returning own data')
- if self.data:
- name = self.get_name(self.url)
- result[name] = self.data
- else:
- self.log.warning(f'Endpoint {self.url} has no members and empty data')
-
- return result
-
- @property
- def has_members(self) -> bool:
- return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list))
class BaseRedfishSystem(BaseSystem):
self.username: str = kw['username']
self.password: str = kw['password']
# move the following line (class attribute?)
- self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
+ self.client: RedFishClient = RedFishClient(
+ host=self.host, port=self.port, username=self.username, password=self.password
+ )
self.endpoints: EndpointMgr = EndpointMgr(self.client)
self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
self.data_ready: bool = False
f = getattr(self, func)
self.update_funcs.append(f)
- def build_data(self,
- data: Dict[str, Any],
- fields: List[str],
- attribute: Optional[str] = None) -> Dict[str, Dict[str, Dict]]:
- result: Dict[str, Dict[str, Optional[Dict]]] = dict()
- member_id: str = ''
-
- def process_data(m_id: str, fields: List[str], data: Dict[str, Any]) -> Dict[str, Any]:
- result: Dict[str, Any] = {}
- for field in fields:
- try:
- result[to_snake_case(field)] = data[field]
- except KeyError:
- self.log.debug(f'Could not find field: {field} in data: {data}')
- result[to_snake_case(field)] = None
- return result
-
- try:
- if attribute is not None:
- data_items = data[attribute]
- else:
- # The following is a hack to re-inject the key to the dict
- # as we have the following structure when `attribute` is passed:
- # "PowerSupplies": [ {"MemberId": "0", ...}, {"MemberId": "1", ...} ]
- # vs. this structure in the opposite case:
- # { "CPU.Socket.2": { "Id": "CPU.Socket.2", "Manufacturer": "Intel" }, "CPU.Socket.1": {} }
- # With the first case, we clearly use the field "MemberId".
- # With the second case, we use the key of the dict.
- # This is mostly for avoiding code duplication.
- data_items = [{'MemberId': k, **v} for k, v in data.items()]
- self.log.error(f"GUITS_DEBUG: data_items= {data_items}")
- for d in data_items:
- member_id = d.get('MemberId')
- result[member_id] = {}
- result[member_id] = process_data(member_id, fields, d)
- except (KeyError, TypeError, AttributeError) as e:
- self.log.error(f"Can't build data: {e}")
- raise
- return normalize_dict(result)
-
def update(self,
collection: str,
component: str,
path: str,
fields: List[str],
attribute: Optional[str] = None) -> None:
- members: List[str] = self.endpoints[collection].get_members_names()
- result: Dict[str, Any] = {}
- data: Dict[str, Any] = {}
- data_built: Dict[str, Any] = {}
- if not members:
- data = self.endpoints[collection][path].get_members_data()
- data_built = self.build_data(data=data, fields=fields, attribute=attribute)
- result = data_built
- else:
- for member in members:
- data_built = {}
- try:
- if attribute is None:
- data = self.endpoints[collection][member][path].get_members_data()
- else:
- data = self.endpoints[collection][member][path].data
- except HTTPError as e:
- self.log.error(f'Error while updating {component}: {e}')
- else:
- data_built = self.build_data(data=data, fields=fields, attribute=attribute)
- result[member] = data_built
- self._sys[component] = result
+ update_component(
+ self.endpoints, collection, component, path, fields,
+ self._sys, self.log, attribute=attribute,
+ )
def main(self) -> None:
self.stop = False
raise NotImplementedError()
def schedule_reboot_job(self, job_id: str) -> int:
- raise NotImplementedError()
\ No newline at end of file
+ raise NotImplementedError()
--- /dev/null
+from typing import TYPE_CHECKING
+
+from ceph_node_proxy.config import CephadmConfig, get_node_proxy_config
+from ceph_node_proxy.util import DEFAULTS, write_tmp_file
+
+if TYPE_CHECKING:
+ from ceph_node_proxy.main import NodeProxyManager
+
+
+def create_node_proxy_manager(cephadm_config: CephadmConfig) -> 'NodeProxyManager':
+ """
+ Build NodeProxyManager from cephadm bootstrap config.
+ Creates temporary CA file and loads node-proxy YAML config.
+ """
+ from ceph_node_proxy.main import NodeProxyManager
+
+ ca_file = write_tmp_file(
+ cephadm_config.root_cert_pem,
+ prefix_name='cephadm-endpoint-root-cert-',
+ )
+ config = get_node_proxy_config(
+ path=cephadm_config.node_proxy_config_path,
+ defaults=DEFAULTS,
+ )
+
+ manager = NodeProxyManager(
+ mgr_host=cephadm_config.target_ip,
+ cephx_name=cephadm_config.name,
+ cephx_secret=cephadm_config.keyring,
+ mgr_agent_port=cephadm_config.target_port,
+ ca_path=ca_file.name,
+ api_ssl_crt=cephadm_config.listener_crt,
+ api_ssl_key=cephadm_config.listener_key,
+ config_path=cephadm_config.node_proxy_config_path,
+ config=config,
+ )
+ # Keep temp file alive for the lifetime of the manager
+ manager._ca_temp_file = ca_file
+ return manager
--- /dev/null
+from __future__ import annotations
+
+import json
+import os
+from dataclasses import dataclass
+from typing import Any, Dict, Optional
+
+from ceph_node_proxy.util import DEFAULTS, Config
+
+
+REQUIRED_CEPHADM_KEYS = (
+ 'target_ip',
+ 'target_port',
+ 'keyring',
+ 'root_cert.pem',
+ 'listener.crt',
+ 'listener.key',
+ 'name',
+)
+
+
+@dataclass
+class CephadmConfig:
+ """Parsed cephadm bootstrap config (from --config JSON file)"""
+ target_ip: str
+ target_port: str
+ keyring: str
+ root_cert_pem: str
+ listener_crt: str
+ listener_key: str
+ name: str
+ node_proxy_config_path: str
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> CephadmConfig:
+ for key in REQUIRED_CEPHADM_KEYS:
+ if key not in data:
+ raise ValueError(f'Missing required cephadm config key: {key}')
+ # Normalize key with dot to attribute name
+ node_proxy_config_path = data.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
+ assert node_proxy_config_path is not None
+ return cls(
+ target_ip=data['target_ip'],
+ target_port=data['target_port'],
+ keyring=data['keyring'],
+ root_cert_pem=data['root_cert.pem'],
+ listener_crt=data['listener.crt'],
+ listener_key=data['listener.key'],
+ name=data['name'],
+ node_proxy_config_path=node_proxy_config_path,
+ )
+
+
+def load_cephadm_config(path: str) -> CephadmConfig:
+ """
+ Load and validate cephadm bootstrap config from a JSON file.
+ Raises FileNotFoundError if path does not exist, ValueError if invalid.
+ """
+ if not os.path.exists(path):
+ raise FileNotFoundError(f'Config file not found: {path}')
+ with open(path, 'r') as f:
+ try:
+ data = json.load(f)
+ except json.JSONDecodeError as e:
+ raise ValueError(f'Invalid JSON config: {e}') from e
+ if not isinstance(data, dict):
+ raise ValueError('Config must be a JSON object')
+ return CephadmConfig.from_dict(data)
+
+
+def get_node_proxy_config(
+ path: Optional[str] = None,
+ defaults: Optional[Dict[str, Any]] = None,
+) -> Config:
+ effective_path = path or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
+ assert effective_path is not None
+ return Config(effective_path, defaults=defaults or DEFAULTS)
from ceph_node_proxy.api import NodeProxyApi
-from ceph_node_proxy.atollon import AtollonSystem
-from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
-from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.bootstrap import create_node_proxy_manager
+from ceph_node_proxy.config import load_cephadm_config
+from ceph_node_proxy.registry import get_system_class
from ceph_node_proxy.reporter import Reporter
-from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req, write_tmp_file
+from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req
from urllib.error import HTTPError
-from typing import Dict, Any, Optional, Type
+from typing import Dict, Any, Optional
import argparse
+import json
import os
+import signal
import ssl
-import json
import time
-import signal
-
-
-REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = {
- 'generic': BaseRedfishSystem,
- 'dell': RedfishDellSystem,
- 'atollon': AtollonSystem,
-}
class NodeProxyManager:
- def __init__(self, **kw: Any) -> None:
+ def __init__(
+ self,
+ *,
+ mgr_host: str,
+ cephx_name: str,
+ cephx_secret: str,
+ ca_path: str,
+ api_ssl_crt: str,
+ api_ssl_key: str,
+ mgr_agent_port: str,
+ config: Optional[Config] = None,
+ config_path: Optional[str] = None,
+ reporter_scheme: str = 'https',
+ reporter_endpoint: str = '/node-proxy/data',
+ ) -> None:
self.exc: Optional[Exception] = None
self.log = get_logger(__name__)
- self.mgr_host: str = kw['mgr_host']
- self.cephx_name: str = kw['cephx_name']
- self.cephx_secret: str = kw['cephx_secret']
- self.ca_path: str = kw['ca_path']
- self.api_ssl_crt: str = kw['api_ssl_crt']
- self.api_ssl_key: str = kw['api_ssl_key']
- self.mgr_agent_port: str = str(kw['mgr_agent_port'])
+ self.mgr_host = mgr_host
+ self.cephx_name = cephx_name
+ self.cephx_secret = cephx_secret
+ self.ca_path = ca_path
+ self.api_ssl_crt = api_ssl_crt
+ self.api_ssl_key = api_ssl_key
+ self.mgr_agent_port = str(mgr_agent_port)
self.stop: bool = False
self.ssl_ctx = ssl.create_default_context()
self.ssl_ctx.check_hostname = True
self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
self.ssl_ctx.load_verify_locations(self.ca_path)
- self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
- self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
+ self.reporter_scheme = reporter_scheme
+ self.reporter_endpoint = reporter_endpoint
self.cephx = {'cephx': {'name': self.cephx_name,
'secret': self.cephx_secret}}
- config_path = kw.get('config_path') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
- self.config = Config(config_path, defaults=DEFAULTS)
+ if config is not None:
+ self.config = config
+ else:
+ path = (
+ config_path or os.environ.get('NODE_PROXY_CONFIG')
+ or '/etc/ceph/node-proxy.yml'
+ )
+ self.config = Config(path, defaults=DEFAULTS)
self.username: str = ''
self.password: str = ''
raise SystemExit(1)
try:
vendor = self.config.get('system', {}).get('vendor', 'generic')
- system_cls = REDFISH_SYSTEM_CLASSES.get(vendor, BaseRedfishSystem)
+ system_cls = get_system_class(vendor)
self.system = system_cls(host=oob_details['host'],
port=oob_details['port'],
username=oob_details['username'],
def init_reporter(self) -> None:
try:
+ max_retries = self.config.get('reporter', {}).get('push_data_max_retries', 30)
self.reporter_agent = Reporter(self.system,
self.cephx,
reporter_scheme=self.reporter_scheme,
reporter_hostname=self.mgr_host,
reporter_port=self.mgr_agent_port,
- reporter_endpoint=self.reporter_endpoint)
+ reporter_endpoint=self.reporter_endpoint,
+ max_retries=max_retries)
self.reporter_agent.start()
except RuntimeError:
self.log.error("Can't initialize the reporter.")
raise
def loop(self) -> None:
+ check_interval = 20
+ min_interval = 20
+ max_interval = 300
+ backoff_factor = 1.5
+ consecutive_failures = 0
+
while not self.stop:
- for thread in [self.system, self.reporter_agent]:
- try:
+ try:
+ for thread in [self.system, self.reporter_agent]:
status = thread.check_status()
label = 'Ok' if status else 'Critical'
self.log.debug(f'{thread} status: {label}')
- except Exception as e:
- self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}')
+ consecutive_failures = 0
+ check_interval = min_interval
+ self.log.debug('All threads are alive, next check in %ds.', check_interval)
+ except Exception as e:
+ consecutive_failures += 1
+ self.log.error(
+ f'{consecutive_failures} failure(s): thread not running: '
+ f'{e.__class__.__name__}: {e}'
+ )
+ for thread in [self.system, self.reporter_agent]:
thread.shutdown()
- self.init_system()
- self.init_reporter()
- self.log.debug('All threads are alive, next check in 20sec.')
- time.sleep(20)
+ self.init_system()
+ self.init_reporter()
+ check_interval = min(int(check_interval * backoff_factor), max_interval)
+ self.log.debug('Next check in %ds (backoff).', check_interval)
+ time.sleep(check_interval)
def shutdown(self) -> None:
self.stop = True
def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None:
- t_mgr.system.pending_shutdown = True
+ if hasattr(t_mgr, 'system') and t_mgr.system is not None:
+ t_mgr.system.pending_shutdown = True
t_mgr.log.info('SIGTERM caught, shutting down threads...')
t_mgr.shutdown()
- t_mgr.log.info('Logging out from RedFish API')
- t_mgr.system.client.logout()
+ if hasattr(t_mgr, 'system') and t_mgr.system is not None and hasattr(t_mgr.system, 'client') and t_mgr.system.client is not None:
+ t_mgr.log.info('Logging out from RedFish API')
+ t_mgr.system.client.logout()
raise SystemExit(0)
if args.debug:
DEFAULTS['logging']['level'] = 10
- if not os.path.exists(args.config):
- raise Exception(f'No config file found at provided config path: {args.config}')
+ try:
+ cephadm_config = load_cephadm_config(args.config)
+ except FileNotFoundError as e:
+ raise SystemExit(f'Config error: {e}')
+ except ValueError as e:
+ raise SystemExit(f'Config error: {e}')
- with open(args.config, 'r') as f:
- try:
- config_json = f.read()
- config = json.loads(config_json)
- except Exception as e:
- raise Exception(f'Failed to load json config: {str(e)}')
-
- target_ip = config['target_ip']
- target_port = config['target_port']
- keyring = config['keyring']
- root_cert = config['root_cert.pem']
- listener_cert = config['listener.crt']
- listener_key = config['listener.key']
- name = config['name']
-
- ca_file = write_tmp_file(root_cert,
- prefix_name='cephadm-endpoint-root-cert')
-
- config_path = config.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
-
- node_proxy_mgr = NodeProxyManager(mgr_host=target_ip,
- cephx_name=name,
- cephx_secret=keyring,
- mgr_agent_port=target_port,
- ca_path=ca_file.name,
- api_ssl_crt=listener_cert,
- api_ssl_key=listener_key,
- config_path=config_path)
+ node_proxy_mgr = create_node_proxy_manager(cephadm_config)
signal.signal(signal.SIGTERM,
lambda signum, frame: handler(signum, frame, node_proxy_mgr))
node_proxy_mgr.run()
--- /dev/null
+from threading import Lock
+from typing import Any, Dict, Protocol, runtime_checkable
+
+
+@runtime_checkable
+class SystemBackend(Protocol):
+ def get_memory(self) -> Dict[str, Any]:
+ ...
+
+ def get_network(self) -> Dict[str, Any]:
+ ...
+
+ def get_storage(self) -> Dict[str, Any]:
+ ...
+
+ def get_processors(self) -> Dict[str, Any]:
+ ...
+
+ def get_power(self) -> Dict[str, Any]:
+ ...
+
+ def get_fans(self) -> Dict[str, Any]:
+ ...
+
+ def get_firmwares(self) -> Dict[str, Any]:
+ ...
+
+ def get_led(self) -> Dict[str, Any]:
+ ...
+
+ def set_led(self, data: Dict[str, str]) -> int:
+ ...
+
+ def get_chassis_led(self) -> Dict[str, Any]:
+ ...
+
+ def chassis_led_on(self) -> int:
+ ...
+
+ def chassis_led_off(self) -> int:
+ ...
+
+ def get_device_led(self, device: str) -> Dict[str, Any]:
+ ...
+
+ def device_led_on(self, device: str) -> int:
+ ...
+
+ def device_led_off(self, device: str) -> int:
+ ...
+
+ def shutdown_host(self, force: bool = False) -> int:
+ ...
+
+ def powercycle(self) -> int:
+ ...
+
+ def flush(self) -> None:
+ ...
+
+
+@runtime_checkable
+class SystemForReporter(Protocol):
+ lock: Lock
+ data_ready: bool
+ pending_shutdown: bool
+ previous_data: Dict[str, Any]
+
+ def get_system(self) -> Dict[str, Any]:
+ ...
--- /dev/null
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from typing import Any, Dict, List, Optional
+
+from ceph_node_proxy.redfish_client import RedFishClient
+from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
+from urllib.error import HTTPError, URLError
+
+
+@dataclass
+class ComponentUpdateSpec:
+ collection: str
+ path: str
+ fields: List[str]
+ attribute: Optional[str] = None
+
+
+class EndpointMgr:
+ """Manages Redfish root endpoints (Systems, Chassis, etc.) discovered from the service root."""
+
+ NAME: str = 'EndpointMgr'
+
+ def __init__(self,
+ client: RedFishClient,
+ prefix: str = RedFishClient.PREFIX) -> None:
+ self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}')
+ self.prefix: str = prefix
+ self.client: RedFishClient = client
+ self._endpoints: Dict[str, 'Endpoint'] = {}
+ self._session_url: str = ''
+
+ def __getitem__(self, index: str) -> Endpoint:
+ if index not in self._endpoints:
+ raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}")
+ return self._endpoints[index]
+
+ def get(self, name: str, default: Any = None) -> Any:
+ return self._endpoints.get(name, default)
+
+ def list_endpoints(self) -> List[str]:
+ return list(self._endpoints.keys())
+
+ @property
+ def session(self) -> str:
+ return self._session_url
+
+ def init(self) -> None:
+ error_msg: str = "Can't discover entrypoint(s)"
+ try:
+ _, _data, _ = self.client.query(endpoint=self.prefix)
+ json_data: Dict[str, Any] = json.loads(_data)
+
+ for k, v in json_data.items():
+ if isinstance(v, dict) and '@odata.id' in v:
+ name: str = to_snake_case(k)
+ url: str = v['@odata.id']
+ self.log.info(f'entrypoint found: {name} = {url}')
+ self._endpoints[name] = Endpoint(url, self.client)
+
+ try:
+ self._session_url = json_data['Links']['Sessions']['@odata.id']
+ except (KeyError, TypeError):
+ self.log.warning('Session URL not found in root response')
+ self._session_url = ''
+
+ except (URLError, KeyError, json.JSONDecodeError) as e:
+ msg = f'{error_msg}: {e}'
+ self.log.error(msg)
+ raise RuntimeError(msg) from e
+
+
+class Endpoint:
+ """Single Redfish resource or collection; supports lazy child resolution and member listing."""
+
+ NAME: str = 'Endpoint'
+
+ def __init__(self, url: str, client: RedFishClient) -> None:
+ self.log = get_logger(f'{__name__}:{Endpoint.NAME}')
+ self.url: str = url
+ self.client: RedFishClient = client
+ self._children: Dict[str, 'Endpoint'] = {}
+ self.data: Dict[str, Any] = self.get_data()
+ self.id: str = ''
+ self.members_names: List[str] = []
+
+ if self.has_members:
+ self.members_names = self.get_members_names()
+
+ if self.data:
+ try:
+ self.id = self.data['Id']
+ except KeyError:
+ self.id = self.data['@odata.id'].split('/')[-1]
+ else:
+ self.log.warning(f'No data could be loaded for {self.url}')
+
+ def __getitem__(self, key: str) -> 'Endpoint':
+ if not isinstance(key, str) or not key or '/' in key:
+ raise KeyError(key)
+
+ if key not in self._children:
+ child_url: str = f'{self.url.rstrip("/")}/{key}'
+ self._children[key] = Endpoint(child_url, self.client)
+
+ return self._children[key]
+
+ def list_children(self) -> List[str]:
+ return list(self._children.keys())
+
+ def query(self, url: str) -> Dict[str, Any]:
+ data: Dict[str, Any] = {}
+ try:
+ self.log.debug(f'Querying {url}')
+ _, _data, _ = self.client.query(endpoint=url)
+ if not _data:
+ self.log.warning(f'Empty response from {url}')
+ else:
+ data = json.loads(_data)
+ except KeyError as e:
+ self.log.error(f'KeyError while querying {url}: {e}')
+ except HTTPError as e:
+ self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}')
+ except json.JSONDecodeError as e:
+ self.log.error(f'JSON decode error while querying {url}: {e}')
+ except Exception as e:
+ self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}')
+ return data
+
+ def get_data(self) -> Dict[str, Any]:
+ return self.query(self.url)
+
+ def get_members_names(self) -> List[str]:
+ result: List[str] = []
+ if self.has_members:
+ for member in self.data['Members']:
+ name: str = member['@odata.id'].split('/')[-1]
+ result.append(name)
+ return result
+
+ def get_name(self, endpoint: str) -> str:
+ return endpoint.split('/')[-1]
+
+ def get_members_endpoints(self) -> Dict[str, str]:
+ members: Dict[str, str] = {}
+ self.log.debug(f'get_members_endpoints called on {self.url}, has_members={self.has_members}')
+
+ if self.has_members:
+ url_parts = self.url.split('/redfish/v1/')
+ if len(url_parts) > 1:
+ base_path = '/redfish/v1/' + url_parts[1].split('/')[0]
+ else:
+ base_path = None
+
+ for member in self.data['Members']:
+ name = self.get_name(member['@odata.id'])
+ endpoint_url = member['@odata.id']
+ self.log.debug(f'Found member: {name} -> {endpoint_url}')
+
+ if base_path and not endpoint_url.startswith(base_path):
+ self.log.warning(
+ f'Member endpoint {endpoint_url} does not match base path {base_path} '
+ f'from {self.url}. Skipping this member.'
+ )
+ continue
+
+ members[name] = endpoint_url
+ else:
+ if self.data:
+ name = self.get_name(self.url)
+ members[name] = self.url
+ self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}')
+ else:
+ self.log.debug(f'Endpoint {self.url} has no data and no Members array')
+
+ return members
+
+ def get_members_data(self) -> Dict[str, Any]:
+ result: Dict[str, Any] = {}
+ self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}')
+
+ if self.has_members:
+ self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}')
+ members_endpoints = self.get_members_endpoints()
+
+ if not members_endpoints:
+ self.log.warning(
+ f'Endpoint {self.url} has Members array but no valid members after filtering. '
+ f'Using endpoint itself as singleton resource.'
+ )
+ if self.data:
+ name = self.get_name(self.url)
+ result[name] = self.data
+ else:
+ for member, endpoint_url in members_endpoints.items():
+ self.log.info(f'Fetching data for member: {member} at {endpoint_url}')
+ result[member] = self.query(endpoint_url)
+ else:
+ self.log.debug(f'Endpoint {self.url} has no Members array, returning own data')
+ if self.data:
+ name = self.get_name(self.url)
+ result[name] = self.data
+ else:
+ self.log.warning(f'Endpoint {self.url} has no members and empty data')
+
+ return result
+
+ @property
+ def has_members(self) -> bool:
+ return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list))
+
+
+def build_data(
+ data: Dict[str, Any],
+ fields: List[str],
+ log: Any,
+ attribute: Optional[str] = None,
+) -> Dict[str, Dict[str, Dict]]:
+ result: Dict[str, Dict[str, Optional[Dict]]] = dict()
+
+ def process_data(m_id: str, flds: List[str], d: Dict[str, Any]) -> Dict[str, Any]:
+ out: Dict[str, Any] = {}
+ for field in flds:
+ try:
+ out[to_snake_case(field)] = d[field]
+ except KeyError:
+ log.debug(f'Could not find field: {field} in data: {d}')
+ out[to_snake_case(field)] = None
+ return out
+
+ try:
+ if attribute is not None:
+ data_items = data[attribute]
+ else:
+ data_items = [{'MemberId': k, **v} for k, v in data.items()]
+ log.debug(f"build_data: data_items count={len(data_items)}")
+ for d in data_items:
+ member_id = d.get('MemberId')
+ result[member_id] = {}
+ result[member_id] = process_data(member_id, fields, d)
+ except (KeyError, TypeError, AttributeError) as e:
+ log.error(f"Can't build data: {e}")
+ raise
+ return normalize_dict(result)
+
+
+def update_component(
+ endpoints: EndpointMgr,
+ collection: str,
+ component: str,
+ path: str,
+ fields: List[str],
+ _sys: Dict[str, Any],
+ log: Any,
+ attribute: Optional[str] = None,
+) -> None:
+ """Update _sys[component] from Redfish endpoints using the given spec."""
+ members: List[str] = endpoints[collection].get_members_names()
+ result: Dict[str, Any] = {}
+ if not members:
+ data = endpoints[collection][path].get_members_data()
+ result = build_data(data=data, fields=fields, log=log, attribute=attribute)
+ else:
+ for member in members:
+ try:
+ if attribute is None:
+ data = endpoints[collection][member][path].get_members_data()
+ else:
+ data = endpoints[collection][member][path].data
+ except HTTPError as e:
+ log.error(f'Error while updating {component}: {e}')
+ continue
+ result[member] = build_data(data=data, fields=fields, log=log, attribute=attribute)
+ _sys[component] = result
--- /dev/null
+from typing import Dict, Type
+
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
+
+# Built-in implementations
+from ceph_node_proxy.atollon import AtollonSystem
+from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.util import get_logger
+
+REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = {
+ 'generic': BaseRedfishSystem,
+ 'dell': RedfishDellSystem,
+ 'atollon': AtollonSystem,
+}
+
+logger = get_logger(__name__)
+
+
+def _load_entry_point_systems() -> None:
+ try:
+ import pkg_resources
+
+ except ImportError:
+ logger.debug(
+ "pkg_resources not available; only built-in Redfish systems will be used."
+ )
+ return
+
+ for ep in pkg_resources.iter_entry_points("ceph_node_proxy.systems"):
+ try:
+ REDFISH_SYSTEM_CLASSES[ep.name] = ep.load()
+ except (ImportError, AttributeError, ModuleNotFoundError) as e:
+ logger.warning(
+ "Failed to load Redfish system entry point %s: %s", ep.name, e
+ )
+
+
+def get_system_class(vendor: str) -> Type[BaseRedfishSystem]:
+ """Return the Redfish system class for the given vendor.
+ Falls back to generic."""
+ return REDFISH_SYSTEM_CLASSES.get(vendor, BaseRedfishSystem)
+
+
+_load_entry_point_systems()
import time
import json
+from ceph_node_proxy.protocols import SystemForReporter
from ceph_node_proxy.util import get_logger, http_req, BaseThread
from urllib.error import HTTPError, URLError
from typing import Dict, Any
+DEFAULT_MAX_RETRIES = 30
+RETRY_SLEEP_SEC = 5
+
+
class Reporter(BaseThread):
def __init__(self,
- system: Any,
+ system: SystemForReporter,
cephx: Dict[str, Any],
reporter_scheme: str = 'https',
reporter_hostname: str = '',
reporter_port: str = '443',
- reporter_endpoint: str = '/node-proxy/data') -> None:
+ reporter_endpoint: str = '/node-proxy/data',
+ max_retries: int = DEFAULT_MAX_RETRIES) -> None:
super().__init__()
self.system = system
self.data: Dict[str, Any] = {}
self.reporter_hostname: str = reporter_hostname
self.reporter_port: str = reporter_port
self.reporter_endpoint: str = reporter_endpoint
+ self.max_retries: int = max_retries
self.log = get_logger(__name__)
self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
f'{reporter_port}{reporter_endpoint}')
self.log.info(f'Reporter url set to {self.reporter_url}')
+ def _send_with_retries(self) -> bool:
+ """Send data to mgr. Returns True on success, False after max_retries failures."""
+ for attempt in range(1, self.max_retries + 1):
+ try:
+ self.log.info(f'sending data to {self.reporter_url} (attempt {attempt}/{self.max_retries})')
+ http_req(hostname=self.reporter_hostname,
+ port=self.reporter_port,
+ method='POST',
+ headers={'Content-Type': 'application/json'},
+ endpoint=self.reporter_endpoint,
+ scheme=self.reporter_scheme,
+ data=json.dumps(self.data))
+ return True
+ except (HTTPError, URLError) as e:
+ self.log.error(
+ f"The reporter couldn't send data to the mgr (attempt {attempt}/{self.max_retries}): {e}"
+ )
+ if attempt < self.max_retries:
+ time.sleep(RETRY_SLEEP_SEC)
+ return False
+
def main(self) -> None:
while not self.stop:
- # Any logic to avoid sending the all the system
- # information every loop can go here. In a real
- # scenario probably we should just send the sub-parts
- # that have changed to minimize the traffic in
- # dense clusters
self.log.debug('waiting for a lock in reporter loop.')
with self.system.lock:
if not self.system.pending_shutdown:
self.log.debug('lock acquired in reporter loop.')
if self.system.data_ready:
self.log.debug('data ready to be sent to the mgr.')
- if not self.system.get_system() == self.system.previous_data:
+ if self.system.get_system() != self.system.previous_data:
self.log.info('data has changed since last iteration.')
self.data['patch'] = self.system.get_system()
- try:
- # TODO: add a timeout parameter to the reporter in the config file
- self.log.info(f'sending data to {self.reporter_url}')
- http_req(hostname=self.reporter_hostname,
- port=self.reporter_port,
- method='POST',
- headers={'Content-Type': 'application/json'},
- endpoint=self.reporter_endpoint,
- scheme=self.reporter_scheme,
- data=json.dumps(self.data))
- except (HTTPError, URLError) as e:
- self.log.error(f"The reporter couldn't send data to the mgr: {e}")
- raise
- # Need to add a new parameter 'max_retries' to the reporter if it can't
- # send the data for more than x times, maybe the daemon should stop altogether
- else:
+ if self._send_with_retries():
self.system.previous_data = self.system.get_system()
+ else:
+ self.log.error(
+ f'Failed to send data after {self.max_retries} retries; '
+ 'will retry on next cycle.'
+ )
else:
self.log.debug('no diff, not sending data to the mgr.')
self.log.debug('lock released in reporter loop.')
'tox',
'ceph',
],
- entry_points=dict(
- console_scripts=[
+ entry_points={
+ 'console_scripts': [
'ceph-node-proxy = ceph_node_proxy.main:main',
],
- ),
+ # vendors can register Redfish system implementations
+ # example: 'myvendor = mypackage.redfish_system:MyVendorSystem'
+ 'ceph_node_proxy.systems': [],
+ },
classifiers=[
'Environment :: Console',
'Intended Audience :: Information Technology',