]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
node-proxy: split out config, bootstrap and redfish logic
authorGuillaume Abrioux <gabrioux@ibm.com>
Fri, 30 Jan 2026 15:02:28 +0000 (16:02 +0100)
committerGuillaume Abrioux <gabrioux@ibm.com>
Thu, 5 Feb 2026 16:40:10 +0000 (17:40 +0100)
refactor config, bootstrap, redfish layer, and monitoring:

this:
- adds a config module (CephadmCofnig, load_cephadm_config and
get_node_proxy_config) and protocols for api/reporter.
- extracts redfish logic to redfish.py
- adds a vendor registry with entrypoints.
- simplifies main() and NodeProxyManager().

Fixes: https://tracker.ceph.com/issues/74749
Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
src/ceph-node-proxy/ceph_node_proxy/api.py
src/ceph-node-proxy/ceph_node_proxy/atollon.py
src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py
src/ceph-node-proxy/ceph_node_proxy/bootstrap.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/config.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/main.py
src/ceph-node-proxy/ceph_node_proxy/protocols.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/redfish.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/registry.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/reporter.py
src/ceph-node-proxy/setup.py

index 676d7c1c61d21b0daf75e77079a8b3ea5a589189..a6cf5b8436a4716609f0de743b4e71384c49a9e8 100644 (file)
@@ -3,8 +3,8 @@ from urllib.error import HTTPError
 from cherrypy._cpserver import Server  # type: ignore
 from threading import Thread, Event
 from typing import Dict, Any, List
+from ceph_node_proxy.protocols import SystemBackend
 from ceph_node_proxy.util import Config, get_logger, write_tmp_file
-from ceph_node_proxy.basesystem import BaseSystem
 from ceph_node_proxy.reporter import Reporter
 from typing import TYPE_CHECKING, Optional
 
@@ -12,6 +12,8 @@ if TYPE_CHECKING:
     from ceph_node_proxy.main import NodeProxyManager
 
 
+# Admin endpoints (start/stop/reload) are not mounted by default.
+# To enable, mount: cherrypy.tree.mount(Admin(api), '/admin', config=config)
 @cherrypy.tools.auth_basic(on=True)
 @cherrypy.tools.allow(methods=['PUT'])
 @cherrypy.tools.json_out()
