From 3b8c945a6afeb7c8ab9f862468929f579c78adc3 Mon Sep 17 00:00:00 2001 From: Guillaume Abrioux Date: Mon, 15 Jan 2024 12:38:39 +0000 Subject: [PATCH] node-proxy: make it a separate daemon The current implementation requires the inclusion of all the recent modifications in the cephadm binary, which won't be backported. Since we need the node-proxy code backported to reef, let's move the code make it a separate daemon. Signed-off-by: Guillaume Abrioux Co-authored-by: Adam King --- .../ceph_node_proxy}/__init__.py | 0 src/ceph-node-proxy/ceph_node_proxy/api.py | 291 ++++++++++++++++++ .../ceph_node_proxy}/baseclient.py | 0 .../ceph_node_proxy/baseredfishsystem.py | 289 +++++++++++++++++ .../ceph_node_proxy}/basesystem.py | 34 +- src/ceph-node-proxy/ceph_node_proxy/main.py | 244 +++++++++++++++ .../ceph_node_proxy}/redfish_client.py | 21 +- .../ceph_node_proxy}/redfishdellsystem.py | 31 +- .../ceph_node_proxy}/reporter.py | 2 +- .../ceph_node_proxy}/util.py | 17 +- src/cephadm/cephadm.py | 117 +------ src/cephadm/cephadmlib/daemons/__init__.py | 2 + src/cephadm/cephadmlib/daemons/node_proxy.py | 145 +++++++++ .../node_proxy/baseredfishsystem.py | 152 --------- src/cephadm/cephadmlib/node_proxy/main.py | 94 ------ src/mypy.ini | 2 + src/pybind/mgr/cephadm/agent.py | 104 ++++--- src/pybind/mgr/cephadm/inventory.py | 23 +- src/pybind/mgr/cephadm/module.py | 84 ++++- src/pybind/mgr/cephadm/serve.py | 3 + .../mgr/cephadm/services/cephadmservice.py | 12 +- src/pybind/mgr/cephadm/services/node_proxy.py | 180 +++++++++++ .../mgr/cephadm/tests/test_node_proxy.py | 16 +- src/pybind/mgr/cephadm/utils.py | 2 +- src/pybind/mgr/orchestrator/_interface.py | 29 ++ src/pybind/mgr/orchestrator/module.py | 45 +++ .../ceph/deployment/service_spec.py | 3 +- src/python-common/ceph/utils.py | 48 ++- 28 files changed, 1511 insertions(+), 479 deletions(-) rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/__init__.py (100%) create mode 100644 src/ceph-node-proxy/ceph_node_proxy/api.py rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/baseclient.py (100%) create mode 100644 src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/basesystem.py (64%) create mode 100644 src/ceph-node-proxy/ceph_node_proxy/main.py rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/redfish_client.py (88%) rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/redfishdellsystem.py (84%) rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/reporter.py (98%) rename src/{cephadm/cephadmlib/node_proxy => ceph-node-proxy/ceph_node_proxy}/util.py (88%) create mode 100644 src/cephadm/cephadmlib/daemons/node_proxy.py delete mode 100644 src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py delete mode 100644 src/cephadm/cephadmlib/node_proxy/main.py create mode 100644 src/pybind/mgr/cephadm/services/node_proxy.py diff --git a/src/cephadm/cephadmlib/node_proxy/__init__.py b/src/ceph-node-proxy/ceph_node_proxy/__init__.py similarity index 100% rename from src/cephadm/cephadmlib/node_proxy/__init__.py rename to src/ceph-node-proxy/ceph_node_proxy/__init__.py diff --git a/src/ceph-node-proxy/ceph_node_proxy/api.py b/src/ceph-node-proxy/ceph_node_proxy/api.py new file mode 100644 index 0000000000000..93e41def3bfe9 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/api.py @@ -0,0 +1,291 @@ +import cherrypy +from urllib.error import HTTPError +from cherrypy._cpserver import Server +from threading import Thread, Event +from typing import Dict, Any, List +from ceph_node_proxy.util import Config, Logger, write_tmp_file +from ceph_node_proxy.basesystem import BaseSystem +from ceph_node_proxy.reporter import Reporter +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from ceph_node_proxy.main import NodeProxy + + +@cherrypy.tools.auth_basic(on=True) +@cherrypy.tools.allow(methods=['PUT']) +@cherrypy.tools.json_out() +class Admin(): + def __init__(self, api: 'API') -> None: + self.api = api + + @cherrypy.expose + def start(self) -> Dict[str, str]: + self.api.backend.start_client() + # self.backend.start_update_loop() + self.api.reporter.run() + return {'ok': 'node-proxy daemon started'} + + @cherrypy.expose + def reload(self) -> Dict[str, str]: + self.api.config.reload() + return {'ok': 'node-proxy config reloaded'} + + def _stop(self) -> None: + self.api.backend.stop_update_loop() + self.api.backend.client.logout() + self.api.reporter.stop() + + @cherrypy.expose + def stop(self) -> Dict[str, str]: + self._stop() + return {'ok': 'node-proxy daemon stopped'} + + @cherrypy.expose + def shutdown(self) -> Dict[str, str]: + self._stop() + cherrypy.engine.exit() + return {'ok': 'Server shutdown.'} + + @cherrypy.expose + def flush(self) -> Dict[str, str]: + self.api.backend.flush() + return {'ok': 'node-proxy data flushed'} + + +class API(Server): + def __init__(self, + backend: 'BaseSystem', + reporter: 'Reporter', + config: 'Config', + addr: str = '0.0.0.0', + port: int = 0) -> None: + super().__init__() + self.log = Logger(__name__) + self.backend = backend + self.reporter = reporter + self.config = config + self.socket_port = self.config.__dict__['server']['port'] if not port else port + self.socket_host = addr + self.subscribe() + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def memory(self) -> Dict[str, Any]: + return {'memory': self.backend.get_memory()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def network(self) -> Dict[str, Any]: + return {'network': self.backend.get_network()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def processors(self) -> Dict[str, Any]: + return {'processors': self.backend.get_processors()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def storage(self) -> Dict[str, Any]: + return {'storage': self.backend.get_storage()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def power(self) -> Dict[str, Any]: + return {'power': self.backend.get_power()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def fans(self) -> Dict[str, Any]: + return {'fans': self.backend.get_fans()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def firmwares(self) -> Dict[str, Any]: + return {'firmwares': self.backend.get_firmwares()} + + def _cp_dispatch(self, vpath: List[str]) -> 'API': + if vpath[0] == 'led' and len(vpath) > 1: # /led/{type}/{id} + _type = vpath[1] + cherrypy.request.params['type'] = _type + vpath.pop(1) # /led/{id} or # /led + if _type == 'drive' and len(vpath) > 1: # /led/{id} + _id = vpath[1] + vpath.pop(1) # /led + cherrypy.request.params['id'] = _id + vpath[0] = '_led' + # / + return self + + @cherrypy.expose + @cherrypy.tools.allow(methods=['POST']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def shutdown(self, **kw: Any) -> int: + data: Dict[str, bool] = cherrypy.request.json + + if 'force' not in data.keys(): + msg = "The key 'force' wasn't passed." + self.log.logger.debug(msg) + raise cherrypy.HTTPError(400, msg) + try: + result: int = self.backend.shutdown(force=data['force']) + except HTTPError as e: + raise cherrypy.HTTPError(e.code, e.reason) + return result + + @cherrypy.expose + @cherrypy.tools.allow(methods=['POST']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def powercycle(self, **kw: Any) -> int: + try: + result: int = self.backend.powercycle() + except HTTPError as e: + raise cherrypy.HTTPError(e.code, e.reason) + return result + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET', 'PATCH']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def _led(self, **kw: Any) -> Dict[str, Any]: + method: str = cherrypy.request.method + led_type: Optional[str] = kw.get('type') + id_drive: Optional[str] = kw.get('id') + result: Dict[str, Any] = dict() + + if not led_type: + msg = "the led type must be provided (either 'chassis' or 'drive')." + self.log.logger.debug(msg) + raise cherrypy.HTTPError(400, msg) + + if led_type == 'drive': + id_drive_required = not id_drive + if id_drive_required or id_drive not in self.backend.get_storage(): + msg = 'A valid device ID must be provided.' + self.log.logger.debug(msg) + raise cherrypy.HTTPError(400, msg) + + try: + if method == 'PATCH': + data: Dict[str, Any] = cherrypy.request.json + + if 'state' not in data or data['state'] not in ['on', 'off']: + msg = "Invalid data. 'state' must be provided and have a valid value (on|off)." + self.log.logger.error(msg) + raise cherrypy.HTTPError(400, msg) + + func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else + self.backend.device_led_off if led_type == 'drive' and data['state'] == 'off' else + self.backend.chassis_led_on if led_type != 'drive' and data['state'] == 'on' else + self.backend.chassis_led_off if led_type != 'drive' and data['state'] == 'off' else None) + + else: + func = self.backend.get_device_led if led_type == 'drive' else self.backend.get_chassis_led + + result = func(id_drive) if led_type == 'drive' else func() + + except HTTPError as e: + raise cherrypy.HTTPError(e.code, e.reason) + return result + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]: + return self.backend.get_led() + + @cherrypy.expose + @cherrypy.tools.allow(methods=['PATCH']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def set_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]: + data = cherrypy.request.json + rc = self.backend.set_led(data) + + if rc != 200: + cherrypy.response.status = rc + result = {'state': 'error: please, verify the data you sent.'} + else: + result = {'state': data['state'].lower()} + return result + + def stop(self) -> None: + self.unsubscribe() + super().stop() + + +class NodeProxyApi(Thread): + def __init__(self, + node_proxy: 'NodeProxy', + username: str, + password: str, + ssl_crt: str, + ssl_key: str) -> None: + super().__init__() + self.log = Logger(__name__) + self.cp_shutdown_event = Event() + self.node_proxy = node_proxy + self.username = username + self.password = password + self.ssl_crt = ssl_crt + self.ssl_key = ssl_key + self.api = API(self.node_proxy.system, + self.node_proxy.reporter_agent, + self.node_proxy.config) + + def check_auth(self, realm: str, username: str, password: str) -> bool: + return self.username == username and \ + self.password == password + + def shutdown(self) -> None: + self.log.logger.info('Stopping node-proxy API...') + self.cp_shutdown_event.set() + + def run(self) -> None: + self.log.logger.info('node-proxy API configuration...') + cherrypy.config.update({ + 'environment': 'production', + 'engine.autoreload.on': False, + 'log.screen': True, + }) + config = {'/': { + 'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'), + 'tools.trailing_slash.on': False, + 'tools.auth_basic.realm': 'localhost', + 'tools.auth_basic.checkpassword': self.check_auth + }} + cherrypy.tree.mount(self.api, '/', config=config) + # cherrypy.tree.mount(admin, '/admin', config=config) + + ssl_crt = write_tmp_file(self.ssl_crt, + prefix_name='listener-crt-') + ssl_key = write_tmp_file(self.ssl_key, + prefix_name='listener-key-') + + self.api.ssl_certificate = ssl_crt.name + self.api.ssl_private_key = ssl_key.name + + cherrypy.server.unsubscribe() + try: + cherrypy.engine.start() + self.log.logger.info('node-proxy API started.') + self.cp_shutdown_event.wait() + self.cp_shutdown_event.clear() + cherrypy.engine.stop() + cherrypy.server.httpserver = None + self.log.logger.info('node-proxy API shutdown.') + except Exception as e: + self.log.logger.error(f'node-proxy API error: {e}') diff --git a/src/cephadm/cephadmlib/node_proxy/baseclient.py b/src/ceph-node-proxy/ceph_node_proxy/baseclient.py similarity index 100% rename from src/cephadm/cephadmlib/node_proxy/baseclient.py rename to src/ceph-node-proxy/ceph_node_proxy/baseclient.py diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py new file mode 100644 index 0000000000000..98f117196158e --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py @@ -0,0 +1,289 @@ +import concurrent.futures +import json +from ceph_node_proxy.basesystem import BaseSystem +from ceph_node_proxy.redfish_client import RedFishClient +from threading import Thread, Lock +from time import sleep +from ceph_node_proxy.util import Logger, retry +from typing import Dict, Any, List, Callable, Union +from urllib.error import HTTPError, URLError + + +class BaseRedfishSystem(BaseSystem): + def __init__(self, **kw: Any) -> None: + super().__init__(**kw) + self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1', + '/UpdateService']) + self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1') + self.log = Logger(__name__) + self.host: str = kw['host'] + self.port: str = kw['port'] + self.username: str = kw['username'] + self.password: str = kw['password'] + # move the following line (class attribute?) + self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password) + self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}') + + self.run: bool = False + self.thread: Thread + self.data_ready: bool = False + self.previous_data: Dict = {} + self.lock: Lock = Lock() + self.data: Dict[str, Dict[str, Any]] = {} + self._system: Dict[str, Dict[str, Any]] = {} + self._sys: Dict[str, Any] = {} + self.job_service_endpoint: str = '' + self.create_reboot_job_endpoint: str = '' + self.setup_job_queue_endpoint: str = '' + + self.start_client() + + def start_client(self) -> None: + self.client.login() + self.start_update_loop() + + def start_update_loop(self) -> None: + self.run = True + self.thread = Thread(target=self.update) + self.thread.start() + + def stop_update_loop(self) -> None: + self.run = False + self.thread.join() + + def update(self) -> None: + # this loop can have: + # - caching logic + while self.run: + self.log.logger.debug('waiting for a lock in the update loop.') + self.lock.acquire() + self.log.logger.debug('lock acquired in the update loop.') + try: + self._update_system() + self._update_sn() + update_funcs = [self._update_memory, + self._update_power, + self._update_fans, + self._update_network, + self._update_processors, + self._update_storage, + self._update_firmwares] + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(lambda f: f(), update_funcs) + + self.data_ready = True + sleep(5) + except RuntimeError as e: + self.run = False + self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}') + self.client.logout() + finally: + self.lock.release() + self.log.logger.debug('lock released in the update loop.') + + def flush(self) -> None: + self.log.logger.debug('Acquiring lock to flush data.') + self.lock.acquire() + self.log.logger.debug('Lock acquired, flushing data.') + self._system = {} + self.previous_data = {} + self.log.logger.info('Data flushed.') + self.data_ready = False + self.log.logger.debug('Data marked as not ready.') + self.lock.release() + self.log.logger.debug('Released the lock after flushing data.') + + @retry(retries=10, delay=2) + def _get_path(self, path: str) -> Dict: + try: + result = self.client.get_path(path) + except RuntimeError: + raise + if result is None: + self.log.logger.error(f'The client reported an error when getting path: {path}') + raise RuntimeError(f'Could not get path: {path}') + return result + + def get_members(self, data: Dict[str, Any], path: str) -> List: + _path = data[path]['@odata.id'] + _data = self._get_path(_path) + return [self._get_path(member['@odata.id']) for member in _data['Members']] + + def get_system(self) -> Dict[str, Any]: + result = { + 'host': self.get_host(), + 'sn': self.get_sn(), + 'status': { + 'storage': self.get_storage(), + 'processors': self.get_processors(), + 'network': self.get_network(), + 'memory': self.get_memory(), + 'power': self.get_power(), + 'fans': self.get_fans() + }, + 'firmwares': self.get_firmwares(), + 'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'} # TODO(guits): not ideal + } + return result + + def _update_system(self) -> None: + for endpoint in self.common_endpoints: + result = self.client.get_path(endpoint) + _endpoint = endpoint.strip('/').split('/')[0] + self._system[_endpoint] = result + + def _update_sn(self) -> None: + raise NotImplementedError() + + def _update_memory(self) -> None: + raise NotImplementedError() + + def _update_power(self) -> None: + raise NotImplementedError() + + def _update_fans(self) -> None: + raise NotImplementedError() + + def _update_network(self) -> None: + raise NotImplementedError() + + def _update_processors(self) -> None: + raise NotImplementedError() + + def _update_storage(self) -> None: + raise NotImplementedError() + + def _update_firmwares(self) -> None: + raise NotImplementedError() + + def device_led_on(self, device: str) -> int: + data: Dict[str, bool] = {'LocationIndicatorActive': True} + try: + result = self.set_device_led(device, data) + except (HTTPError, KeyError): + return 0 + return result + + def device_led_off(self, device: str) -> int: + data: Dict[str, bool] = {'LocationIndicatorActive': False} + try: + result = self.set_device_led(device, data) + except (HTTPError, KeyError): + return 0 + return result + + def chassis_led_on(self) -> int: + data: Dict[str, str] = {'IndicatorLED': 'Blinking'} + result = self.set_chassis_led(data) + return result + + def chassis_led_off(self) -> int: + data: Dict[str, str] = {'IndicatorLED': 'Lit'} + result = self.set_chassis_led(data) + return result + + def get_device_led(self, device: str) -> Dict[str, Any]: + endpoint = self._sys['storage'][device]['redfish_endpoint'] + try: + result = self.client.query(method='GET', + endpoint=endpoint, + timeout=10) + except HTTPError as e: + self.log.logger.error(f"Couldn't get the ident device LED status for device '{device}': {e}") + raise + response_json = json.loads(result[1]) + _result: Dict[str, Any] = {'http_code': result[2]} + if result[2] == 200: + _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive'] + else: + _result['LocationIndicatorActive'] = None + return _result + + def set_device_led(self, device: str, data: Dict[str, bool]) -> int: + try: + _, response, status = self.client.query( + data=json.dumps(data), + method='PATCH', + endpoint=self._sys['storage'][device]['redfish_endpoint'] + ) + except (HTTPError, KeyError) as e: + self.log.logger.error(f"Couldn't set the ident device LED for device '{device}': {e}") + raise + return status + + def get_chassis_led(self) -> Dict[str, Any]: + endpoint = f'/redfish/v1/{self.chassis_endpoint}' + try: + result = self.client.query(method='GET', + endpoint=endpoint, + timeout=10) + except HTTPError as e: + self.log.logger.error(f"Couldn't get the ident chassis LED status: {e}") + raise + response_json = json.loads(result[1]) + _result: Dict[str, Any] = {'http_code': result[2]} + if result[2] == 200: + _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive'] + else: + _result['LocationIndicatorActive'] = None + return _result + + def set_chassis_led(self, data: Dict[str, str]) -> int: + # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false + # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true + try: + _, response, status = self.client.query( + data=json.dumps(data), + method='PATCH', + endpoint=f'/redfish/v1{self.chassis_endpoint}' + ) + except HTTPError as e: + self.log.logger.error(f"Couldn't set the ident chassis LED: {e}") + raise + return status + + def shutdown(self, force: bool = False) -> int: + reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown' + + try: + job_id: str = self.create_reboot_job(reboot_type) + status = self.schedule_reboot_job(job_id) + except (HTTPError, KeyError) as e: + self.log.logger.error(f"Couldn't create the reboot job: {e}") + raise + return status + + def powercycle(self) -> int: + try: + job_id: str = self.create_reboot_job('PowerCycle') + status = self.schedule_reboot_job(job_id) + except (HTTPError, URLError) as e: + self.log.logger.error(f"Couldn't perform power cycle: {e}") + raise + return status + + def create_reboot_job(self, reboot_type: str) -> str: + data: Dict[str, str] = dict(RebootJobType=reboot_type) + try: + headers, response, status = self.client.query( + data=json.dumps(data), + endpoint=self.create_reboot_job_endpoint + ) + job_id: str = headers['Location'].split('/')[-1] + except (HTTPError, URLError) as e: + self.log.logger.error(f"Couldn't create the reboot job: {e}") + raise + return job_id + + def schedule_reboot_job(self, job_id: str) -> int: + data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW') + try: + headers, response, status = self.client.query( + data=json.dumps(data), + endpoint=self.setup_job_queue_endpoint + ) + except (HTTPError, KeyError) as e: + self.log.logger.error(f"Couldn't schedule the reboot job: {e}") + raise + return status diff --git a/src/cephadm/cephadmlib/node_proxy/basesystem.py b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py similarity index 64% rename from src/cephadm/cephadmlib/node_proxy/basesystem.py rename to src/ceph-node-proxy/ceph_node_proxy/basesystem.py index 4fb1b7b855347..c2389d8dc85e7 100644 --- a/src/cephadm/cephadmlib/node_proxy/basesystem.py +++ b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py @@ -1,7 +1,7 @@ import socket -from .util import Config +from ceph_node_proxy.util import Config from typing import Dict, Any -from .baseclient import BaseClient +from ceph_node_proxy.baseclient import BaseClient class BaseSystem: @@ -49,6 +49,30 @@ class BaseSystem: def set_led(self, data: Dict[str, str]) -> int: raise NotImplementedError() + def get_chassis_led(self) -> Dict[str, Any]: + raise NotImplementedError() + + def set_chassis_led(self, data: Dict[str, str]) -> int: + raise NotImplementedError() + + def device_led_on(self, device: str) -> int: + raise NotImplementedError() + + def device_led_off(self, device: str) -> int: + raise NotImplementedError() + + def get_device_led(self, device: str) -> Dict[str, Any]: + raise NotImplementedError() + + def set_device_led(self, device: str, data: Dict[str, bool]) -> int: + raise NotImplementedError() + + def chassis_led_on(self) -> int: + raise NotImplementedError() + + def chassis_led_off(self) -> int: + raise NotImplementedError() + def get_host(self) -> str: return socket.gethostname() @@ -63,3 +87,9 @@ class BaseSystem: def flush(self) -> None: raise NotImplementedError() + + def shutdown(self, force: bool = False) -> int: + raise NotImplementedError() + + def powercycle(self) -> int: + raise NotImplementedError() diff --git a/src/ceph-node-proxy/ceph_node_proxy/main.py b/src/ceph-node-proxy/ceph_node_proxy/main.py new file mode 100644 index 0000000000000..689089aa609f5 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/main.py @@ -0,0 +1,244 @@ +from threading import Thread +from ceph_node_proxy.redfishdellsystem import RedfishDellSystem +from ceph_node_proxy.api import NodeProxyApi +from ceph_node_proxy.reporter import Reporter +from ceph_node_proxy.util import Config, Logger, http_req, write_tmp_file +from typing import Dict, Any, Optional + +import argparse +import traceback +import logging +import os +import ssl +import json +import time + +logger = logging.getLogger(__name__) + +DEFAULT_CONFIG = { + 'reporter': { + 'check_interval': 5, + 'push_data_max_retries': 30, + 'endpoint': 'https://127.0.0.1:7150/node-proxy/data', + }, + 'system': { + 'refresh_interval': 5 + }, + 'server': { + 'port': 8080, + }, + 'logging': { + 'level': 20, + } +} + + +class NodeProxyManager(Thread): + def __init__(self, + mgr_host: str, + cephx_name: str, + cephx_secret: str, + ca_path: str, + api_ssl_crt: str, + api_ssl_key: str, + mgr_agent_port: int = 7150): + super().__init__() + self.mgr_host = mgr_host + self.cephx_name = cephx_name + self.cephx_secret = cephx_secret + self.ca_path = ca_path + self.api_ssl_crt = api_ssl_crt + self.api_ssl_key = api_ssl_key + self.mgr_agent_port = str(mgr_agent_port) + self.stop = False + self.ssl_ctx = ssl.create_default_context() + self.ssl_ctx.check_hostname = True + self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED + self.ssl_ctx.load_verify_locations(self.ca_path) + + def run(self) -> None: + self.init() + self.loop() + + def init(self) -> None: + node_proxy_meta = { + 'cephx': { + 'name': self.cephx_name, + 'secret': self.cephx_secret + } + } + headers, result, status = http_req(hostname=self.mgr_host, + port=self.mgr_agent_port, + data=json.dumps(node_proxy_meta), + endpoint='/node-proxy/oob', + ssl_ctx=self.ssl_ctx) + if status != 200: + msg = f'No out of band tool details could be loaded: {status}, {result}' + logger.debug(msg) + raise RuntimeError(msg) + + result_json = json.loads(result) + kwargs = { + 'host': result_json['result']['addr'], + 'username': result_json['result']['username'], + 'password': result_json['result']['password'], + 'cephx': node_proxy_meta['cephx'], + 'mgr_host': self.mgr_host, + 'mgr_agent_port': self.mgr_agent_port, + 'api_ssl_crt': self.api_ssl_crt, + 'api_ssl_key': self.api_ssl_key + } + if result_json['result'].get('port'): + kwargs['port'] = result_json['result']['port'] + + self.node_proxy: NodeProxy = NodeProxy(**kwargs) + self.node_proxy.start() + + def loop(self) -> None: + while not self.stop: + try: + status = self.node_proxy.check_status() + label = 'Ok' if status else 'Critical' + logger.debug(f'node-proxy status: {label}') + except Exception as e: + logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') + time.sleep(120) + self.init() + else: + logger.debug('node-proxy alive, next check in 60sec.') + time.sleep(60) + + def shutdown(self) -> None: + self.stop = True + # if `self.node_proxy.shutdown()` is called before self.start(), it will fail. + if self.__dict__.get('node_proxy'): + self.node_proxy.shutdown() + + +class NodeProxy(Thread): + def __init__(self, **kw: Any) -> None: + super().__init__() + self.username: str = kw.get('username', '') + self.password: str = kw.get('password', '') + self.host: str = kw.get('host', '') + self.port: int = kw.get('port', 443) + self.cephx: Dict[str, Any] = kw.get('cephx', {}) + self.reporter_scheme: str = kw.get('reporter_scheme', 'https') + self.mgr_host: str = kw.get('mgr_host', '') + self.mgr_agent_port: str = kw.get('mgr_agent_port', '') + self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data') + self.api_ssl_crt: str = kw.get('api_ssl_crt', '') + self.api_ssl_key: str = kw.get('api_ssl_key', '') + self.exc: Optional[Exception] = None + self.log = Logger(__name__) + + def run(self) -> None: + try: + self.main() + except Exception as e: + self.exc = e + return + + def shutdown(self) -> None: + self.log.logger.info('Shutting down node-proxy...') + self.system.client.logout() + self.system.stop_update_loop() + self.reporter_agent.stop() + + def check_status(self) -> bool: + if self.__dict__.get('system') and not self.system.run: + raise RuntimeError('node-proxy encountered an error.') + if self.exc: + traceback.print_tb(self.exc.__traceback__) + self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}') + raise self.exc + return True + + def main(self) -> None: + # TODO: add a check and fail if host/username/password/data aren't passed + self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG) + self.log = Logger(__name__, level=self.config.__dict__['logging']['level']) + + # create the redfish system and the obsever + self.log.logger.info('Server initialization...') + try: + self.system = RedfishDellSystem(host=self.host, + port=self.port, + username=self.username, + password=self.password, + config=self.config) + except RuntimeError: + self.log.logger.error("Can't initialize the redfish system.") + raise + + try: + self.reporter_agent = Reporter(self.system, + self.cephx, + reporter_scheme=self.reporter_scheme, + reporter_hostname=self.mgr_host, + reporter_port=self.mgr_agent_port, + reporter_endpoint=self.reporter_endpoint) + self.reporter_agent.run() + except RuntimeError: + self.log.logger.error("Can't initialize the reporter.") + raise + + try: + self.log.logger.info('Starting node-proxy API...') + self.api = NodeProxyApi(self, + username=self.username, + password=self.password, + ssl_crt=self.api_ssl_crt, + ssl_key=self.api_ssl_key) + self.api.start() + except Exception as e: + self.log.logger.error(f"Can't start node-proxy API: {e}") + raise + + +def main() -> None: + parser = argparse.ArgumentParser( + description='Ceph Node-Proxy for HW Monitoring', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + '--config', + help='path of config file in json format', + required=True + ) + + args = parser.parse_args() + + if not os.path.exists(args.config): + raise Exception(f'No config file found at provided config path: {args.config}') + + with open(args.config, 'r') as f: + try: + config_json = f.read() + config = json.loads(config_json) + except Exception as e: + raise Exception(f'Failed to load json config: {str(e)}') + + target_ip = config['target_ip'] + target_port = config['target_port'] + keyring = config['keyring'] + root_cert = config['root_cert.pem'] + listener_cert = config['listener.crt'] + listener_key = config['listener.key'] + name = config['name'] + + f = write_tmp_file(root_cert, + prefix_name='cephadm-endpoint-root-cert') + + node_proxy_mgr = NodeProxyManager(mgr_host=target_ip, + cephx_name=name, + cephx_secret=keyring, + mgr_agent_port=target_port, + ca_path=f.name, + api_ssl_crt=listener_cert, + api_ssl_key=listener_key) + if not node_proxy_mgr.is_alive(): + node_proxy_mgr.start() + + +if __name__ == '__main__': + main() diff --git a/src/cephadm/cephadmlib/node_proxy/redfish_client.py b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py similarity index 88% rename from src/cephadm/cephadmlib/node_proxy/redfish_client.py rename to src/ceph-node-proxy/ceph_node_proxy/redfish_client.py index 040db8ce2521f..eeca2e5ba217b 100644 --- a/src/cephadm/cephadmlib/node_proxy/redfish_client.py +++ b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py @@ -1,7 +1,7 @@ import json from urllib.error import HTTPError, URLError -from .baseclient import BaseClient -from .util import Logger, http_req +from ceph_node_proxy.baseclient import BaseClient +from ceph_node_proxy.util import Logger, http_req from typing import Dict, Any, Tuple, Optional from http.client import HTTPMessage @@ -57,21 +57,24 @@ class RedFishClient(BaseClient): except URLError as e: self.log.logger.error("Can't check token " f'validity for {self.url}: {e}') - raise RuntimeError + raise return _status_code == 200 def logout(self) -> Dict[str, Any]: + result: Dict[str, Any] = {} try: - _, _data, _status_code = self.query(method='DELETE', - headers={'X-Auth-Token': self.token}, - endpoint=self.location) + if self.is_logged_in(): + _, _data, _status_code = self.query(method='DELETE', + headers={'X-Auth-Token': self.token}, + endpoint=self.location) + result = json.loads(_data) except URLError: self.log.logger.error(f"Can't log out from {self.url}") - return {} - response_str = _data + self.location = '' + self.token = '' - return json.loads(response_str) + return result def get_path(self, path: str) -> Dict[str, Any]: if self.PREFIX not in path: diff --git a/src/cephadm/cephadmlib/node_proxy/redfishdellsystem.py b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py similarity index 84% rename from src/cephadm/cephadmlib/node_proxy/redfishdellsystem.py rename to src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py index 71006c48bf866..0424bb38bd1ee 100644 --- a/src/cephadm/cephadmlib/node_proxy/redfishdellsystem.py +++ b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py @@ -1,6 +1,5 @@ -import json -from .baseredfishsystem import BaseRedfishSystem -from .util import Logger, normalize_dict, to_snake_case +from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem +from ceph_node_proxy.util import Logger, normalize_dict, to_snake_case from typing import Dict, Any, List @@ -8,6 +7,9 @@ class RedfishDellSystem(BaseRedfishSystem): def __init__(self, **kw: Any) -> None: super().__init__(**kw) self.log = Logger(__name__) + self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService' + self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob' + self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue' def build_common_data(self, data: Dict[str, Any], @@ -161,26 +163,3 @@ class RedfishDellSystem(BaseRedfishSystem): self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'], fields=fields, path='FirmwareInventory') - - def get_chassis_led(self) -> Dict[str, Any]: - endpoint = f'/redfish/v1/{self.chassis_endpoint}' - result = self.client.query(method='GET', - endpoint=endpoint, - timeout=10) - response_json = json.loads(result[1]) - _result: Dict[str, Any] = {'http_code': result[2]} - if result[2] == 200: - _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive'] - else: - _result['LocationIndicatorActive'] = None - return _result - - def set_chassis_led(self, data: Dict[str, str]) -> int: - # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false - # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true - _, response, status = self.client.query( - data=json.dumps(data), - method='PATCH', - endpoint=f'/redfish/v1{self.chassis_endpoint}' - ) - return status diff --git a/src/cephadm/cephadmlib/node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py similarity index 98% rename from src/cephadm/cephadmlib/node_proxy/reporter.py rename to src/ceph-node-proxy/ceph_node_proxy/reporter.py index fb92a45234673..9e5521aa280b3 100644 --- a/src/cephadm/cephadmlib/node_proxy/reporter.py +++ b/src/ceph-node-proxy/ceph_node_proxy/reporter.py @@ -1,7 +1,7 @@ from threading import Thread import time import json -from .util import Logger, http_req +from ceph_node_proxy.util import Logger, http_req from urllib.error import HTTPError, URLError from typing import Dict, Any diff --git a/src/cephadm/cephadmlib/node_proxy/util.py b/src/ceph-node-proxy/ceph_node_proxy/util.py similarity index 88% rename from src/cephadm/cephadmlib/node_proxy/util.py rename to src/ceph-node-proxy/ceph_node_proxy/util.py index 31c1c00a0a15b..a94acc9ed5564 100644 --- a/src/cephadm/cephadmlib/node_proxy/util.py +++ b/src/ceph-node-proxy/ceph_node_proxy/util.py @@ -4,6 +4,7 @@ import os import time import re import ssl +from tempfile import NamedTemporaryFile, _TemporaryFileWrapper from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple @@ -124,9 +125,13 @@ def http_req(hostname: str = '', url: str = f'{scheme}://{hostname}:{port}{endpoint}' _data = bytes(data, 'ascii') if data else None - + _headers = headers + if data and not method: + method = 'POST' + if not _headers.get('Content-Type') and method in ['POST', 'PATCH']: + _headers['Content-Type'] = 'application/json' try: - req = Request(url, _data, headers, method=method) + req = Request(url, _data, _headers, method=method) with urlopen(req, context=ssl_ctx, timeout=timeout) as response: response_str = response.read() response_headers = response.headers @@ -136,3 +141,11 @@ def http_req(hostname: str = '', print(f'{e}') # handle error here if needed raise + + +def write_tmp_file(data: str, prefix_name: str = 'node-proxy-') -> _TemporaryFileWrapper: + f = NamedTemporaryFile(prefix=prefix_name) + os.fchmod(f.fileno(), 0o600) + f.write(data.encode('utf-8')) + f.flush() + return f diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index 1fc5768ca968e..5a7ed9e766569 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -29,7 +29,6 @@ from glob import glob from io import StringIO from threading import Thread, Event from pathlib import Path -from cephadmlib.node_proxy.main import NodeProxy from cephadmlib.constants import ( # default images @@ -176,6 +175,7 @@ from cephadmlib.daemons import ( NFSGanesha, SNMPGateway, Tracing, + NodeProxy, ) from cephadmlib.agent import http_query @@ -226,6 +226,7 @@ def get_supported_daemons(): supported_daemons.append(CephadmAgent.daemon_type) supported_daemons.append(SNMPGateway.daemon_type) supported_daemons.extend(Tracing.components) + supported_daemons.append(NodeProxy.daemon_type) assert len(supported_daemons) == len(set(supported_daemons)) return supported_daemons @@ -800,6 +801,10 @@ def create_daemon_dirs( sg = SNMPGateway.init(ctx, fsid, ident.daemon_id) sg.create_daemon_conf() + elif daemon_type == NodeProxy.daemon_type: + node_proxy = NodeProxy.init(ctx, fsid, ident.daemon_id) + node_proxy.create_daemon_dirs(data_dir, uid, gid) + _write_custom_conf_files(ctx, ident, uid, gid) @@ -1287,28 +1292,13 @@ class MgrListener(Thread): self.agent.ls_gatherer.wakeup() self.agent.volume_gatherer.wakeup() logger.debug(f'Got mgr message {data}') - if 'node_proxy_oob_cmd' in data: - if data['node_proxy_oob_cmd']['action'] in ['get_led', 'set_led']: - conn.send(bytes(json.dumps(self.node_proxy_oob_cmd_result), 'utf-8')) except Exception as e: logger.error(f'Mgr Listener encountered exception: {e}') def shutdown(self) -> None: self.stop = True - def validate_node_proxy_payload(self, data: Dict[str, Any]) -> None: - if 'action' not in data.keys(): - raise RuntimeError('node-proxy oob command needs an action.') - if data['action'] in ['get_led', 'set_led']: - fields = ['type', 'id'] - if data['type'] not in ['chassis', 'drive']: - raise RuntimeError('the LED type must be either "chassis" or "drive".') - for field in fields: - if field not in data.keys(): - raise RuntimeError('Received invalid node-proxy cmd.') - def handle_json_payload(self, data: Dict[Any, Any]) -> None: - self.node_proxy_oob_cmd_result: Dict[str, Any] = {} if 'counter' in data: self.agent.ack = int(data['counter']) if 'config' in data: @@ -1321,95 +1311,10 @@ class MgrListener(Thread): f.write(config[filename]) self.agent.pull_conf_settings() self.agent.wakeup() - elif 'node_proxy_shutdown' in data: - logger.info('Received node_proxy_shutdown command.') - self.agent.shutdown() - elif 'node_proxy_oob_cmd' in data: - node_proxy_cmd: Dict[str, Any] = data['node_proxy_oob_cmd'] - try: - self.validate_node_proxy_payload(node_proxy_cmd) - except RuntimeError as e: - logger.error(f"Couldn't validate node-proxy payload:\n{node_proxy_cmd}\n{e}") - raise - logger.info(f'Received node_proxy_oob_cmd command: {node_proxy_cmd}') - if node_proxy_cmd['action'] == 'get_led': - if node_proxy_cmd['type'] == 'chassis': - self.node_proxy_oob_cmd_result = self.agent.node_proxy_mgr.node_proxy.system.get_chassis_led() - if node_proxy_cmd['action'] == 'set_led': - if node_proxy_cmd['type'] == 'chassis': - _data: Dict[str, Any] = json.loads(node_proxy_cmd['data']) - _result: int = self.agent.node_proxy_mgr.node_proxy.system.set_chassis_led(_data) - self.node_proxy_oob_cmd_result = {'http_code': _result} else: raise RuntimeError('No valid data received.') -class NodeProxyManager(Thread): - def __init__(self, agent: 'CephadmAgent', event: Event): - super().__init__() - self.agent = agent - self.event = event - self.stop = False - - def run(self) -> None: - self.event.wait() - self.ssl_ctx = self.agent.ssl_ctx - self.init() - self.loop() - - def init(self) -> None: - node_proxy_meta = { - 'cephx': { - 'name': self.agent.host, - 'secret': self.agent.keyring - } - } - status, result = http_query(addr=self.agent.target_ip, - port=self.agent.target_port, - data=json.dumps(node_proxy_meta).encode('ascii'), - endpoint='/node-proxy/oob', - ssl_ctx=self.ssl_ctx) - if status != 200: - msg = f'No out of band tool details could be loaded: {status}, {result}' - logger.debug(msg) - raise RuntimeError(msg) - - result_json = json.loads(result) - kwargs = { - 'host': result_json['result']['addr'], - 'username': result_json['result']['username'], - 'password': result_json['result']['password'], - 'cephx': node_proxy_meta['cephx'], - 'mgr_target_ip': self.agent.target_ip, - 'mgr_target_port': self.agent.target_port - } - if result_json['result'].get('port'): - kwargs['port'] = result_json['result']['port'] - - self.node_proxy: NodeProxy = NodeProxy(**kwargs) - self.node_proxy.start() - - def loop(self) -> None: - while not self.stop: - try: - status = self.node_proxy.check_status() - label = 'Ok' if status else 'Critical' - logger.debug(f'node-proxy status: {label}') - except Exception as e: - logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') - time.sleep(120) - self.init() - else: - logger.debug('node-proxy alive, next check in 60sec.') - time.sleep(60) - - def shutdown(self) -> None: - self.stop = True - # if `self.node_proxy.shutdown()` is called before self.start(), it will fail. - if self.__dict__.get('node_proxy'): - self.node_proxy.shutdown() - - @register_daemon_form class CephadmAgent(DaemonForm): @@ -1465,8 +1370,6 @@ class CephadmAgent(DaemonForm): self.ssl_ctx = ssl.create_default_context() self.ssl_ctx.check_hostname = True self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED - self.node_proxy_mgr_event = Event() - self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event) def validate(self, config: Dict[str, str] = {}) -> None: # check for the required files @@ -1522,8 +1425,6 @@ class CephadmAgent(DaemonForm): def shutdown(self) -> None: self.stop = True - if self.node_proxy_mgr.is_alive(): - self.node_proxy_mgr.shutdown() if self.mgr_listener.is_alive(): self.mgr_listener.shutdown() if self.ls_gatherer.is_alive(): @@ -1565,9 +1466,6 @@ class CephadmAgent(DaemonForm): def run(self) -> None: self.pull_conf_settings() self.ssl_ctx.load_verify_locations(self.ca_path) - # only after self.pull_conf_settings() was called we can actually start - # node-proxy - self.node_proxy_mgr_event.set() try: for _ in range(1001): @@ -1589,9 +1487,6 @@ class CephadmAgent(DaemonForm): if not self.volume_gatherer.is_alive(): self.volume_gatherer.start() - if not self.node_proxy_mgr.is_alive(): - self.node_proxy_mgr.start() - while not self.stop: start_time = time.monotonic() ack = self.ack diff --git a/src/cephadm/cephadmlib/daemons/__init__.py b/src/cephadm/cephadmlib/daemons/__init__.py index cf572d487c9bd..29f1506948323 100644 --- a/src/cephadm/cephadmlib/daemons/__init__.py +++ b/src/cephadm/cephadmlib/daemons/__init__.py @@ -7,6 +7,7 @@ from .nfs import NFSGanesha from .nvmeof import CephNvmeof from .snmp import SNMPGateway from .tracing import Tracing +from .node_proxy import NodeProxy __all__ = [ 'Ceph', @@ -21,4 +22,5 @@ __all__ = [ 'OSD', 'SNMPGateway', 'Tracing', + 'NodeProxy', ] diff --git a/src/cephadm/cephadmlib/daemons/node_proxy.py b/src/cephadm/cephadmlib/daemons/node_proxy.py new file mode 100644 index 0000000000000..a4cce11a53ca9 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/node_proxy.py @@ -0,0 +1,145 @@ +import logging +import os + +from typing import Dict, List, Optional, Tuple + +from ..constants import DEFAULT_IMAGE +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, extract_uid_gid +from ..context import CephadmContext +from ..context_getters import fetch_configs, get_config_and_keyring +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..data_utils import dict_get, is_fsid +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import populate_files + +logger = logging.getLogger() + + +@register_daemon_form +class NodeProxy(ContainerDaemonForm): + """Defines a node-proxy container""" + + daemon_type = 'node-proxy' + # TODO: update this if we make node-proxy an executable + entrypoint = 'python3' + required_files = ['node-proxy.json'] + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + ctx: CephadmContext, + ident: DaemonIdentity, + config_json: Dict, + image: str = DEFAULT_IMAGE, + ): + self.ctx = ctx + self._identity = ident + self.image = image + + # config-json options + config = dict_get(config_json, 'node-proxy.json', {}) + self.files = {'node-proxy.json': config} + + # validate the supplied args + self.validate() + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: str + ) -> 'NodeProxy': + return cls.create( + ctx, DaemonIdentity(fsid, cls.daemon_type, daemon_id) + ) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'NodeProxy': + return cls(ctx, ident, fetch_configs(ctx), ctx.image) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + @property + def fsid(self) -> str: + return self._identity.fsid + + @property + def daemon_id(self) -> str: + return self._identity.daemon_id + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + # TODO: update this when we have the actual location + # in the ceph container we are going to keep node-proxy + mounts.update({os.path.join(data_dir, 'node-proxy.json'): '/usr/share/ceph/node-proxy.json:z'}) + + def customize_process_args(self, ctx: CephadmContext, args: List[str]) -> None: + # TODO: this corresponds with the mount location of + # the config in _get_container_mounts above. They + # will both need to be updated when we have a proper + # location in the container for node-proxy + args.extend(['/usr/share/ceph/ceph_node_proxy/main.py', '--config', '/usr/share/ceph/node-proxy.json']) + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def create_daemon_dirs(self, data_dir, uid, gid): + # type: (str, int, int) -> None + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + logger.info('Writing node-proxy config...') + # populate files from the config-json + populate_files(data_dir, self.files, uid, gid) + + def container(self, ctx: CephadmContext) -> CephContainer: + # So the container can modprobe iscsi_target_mod and have write perms + # to configfs we need to make this a privileged container. + ctr = daemon_to_container(ctx, self, privileged=True, envs=['PYTHONPATH=$PYTHONPATH:/usr/share/ceph']) + return to_deployment_container(ctx, ctr) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return extract_uid_gid(ctx) + + def default_entrypoint(self) -> str: + return self.entrypoint diff --git a/src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py b/src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py deleted file mode 100644 index 45c80e0209d4e..0000000000000 --- a/src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py +++ /dev/null @@ -1,152 +0,0 @@ -import concurrent.futures -from .basesystem import BaseSystem -from .redfish_client import RedFishClient -from threading import Thread, Lock -from time import sleep -from .util import Logger, retry -from typing import Dict, Any, List - - -class BaseRedfishSystem(BaseSystem): - def __init__(self, **kw: Any) -> None: - super().__init__(**kw) - self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1', - '/UpdateService']) - self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1') - self.log = Logger(__name__) - self.host: str = kw['host'] - self.port: str = kw['port'] - self.username: str = kw['username'] - self.password: str = kw['password'] - # move the following line (class attribute?) - self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password) - self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}') - - self.run: bool = False - self.thread: Thread - self.data_ready: bool = False - self.previous_data: Dict = {} - self.lock: Lock = Lock() - self.data: Dict[str, Dict[str, Any]] = {} - self._system: Dict[str, Dict[str, Any]] = {} - self._sys: Dict[str, Any] = {} - self.start_client() - - def start_client(self) -> None: - self.client.login() - self.start_update_loop() - - def start_update_loop(self) -> None: - self.run = True - self.thread = Thread(target=self.update) - self.thread.start() - - def stop_update_loop(self) -> None: - self.run = False - self.thread.join() - - def update(self) -> None: - # this loop can have: - # - caching logic - while self.run: - self.log.logger.debug('waiting for a lock in the update loop.') - self.lock.acquire() - self.log.logger.debug('lock acquired in the update loop.') - try: - self._update_system() - self._update_sn() - update_funcs = [self._update_memory, - self._update_power, - self._update_fans, - self._update_network, - self._update_processors, - self._update_storage, - self._update_firmwares] - - with concurrent.futures.ThreadPoolExecutor() as executor: - executor.map(lambda f: f(), update_funcs) - - self.data_ready = True - sleep(5) - except RuntimeError as e: - self.run = False - self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}') - self.client.logout() - finally: - self.lock.release() - self.log.logger.debug('lock released in the update loop.') - - def flush(self) -> None: - self.log.logger.debug('Acquiring lock to flush data.') - self.lock.acquire() - self.log.logger.debug('Lock acquired, flushing data.') - self._system = {} - self.previous_data = {} - self.log.logger.info('Data flushed.') - self.data_ready = False - self.log.logger.debug('Data marked as not ready.') - self.lock.release() - self.log.logger.debug('Released the lock after flushing data.') - - @retry(retries=10, delay=2) - def _get_path(self, path: str) -> Dict: - try: - result = self.client.get_path(path) - except RuntimeError: - raise - if result is None: - self.log.logger.error(f'The client reported an error when getting path: {path}') - raise RuntimeError(f'Could not get path: {path}') - return result - - def get_members(self, data: Dict[str, Any], path: str) -> List: - _path = data[path]['@odata.id'] - _data = self._get_path(_path) - return [self._get_path(member['@odata.id']) for member in _data['Members']] - - def get_system(self) -> Dict[str, Any]: - result = { - 'host': self.get_host(), - 'sn': self.get_sn(), - 'status': { - 'storage': self.get_storage(), - 'processors': self.get_processors(), - 'network': self.get_network(), - 'memory': self.get_memory(), - 'power': self.get_power(), - 'fans': self.get_fans() - }, - 'firmwares': self.get_firmwares(), - 'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'} # TODO(guits): not ideal - } - return result - - def _update_system(self) -> None: - for endpoint in self.common_endpoints: - result = self.client.get_path(endpoint) - _endpoint = endpoint.strip('/').split('/')[0] - self._system[_endpoint] = result - - def _update_sn(self) -> None: - raise NotImplementedError() - - def _update_memory(self) -> None: - raise NotImplementedError() - - def _update_power(self) -> None: - raise NotImplementedError() - - def _update_fans(self) -> None: - raise NotImplementedError() - - def _update_network(self) -> None: - raise NotImplementedError() - - def _update_processors(self) -> None: - raise NotImplementedError() - - def _update_storage(self) -> None: - raise NotImplementedError() - - def _update_firmwares(self) -> None: - raise NotImplementedError() diff --git a/src/cephadm/cephadmlib/node_proxy/main.py b/src/cephadm/cephadmlib/node_proxy/main.py deleted file mode 100644 index 968dfd3c1ca66..0000000000000 --- a/src/cephadm/cephadmlib/node_proxy/main.py +++ /dev/null @@ -1,94 +0,0 @@ -from threading import Thread -from .redfishdellsystem import RedfishDellSystem -from .reporter import Reporter -from .util import Config, Logger -from typing import Dict, Any, Optional -import traceback - -DEFAULT_CONFIG = { - 'reporter': { - 'check_interval': 5, - 'push_data_max_retries': 30, - 'endpoint': 'https://127.0.0.1:7150/node-proxy/data', - }, - 'system': { - 'refresh_interval': 5 - }, - 'server': { - 'port': 8080, - }, - 'logging': { - 'level': 20, - } -} - - -class NodeProxy(Thread): - def __init__(self, **kw: Any) -> None: - super().__init__() - self.username: str = kw.get('username', '') - self.password: str = kw.get('password', '') - self.host: str = kw.get('host', '') - self.port: int = kw.get('port', 443) - self.cephx: Dict[str, Any] = kw.get('cephx', {}) - self.reporter_scheme: str = kw.get('reporter_scheme', 'https') - self.mgr_target_ip: str = kw.get('mgr_target_ip', '') - self.mgr_target_port: str = kw.get('mgr_target_port', '') - self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data') - self.exc: Optional[Exception] = None - self.log = Logger(__name__) - - def run(self) -> None: - try: - self.main() - except Exception as e: - self.exc = e - return - - def shutdown(self) -> None: - self.log.logger.info('Shutting down node-proxy...') - self.system.client.logout() - self.system.stop_update_loop() - self.reporter_agent.stop() - - def check_auth(self, realm: str, username: str, password: str) -> bool: - return self.username == username and \ - self.password == password - - def check_status(self) -> bool: - if self.__dict__.get('system') and not self.system.run: - raise RuntimeError('node-proxy encountered an error.') - if self.exc: - traceback.print_tb(self.exc.__traceback__) - self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}') - raise self.exc - return True - - def main(self) -> None: - # TODO: add a check and fail if host/username/password/data aren't passed - self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG) - self.log = Logger(__name__, level=self.config.__dict__['logging']['level']) - - # create the redfish system and the obsever - self.log.logger.info('Server initialization...') - try: - self.system = RedfishDellSystem(host=self.host, - port=self.port, - username=self.username, - password=self.password, - config=self.config) - except RuntimeError: - self.log.logger.error("Can't initialize the redfish system.") - raise - - try: - self.reporter_agent = Reporter(self.system, - self.cephx, - reporter_scheme=self.reporter_scheme, - reporter_hostname=self.mgr_target_ip, - reporter_port=self.mgr_target_port, - reporter_endpoint=self.reporter_endpoint) - self.reporter_agent.run() - except RuntimeError: - self.log.logger.error("Can't initialize the reporter.") - raise diff --git a/src/mypy.ini b/src/mypy.ini index bd4b436b49266..1b158822b526b 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -45,6 +45,8 @@ ignore_missing_imports = True [mypy-kubernetes.*] ignore_missing_imports = True +[mypy-setuptools] +ignore_missing_imports = True # Make dashboard happy: [mypy-coverage] diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index b589886e566c8..68495d3bc061f 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -16,14 +16,15 @@ import time from orchestrator import DaemonDescriptionStatus from orchestrator._interface import daemon_type_to_service -from ceph.utils import datetime_now +from ceph.utils import datetime_now, http_req from ceph.deployment.inventory import Devices from ceph.deployment.service_spec import ServiceSpec, PlacementSpec from cephadm.services.cephadmservice import CephadmDaemonDeploySpec from cephadm.ssl_cert_utils import SSLCerts from mgr_util import test_port_allocation, PortAlreadyInUse -from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional +from urllib.error import HTTPError, URLError +from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -138,8 +139,9 @@ class NodeProxyEndpoint: self.validate_node_proxy_data(data) - host = data["cephx"]["name"] - results['result'] = self.mgr.node_proxy_cache.oob.get(host) + # expecting name to be "node-proxy." + hostname = data['cephx']['name'][11:] + results['result'] = self.mgr.node_proxy_cache.oob.get(hostname, '') if not results['result']: raise cherrypy.HTTPError(400, 'The provided host has no iDrac details.') return results @@ -160,13 +162,15 @@ class NodeProxyEndpoint: raise cherrypy.HTTPError(400, 'The field \'cephx\' must be provided.') elif 'name' not in data['cephx'].keys(): cherrypy.response.status = 400 - raise cherrypy.HTTPError(400, 'The field \'host\' must be provided.') - elif 'secret' not in data['cephx'].keys(): - raise cherrypy.HTTPError(400, 'The agent keyring must be provided.') - elif not self.mgr.agent_cache.agent_keys.get(data['cephx']['name']): - raise cherrypy.HTTPError(502, f'Make sure the agent is running on {data["cephx"]["name"]}') - elif data['cephx']['secret'] != self.mgr.agent_cache.agent_keys[data['cephx']['name']]: - raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {data["cephx"]["name"]}.') + raise cherrypy.HTTPError(400, 'The field \'name\' must be provided.') + # expecting name to be "node-proxy." + hostname = data['cephx']['name'][11:] + if 'secret' not in data['cephx'].keys(): + raise cherrypy.HTTPError(400, 'The node-proxy keyring must be provided.') + elif not self.mgr.node_proxy_cache.keyrings.get(hostname, ''): + raise cherrypy.HTTPError(502, f'Make sure the node-proxy is running on {hostname}') + elif data['cephx']['secret'] != self.mgr.node_proxy_cache.keyrings[hostname]: + raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {hostname}.') except AttributeError: raise cherrypy.HTTPError(400, 'Malformed data received.') @@ -289,12 +293,19 @@ class NodeProxyEndpoint: :rtype: dict[str, Any] """ method: str = cherrypy.request.method + header: MutableMapping[str, str] = {} hostname: Optional[str] = kw.get('hostname') led_type: Optional[str] = kw.get('type') id_drive: Optional[str] = kw.get('id') - data: Optional[str] = None - # this method is restricted to 'GET' or 'PATCH' - action: str = 'get_led' if method == 'GET' else 'set_led' + payload: Optional[Dict[str, str]] = None + endpoint: List[Any] = ['led', led_type] + device: str = id_drive if id_drive else '' + + ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert() + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = True + ssl_ctx.verify_mode = ssl.CERT_REQUIRED + ssl_ctx.load_verify_locations(cadata=ssl_root_crt) if not hostname: msg: str = "listing enclosure LED status for all nodes is not implemented." @@ -311,16 +322,32 @@ class NodeProxyEndpoint: self.mgr.log.debug(msg) raise cherrypy.HTTPError(400, msg) + if led_type == 'drive': + endpoint.append(device) + if hostname not in self.mgr.node_proxy_cache.data.keys(): # TODO(guits): update unit test for this msg = f"'{hostname}' not found." self.mgr.log.debug(msg) raise cherrypy.HTTPError(400, msg) + addr: str = self.mgr.inventory.get_addr(hostname) + if method == 'PATCH': # TODO(guits): need to check the request is authorized # allowing a specific keyring only ? (client.admin or client.agent.. ?) - data = json.dumps(cherrypy.request.json) + data: Dict[str, Any] = cherrypy.request.json + if 'state' not in data.keys(): + msg = "'state' key not provided." + raise cherrypy.HTTPError(400, msg) + if 'keyring' not in data.keys(): + msg = "'keyring' key must be provided." + raise cherrypy.HTTPError(400, msg) + if data['keyring'] != self.mgr.node_proxy_cache.keyrings.get(hostname): + msg = 'wrong keyring provided.' + raise cherrypy.HTTPError(401, msg) + payload = {} + payload['state'] = data['state'] if led_type == 'drive': if id_drive not in self.mgr.node_proxy_cache.data[hostname]['status']['storage'].keys(): @@ -329,28 +356,23 @@ class NodeProxyEndpoint: self.mgr.log.debug(msg) raise cherrypy.HTTPError(400, msg) - payload: Dict[str, Any] = {"node_proxy_oob_cmd": - {"action": action, - "type": led_type, - "id": id_drive, - "host": hostname, - "data": data}} - try: - message_thread = AgentMessageThread( - hostname, self.mgr.agent_cache.agent_ports[hostname], payload, self.mgr) - message_thread.start() - message_thread.join() # TODO(guits): Add a timeout? - except KeyError: - raise cherrypy.HTTPError(502, f"{hostname}'s agent not running, please check.") - agent_response = message_thread.get_agent_response() + endpoint = f'/{"/".join(endpoint)}' + header = self.mgr.node_proxy.generate_auth_header(hostname) + try: - response_json: Dict[str, Any] = json.loads(agent_response) - except json.decoder.JSONDecodeError: - cherrypy.response.status = 503 - else: - cherrypy.response.status = response_json.get('http_code', 503) - if cherrypy.response.status != 200: - raise cherrypy.HTTPError(cherrypy.response.status, "Couldn't change the LED status.") + headers, result, status = http_req(hostname=addr, + port='8080', + headers=header, + method=method, + data=json.dumps(payload), + endpoint=endpoint, + ssl_ctx=ssl_ctx) + response_json = json.loads(result) + except HTTPError as e: + self.mgr.log.debug(e) + except URLError: + raise cherrypy.HTTPError(502, f'Make sure the node-proxy agent is deployed and running on {hostname}') + return response_json @cherrypy.expose @@ -842,16 +864,6 @@ class CephadmAgentHelpers: host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec) message_thread.start() - def _shutdown_node_proxy(self) -> None: - hosts = set([h for h in self.mgr.cache.get_hosts() if - (h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]) - - for host in hosts: - payload: Dict[str, Any] = {'node_proxy_shutdown': host} - message_thread = AgentMessageThread( - host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr) - message_thread.start() - def _request_ack_all_not_up_to_date(self) -> None: self.mgr.agent_helpers._request_agent_acks( set([h for h in self.mgr.cache.get_hosts() if diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 27bf55c921c38..235737ef10e76 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) HOST_CACHE_PREFIX = "host." SPEC_STORE_PREFIX = "spec." AGENT_CACHE_PREFIX = 'agent.' -NODE_PROXY_CACHE_PREFIX = 'node_proxy/data' +NODE_PROXY_CACHE_PREFIX = 'node_proxy' class HostCacheStatus(enum.Enum): @@ -1411,20 +1411,25 @@ class NodeProxyCache: self.mgr = mgr self.data: Dict[str, Any] = {} self.oob: Dict[str, Any] = {} + self.keyrings: Dict[str, str] = {} def load(self) -> None: - _oob = self.mgr.get_store('node_proxy/oob', "{}") + _oob = self.mgr.get_store(f'{NODE_PROXY_CACHE_PREFIX}/oob', '{}') self.oob = json.loads(_oob) - for k, v in self.mgr.get_store_prefix(NODE_PROXY_CACHE_PREFIX).items(): + _keyrings = self.mgr.get_store(f'{NODE_PROXY_CACHE_PREFIX}/keyrings', '{}') + self.keyrings = json.loads(_keyrings) + + for k, v in self.mgr.get_store_prefix(f'{NODE_PROXY_CACHE_PREFIX}/data').items(): host = k.split('/')[-1:][0] if host not in self.mgr.inventory.keys(): # remove entry for host that no longer exists - self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/{host}", None) + self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/data/{host}", None) try: self.oob.pop(host) self.data.pop(host) + self.keyrings.pop(host) except KeyError: pass continue @@ -1434,7 +1439,15 @@ class NodeProxyCache: def save(self, host: str = '', data: Dict[str, Any] = {}) -> None: - self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/{host}", json.dumps(data)) + self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/data/{host}", json.dumps(data)) + + def update_oob(self, host: str, host_oob_info: Dict[str, str]) -> None: + self.oob[host] = host_oob_info + self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/oob", json.dumps(self.oob)) + + def update_keyring(self, host: str, key: str) -> None: + self.keyrings[host] = key + self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/keyrings", json.dumps(self.keyrings)) def fullreport(self, **kw: Any) -> Dict[str, Any]: """ diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index b82f861ce6add..5c5e602e944fc 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -11,6 +11,7 @@ from configparser import ConfigParser from contextlib import contextmanager from functools import wraps from tempfile import TemporaryDirectory, NamedTemporaryFile +from urllib.error import HTTPError from threading import Event from cephadm.service_discovery import ServiceDiscovery @@ -72,6 +73,7 @@ from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService, SNMPGatewayService, LokiService, PromtailService from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService +from .services.node_proxy import NodeProxy from .schedule import HostAssignment from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \ ClientKeyringStore, ClientKeyringSpec, TunedProfileStore, NodeProxyCache @@ -443,6 +445,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, default=3.0, desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down' ), + Option( + 'hw_monitoring', + type='bool', + default=False, + desc='Deploy hw monitoring daemon on every host.' + ), Option( 'max_osd_draining_count', type='int', @@ -560,6 +568,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.agent_refresh_rate = 0 self.agent_down_multiplier = 0.0 self.agent_starting_port = 0 + self.hw_monitoring = False self.service_discovery_port = 0 self.secure_monitoring_stack = False self.apply_spec_fails: List[Tuple[str, str]] = [] @@ -640,7 +649,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService, IngressService, CustomContainerService, CephfsMirrorService, NvmeofService, CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService, - JaegerQueryService, JaegerAgentService, JaegerCollectorService + JaegerQueryService, JaegerAgentService, JaegerCollectorService, NodeProxy ] # https://github.com/python/mypy/issues/8993 @@ -651,6 +660,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd']) self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi']) self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof']) + self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy']) self.scheduled_async_actions: List[Callable] = [] @@ -663,6 +673,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.http_server = CephadmHttpServer(self) self.http_server.start() + + self.node_proxy = NodeProxy(self) + self.agent_helpers = CephadmAgentHelpers(self) if self.use_agent: self.agent_helpers._apply_agent() @@ -829,7 +842,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, Generate a unique random service name """ suffix = daemon_type not in [ - 'mon', 'crash', 'ceph-exporter', + 'mon', 'crash', 'ceph-exporter', 'node-proxy', 'prometheus', 'node-exporter', 'grafana', 'alertmanager', 'container', 'agent', 'snmp-gateway', 'loki', 'promtail', 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query' @@ -1621,13 +1634,12 @@ Then run the following: spec.oob['addr'] = spec.hostname if not spec.oob.get('port'): spec.oob['port'] = '443' - data = json.loads(self.get_store('node_proxy/oob', '{}')) - data[spec.hostname] = dict() - data[spec.hostname]['addr'] = spec.oob['addr'] - data[spec.hostname]['port'] = spec.oob['port'] - data[spec.hostname]['username'] = spec.oob['username'] - data[spec.hostname]['password'] = spec.oob['password'] - self.set_store('node_proxy/oob', json.dumps(data)) + host_oob_info = dict() + host_oob_info['addr'] = spec.oob['addr'] + host_oob_info['port'] = spec.oob['port'] + host_oob_info['username'] = spec.oob['username'] + host_oob_info['password'] = spec.oob['password'] + self.node_proxy_cache.update_oob(spec.hostname, host_oob_info) # prime crush map? if spec.location: @@ -1652,6 +1664,51 @@ Then run the following: def add_host(self, spec: HostSpec) -> str: return self._add_host(spec) + @handle_orch_error + def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]: + try: + result = self.node_proxy.led(light_type=light_type, + action=action, + hostname=hostname, + device=device) + except RuntimeError as e: + self.log.error(e) + raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}') + except HTTPError as e: + self.log.error(e) + raise OrchestratorValidationError(f"http error while querying node-proxy API: {e}") + return result + + @handle_orch_error + def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> str: + if not yes_i_really_mean_it: + raise OrchestratorError("you must pass --yes-i-really-mean-it") + + try: + self.node_proxy.shutdown(hostname, force) + except RuntimeError as e: + self.log.error(e) + raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}') + except HTTPError as e: + self.log.error(e) + raise OrchestratorValidationError(f"Can't shutdown node {hostname}: {e}") + return f'Shutdown scheduled on {hostname}' + + @handle_orch_error + def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> str: + if not yes_i_really_mean_it: + raise OrchestratorError("you must pass --yes-i-really-mean-it") + + try: + self.node_proxy.powercycle(hostname) + except RuntimeError as e: + self.log.error(e) + raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}') + except HTTPError as e: + self.log.error(e) + raise OrchestratorValidationError(f"Can't perform powercycle on node {hostname}: {e}") + return f'Powercycle scheduled on {hostname}' + @handle_orch_error def node_proxy_summary(self, hostname: Optional[str] = None) -> Dict[str, Any]: return self.node_proxy_cache.summary(hostname=hostname) @@ -2724,6 +2781,15 @@ Then run the following: pass deps = sorted([self.get_mgr_ip(), server_port, root_cert, str(self.device_enhanced_scan)]) + elif daemon_type == 'node-proxy': + root_cert = '' + server_port = '' + try: + server_port = str(self.http_server.agent.server_port) + root_cert = self.http_server.agent.ssl_certs.get_root_cert() + except Exception: + pass + deps = sorted([self.get_mgr_ip(), server_port, root_cert]) elif daemon_type == 'iscsi': if spec: iscsi_spec = cast(IscsiServiceSpec, spec) diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 262ecbd27f6ad..600329509a0e8 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -113,6 +113,9 @@ class CephadmServe: if self.mgr.agent_helpers._handle_use_agent_setting(): continue + if self.mgr.node_proxy_service.handle_hw_monitoring_setting(): + continue + if self.mgr.upgrade.continue_upgrade(): continue diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 1681be0034879..115ee8d010249 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -42,7 +42,7 @@ def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEnt # the CephService class refers to service types, not daemon types if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']: return AuthEntity(f'client.{daemon_type}.{daemon_id}') - elif daemon_type in ['crash', 'agent']: + elif daemon_type in ['crash', 'agent', 'node-proxy']: if host == "": raise OrchestratorError( f'Host not provided to generate <{daemon_type}> auth entity name') @@ -1236,16 +1236,6 @@ class CephadmAgent(CephService): return daemon_spec - def pre_remove(self, daemon: DaemonDescription) -> None: - super().pre_remove(daemon) - - assert daemon.daemon_id is not None - daemon_id: str = daemon.daemon_id - - logger.info('Removing agent %s...' % daemon_id) - - self.mgr.agent_helpers._shutdown_node_proxy() - def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: agent = self.mgr.http_server.agent try: diff --git a/src/pybind/mgr/cephadm/services/node_proxy.py b/src/pybind/mgr/cephadm/services/node_proxy.py new file mode 100644 index 0000000000000..ebbbaf212c742 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/node_proxy.py @@ -0,0 +1,180 @@ +import json +import ssl +import base64 + +from urllib.error import HTTPError, URLError +from typing import List, Any, Dict, Tuple, Optional, MutableMapping + +from .cephadmservice import CephadmDaemonDeploySpec, CephService +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec +from ceph.utils import http_req +from orchestrator import OrchestratorError + + +class NodeProxy(CephService): + TYPE = 'node-proxy' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_id, host = daemon_spec.daemon_id, daemon_spec.host + + if not self.mgr.http_server.agent: + raise OrchestratorError('Cannot deploy node-proxy before creating cephadm endpoint') + + keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), []) + daemon_spec.keyring = keyring + self.mgr.node_proxy_cache.update_keyring(host, keyring) + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + # node-proxy is re-using the agent endpoint and therefore + # needs similar checks to see if the endpoint is ready. + self.agent_endpoint = self.mgr.http_server.agent + try: + assert self.agent_endpoint + assert self.agent_endpoint.ssl_certs.get_root_cert() + assert self.agent_endpoint.server_port + except Exception: + raise OrchestratorError( + 'Cannot deploy node-proxy daemons until cephadm endpoint has finished generating certs') + + listener_cert, listener_key = self.agent_endpoint.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host)) + cfg = { + 'target_ip': self.mgr.get_mgr_ip(), + 'target_port': self.agent_endpoint.server_port, + 'name': f'node-proxy.{daemon_spec.host}', + 'keyring': daemon_spec.keyring, + 'root_cert.pem': self.agent_endpoint.ssl_certs.get_root_cert(), + 'listener.crt': listener_cert, + 'listener.key': listener_key, + } + config = {'node-proxy.json': json.dumps(cfg)} + + return config, sorted([str(self.mgr.get_mgr_ip()), str(self.agent_endpoint.server_port), + self.agent_endpoint.ssl_certs.get_root_cert()]) + + def handle_hw_monitoring_setting(self) -> bool: + # function to apply or remove node-proxy service spec depending + # on whether the hw_mointoring config option is set or not. + # It should return True when it either creates or deletes a spec + # and False otherwise. + if self.mgr.hw_monitoring: + if 'node-proxy' not in self.mgr.spec_store: + spec = ServiceSpec( + service_type='node-proxy', + placement=PlacementSpec(host_pattern='*') + ) + self.mgr.spec_store.save(spec) + return True + return False + else: + if 'node-proxy' in self.mgr.spec_store: + self.mgr.spec_store.rm('node-proxy') + return True + return False + + def get_ssl_ctx(self) -> ssl.SSLContext: + ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert() + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = True + ssl_ctx.verify_mode = ssl.CERT_REQUIRED + ssl_ctx.load_verify_locations(cadata=ssl_root_crt) + return ssl_ctx + + def led(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]: + ssl_ctx: ssl.SSLContext = self.get_ssl_ctx() + header: MutableMapping[str, str] = {} + method: str = 'PATCH' if action in ['on', 'off'] else 'GET' + payload: Optional[Dict[str, str]] = None + addr: str = self.mgr.inventory.get_addr(hostname) + endpoint: List[str] = ['led', light_type] + _device: str = device if device else '' + + if light_type == 'drive': + endpoint.append(_device) + + if method == 'PATCH': + payload = dict(state=action) + + header = self.generate_auth_header(hostname) + + endpoint = f'/{"/".join(endpoint)}' + + try: + headers, result, status = http_req(hostname=addr, + port='8080', + headers=header, + method=method, + data=json.dumps(payload), + endpoint=endpoint, + ssl_ctx=ssl_ctx) + result_json = json.loads(result) + except HTTPError as e: + self.mgr.log.error(e) + raise + except URLError as e: + raise RuntimeError(e) + + return result_json + + def generate_auth_header(self, hostname: str) -> Dict[str, str]: + try: + username = self.mgr.node_proxy_cache.oob[hostname]['username'] + password = self.mgr.node_proxy_cache.oob[hostname]['password'] + auth: bytes = f'{username}:{password}'.encode('utf-8') + auth_str: str = base64.b64encode(auth).decode('utf-8') + header = {'Authorization': f'Basic {auth_str}'} + except KeyError as e: + self.mgr.log.error(f'Check oob information is provided for {hostname}.') + raise RuntimeError(e) + return header + + def shutdown(self, hostname: str, force: Optional[bool] = False) -> Dict[str, Any]: + ssl_ctx: ssl.SSLContext = self.get_ssl_ctx() + header: Dict[str, str] = self.generate_auth_header(hostname) + addr: str = self.mgr.inventory.get_addr(hostname) + + endpoint = '/shutdown' + payload: Dict[str, Optional[bool]] = dict(force=force) + + try: + headers, result, status = http_req(hostname=addr, + port='8080', + headers=header, + data=json.dumps(payload), + endpoint=endpoint, + ssl_ctx=ssl_ctx) + result_json = json.loads(result) + except HTTPError as e: + self.mgr.log.error(e) + raise + except URLError as e: + raise RuntimeError(e) + + return result_json + + def powercycle(self, hostname: str) -> Dict[str, Any]: + ssl_ctx: ssl.SSLContext = self.get_ssl_ctx() + header: Dict[str, str] = self.generate_auth_header(hostname) + addr: str = self.mgr.inventory.get_addr(hostname) + + endpoint = '/powercycle' + + try: + headers, result, status = http_req(hostname=addr, + port='8080', + headers=header, + data="{}", + endpoint=endpoint, + ssl_ctx=ssl_ctx) + result_json = json.loads(result) + except HTTPError as e: + self.mgr.log.error(e) + raise + except URLError as e: + raise RuntimeError(e) + + return result_json diff --git a/src/pybind/mgr/cephadm/tests/test_node_proxy.py b/src/pybind/mgr/cephadm/tests/test_node_proxy.py index b713d04cd5975..0c9ee127547c4 100644 --- a/src/pybind/mgr/cephadm/tests/test_node_proxy.py +++ b/src/pybind/mgr/cephadm/tests/test_node_proxy.py @@ -35,8 +35,8 @@ class FakeMgr: class TestNodeProxyEndpoint(helper.CPWebCase): mgr = FakeMgr() app = NodeProxyEndpoint(mgr) - mgr.agent_cache.agent_keys = {"host01": "fake-secret01", - "host02": "fake-secret02"} + mgr.node_proxy.keyrings = {"host01": "fake-secret01", + "host02": "fake-secret02"} mgr.node_proxy.oob = {"host01": {"username": "oob-user01", "password": "oob-pass01"}, "host02": {"username": "oob-user02", @@ -68,38 +68,38 @@ class TestNodeProxyEndpoint(helper.CPWebCase): self.assertStatus('400 Bad Request') def test_oob_data_misses_secret_field(self): - data = '{"cephx": {"name": "host01"}}' + data = '{"cephx": {"name": "node-proxy.host01"}}' self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'), ('Content-Length', str(len(data)))]) self.assertStatus('400 Bad Request') def test_oob_agent_not_running(self): - data = '{"cephx": {"name": "host03", "secret": "fake-secret03"}}' + data = '{"cephx": {"name": "node-proxy.host03", "secret": "fake-secret03"}}' self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'), ('Content-Length', str(len(data)))]) self.assertStatus('502 Bad Gateway') def test_oob_wrong_keyring(self): - data = '{"cephx": {"name": "host01", "secret": "wrong-keyring"}}' + data = '{"cephx": {"name": "node-proxy.host01", "secret": "wrong-keyring"}}' self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'), ('Content-Length', str(len(data)))]) self.assertStatus('403 Forbidden') def test_oob_ok(self): - data = '{"cephx": {"name": "host01", "secret": "fake-secret01"}}' + data = '{"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}}' self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'), ('Content-Length', str(len(data)))]) self.assertStatus('200 OK') def test_data_missing_patch(self): - data = '{"cephx": {"name": "host01", "secret": "fake-secret01"}}' + data = '{"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}}' self.getPage("/data", method="POST", body=data, headers=[('Content-Type', 'application/json'), ('Content-Length', str(len(data)))]) self.assertStatus('400 Bad Request') def test_data_raises_alert(self): patch = node_proxy_data.full_set_with_critical - data = {"cephx": {"name": "host01", "secret": "fake-secret01"}, "patch": patch} + data = {"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}, "patch": patch} data_str = json.dumps(data) self.getPage("/data", method="POST", body=data_str, headers=[('Content-Type', 'application/json'), ('Content-Length', str(len(data_str)))]) diff --git a/src/pybind/mgr/cephadm/utils.py b/src/pybind/mgr/cephadm/utils.py index 63672936c7cb3..3aedfbd86f008 100644 --- a/src/pybind/mgr/cephadm/utils.py +++ b/src/pybind/mgr/cephadm/utils.py @@ -31,7 +31,7 @@ RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['haproxy', 'nfs'] CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES # these daemon types use the ceph container image -CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs'] +CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs', 'node-proxy'] # these daemons do not use the ceph image. There are other daemons # that also don't use the ceph image, but we only care about those diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index f97b61e8f9b55..362badcc35eee 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -359,6 +359,33 @@ class Orchestrator(object): """ raise NotImplementedError() + def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> OrchResult[Dict[str, Any]]: + """ + Light a chassis or device ident LED. + + :param light_type: led type (chassis or device). + :param action: set or get status led. + :param hostname: the name of the host. + :param device: the device id (when light_type = 'device') + """ + raise NotImplementedError() + + def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> OrchResult[str]: + """ + Reboot a host. + + :param hostname: the name of the host being rebooted. + """ + raise NotImplementedError() + + def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> OrchResult[str]: + """ + Shutdown a host. + + :param hostname: the name of the host to shutdown. + """ + raise NotImplementedError() + def hardware_status(self, hostname: Optional[str] = None, category: Optional[str] = 'summary') -> OrchResult[str]: """ Display hardware status. @@ -869,6 +896,7 @@ def daemon_type_to_service(dtype: str) -> str: 'crashcollector': 'crash', # Specific Rook Daemon 'container': 'container', 'agent': 'agent', + 'node-proxy': 'node-proxy', 'snmp-gateway': 'snmp-gateway', 'elasticsearch': 'elasticsearch', 'jaeger-agent': 'jaeger-agent', @@ -901,6 +929,7 @@ def service_to_daemon_types(stype: str) -> List[str]: 'crash': ['crash'], 'container': ['container'], 'agent': ['agent'], + 'node-proxy': ['node-proxy'], 'snmp-gateway': ['snmp-gateway'], 'elasticsearch': ['elasticsearch'], 'jaeger-agent': ['jaeger-agent'], diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 27d91d1a4d354..22e1dd2f65271 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -591,6 +591,51 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule, return table.get_string() + class HardwareLightType(enum.Enum): + chassis = 'chassis' + device = 'drive' + + class HardwareLightAction(enum.Enum): + on = 'on' + off = 'off' + get = 'get' + + @_cli_write_command('orch hardware light') + def _hardware_light(self, + light_type: HardwareLightType, action: HardwareLightAction, + hostname: str, device: Optional[str] = None) -> HandleCommandResult: + """Enable or Disable a device or chassis LED""" + if light_type == self.HardwareLightType.device and not device: + return HandleCommandResult(stderr='you must pass a device ID.', + retval=-errno.ENOENT) + + completion = self.hardware_light(light_type.value, action.value, hostname, device) + data = raise_if_exception(completion) + output: str = '' + if action == self.HardwareLightAction.get: + status = 'on' if data["LocationIndicatorActive"] else 'off' + if light_type == self.HardwareLightType.device: + output = f'ident LED for {device} on {hostname} is: {status}' + else: + output = f'ident chassis LED for {hostname} is: {status}' + else: + pass + return HandleCommandResult(stdout=output) + + @_cli_write_command('orch hardware powercycle') + def _hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> HandleCommandResult: + """Reboot a host""" + completion = self.hardware_powercycle(hostname, yes_i_really_mean_it=yes_i_really_mean_it) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch hardware shutdown') + def _hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> HandleCommandResult: + """Shutdown a host""" + completion = self.hardware_shutdown(hostname, force, yes_i_really_mean_it=yes_i_really_mean_it) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + @_cli_write_command('orch host rm') def _remove_host(self, hostname: str, force: bool = False, offline: bool = False, rm_crush_entry: bool = False) -> HandleCommandResult: """Remove a host""" diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index cb81cf7b5c8b1..a8b679ee39696 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -755,7 +755,8 @@ class ServiceSpec(object): KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \ 'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \ 'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \ - 'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split() + 'elasticsearch jaeger-agent jaeger-collector jaeger-query ' \ + 'node-proxy'.split() REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split() MANAGED_CONFIG_OPTIONS = [ 'mds_join_fs', diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py index 643be06580b61..e92a2d1de7db8 100644 --- a/src/python-common/ceph/utils.py +++ b/src/python-common/ceph/utils.py @@ -1,8 +1,15 @@ import datetime import re import string +import ssl -from typing import Optional +from typing import Optional, MutableMapping, Tuple, Any +from urllib.error import HTTPError, URLError +from urllib.request import urlopen, Request + +import logging + +log = logging.getLogger(__name__) def datetime_now() -> datetime.datetime: @@ -121,3 +128,42 @@ def is_hex(s: str, strict: bool = True) -> bool: return False return True + + +def http_req(hostname: str = '', + port: str = '443', + method: Optional[str] = None, + headers: MutableMapping[str, str] = {}, + data: Optional[str] = None, + endpoint: str = '/', + scheme: str = 'https', + ssl_verify: bool = False, + timeout: Optional[int] = None, + ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]: + + if not ssl_ctx: + ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + if not ssl_verify: + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + else: + ssl_ctx.verify_mode = ssl.CERT_REQUIRED + + url: str = f'{scheme}://{hostname}:{port}{endpoint}' + _data = bytes(data, 'ascii') if data else None + _headers = headers + if data and not method: + method = 'POST' + if not _headers.get('Content-Type') and method in ['POST', 'PATCH']: + _headers['Content-Type'] = 'application/json' + try: + req = Request(url, _data, _headers, method=method) + with urlopen(req, context=ssl_ctx, timeout=timeout) as response: + response_str = response.read() + response_headers = response.headers + response_code = response.code + return response_headers, response_str.decode(), response_code + except (HTTPError, URLError) as e: + log.error(e) + # handle error here if needed + raise -- 2.39.5