From: Guillaume Abrioux Date: Sun, 4 Feb 2024 19:11:41 +0000 (+0000) Subject: node-proxy: refactor entrypoint X-Git-Tag: v19.1.0~364^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=ebf05d5d6cf0532d321a668a4444d2069f995d4a;p=ceph.git node-proxy: refactor entrypoint This commit introduces a major refactor of the main entrypoint. - subclass threading.Thread: - Introduce a new class `BaseThread()` that is a `threading.Thread()` abstraction class in order to monitor the different threads. - `BaseSystem()` inherits from `BaseThread()`. - Handle `SIGTERM` signal in order to gracefully shutdown node-proxy (make threads exit gracefully, log out from RedFish API, etc.) Additionally, this: - drops the class `Logger()` from util.py which was not adding value. It is now replaced with a simple `get_logger()` function. - changes the node-proxy API port from 8080 to 9456 (8080 being widely used for frontend apps...) - changes the container entrypoint in order to use the `ceph-node-proxy` binary from the packaging Signed-off-by: Guillaume Abrioux --- diff --git a/src/ceph-node-proxy/ceph_node_proxy/api.py b/src/ceph-node-proxy/ceph_node_proxy/api.py index 93e41def3bfe9..25ae03e519527 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/api.py +++ b/src/ceph-node-proxy/ceph_node_proxy/api.py @@ -1,15 +1,15 @@ -import cherrypy +import cherrypy # type: ignore from urllib.error import HTTPError -from cherrypy._cpserver import Server +from cherrypy._cpserver import Server # type: ignore from threading import Thread, Event from typing import Dict, Any, List -from ceph_node_proxy.util import Config, Logger, write_tmp_file +from ceph_node_proxy.util import Config, get_logger, write_tmp_file from ceph_node_proxy.basesystem import BaseSystem from ceph_node_proxy.reporter import Reporter from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: - from ceph_node_proxy.main import NodeProxy + from ceph_node_proxy.main import NodeProxyManager @cherrypy.tools.auth_basic(on=True) @@ -21,8 +21,7 @@ class Admin(): @cherrypy.expose def start(self) -> Dict[str, str]: - self.api.backend.start_client() - # self.backend.start_update_loop() + self.api.backend.start() self.api.reporter.run() return {'ok': 'node-proxy daemon started'} @@ -32,9 +31,8 @@ class Admin(): return {'ok': 'node-proxy config reloaded'} def _stop(self) -> None: - self.api.backend.stop_update_loop() - self.api.backend.client.logout() - self.api.reporter.stop() + self.api.backend.shutdown() + self.api.reporter.shutdown() @cherrypy.expose def stop(self) -> Dict[str, str]: @@ -61,11 +59,11 @@ class API(Server): addr: str = '0.0.0.0', port: int = 0) -> None: super().__init__() - self.log = Logger(__name__) + self.log = get_logger(__name__) self.backend = backend self.reporter = reporter self.config = config - self.socket_port = self.config.__dict__['server']['port'] if not port else port + self.socket_port = self.config.__dict__['api']['port'] if not port else port self.socket_host = addr self.subscribe() @@ -134,10 +132,10 @@ class API(Server): if 'force' not in data.keys(): msg = "The key 'force' wasn't passed." - self.log.logger.debug(msg) + self.log.debug(msg) raise cherrypy.HTTPError(400, msg) try: - result: int = self.backend.shutdown(force=data['force']) + result: int = self.backend.shutdown_host(force=data['force']) except HTTPError as e: raise cherrypy.HTTPError(e.code, e.reason) return result @@ -167,14 +165,14 @@ class API(Server): if not led_type: msg = "the led type must be provided (either 'chassis' or 'drive')." - self.log.logger.debug(msg) + self.log.debug(msg) raise cherrypy.HTTPError(400, msg) if led_type == 'drive': id_drive_required = not id_drive if id_drive_required or id_drive not in self.backend.get_storage(): msg = 'A valid device ID must be provided.' - self.log.logger.debug(msg) + self.log.debug(msg) raise cherrypy.HTTPError(400, msg) try: @@ -183,7 +181,7 @@ class API(Server): if 'state' not in data or data['state'] not in ['on', 'off']: msg = "Invalid data. 'state' must be provided and have a valid value (on|off)." - self.log.logger.error(msg) + self.log.error(msg) raise cherrypy.HTTPError(400, msg) func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else @@ -228,34 +226,30 @@ class API(Server): class NodeProxyApi(Thread): - def __init__(self, - node_proxy: 'NodeProxy', - username: str, - password: str, - ssl_crt: str, - ssl_key: str) -> None: + def __init__(self, node_proxy_mgr: 'NodeProxyManager') -> None: super().__init__() - self.log = Logger(__name__) + self.log = get_logger(__name__) self.cp_shutdown_event = Event() - self.node_proxy = node_proxy - self.username = username - self.password = password - self.ssl_crt = ssl_crt - self.ssl_key = ssl_key - self.api = API(self.node_proxy.system, - self.node_proxy.reporter_agent, - self.node_proxy.config) + self.node_proxy_mgr = node_proxy_mgr + self.username = self.node_proxy_mgr.username + self.password = self.node_proxy_mgr.password + self.ssl_crt = self.node_proxy_mgr.api_ssl_crt + self.ssl_key = self.node_proxy_mgr.api_ssl_key + self.system = self.node_proxy_mgr.system + self.reporter_agent = self.node_proxy_mgr.reporter_agent + self.config = self.node_proxy_mgr.config + self.api = API(self.system, self.reporter_agent, self.config) def check_auth(self, realm: str, username: str, password: str) -> bool: return self.username == username and \ self.password == password def shutdown(self) -> None: - self.log.logger.info('Stopping node-proxy API...') + self.log.info('Stopping node-proxy API...') self.cp_shutdown_event.set() def run(self) -> None: - self.log.logger.info('node-proxy API configuration...') + self.log.info('node-proxy API configuration...') cherrypy.config.update({ 'environment': 'production', 'engine.autoreload.on': False, @@ -281,11 +275,11 @@ class NodeProxyApi(Thread): cherrypy.server.unsubscribe() try: cherrypy.engine.start() - self.log.logger.info('node-proxy API started.') + self.log.info('node-proxy API started.') self.cp_shutdown_event.wait() self.cp_shutdown_event.clear() - cherrypy.engine.stop() + cherrypy.engine.exit() cherrypy.server.httpserver = None - self.log.logger.info('node-proxy API shutdown.') + self.log.info('node-proxy API shutdown.') except Exception as e: - self.log.logger.error(f'node-proxy API error: {e}') + self.log.error(f'node-proxy API error: {e}') diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py index e80523fed1850..ea4e65cc6ac69 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py +++ b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py @@ -2,9 +2,8 @@ import concurrent.futures import json from ceph_node_proxy.basesystem import BaseSystem from ceph_node_proxy.redfish_client import RedFishClient -from threading import Thread, Lock from time import sleep -from ceph_node_proxy.util import Logger, retry +from ceph_node_proxy.util import get_logger from typing import Dict, Any, List, Callable, Union from urllib.error import HTTPError, URLError @@ -15,20 +14,16 @@ class BaseRedfishSystem(BaseSystem): self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1', '/UpdateService']) self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1') - self.log = Logger(__name__) + self.log = get_logger(__name__) self.host: str = kw['host'] self.port: str = kw['port'] self.username: str = kw['username'] self.password: str = kw['password'] # move the following line (class attribute?) self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password) - self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}') - - self.run: bool = False - self.thread: Thread + self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}') self.data_ready: bool = False self.previous_data: Dict = {} - self.lock: Lock = Lock() self.data: Dict[str, Dict[str, Any]] = {} self._system: Dict[str, Dict[str, Any]] = {} self._sys: Dict[str, Any] = {} @@ -44,71 +39,63 @@ class BaseRedfishSystem(BaseSystem): 'firmwares']) self.update_funcs: List[Callable] = [] for component in self.component_list: - self.log.logger.debug(f'adding: {component} to hw component gathered list.') + self.log.debug(f'adding: {component} to hw component gathered list.') func = f'_update_{component}' if hasattr(self, func): f = getattr(self, func) self.update_funcs.append(f) - self.start_client() - - def start_client(self) -> None: + def main(self) -> None: + self.stop = False self.client.login() - self.start_update_loop() - - def start_update_loop(self) -> None: - self.run = True - self.thread = Thread(target=self.update) - self.thread.start() - - def stop_update_loop(self) -> None: - self.run = False - self.thread.join() - - def update(self) -> None: - # this loop can have: - # - caching logic - while self.run: - self.log.logger.debug('waiting for a lock in the update loop.') - self.lock.acquire() - self.log.logger.debug('lock acquired in the update loop.') - try: - self._update_system() - self._update_sn() - - with concurrent.futures.ThreadPoolExecutor() as executor: - executor.map(lambda f: f(), self.update_funcs) - - self.data_ready = True - except RuntimeError as e: - self.run = False - self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}') - self.client.logout() - finally: - self.lock.release() - sleep(5) - self.log.logger.debug('lock released in the update loop.') + while not self.stop: + self.log.debug('waiting for a lock in the update loop.') + with self.lock: + if not self.pending_shutdown: + self.log.debug('lock acquired in the update loop.') + try: + self._update_system() + self._update_sn() + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(lambda f: f(), self.update_funcs) + + self.data_ready = True + except RuntimeError as e: + self.stop = True + self.log.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}') + self.client.logout() + raise + sleep(5) + self.log.debug('lock released in the update loop.') + self.log.debug('exiting update loop.') + raise SystemExit(0) def flush(self) -> None: - self.log.logger.debug('Acquiring lock to flush data.') + self.log.debug('Acquiring lock to flush data.') self.lock.acquire() - self.log.logger.debug('Lock acquired, flushing data.') + self.log.debug('Lock acquired, flushing data.') self._system = {} self.previous_data = {} - self.log.logger.info('Data flushed.') + self.log.info('Data flushed.') self.data_ready = False - self.log.logger.debug('Data marked as not ready.') + self.log.debug('Data marked as not ready.') self.lock.release() - self.log.logger.debug('Released the lock after flushing data.') + self.log.debug('Released the lock after flushing data.') - @retry(retries=10, delay=2) + # @retry(retries=10, delay=2) def _get_path(self, path: str) -> Dict: + result: Dict[str, Any] = {} try: - result = self.client.get_path(path) + if not self.pending_shutdown: + self.log.debug(f'Getting path: {path}') + result = self.client.get_path(path) + else: + self.log.debug(f'Pending shutdown, aborting query to {path}') except RuntimeError: raise if result is None: - self.log.logger.error(f'The client reported an error when getting path: {path}') + self.log.error(f'The client reported an error when getting path: {path}') raise RuntimeError(f'Could not get path: {path}') return result @@ -197,7 +184,7 @@ class BaseRedfishSystem(BaseSystem): endpoint=endpoint, timeout=10) except HTTPError as e: - self.log.logger.error(f"Couldn't get the ident device LED status for device '{device}': {e}") + self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}") raise response_json = json.loads(result[1]) _result: Dict[str, Any] = {'http_code': result[2]} @@ -215,7 +202,7 @@ class BaseRedfishSystem(BaseSystem): endpoint=self._sys['storage'][device]['redfish_endpoint'] ) except (HTTPError, KeyError) as e: - self.log.logger.error(f"Couldn't set the ident device LED for device '{device}': {e}") + self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}") raise return status @@ -226,7 +213,7 @@ class BaseRedfishSystem(BaseSystem): endpoint=endpoint, timeout=10) except HTTPError as e: - self.log.logger.error(f"Couldn't get the ident chassis LED status: {e}") + self.log.error(f"Couldn't get the ident chassis LED status: {e}") raise response_json = json.loads(result[1]) _result: Dict[str, Any] = {'http_code': result[2]} @@ -246,18 +233,18 @@ class BaseRedfishSystem(BaseSystem): endpoint=f'/redfish/v1{self.chassis_endpoint}' ) except HTTPError as e: - self.log.logger.error(f"Couldn't set the ident chassis LED: {e}") + self.log.error(f"Couldn't set the ident chassis LED: {e}") raise return status - def shutdown(self, force: bool = False) -> int: + def shutdown_host(self, force: bool = False) -> int: reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown' try: job_id: str = self.create_reboot_job(reboot_type) status = self.schedule_reboot_job(job_id) except (HTTPError, KeyError) as e: - self.log.logger.error(f"Couldn't create the reboot job: {e}") + self.log.error(f"Couldn't create the reboot job: {e}") raise return status @@ -266,7 +253,7 @@ class BaseRedfishSystem(BaseSystem): job_id: str = self.create_reboot_job('PowerCycle') status = self.schedule_reboot_job(job_id) except (HTTPError, URLError) as e: - self.log.logger.error(f"Couldn't perform power cycle: {e}") + self.log.error(f"Couldn't perform power cycle: {e}") raise return status @@ -279,7 +266,7 @@ class BaseRedfishSystem(BaseSystem): ) job_id: str = headers['Location'].split('/')[-1] except (HTTPError, URLError) as e: - self.log.logger.error(f"Couldn't create the reboot job: {e}") + self.log.error(f"Couldn't create the reboot job: {e}") raise return job_id @@ -291,6 +278,6 @@ class BaseRedfishSystem(BaseSystem): endpoint=self.setup_job_queue_endpoint ) except (HTTPError, KeyError) as e: - self.log.logger.error(f"Couldn't schedule the reboot job: {e}") + self.log.error(f"Couldn't schedule the reboot job: {e}") raise return status diff --git a/src/ceph-node-proxy/ceph_node_proxy/basesystem.py b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py index c2389d8dc85e7..65eca55af1f07 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/basesystem.py +++ b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py @@ -1,14 +1,21 @@ import socket -from ceph_node_proxy.util import Config +from threading import Lock +from ceph_node_proxy.util import Config, get_logger, BaseThread from typing import Dict, Any from ceph_node_proxy.baseclient import BaseClient -class BaseSystem: +class BaseSystem(BaseThread): def __init__(self, **kw: Any) -> None: + super().__init__() + self.lock: Lock = Lock() self._system: Dict = {} - self.config: Config = kw['config'] + self.config: Config = kw.get('config', {}) self.client: BaseClient + self.log = get_logger(__name__) + + def main(self) -> None: + raise NotImplementedError() def get_system(self) -> Dict[str, Any]: raise NotImplementedError() @@ -76,19 +83,13 @@ class BaseSystem: def get_host(self) -> str: return socket.gethostname() - def start_update_loop(self) -> None: - raise NotImplementedError() - def stop_update_loop(self) -> None: raise NotImplementedError() - def start_client(self) -> None: - raise NotImplementedError() - def flush(self) -> None: raise NotImplementedError() - def shutdown(self, force: bool = False) -> int: + def shutdown_host(self, force: bool = False) -> int: raise NotImplementedError() def powercycle(self) -> int: diff --git a/src/ceph-node-proxy/ceph_node_proxy/main.py b/src/ceph-node-proxy/ceph_node_proxy/main.py index 689089aa609f5..2a6479c4238e9 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/main.py +++ b/src/ceph-node-proxy/ceph_node_proxy/main.py @@ -1,176 +1,84 @@ -from threading import Thread from ceph_node_proxy.redfishdellsystem import RedfishDellSystem from ceph_node_proxy.api import NodeProxyApi from ceph_node_proxy.reporter import Reporter -from ceph_node_proxy.util import Config, Logger, http_req, write_tmp_file +from ceph_node_proxy.util import Config, get_logger, http_req, write_tmp_file, CONFIG from typing import Dict, Any, Optional import argparse -import traceback -import logging import os import ssl import json import time +import signal -logger = logging.getLogger(__name__) - -DEFAULT_CONFIG = { - 'reporter': { - 'check_interval': 5, - 'push_data_max_retries': 30, - 'endpoint': 'https://127.0.0.1:7150/node-proxy/data', - }, - 'system': { - 'refresh_interval': 5 - }, - 'server': { - 'port': 8080, - }, - 'logging': { - 'level': 20, - } -} - - -class NodeProxyManager(Thread): - def __init__(self, - mgr_host: str, - cephx_name: str, - cephx_secret: str, - ca_path: str, - api_ssl_crt: str, - api_ssl_key: str, - mgr_agent_port: int = 7150): - super().__init__() - self.mgr_host = mgr_host - self.cephx_name = cephx_name - self.cephx_secret = cephx_secret - self.ca_path = ca_path - self.api_ssl_crt = api_ssl_crt - self.api_ssl_key = api_ssl_key - self.mgr_agent_port = str(mgr_agent_port) - self.stop = False + +class NodeProxyManager: + def __init__(self, **kw: Any) -> None: + self.exc: Optional[Exception] = None + self.log = get_logger(__name__) + self.mgr_host: str = kw['mgr_host'] + self.cephx_name: str = kw['cephx_name'] + self.cephx_secret: str = kw['cephx_secret'] + self.ca_path: str = kw['ca_path'] + self.api_ssl_crt: str = kw['api_ssl_crt'] + self.api_ssl_key: str = kw['api_ssl_key'] + self.mgr_agent_port: str = str(kw['mgr_agent_port']) + self.stop: bool = False self.ssl_ctx = ssl.create_default_context() self.ssl_ctx.check_hostname = True self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED self.ssl_ctx.load_verify_locations(self.ca_path) + self.reporter_scheme: str = kw.get('reporter_scheme', 'https') + self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data') + self.cephx = {'cephx': {'name': self.cephx_name, + 'secret': self.cephx_secret}} + self.config = Config('/etc/ceph/node-proxy.yml', config=CONFIG) def run(self) -> None: self.init() self.loop() def init(self) -> None: - node_proxy_meta = { - 'cephx': { - 'name': self.cephx_name, - 'secret': self.cephx_secret - } - } + self.init_system() + self.init_reporter() + self.init_api() + + def fetch_oob_details(self) -> Dict[str, str]: headers, result, status = http_req(hostname=self.mgr_host, port=self.mgr_agent_port, - data=json.dumps(node_proxy_meta), + data=json.dumps(self.cephx), endpoint='/node-proxy/oob', ssl_ctx=self.ssl_ctx) if status != 200: msg = f'No out of band tool details could be loaded: {status}, {result}' - logger.debug(msg) + self.log.debug(msg) raise RuntimeError(msg) result_json = json.loads(result) - kwargs = { + oob_details: Dict[str, str] = { 'host': result_json['result']['addr'], 'username': result_json['result']['username'], 'password': result_json['result']['password'], - 'cephx': node_proxy_meta['cephx'], - 'mgr_host': self.mgr_host, - 'mgr_agent_port': self.mgr_agent_port, - 'api_ssl_crt': self.api_ssl_crt, - 'api_ssl_key': self.api_ssl_key + 'port': result_json['result'].get('port', '443') } - if result_json['result'].get('port'): - kwargs['port'] = result_json['result']['port'] - - self.node_proxy: NodeProxy = NodeProxy(**kwargs) - self.node_proxy.start() - - def loop(self) -> None: - while not self.stop: - try: - status = self.node_proxy.check_status() - label = 'Ok' if status else 'Critical' - logger.debug(f'node-proxy status: {label}') - except Exception as e: - logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') - time.sleep(120) - self.init() - else: - logger.debug('node-proxy alive, next check in 60sec.') - time.sleep(60) - - def shutdown(self) -> None: - self.stop = True - # if `self.node_proxy.shutdown()` is called before self.start(), it will fail. - if self.__dict__.get('node_proxy'): - self.node_proxy.shutdown() + return oob_details - -class NodeProxy(Thread): - def __init__(self, **kw: Any) -> None: - super().__init__() - self.username: str = kw.get('username', '') - self.password: str = kw.get('password', '') - self.host: str = kw.get('host', '') - self.port: int = kw.get('port', 443) - self.cephx: Dict[str, Any] = kw.get('cephx', {}) - self.reporter_scheme: str = kw.get('reporter_scheme', 'https') - self.mgr_host: str = kw.get('mgr_host', '') - self.mgr_agent_port: str = kw.get('mgr_agent_port', '') - self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data') - self.api_ssl_crt: str = kw.get('api_ssl_crt', '') - self.api_ssl_key: str = kw.get('api_ssl_key', '') - self.exc: Optional[Exception] = None - self.log = Logger(__name__) - - def run(self) -> None: + def init_system(self) -> None: + oob_details = self.fetch_oob_details() + self.username: str = oob_details['username'] + self.password: str = oob_details['password'] try: - self.main() - except Exception as e: - self.exc = e - return - - def shutdown(self) -> None: - self.log.logger.info('Shutting down node-proxy...') - self.system.client.logout() - self.system.stop_update_loop() - self.reporter_agent.stop() - - def check_status(self) -> bool: - if self.__dict__.get('system') and not self.system.run: - raise RuntimeError('node-proxy encountered an error.') - if self.exc: - traceback.print_tb(self.exc.__traceback__) - self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}') - raise self.exc - return True - - def main(self) -> None: - # TODO: add a check and fail if host/username/password/data aren't passed - self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG) - self.log = Logger(__name__, level=self.config.__dict__['logging']['level']) - - # create the redfish system and the obsever - self.log.logger.info('Server initialization...') - try: - self.system = RedfishDellSystem(host=self.host, - port=self.port, - username=self.username, - password=self.password, + self.system = RedfishDellSystem(host=oob_details['host'], + port=oob_details['port'], + username=oob_details['username'], + password=oob_details['password'], config=self.config) + self.system.start() except RuntimeError: - self.log.logger.error("Can't initialize the redfish system.") + self.log.error("Can't initialize the redfish system.") raise + def init_reporter(self) -> None: try: self.reporter_agent = Reporter(self.system, self.cephx, @@ -178,23 +86,54 @@ class NodeProxy(Thread): reporter_hostname=self.mgr_host, reporter_port=self.mgr_agent_port, reporter_endpoint=self.reporter_endpoint) - self.reporter_agent.run() + self.reporter_agent.start() except RuntimeError: - self.log.logger.error("Can't initialize the reporter.") + self.log.error("Can't initialize the reporter.") raise + def init_api(self) -> None: try: - self.log.logger.info('Starting node-proxy API...') - self.api = NodeProxyApi(self, - username=self.username, - password=self.password, - ssl_crt=self.api_ssl_crt, - ssl_key=self.api_ssl_key) + self.log.info('Starting node-proxy API...') + self.api = NodeProxyApi(self) self.api.start() except Exception as e: - self.log.logger.error(f"Can't start node-proxy API: {e}") + self.log.error(f"Can't start node-proxy API: {e}") raise + def loop(self) -> None: + while not self.stop: + for thread in [self.system, self.reporter_agent]: + try: + status = thread.check_status() + label = 'Ok' if status else 'Critical' + self.log.debug(f'{thread} status: {label}') + except Exception as e: + self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}') + thread.shutdown() + self.init_system() + self.init_reporter() + self.log.debug('All threads are alive, next check in 20sec.') + time.sleep(20) + + def shutdown(self) -> None: + self.stop = True + # if `self.system.shutdown()` is called before self.start(), it will fail. + if hasattr(self, 'api'): + self.api.shutdown() + if hasattr(self, 'reporter_agent'): + self.reporter_agent.shutdown() + if hasattr(self, 'system'): + self.system.shutdown() + + +def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None: + t_mgr.system.pending_shutdown = True + t_mgr.log.info('SIGTERM caught, shutting down threads...') + t_mgr.shutdown() + t_mgr.log.info('Logging out from RedFish API') + t_mgr.system.client.logout() + raise SystemExit(0) + def main() -> None: parser = argparse.ArgumentParser( @@ -205,8 +144,15 @@ def main() -> None: help='path of config file in json format', required=True ) + parser.add_argument( + '--debug', + help='increase logging verbosity (debug level)', + action='store_true', + ) args = parser.parse_args() + if args.debug: + CONFIG['logging']['level'] = 10 if not os.path.exists(args.config): raise Exception(f'No config file found at provided config path: {args.config}') @@ -226,18 +172,19 @@ def main() -> None: listener_key = config['listener.key'] name = config['name'] - f = write_tmp_file(root_cert, - prefix_name='cephadm-endpoint-root-cert') + ca_file = write_tmp_file(root_cert, + prefix_name='cephadm-endpoint-root-cert') node_proxy_mgr = NodeProxyManager(mgr_host=target_ip, cephx_name=name, cephx_secret=keyring, mgr_agent_port=target_port, - ca_path=f.name, + ca_path=ca_file.name, api_ssl_crt=listener_cert, api_ssl_key=listener_key) - if not node_proxy_mgr.is_alive(): - node_proxy_mgr.start() + signal.signal(signal.SIGTERM, + lambda signum, frame: handler(signum, frame, node_proxy_mgr)) + node_proxy_mgr.run() if __name__ == '__main__': diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py index eeca2e5ba217b..08ee4170dcc6a 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py +++ b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py @@ -1,7 +1,7 @@ import json from urllib.error import HTTPError, URLError from ceph_node_proxy.baseclient import BaseClient -from ceph_node_proxy.util import Logger, http_req +from ceph_node_proxy.util import get_logger, http_req from typing import Dict, Any, Tuple, Optional from http.client import HTTPMessage @@ -15,8 +15,8 @@ class RedFishClient(BaseClient): username: str = '', password: str = ''): super().__init__(host, username, password) - self.log: Logger = Logger(__name__) - self.log.logger.info(f'Initializing redfish client {__name__}') + self.log = get_logger(__name__) + self.log.info(f'Initializing redfish client {__name__}') self.host: str = host self.port: str = port self.url: str = f'https://{self.host}:{self.port}' @@ -25,8 +25,8 @@ class RedFishClient(BaseClient): def login(self) -> None: if not self.is_logged_in(): - self.log.logger.info('Logging in to ' - f"{self.url} as '{self.username}'") + self.log.info('Logging in to ' + f"{self.url} as '{self.username}'") oob_credentials = json.dumps({'UserName': self.username, 'Password': self.password}) headers = {'Content-Type': 'application/json'} @@ -36,27 +36,27 @@ class RedFishClient(BaseClient): headers=headers, endpoint='/redfish/v1/SessionService/Sessions/') if _status_code != 201: - self.log.logger.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}") + self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}") raise RuntimeError except URLError as e: msg = f"Can't log in to {self.url} as '{self.username}': {e}" - self.log.logger.error(msg) + self.log.error(msg) raise RuntimeError self.token = _headers['X-Auth-Token'] self.location = _headers['Location'] def is_logged_in(self) -> bool: - self.log.logger.debug(f'Checking token validity for {self.url}') + self.log.debug(f'Checking token validity for {self.url}') if not self.location or not self.token: - self.log.logger.debug(f'No token found for {self.url}.') + self.log.debug(f'No token found for {self.url}.') return False headers = {'X-Auth-Token': self.token} try: _headers, _data, _status_code = self.query(headers=headers, endpoint=self.location) except URLError as e: - self.log.logger.error("Can't check token " - f'validity for {self.url}: {e}') + self.log.error("Can't check token " + f'validity for {self.url}: {e}') raise return _status_code == 200 @@ -69,7 +69,7 @@ class RedFishClient(BaseClient): endpoint=self.location) result = json.loads(_data) except URLError: - self.log.logger.error(f"Can't log out from {self.url}") + self.log.error(f"Can't log out from {self.url}") self.location = '' self.token = '' @@ -84,7 +84,7 @@ class RedFishClient(BaseClient): result_json = json.loads(result) return result_json except URLError as e: - self.log.logger.error(f"Can't get path {path}:\n{e}") + self.log.error(f"Can't get path {path}:\n{e}") raise RuntimeError def query(self, @@ -111,5 +111,5 @@ class RedFishClient(BaseClient): return response_headers, response_str, response_status except (HTTPError, URLError) as e: - self.log.logger.debug(f'{e}') + self.log.debug(f'{e}') raise diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py index 83b73657b1aac..f0d24c667c96d 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py +++ b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py @@ -1,12 +1,12 @@ from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem -from ceph_node_proxy.util import Logger, normalize_dict, to_snake_case +from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case from typing import Dict, Any, List class RedfishDellSystem(BaseRedfishSystem): def __init__(self, **kw: Any) -> None: super().__init__(**kw) - self.log = Logger(__name__) + self.log = get_logger(__name__) self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService' self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob' self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue' @@ -23,7 +23,7 @@ class RedfishDellSystem(BaseRedfishSystem): try: result[member_id][to_snake_case(field)] = member_info[field] except KeyError: - self.log.logger.warning(f'Could not find field: {field} in member_info: {member_info}') + self.log.warning(f'Could not find field: {field} in member_info: {member_info}') return normalize_dict(result) @@ -41,7 +41,7 @@ class RedfishDellSystem(BaseRedfishSystem): try: result[_id][to_snake_case(field)] = member_elt[field] except KeyError: - self.log.logger.warning(f'Could not find field: {field} in data: {data[elt]}') + self.log.warning(f'Could not find field: {field} in data: {data[elt]}') return normalize_dict(result) def get_sn(self) -> str: @@ -73,7 +73,7 @@ class RedfishDellSystem(BaseRedfishSystem): def _update_network(self) -> None: fields = ['Description', 'Name', 'SpeedMbps', 'Status'] - self.log.logger.debug('Updating network') + self.log.debug('Updating network') self._sys['network'] = self.build_common_data(data=self._system['Systems'], fields=fields, path='EthernetInterfaces') @@ -86,7 +86,7 @@ class RedfishDellSystem(BaseRedfishSystem): 'Model', 'Status', 'Manufacturer'] - self.log.logger.debug('Updating processors') + self.log.debug('Updating processors') self._sys['processors'] = self.build_common_data(data=self._system['Systems'], fields=fields, path='Processors') @@ -100,7 +100,7 @@ class RedfishDellSystem(BaseRedfishSystem): 'PhysicalLocation'] entities = self.get_members(data=self._system['Systems'], path='Storage') - self.log.logger.debug('Updating storage') + self.log.debug('Updating storage') result: Dict[str, Dict[str, Dict]] = dict() for entity in entities: for drive in entity['Drives']: @@ -115,7 +115,7 @@ class RedfishDellSystem(BaseRedfishSystem): self._sys['storage'] = normalize_dict(result) def _update_sn(self) -> None: - self.log.logger.debug('Updating serial number') + self.log.debug('Updating serial number') self._sys['SKU'] = self._system['Systems']['SKU'] def _update_memory(self) -> None: @@ -123,7 +123,7 @@ class RedfishDellSystem(BaseRedfishSystem): 'MemoryDeviceType', 'CapacityMiB', 'Status'] - self.log.logger.debug('Updating memory') + self.log.debug('Updating memory') self._sys['memory'] = self.build_common_data(data=self._system['Systems'], fields=fields, path='Memory') @@ -137,7 +137,7 @@ class RedfishDellSystem(BaseRedfishSystem): 'Status' ] } - self.log.logger.debug('Updating powersupplies') + self.log.debug('Updating powersupplies') self._sys['power'] = self.build_chassis_data(fields, 'Power') def _update_fans(self) -> None: @@ -148,7 +148,7 @@ class RedfishDellSystem(BaseRedfishSystem): 'Status' ], } - self.log.logger.debug('Updating fans') + self.log.debug('Updating fans') self._sys['fans'] = self.build_chassis_data(fields, 'Thermal') def _update_firmwares(self) -> None: @@ -160,7 +160,7 @@ class RedfishDellSystem(BaseRedfishSystem): 'Updateable', 'Status', ] - self.log.logger.debug('Updating firmwares') + self.log.debug('Updating firmwares') self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'], fields=fields, path='FirmwareInventory') diff --git a/src/ceph-node-proxy/ceph_node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py index aa16d83421f3e..4e9c4e2798b31 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/reporter.py +++ b/src/ceph-node-proxy/ceph_node_proxy/reporter.py @@ -1,12 +1,11 @@ -from threading import Thread import time import json -from ceph_node_proxy.util import Logger, http_req +from ceph_node_proxy.util import get_logger, http_req, BaseThread from urllib.error import HTTPError, URLError from typing import Dict, Any -class Reporter: +class Reporter(BaseThread): def __init__(self, system: Any, cephx: Dict[str, Any], @@ -14,61 +13,57 @@ class Reporter: reporter_hostname: str = '', reporter_port: str = '443', reporter_endpoint: str = '/node-proxy/data') -> None: + super().__init__() self.system = system self.data: Dict[str, Any] = {} - self.finish = False + self.stop: bool = False self.cephx = cephx - self.data['cephx'] = self.cephx + self.data['cephx'] = self.cephx['cephx'] self.reporter_scheme: str = reporter_scheme self.reporter_hostname: str = reporter_hostname self.reporter_port: str = reporter_port self.reporter_endpoint: str = reporter_endpoint - self.log = Logger(__name__) + self.log = get_logger(__name__) self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:' f'{reporter_port}{reporter_endpoint}') - self.log.logger.info(f'Reporter url set to {self.reporter_url}') + self.log.info(f'Reporter url set to {self.reporter_url}') - def stop(self) -> None: - self.finish = True - self.thread.join() - - def run(self) -> None: - self.thread = Thread(target=self.loop) - self.thread.start() - - def loop(self) -> None: - while not self.finish: + def main(self) -> None: + while not self.stop: # Any logic to avoid sending the all the system # information every loop can go here. In a real # scenario probably we should just send the sub-parts # that have changed to minimize the traffic in # dense clusters - self.log.logger.debug('waiting for a lock in reporter loop.') - self.system.lock.acquire() - self.log.logger.debug('lock acquired in reporter loop.') - if self.system.data_ready: - self.log.logger.debug('data ready to be sent to the mgr.') - if not self.system.get_system() == self.system.previous_data: - self.log.logger.info('data has changed since last iteration.') - self.data['patch'] = self.system.get_system() - try: - # TODO: add a timeout parameter to the reporter in the config file - self.log.logger.info(f'sending data to {self.reporter_url}') - http_req(hostname=self.reporter_hostname, - port=self.reporter_port, - method='POST', - headers={'Content-Type': 'application/json'}, - endpoint=self.reporter_endpoint, - scheme=self.reporter_scheme, - data=json.dumps(self.data)) - except (HTTPError, URLError) as e: - self.log.logger.error(f"The reporter couldn't send data to the mgr: {e}") - # Need to add a new parameter 'max_retries' to the reporter if it can't - # send the data for more than x times, maybe the daemon should stop altogether - else: - self.system.previous_data = self.system.get_system() - else: - self.log.logger.debug('no diff, not sending data to the mgr.') - self.system.lock.release() - self.log.logger.debug('lock released in reporter loop.') - time.sleep(5) + self.log.debug('waiting for a lock in reporter loop.') + with self.system.lock: + if not self.system.pending_shutdown: + self.log.debug('lock acquired in reporter loop.') + if self.system.data_ready: + self.log.debug('data ready to be sent to the mgr.') + if not self.system.get_system() == self.system.previous_data: + self.log.info('data has changed since last iteration.') + self.data['patch'] = self.system.get_system() + try: + # TODO: add a timeout parameter to the reporter in the config file + self.log.info(f'sending data to {self.reporter_url}') + http_req(hostname=self.reporter_hostname, + port=self.reporter_port, + method='POST', + headers={'Content-Type': 'application/json'}, + endpoint=self.reporter_endpoint, + scheme=self.reporter_scheme, + data=json.dumps(self.data)) + except (HTTPError, URLError) as e: + self.log.error(f"The reporter couldn't send data to the mgr: {e}") + raise + # Need to add a new parameter 'max_retries' to the reporter if it can't + # send the data for more than x times, maybe the daemon should stop altogether + else: + self.system.previous_data = self.system.get_system() + else: + self.log.debug('no diff, not sending data to the mgr.') + time.sleep(5) + self.log.debug('lock released in reporter loop.') + self.log.debug('exiting reporter loop.') + raise SystemExit(0) diff --git a/src/ceph-node-proxy/ceph_node_proxy/util.py b/src/ceph-node-proxy/ceph_node_proxy/util.py index a94acc9ed5564..f6ed0fb483d6d 100644 --- a/src/ceph-node-proxy/ceph_node_proxy/util.py +++ b/src/ceph-node-proxy/ceph_node_proxy/util.py @@ -4,43 +4,57 @@ import os import time import re import ssl +import traceback +import threading from tempfile import NamedTemporaryFile, _TemporaryFileWrapper from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request -from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple - - -class Logger: - _Logger: List['Logger'] = [] - - def __init__(self, name: str, level: int = logging.INFO): - self.name = name - self.level = level - - Logger._Logger.append(self) - self.logger = self.get_logger() - - def get_logger(self) -> logging.Logger: - logger = logging.getLogger(self.name) - logger.setLevel(self.level) - handler = logging.StreamHandler() - handler.setLevel(self.level) - fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(fmt) - logger.handlers.clear() - logger.addHandler(handler) - logger.propagate = False - - return logger +from typing import Dict, Callable, Any, Optional, MutableMapping, Tuple, Union + + +CONFIG: Dict[str, Any] = { + 'reporter': { + 'check_interval': 5, + 'push_data_max_retries': 30, + 'endpoint': 'https://%(mgr_host):%(mgr_port)/node-proxy/data', + }, + 'system': { + 'refresh_interval': 5 + }, + 'api': { + 'port': 9456, + }, + 'logging': { + 'level': logging.INFO, + } +} + + +def get_logger(name: str, level: Union[int, str] = logging.NOTSET) -> logging.Logger: + if level == logging.NOTSET: + log_level = CONFIG['logging']['level'] + logger = logging.getLogger(name) + logger.setLevel(log_level) + handler = logging.StreamHandler() + handler.setLevel(log_level) + fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(fmt) + logger.handlers.clear() + logger.addHandler(handler) + logger.propagate = False + + return logger + + +logger = get_logger(__name__) class Config: - def __init__(self, config_file: str = '/etc/ceph/node-proxy.yaml', - default_config: Dict[str, Any] = {}) -> None: + config: Dict[str, Any] = {}) -> None: self.config_file = config_file - self.default_config = default_config + self.config = config self.load_config() @@ -49,27 +63,56 @@ class Config: with open(self.config_file, 'r') as f: self.config = yaml.safe_load(f) else: - self.config = self.default_config + self.config = self.config - for k, v in self.default_config.items(): + for k, v in self.config.items(): if k not in self.config.keys(): self.config[k] = v for k, v in self.config.items(): setattr(self, k, v) - # TODO: need to be improved - for _l in Logger._Logger: - _l.logger.setLevel(self.logging['level']) # type: ignore - _l.logger.handlers[0].setLevel(self.logging['level']) # type: ignore - def reload(self, config_file: str = '') -> None: if config_file != '': self.config_file = config_file self.load_config() -log = Logger(__name__) +class BaseThread(threading.Thread): + def __init__(self) -> None: + super().__init__() + self.exc: Optional[Exception] = None + self.stop: bool = False + self.daemon = True + self.name = self.__class__.__name__ + self.log: logging.Logger = get_logger(__name__) + self.pending_shutdown: bool = False + + def run(self) -> None: + logger.info(f'Starting {self.name}') + try: + self.main() + except Exception as e: + self.exc = e + return + + def shutdown(self) -> None: + self.stop = True + self.pending_shutdown = True + + def check_status(self) -> bool: + logger.debug(f'Checking status of {self.name}') + if self.exc: + traceback.print_tb(self.exc.__traceback__) + logger.error(f'Caught exception: {self.exc.__class__.__name__}') + raise self.exc + if not self.is_alive(): + logger.info(f'{self.name} not alive') + self.start() + return True + + def main(self) -> None: + raise NotImplementedError() def to_snake_case(name: str) -> str: @@ -93,12 +136,12 @@ def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Cal _tries = retries while _tries > 1: try: - log.logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1)) + logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1)) return f(*args, **kwargs) except exceptions: time.sleep(delay) _tries -= 1 - log.logger.warn('{} has failed after {} tries'.format(f, retries)) + logger.warn('{} has failed after {} tries'.format(f, retries)) return f(*args, **kwargs) return _retry return decorator diff --git a/src/cephadm/cephadmlib/daemons/node_proxy.py b/src/cephadm/cephadmlib/daemons/node_proxy.py index a4cce11a53ca9..c197ded3ff6c1 100644 --- a/src/cephadm/cephadmlib/daemons/node_proxy.py +++ b/src/cephadm/cephadmlib/daemons/node_proxy.py @@ -24,7 +24,7 @@ class NodeProxy(ContainerDaemonForm): daemon_type = 'node-proxy' # TODO: update this if we make node-proxy an executable - entrypoint = 'python3' + entrypoint = '/usr/sbin/ceph-node-proxy' required_files = ['node-proxy.json'] @classmethod @@ -88,7 +88,7 @@ class NodeProxy(ContainerDaemonForm): # the config in _get_container_mounts above. They # will both need to be updated when we have a proper # location in the container for node-proxy - args.extend(['/usr/share/ceph/ceph_node_proxy/main.py', '--config', '/usr/share/ceph/node-proxy.json']) + args.extend(['--config', '/usr/share/ceph/node-proxy.json']) def validate(self): # type: () -> None @@ -130,7 +130,7 @@ class NodeProxy(ContainerDaemonForm): def container(self, ctx: CephadmContext) -> CephContainer: # So the container can modprobe iscsi_target_mod and have write perms # to configfs we need to make this a privileged container. - ctr = daemon_to_container(ctx, self, privileged=True, envs=['PYTHONPATH=$PYTHONPATH:/usr/share/ceph']) + ctr = daemon_to_container(ctx, self, privileged=True) return to_deployment_container(ctx, ctr) def config_and_keyring( diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index e09b98d0e4d73..12c03901de8d0 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -361,7 +361,7 @@ class NodeProxyEndpoint: try: headers, result, status = http_req(hostname=addr, - port='8080', + port='9456', headers=header, method=method, data=json.dumps(payload), diff --git a/src/pybind/mgr/cephadm/services/node_proxy.py b/src/pybind/mgr/cephadm/services/node_proxy.py index ebbbaf212c742..e5608ca42b51f 100644 --- a/src/pybind/mgr/cephadm/services/node_proxy.py +++ b/src/pybind/mgr/cephadm/services/node_proxy.py @@ -105,7 +105,7 @@ class NodeProxy(CephService): try: headers, result, status = http_req(hostname=addr, - port='8080', + port='9456', headers=header, method=method, data=json.dumps(payload), @@ -142,7 +142,7 @@ class NodeProxy(CephService): try: headers, result, status = http_req(hostname=addr, - port='8080', + port='9456', headers=header, data=json.dumps(payload), endpoint=endpoint, @@ -165,7 +165,7 @@ class NodeProxy(CephService): try: headers, result, status = http_req(hostname=addr, - port='8080', + port='9456', headers=header, data="{}", endpoint=endpoint,