@@ -53,7 +55,7 @@ class Admin():
 
 class API(Server):
     def __init__(self,
-                 backend: 'BaseSystem',
+                 backend: SystemBackend,
                  reporter: 'Reporter',
                  config: 'Config',
                  addr: str = '0.0.0.0',
index fd67ef47f186b5ec9681682e5fa7e9c9ae8691ea..5d71b7730425849e726de5115f172a7ef54794ea 100644 (file)
@@ -9,4 +9,4 @@ class AtollonSystem(BaseRedfishSystem):
         self.log = get_logger(__name__)
 
     def get_component_spec_overrides(self) -> Dict[str, Dict[str, Any]]:
-        return {'power': {'path': 'PowerSubsystem'}}
\ No newline at end of file
+        return {'power': {'path': 'PowerSubsystem'}}
index 88a84fb6f0c4bcfd2077b898bf576560440ae8cf..98b88d8416e944d5ef6200c1986630516e857e26 100644 (file)
 import concurrent.futures
 import dataclasses
-import json
-from dataclasses import dataclass
 from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.redfish import (
+    ComponentUpdateSpec,
+    Endpoint,
+    EndpointMgr,
+    update_component,
+)
 from ceph_node_proxy.redfish_client import RedFishClient
-from time import sleep
 from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
+from time import sleep
 from typing import Dict, Any, List, Callable, Optional
-from urllib.error import HTTPError, URLError
-
-
-@dataclass
-class ComponentUpdateSpec:
-    collection: str
-    path: str
-    fields: List[str]
-    attribute: Optional[str] = None
-
-
-class EndpointMgr:
-    NAME: str = 'EndpointMgr'
-
-    def __init__(self,
-                 client: RedFishClient,
-                 prefix: str = RedFishClient.PREFIX) -> None:
-        self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}')
-        self.prefix: str = prefix
-        self.client: RedFishClient = client
-        # Use explicit dictionary instead of dynamic attributes
-        self._endpoints: Dict[str, Endpoint] = {}
-        self._session_url: str = ''
-
-    def __getitem__(self, index: str) -> 'Endpoint':
-        if index not in self._endpoints:
-            raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}")
-        return self._endpoints[index]
-
-    def get(self, name: str, default: Any = None) -> Any:
-        return self._endpoints.get(name, default)
-
-    def list_endpoints(self) -> List[str]:
-        return list(self._endpoints.keys())
-
-    @property
-    def session(self) -> str:
-        return self._session_url
-
-    def init(self) -> None:
-        error_msg: str = "Can't discover entrypoint(s)"
-        try:
-            _, _data, _ = self.client.query(endpoint=self.prefix)
-            json_data: Dict[str, Any] = json.loads(_data)
-
-            # Discover endpoints
-            for k, v in json_data.items():
-                if isinstance(v, dict) and '@odata.id' in v:
-                    name: str = to_snake_case(k)
-                    url: str = v['@odata.id']
-                    self.log.info(f'entrypoint found: {name} = {url}')
-                    self._endpoints[name] = Endpoint(url, self.client)
-
-            # Extract session URL if available
-            try:
-                self._session_url = json_data['Links']['Sessions']['@odata.id']
-            except (KeyError, TypeError):
-                self.log.warning('Session URL not found in root response')
-                self._session_url = ''
-
-        except (URLError, KeyError, json.JSONDecodeError) as e:
-            msg = f'{error_msg}: {e}'
-            self.log.error(msg)
-            raise RuntimeError(msg) from e
-
-
-class Endpoint:
-    NAME: str = 'Endpoint'
-
-    def __init__(self, url: str, client: RedFishClient) -> None:
-        self.log = get_logger(f'{__name__}:{Endpoint.NAME}')
-        self.url: str = url
-        self.client: RedFishClient = client
-        self._children: Dict[str, 'Endpoint'] = {}
-        self.data: Dict[str, Any] = self.get_data()
-        self.id: str = ''
-        self.members_names: List[str] = []
-
-        if self.has_members:
-            self.members_names = self.get_members_names()
-
-        if self.data:
-            try:
-                self.id = self.data['Id']
-            except KeyError:
-                self.id = self.data['@odata.id'].split('/')[-1]
-        else:
-            self.log.warning(f'No data could be loaded for {self.url}')
-
-    def __getitem__(self, key: str) -> 'Endpoint':
-        if not isinstance(key, str) or not key or '/' in key:
-            raise KeyError(key)
-
-        if key not in self._children:
-            child_url: str = f'{self.url.rstrip("/")}/{key}'
-            self._children[key] = Endpoint(child_url, self.client)
-
-        return self._children[key]
-
-    def list_children(self) -> List[str]:
-        return list(self._children.keys())
-
-    def query(self, url: str) -> Dict[str, Any]:
-        data: Dict[str, Any] = {}
-        try:
-            self.log.debug(f'Querying {url}')
-            _, _data, _ = self.client.query(endpoint=url)
-            if not _data:
-                self.log.warning(f'Empty response from {url}')
-            else:
-                data = json.loads(_data)
-        except KeyError as e:
-            self.log.error(f'KeyError while querying {url}: {e}')
-        except HTTPError as e:
-            self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}')
-        except json.JSONDecodeError as e:
-            self.log.error(f'JSON decode error while querying {url}: {e}')
-        except Exception as e:
-            self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}')
-        return data
-
-    def get_data(self) -> Dict[str, Any]:
-        return self.query(self.url)
-
-    def get_members_names(self) -> List[str]:
-        result: List[str] = []
-        if self.has_members:
-            for member in self.data['Members']:
-                name: str = member['@odata.id'].split('/')[-1]
-                result.append(name)
-        return result
-
-    def get_name(self, endpoint: str) -> str:
-        return endpoint.split('/')[-1]
-
-    def get_members_endpoints(self) -> Dict[str, str]:
-        members: Dict[str, str] = {}
-
-        self.log.error(f'get_members_endpoints called on {self.url}, has_members={self.has_members}')
-
-        if self.has_members:
-            url_parts = self.url.split('/redfish/v1/')
-            if len(url_parts) > 1:
-                base_path = '/redfish/v1/' + url_parts[1].split('/')[0]
-            else:
-                base_path = None
-
-            for member in self.data['Members']:
-                name = self.get_name(member['@odata.id'])
-                endpoint_url = member['@odata.id']
-                self.log.debug(f'Found member: {name} -> {endpoint_url}')
-                
-                if base_path and not endpoint_url.startswith(base_path):
-                    self.log.warning(
-                        f'Member endpoint {endpoint_url} does not match base path {base_path} '
-                        f'from {self.url}. Skipping this member.'
-                    )
-                    continue
-
-                members[name] = endpoint_url
-        else:
-            if self.data:
-                name = self.get_name(self.url)
-                members[name] = self.url
-                self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}')
-            else:
-                self.log.debug(f'Endpoint {self.url} has no data and no Members array')
-
-        return members
-
-    def get_members_data(self) -> Dict[str, Any]:
-        result: Dict[str, Any] = {}
-        self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}')
-        
-        if self.has_members:
-            self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}')
-            members_endpoints = self.get_members_endpoints()
-            
-            # If no valid members after filtering, fall back to using the endpoint itself
-            if not members_endpoints:
-                self.log.warning(
-                    f'Endpoint {self.url} has Members array but no valid members after filtering. '
-                    f'Using endpoint itself as singleton resource.'
-                )
-                if self.data:
-                    name = self.get_name(self.url)
-                    result[name] = self.data
-            else:
-                for member, endpoint_url in members_endpoints.items():
-                    self.log.info(f'Fetching data for member: {member} at {endpoint_url}')
-                    result[member] = self.query(endpoint_url)
-        else:
-            self.log.debug(f'Endpoint {self.url} has no Members array, returning own data')
-            if self.data:
-                name = self.get_name(self.url)
-                result[name] = self.data
-            else:
-                self.log.warning(f'Endpoint {self.url} has no members and empty data')
-
-        return result
-
-    @property
-    def has_members(self) -> bool:
-        return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list))
 
 
 class BaseRedfishSystem(BaseSystem):
@@ -242,7 +42,9 @@ class BaseRedfishSystem(BaseSystem):
         self.username: str = kw['username']
         self.password: str = kw['password']
         # move the following line (class attribute?)
-        self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
+        self.client: RedFishClient = RedFishClient(
+            host=self.host, port=self.port, username=self.username, password=self.password
+        )
         self.endpoints: EndpointMgr = EndpointMgr(self.client)
         self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
         self.data_ready: bool = False
@@ -268,74 +70,16 @@ class BaseRedfishSystem(BaseSystem):
                 f = getattr(self, func)
                 self.update_funcs.append(f)
 
-    def build_data(self,
-                   data: Dict[str, Any],
-                   fields: List[str],
-                   attribute: Optional[str] = None) -> Dict[str, Dict[str, Dict]]:
-        result: Dict[str, Dict[str, Optional[Dict]]] = dict()
-        member_id: str = ''
-
-        def process_data(m_id: str, fields: List[str], data: Dict[str, Any]) -> Dict[str, Any]:
-            result: Dict[str, Any] = {}
-            for field in fields:
-                try:
-                    result[to_snake_case(field)] = data[field]
-                except KeyError:
-                    self.log.debug(f'Could not find field: {field} in data: {data}')
-                    result[to_snake_case(field)] = None
-            return result
-
-        try:
-            if attribute is not None:
-                data_items = data[attribute]
-            else:
-                # The following is a hack to re-inject the key to the dict
-                # as we have the following structure when `attribute` is passed:
-                # "PowerSupplies": [ {"MemberId": "0", ...}, {"MemberId": "1", ...} ]
-                # vs. this structure in the opposite case:
-                # { "CPU.Socket.2": { "Id": "CPU.Socket.2", "Manufacturer": "Intel" }, "CPU.Socket.1": {} }
-                # With the first case, we clearly use the field "MemberId".
-                # With the second case, we use the key of the dict.
-                # This is mostly for avoiding code duplication.
-                data_items = [{'MemberId': k, **v} for k, v in data.items()]
-            self.log.error(f"GUITS_DEBUG: data_items= {data_items}")
-            for d in data_items:
-                member_id = d.get('MemberId')
-                result[member_id] = {}
-                result[member_id] = process_data(member_id, fields, d)
-        except (KeyError, TypeError, AttributeError) as e:
-            self.log.error(f"Can't build data: {e}")
-            raise
-        return normalize_dict(result)
-
     def update(self,
                collection: str,
                component: str,
                path: str,
                fields: List[str],
                attribute: Optional[str] = None) -> None:
-        members: List[str] = self.endpoints[collection].get_members_names()
-        result: Dict[str, Any] = {}
-        data: Dict[str, Any] = {}
-        data_built: Dict[str, Any] = {}
-        if not members:
-            data = self.endpoints[collection][path].get_members_data()
-            data_built = self.build_data(data=data, fields=fields, attribute=attribute)
-            result = data_built
-        else:
-            for member in members:
-                data_built = {}
-                try:
-                    if attribute is None:
-                        data = self.endpoints[collection][member][path].get_members_data()
-                    else:
-                        data = self.endpoints[collection][member][path].data
-                except HTTPError as e:
-                    self.log.error(f'Error while updating {component}: {e}')
-                else:
-                    data_built = self.build_data(data=data, fields=fields, attribute=attribute)
-                    result[member] = data_built
-        self._sys[component] = result
+        update_component(
+            self.endpoints, collection, component, path, fields,
+            self._sys, self.log, attribute=attribute,
+        )
 
     def main(self) -> None:
         self.stop = False
@@ -546,4 +290,4 @@ class BaseRedfishSystem(BaseSystem):
         raise NotImplementedError()
 
     def schedule_reboot_job(self, job_id: str) -> int:
-        raise NotImplementedError()
\ No newline at end of file
+        raise NotImplementedError()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/bootstrap.py b/src/ceph-node-proxy/ceph_node_proxy/bootstrap.py
new file mode 100644 (file)
index 0000000..c691680
--- /dev/null
@@ -0,0 +1,39 @@
+from typing import TYPE_CHECKING
+
+from ceph_node_proxy.config import CephadmConfig, get_node_proxy_config
+from ceph_node_proxy.util import DEFAULTS, write_tmp_file
+
+if TYPE_CHECKING:
+    from ceph_node_proxy.main import NodeProxyManager
+
+
+def create_node_proxy_manager(cephadm_config: CephadmConfig) -> 'NodeProxyManager':
+    """
+    Build NodeProxyManager from cephadm bootstrap config.
+    Creates temporary CA file and loads node-proxy YAML config.
+    """
+    from ceph_node_proxy.main import NodeProxyManager
+
+    ca_file = write_tmp_file(
+        cephadm_config.root_cert_pem,
+        prefix_name='cephadm-endpoint-root-cert-',
+    )
+    config = get_node_proxy_config(
+        path=cephadm_config.node_proxy_config_path,
+        defaults=DEFAULTS,
+    )
+
+    manager = NodeProxyManager(
+        mgr_host=cephadm_config.target_ip,
+        cephx_name=cephadm_config.name,
+        cephx_secret=cephadm_config.keyring,
+        mgr_agent_port=cephadm_config.target_port,
+        ca_path=ca_file.name,
+        api_ssl_crt=cephadm_config.listener_crt,
+        api_ssl_key=cephadm_config.listener_key,
+        config_path=cephadm_config.node_proxy_config_path,
+        config=config,
+    )
+    # Keep temp file alive for the lifetime of the manager
+    manager._ca_temp_file = ca_file
+    return manager
diff --git a/src/ceph-node-proxy/ceph_node_proxy/config.py b/src/ceph-node-proxy/ceph_node_proxy/config.py
new file mode 100644 (file)
index 0000000..8ff4aa6
--- /dev/null
@@ -0,0 +1,77 @@
+from __future__ import annotations
+
+import json
+import os
+from dataclasses import dataclass
+from typing import Any, Dict, Optional
+
+from ceph_node_proxy.util import DEFAULTS, Config
+
+
+REQUIRED_CEPHADM_KEYS = (
+    'target_ip',
+    'target_port',
+    'keyring',
+    'root_cert.pem',
+    'listener.crt',
+    'listener.key',
+    'name',
+)
+
+
+@dataclass
+class CephadmConfig:
+    """Parsed cephadm bootstrap config (from --config JSON file)"""
+    target_ip: str
+    target_port: str
+    keyring: str
+    root_cert_pem: str
+    listener_crt: str
+    listener_key: str
+    name: str
+    node_proxy_config_path: str
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> CephadmConfig:
+        for key in REQUIRED_CEPHADM_KEYS:
+            if key not in data:
+                raise ValueError(f'Missing required cephadm config key: {key}')
+        # Normalize key with dot to attribute name
+        node_proxy_config_path = data.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
+        assert node_proxy_config_path is not None
+        return cls(
+            target_ip=data['target_ip'],
+            target_port=data['target_port'],
+            keyring=data['keyring'],
+            root_cert_pem=data['root_cert.pem'],
+            listener_crt=data['listener.crt'],
+            listener_key=data['listener.key'],
+            name=data['name'],
+            node_proxy_config_path=node_proxy_config_path,
+        )
+
+
+def load_cephadm_config(path: str) -> CephadmConfig:
+    """
+    Load and validate cephadm bootstrap config from a JSON file.
+    Raises FileNotFoundError if path does not exist, ValueError if invalid.
+    """
+    if not os.path.exists(path):
+        raise FileNotFoundError(f'Config file not found: {path}')
+    with open(path, 'r') as f:
+        try:
+            data = json.load(f)
+        except json.JSONDecodeError as e:
+            raise ValueError(f'Invalid JSON config: {e}') from e
+    if not isinstance(data, dict):
+        raise ValueError('Config must be a JSON object')
+    return CephadmConfig.from_dict(data)
+
+
+def get_node_proxy_config(
+    path: Optional[str] = None,
+    defaults: Optional[Dict[str, Any]] = None,
+) -> Config:
+    effective_path = path or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
+    assert effective_path is not None
+    return Config(effective_path, defaults=defaults or DEFAULTS)
index 13887112589402d2fbb55aae1cb55a96b5f104e9..5bd37d090fdc57ae5de3aaa05548be906a2f65b4 100644 (file)
@@ -1,49 +1,62 @@
 from ceph_node_proxy.api import NodeProxyApi
-from ceph_node_proxy.atollon import AtollonSystem
-from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
-from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.bootstrap import create_node_proxy_manager
+from ceph_node_proxy.config import load_cephadm_config
+from ceph_node_proxy.registry import get_system_class
 from ceph_node_proxy.reporter import Reporter
-from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req, write_tmp_file
+from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req
 from urllib.error import HTTPError
-from typing import Dict, Any, Optional, Type
+from typing import Dict, Any, Optional
 
 import argparse
+import json
 import os
+import signal
 import ssl
-import json
 import time
-import signal
-
-
-REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = {
-    'generic': BaseRedfishSystem,
-    'dell': RedfishDellSystem,
-    'atollon': AtollonSystem,
-}
 
 
 class NodeProxyManager:
-    def __init__(self, **kw: Any) -> None:
+    def __init__(
+        self,
+        *,
+        mgr_host: str,
+        cephx_name: str,
+        cephx_secret: str,
+        ca_path: str,
+        api_ssl_crt: str,
+        api_ssl_key: str,
+        mgr_agent_port: str,
+        config: Optional[Config] = None,
+        config_path: Optional[str] = None,
+        reporter_scheme: str = 'https',
+        reporter_endpoint: str = '/node-proxy/data',
+    ) -> None:
         self.exc: Optional[Exception] = None
         self.log = get_logger(__name__)
-        self.mgr_host: str = kw['mgr_host']
-        self.cephx_name: str = kw['cephx_name']
-        self.cephx_secret: str = kw['cephx_secret']
-        self.ca_path: str = kw['ca_path']
-        self.api_ssl_crt: str = kw['api_ssl_crt']
-        self.api_ssl_key: str = kw['api_ssl_key']
-        self.mgr_agent_port: str = str(kw['mgr_agent_port'])
+        self.mgr_host = mgr_host
+        self.cephx_name = cephx_name
+        self.cephx_secret = cephx_secret
+        self.ca_path = ca_path
+        self.api_ssl_crt = api_ssl_crt
+        self.api_ssl_key = api_ssl_key
+        self.mgr_agent_port = str(mgr_agent_port)
         self.stop: bool = False
         self.ssl_ctx = ssl.create_default_context()
         self.ssl_ctx.check_hostname = True
         self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
         self.ssl_ctx.load_verify_locations(self.ca_path)
-        self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
-        self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
+        self.reporter_scheme = reporter_scheme
+        self.reporter_endpoint = reporter_endpoint
         self.cephx = {'cephx': {'name': self.cephx_name,
                                 'secret': self.cephx_secret}}
-        config_path = kw.get('config_path') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
-        self.config = Config(config_path, defaults=DEFAULTS)
+        if config is not None:
+            self.config = config
+        else:
+            path = (
+                config_path or os.environ.get('NODE_PROXY_CONFIG')
+                or '/etc/ceph/node-proxy.yml'
+            )
+            self.config = Config(path, defaults=DEFAULTS)
         self.username: str = ''
         self.password: str = ''
 
@@ -87,7 +100,7 @@ class NodeProxyManager:
             raise SystemExit(1)
         try:
             vendor = self.config.get('system', {}).get('vendor', 'generic')
-            system_cls = REDFISH_SYSTEM_CLASSES.get(vendor, BaseRedfishSystem)
+            system_cls = get_system_class(vendor)
             self.system = system_cls(host=oob_details['host'],
                                      port=oob_details['port'],
                                      username=oob_details['username'],
@@ -100,12 +113,14 @@ class NodeProxyManager:
 
     def init_reporter(self) -> None:
         try:
+            max_retries = self.config.get('reporter', {}).get('push_data_max_retries', 30)
             self.reporter_agent = Reporter(self.system,
                                            self.cephx,
                                            reporter_scheme=self.reporter_scheme,
                                            reporter_hostname=self.mgr_host,
                                            reporter_port=self.mgr_agent_port,
-                                           reporter_endpoint=self.reporter_endpoint)
+                                           reporter_endpoint=self.reporter_endpoint,
+                                           max_retries=max_retries)
             self.reporter_agent.start()
         except RuntimeError:
             self.log.error("Can't initialize the reporter.")
@@ -121,19 +136,34 @@ class NodeProxyManager:
             raise
 
     def loop(self) -> None:
+        check_interval = 20
+        min_interval = 20
+        max_interval = 300
+        backoff_factor = 1.5
+        consecutive_failures = 0
+
         while not self.stop:
-            for thread in [self.system, self.reporter_agent]:
-                try:
+            try:
+                for thread in [self.system, self.reporter_agent]:
                     status = thread.check_status()
                     label = 'Ok' if status else 'Critical'
                     self.log.debug(f'{thread} status: {label}')
-                except Exception as e:
-                    self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}')
+                consecutive_failures = 0
+                check_interval = min_interval
+                self.log.debug('All threads are alive, next check in %ds.', check_interval)
+            except Exception as e:
+                consecutive_failures += 1
+                self.log.error(
+                    f'{consecutive_failures} failure(s): thread not running: '
+                    f'{e.__class__.__name__}: {e}'
+                )
+                for thread in [self.system, self.reporter_agent]:
                     thread.shutdown()
-                    self.init_system()
-                    self.init_reporter()
-            self.log.debug('All threads are alive, next check in 20sec.')
-            time.sleep(20)
+                self.init_system()
+                self.init_reporter()
+                check_interval = min(int(check_interval * backoff_factor), max_interval)
+                self.log.debug('Next check in %ds (backoff).', check_interval)
+            time.sleep(check_interval)
 
     def shutdown(self) -> None:
         self.stop = True
@@ -147,11 +177,13 @@ class NodeProxyManager:
 
 
 def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None:
-    t_mgr.system.pending_shutdown = True
+    if hasattr(t_mgr, 'system') and t_mgr.system is not None:
+        t_mgr.system.pending_shutdown = True
     t_mgr.log.info('SIGTERM caught, shutting down threads...')
     t_mgr.shutdown()
-    t_mgr.log.info('Logging out from RedFish API')
-    t_mgr.system.client.logout()
+    if hasattr(t_mgr, 'system') and t_mgr.system is not None and hasattr(t_mgr.system, 'client') and t_mgr.system.client is not None:
+        t_mgr.log.info('Logging out from RedFish API')
+        t_mgr.system.client.logout()
     raise SystemExit(0)
 
 
@@ -174,37 +206,14 @@ def main() -> None:
     if args.debug:
         DEFAULTS['logging']['level'] = 10
 
-    if not os.path.exists(args.config):
-        raise Exception(f'No config file found at provided config path: {args.config}')
+    try:
+        cephadm_config = load_cephadm_config(args.config)
+    except FileNotFoundError as e:
+        raise SystemExit(f'Config error: {e}')
+    except ValueError as e:
+        raise SystemExit(f'Config error: {e}')
 
-    with open(args.config, 'r') as f:
-        try:
-            config_json = f.read()
-            config = json.loads(config_json)
-        except Exception as e:
-            raise Exception(f'Failed to load json config: {str(e)}')
-
-    target_ip = config['target_ip']
-    target_port = config['target_port']
-    keyring = config['keyring']
-    root_cert = config['root_cert.pem']
-    listener_cert = config['listener.crt']
-    listener_key = config['listener.key']
-    name = config['name']
-
-    ca_file = write_tmp_file(root_cert,
-                             prefix_name='cephadm-endpoint-root-cert')
-
-    config_path = config.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
-
-    node_proxy_mgr = NodeProxyManager(mgr_host=target_ip,
-                                      cephx_name=name,
-                                      cephx_secret=keyring,
-                                      mgr_agent_port=target_port,
-                                      ca_path=ca_file.name,
-                                      api_ssl_crt=listener_cert,
-                                      api_ssl_key=listener_key,
-                                      config_path=config_path)
+    node_proxy_mgr = create_node_proxy_manager(cephadm_config)
     signal.signal(signal.SIGTERM,
                   lambda signum, frame: handler(signum, frame, node_proxy_mgr))
     node_proxy_mgr.run()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/protocols.py b/src/ceph-node-proxy/ceph_node_proxy/protocols.py
new file mode 100644 (file)
index 0000000..f71d665
--- /dev/null
@@ -0,0 +1,70 @@
+from threading import Lock
+from typing import Any, Dict, Protocol, runtime_checkable
+
+
+@runtime_checkable
+class SystemBackend(Protocol):
+    def get_memory(self) -> Dict[str, Any]:
+        ...
+
+    def get_network(self) -> Dict[str, Any]:
+        ...
+
+    def get_storage(self) -> Dict[str, Any]:
+        ...
+
+    def get_processors(self) -> Dict[str, Any]:
+        ...
+
+    def get_power(self) -> Dict[str, Any]:
+        ...
+
+    def get_fans(self) -> Dict[str, Any]:
+        ...
+
+    def get_firmwares(self) -> Dict[str, Any]:
+        ...
+
+    def get_led(self) -> Dict[str, Any]:
+        ...
+
+    def set_led(self, data: Dict[str, str]) -> int:
+        ...
+
+    def get_chassis_led(self) -> Dict[str, Any]:
+        ...
+
+    def chassis_led_on(self) -> int:
+        ...
+
+    def chassis_led_off(self) -> int:
+        ...
+
+    def get_device_led(self, device: str) -> Dict[str, Any]:
+        ...
+
+    def device_led_on(self, device: str) -> int:
+        ...
+
+    def device_led_off(self, device: str) -> int:
+        ...
+
+    def shutdown_host(self, force: bool = False) -> int:
+        ...
+
+    def powercycle(self) -> int:
+        ...
+
+    def flush(self) -> None:
+        ...
+
+
+@runtime_checkable
+class SystemForReporter(Protocol):
+    lock: Lock
+    data_ready: bool
+    pending_shutdown: bool
+    previous_data: Dict[str, Any]
+
+    def get_system(self) -> Dict[str, Any]:
+        ...
diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfish.py b/src/ceph-node-proxy/ceph_node_proxy/redfish.py
new file mode 100644 (file)
index 0000000..c9ef874
--- /dev/null
@@ -0,0 +1,275 @@
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from typing import Any, Dict, List, Optional
+
+from ceph_node_proxy.redfish_client import RedFishClient
+from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
+from urllib.error import HTTPError, URLError
+
+
+@dataclass
+class ComponentUpdateSpec:
+    collection: str
+    path: str
+    fields: List[str]
+    attribute: Optional[str] = None
+
+
+class EndpointMgr:
+    """Manages Redfish root endpoints (Systems, Chassis, etc.) discovered from the service root."""
+
+    NAME: str = 'EndpointMgr'
+
+    def __init__(self,
+                 client: RedFishClient,
+                 prefix: str = RedFishClient.PREFIX) -> None:
+        self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}')
+        self.prefix: str = prefix
+        self.client: RedFishClient = client
+        self._endpoints: Dict[str, 'Endpoint'] = {}
+        self._session_url: str = ''
+
+    def __getitem__(self, index: str) -> Endpoint:
+        if index not in self._endpoints:
+            raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}")
+        return self._endpoints[index]
+
+    def get(self, name: str, default: Any = None) -> Any:
+        return self._endpoints.get(name, default)
+
+    def list_endpoints(self) -> List[str]:
+        return list(self._endpoints.keys())
+
+    @property
+    def session(self) -> str:
+        return self._session_url
+
+    def init(self) -> None:
+        error_msg: str = "Can't discover entrypoint(s)"
+        try:
+            _, _data, _ = self.client.query(endpoint=self.prefix)
+            json_data: Dict[str, Any] = json.loads(_data)
+
+            for k, v in json_data.items():
+                if isinstance(v, dict) and '@odata.id' in v:
+                    name: str = to_snake_case(k)
+                    url: str = v['@odata.id']
+                    self.log.info(f'entrypoint found: {name} = {url}')
+                    self._endpoints[name] = Endpoint(url, self.client)
+
+            try:
+                self._session_url = json_data['Links']['Sessions']['@odata.id']
+            except (KeyError, TypeError):
+                self.log.warning('Session URL not found in root response')
+                self._session_url = ''
+
+        except (URLError, KeyError, json.JSONDecodeError) as e:
+            msg = f'{error_msg}: {e}'
+            self.log.error(msg)
+            raise RuntimeError(msg) from e
+
+
+class Endpoint:
+    """Single Redfish resource or collection; supports lazy child resolution and member listing."""
+
+    NAME: str = 'Endpoint'
+
+    def __init__(self, url: str, client: RedFishClient) -> None:
+        self.log = get_logger(f'{__name__}:{Endpoint.NAME}')
+        self.url: str = url
+        self.client: RedFishClient = client
+        self._children: Dict[str, 'Endpoint'] = {}
+        self.data: Dict[str, Any] = self.get_data()
+        self.id: str = ''
+        self.members_names: List[str] = []
+
+        if self.has_members:
+            self.members_names = self.get_members_names()
+
+        if self.data:
+            try:
+                self.id = self.data['Id']
+            except KeyError:
+                self.id = self.data['@odata.id'].split('/')[-1]
+        else:
+            self.log.warning(f'No data could be loaded for {self.url}')
+
+    def __getitem__(self, key: str) -> 'Endpoint':
+        if not isinstance(key, str) or not key or '/' in key:
+            raise KeyError(key)
+
+        if key not in self._children:
+            child_url: str = f'{self.url.rstrip("/")}/{key}'
+            self._children[key] = Endpoint(child_url, self.client)
+
+        return self._children[key]
+
+    def list_children(self) -> List[str]:
+        return list(self._children.keys())
+
+    def query(self, url: str) -> Dict[str, Any]:
+        data: Dict[str, Any] = {}
+        try:
+            self.log.debug(f'Querying {url}')
+            _, _data, _ = self.client.query(endpoint=url)
+            if not _data:
+                self.log.warning(f'Empty response from {url}')
+            else:
+                data = json.loads(_data)
+        except KeyError as e:
+            self.log.error(f'KeyError while querying {url}: {e}')
+        except HTTPError as e:
+            self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}')
+        except json.JSONDecodeError as e:
+            self.log.error(f'JSON decode error while querying {url}: {e}')
+        except Exception as e:
+            self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}')
+        return data
+
+    def get_data(self) -> Dict[str, Any]:
+        return self.query(self.url)
+
+    def get_members_names(self) -> List[str]:
+        result: List[str] = []
+        if self.has_members:
+            for member in self.data['Members']:
+                name: str = member['@odata.id'].split('/')[-1]
+                result.append(name)
+        return result
+
+    def get_name(self, endpoint: str) -> str:
+        return endpoint.split('/')[-1]
+
+    def get_members_endpoints(self) -> Dict[str, str]:
+        members: Dict[str, str] = {}
+        self.log.debug(f'get_members_endpoints called on {self.url}, has_members={self.has_members}')
+
+        if self.has_members:
+            url_parts = self.url.split('/redfish/v1/')
+            if len(url_parts) > 1:
+                base_path = '/redfish/v1/' + url_parts[1].split('/')[0]
+            else:
+                base_path = None
+
+            for member in self.data['Members']:
+                name = self.get_name(member['@odata.id'])
+                endpoint_url = member['@odata.id']
+                self.log.debug(f'Found member: {name} -> {endpoint_url}')
+
+                if base_path and not endpoint_url.startswith(base_path):
+                    self.log.warning(
+                        f'Member endpoint {endpoint_url} does not match base path {base_path} '
+                        f'from {self.url}. Skipping this member.'
+                    )
+                    continue
+
+                members[name] = endpoint_url
+        else:
+            if self.data:
+                name = self.get_name(self.url)
+                members[name] = self.url
+                self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}')
+            else:
+                self.log.debug(f'Endpoint {self.url} has no data and no Members array')
+
+        return members
+
+    def get_members_data(self) -> Dict[str, Any]:
+        result: Dict[str, Any] = {}
+        self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}')
+
+        if self.has_members:
+            self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}')
+            members_endpoints = self.get_members_endpoints()
+
+            if not members_endpoints:
+                self.log.warning(
+                    f'Endpoint {self.url} has Members array but no valid members after filtering. '
+                    f'Using endpoint itself as singleton resource.'
+                )
+                if self.data:
+                    name = self.get_name(self.url)
+                    result[name] = self.data
+            else:
+                for member, endpoint_url in members_endpoints.items():
+                    self.log.info(f'Fetching data for member: {member} at {endpoint_url}')
+                    result[member] = self.query(endpoint_url)
+        else:
+            self.log.debug(f'Endpoint {self.url} has no Members array, returning own data')
+            if self.data:
+                name = self.get_name(self.url)
+                result[name] = self.data
+            else:
+                self.log.warning(f'Endpoint {self.url} has no members and empty data')
+
+        return result
+
+    @property
+    def has_members(self) -> bool:
+        return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list))
+
+
+def build_data(
+    data: Dict[str, Any],
+    fields: List[str],
+    log: Any,
+    attribute: Optional[str] = None,
+) -> Dict[str, Dict[str, Dict]]:
+    result: Dict[str, Dict[str, Optional[Dict]]] = dict()
+
+    def process_data(m_id: str, flds: List[str], d: Dict[str, Any]) -> Dict[str, Any]:
+        out: Dict[str, Any] = {}
+        for field in flds:
+            try:
+                out[to_snake_case(field)] = d[field]
+            except KeyError:
+                log.debug(f'Could not find field: {field} in data: {d}')
+                out[to_snake_case(field)] = None
+        return out
+
+    try:
+        if attribute is not None:
+            data_items = data[attribute]
+        else:
+            data_items = [{'MemberId': k, **v} for k, v in data.items()]
+        log.debug(f"build_data: data_items count={len(data_items)}")
+        for d in data_items:
+            member_id = d.get('MemberId')
+            result[member_id] = {}
+            result[member_id] = process_data(member_id, fields, d)
+    except (KeyError, TypeError, AttributeError) as e:
+        log.error(f"Can't build data: {e}")
+        raise
+    return normalize_dict(result)
+
+
+def update_component(
+    endpoints: EndpointMgr,
+    collection: str,
+    component: str,
+    path: str,
+    fields: List[str],
+    _sys: Dict[str, Any],
+    log: Any,
+    attribute: Optional[str] = None,
+) -> None:
+    """Update _sys[component] from Redfish endpoints using the given spec."""
+    members: List[str] = endpoints[collection].get_members_names()
+    result: Dict[str, Any] = {}
+    if not members:
+        data = endpoints[collection][path].get_members_data()
+        result = build_data(data=data, fields=fields, log=log, attribute=attribute)
+    else:
+        for member in members:
+            try:
+                if attribute is None:
+                    data = endpoints[collection][member][path].get_members_data()
+                else:
+                    data = endpoints[collection][member][path].data
+            except HTTPError as e:
+                log.error(f'Error while updating {component}: {e}')
+                continue
+            result[member] = build_data(data=data, fields=fields, log=log, attribute=attribute)
+    _sys[component] = result
diff --git a/src/ceph-node-proxy/ceph_node_proxy/registry.py b/src/ceph-node-proxy/ceph_node_proxy/registry.py
new file mode 100644 (file)
index 0000000..5d53bd3
--- /dev/null
@@ -0,0 +1,44 @@
+from typing import Dict, Type
+
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
+
+# Built-in implementations
+from ceph_node_proxy.atollon import AtollonSystem
+from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.util import get_logger
+
+REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = {
+    'generic': BaseRedfishSystem,
+    'dell': RedfishDellSystem,
+    'atollon': AtollonSystem,
+}
+
+logger = get_logger(__name__)
+
+
+def _load_entry_point_systems() -> None:
+    try:
+        import pkg_resources
+
+    except ImportError:
+        logger.debug(
+            "pkg_resources not available; only built-in Redfish systems will be used."
+        )
+        return
+
+    for ep in pkg_resources.iter_entry_points("ceph_node_proxy.systems"):
+        try:
+            REDFISH_SYSTEM_CLASSES[ep.name] = ep.load()
+        except (ImportError, AttributeError, ModuleNotFoundError) as e:
+            logger.warning(
+                "Failed to load Redfish system entry point %s: %s", ep.name, e
+            )
+
+
+def get_system_class(vendor: str) -> Type[BaseRedfishSystem]:
+    """Return the Redfish system class for the given vendor.
+    Falls back to generic."""
+    return REDFISH_SYSTEM_CLASSES.get(vendor, BaseRedfishSystem)
+
+
+_load_entry_point_systems()
index 20d43b59d332b1b3f98082b50421ac561ac6c715..1973d6ec365764bc3e510460469bbcda129bdc13 100644 (file)
@@ -1,18 +1,24 @@
 import time
 import json
+from ceph_node_proxy.protocols import SystemForReporter
 from ceph_node_proxy.util import get_logger, http_req, BaseThread
 from urllib.error import HTTPError, URLError
 from typing import Dict, Any
 
 
+DEFAULT_MAX_RETRIES = 30
+RETRY_SLEEP_SEC = 5
+
+
 class Reporter(BaseThread):
     def __init__(self,
-                 system: Any,
+                 system: SystemForReporter,
                  cephx: Dict[str, Any],
                  reporter_scheme: str = 'https',
                  reporter_hostname: str = '',
                  reporter_port: str = '443',
-                 reporter_endpoint: str = '/node-proxy/data') -> None:
+                 reporter_endpoint: str = '/node-proxy/data',
+                 max_retries: int = DEFAULT_MAX_RETRIES) -> None:
         super().__init__()
         self.system = system
         self.data: Dict[str, Any] = {}
@@ -23,44 +29,51 @@ class Reporter(BaseThread):
         self.reporter_hostname: str = reporter_hostname
         self.reporter_port: str = reporter_port
         self.reporter_endpoint: str = reporter_endpoint
+        self.max_retries: int = max_retries
         self.log = get_logger(__name__)
         self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
                                   f'{reporter_port}{reporter_endpoint}')
         self.log.info(f'Reporter url set to {self.reporter_url}')
 
+    def _send_with_retries(self) -> bool:
+        """Send data to mgr. Returns True on success, False after max_retries failures."""
+        for attempt in range(1, self.max_retries + 1):
+            try:
+                self.log.info(f'sending data to {self.reporter_url} (attempt {attempt}/{self.max_retries})')
+                http_req(hostname=self.reporter_hostname,
+                         port=self.reporter_port,
+                         method='POST',
+                         headers={'Content-Type': 'application/json'},
+                         endpoint=self.reporter_endpoint,
+                         scheme=self.reporter_scheme,
+                         data=json.dumps(self.data))
+                return True
+            except (HTTPError, URLError) as e:
+                self.log.error(
+                    f"The reporter couldn't send data to the mgr (attempt {attempt}/{self.max_retries}): {e}"
+                )
+                if attempt < self.max_retries:
+                    time.sleep(RETRY_SLEEP_SEC)
+        return False
+
     def main(self) -> None:
         while not self.stop:
-            # Any logic to avoid sending the all the system
-            # information every loop can go here. In a real
-            # scenario probably we should just send the sub-parts
-            # that have changed to minimize the traffic in
-            # dense clusters
             self.log.debug('waiting for a lock in reporter loop.')
             with self.system.lock:
                 if not self.system.pending_shutdown:
                     self.log.debug('lock acquired in reporter loop.')
                     if self.system.data_ready:
                         self.log.debug('data ready to be sent to the mgr.')
-                        if not self.system.get_system() == self.system.previous_data:
+                        if self.system.get_system() != self.system.previous_data:
                             self.log.info('data has changed since last iteration.')
                             self.data['patch'] = self.system.get_system()
-                            try:
-                                # TODO: add a timeout parameter to the reporter in the config file
-                                self.log.info(f'sending data to {self.reporter_url}')
-                                http_req(hostname=self.reporter_hostname,
-                                         port=self.reporter_port,
-                                         method='POST',
-                                         headers={'Content-Type': 'application/json'},
-                                         endpoint=self.reporter_endpoint,
-                                         scheme=self.reporter_scheme,
-                                         data=json.dumps(self.data))
-                            except (HTTPError, URLError) as e:
-                                self.log.error(f"The reporter couldn't send data to the mgr: {e}")
-                                raise
-                                # Need to add a new parameter 'max_retries' to the reporter if it can't
-                                # send the data for more than x times, maybe the daemon should stop altogether
-                            else:
+                            if self._send_with_retries():
                                 self.system.previous_data = self.system.get_system()
+                            else:
+                                self.log.error(
+                                    f'Failed to send data after {self.max_retries} retries; '
+                                    'will retry on next cycle.'
+                                )
                         else:
                             self.log.debug('no diff, not sending data to the mgr.')
             self.log.debug('lock released in reporter loop.')
index 7dcc7cdf5bf818590641b8e6e41ead031a36cab7..13b69433ad68c30f9821b1c4e77cb640344f2105 100644 (file)
@@ -22,11 +22,14 @@ setup(
         'tox',
         'ceph',
     ],
-    entry_points=dict(
-        console_scripts=[
+    entry_points={
+        'console_scripts': [
             'ceph-node-proxy = ceph_node_proxy.main:main',
         ],
-    ),
+        # vendors can register Redfish system implementations
+        # example: 'myvendor = mypackage.redfish_system:MyVendorSystem'
+        'ceph_node_proxy.systems': [],
+    },
     classifiers=[
         'Environment :: Console',
         'Intended Audience :: Information Technology',