From: Guillaume Abrioux Date: Fri, 1 Dec 2023 08:11:31 +0000 (+0000) Subject: node-proxy: Add a `NodeProxyManager` class X-Git-Tag: v19.3.0~102^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8d129efd850dff1306c4084f52da84594d8146ef;p=ceph.git node-proxy: Add a `NodeProxyManager` class The current approach with `init_node_proxy()` and `node_proxy_loop_check()` is 'cumbersome' and gives the heebie-jeebies. Sub-classing `Thread()` makes the code a bit more clearer and readable. Signed-off-by: Guillaume Abrioux --- diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index e7193ccc29b6..879aec568b5b 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -28,9 +28,8 @@ from functools import wraps from glob import glob from io import StringIO from threading import Thread, Event -from urllib.request import urlopen, Request from pathlib import Path -from cephadmlib.node_proxy.main import NodeProxy, NodeProxyInitialization, NodeProxyFetchOobError +from cephadmlib.node_proxy.main import NodeProxy from cephadmlib.constants import ( # default images @@ -178,6 +177,7 @@ from cephadmlib.daemons import ( SNMPGateway, Tracing, ) +from cephadmlib.agent import http_query FuncT = TypeVar('FuncT', bound=Callable) @@ -1312,6 +1312,71 @@ class MgrListener(Thread): raise RuntimeError('No valid data received.') +class NodeProxyManager(Thread): + def __init__(self, agent: 'CephadmAgent', event: Event): + super().__init__() + self.agent = agent + self.event = event + self.stop = False + + def run(self) -> None: + self.event.wait() + self.ssl_ctx = self.agent.ssl_ctx + self.init() + self.loop() + + def init(self) -> None: + node_proxy_meta = { + 'cephx': { + 'name': self.agent.host, + 'secret': self.agent.keyring + } + } + status, result = http_query(addr=self.agent.target_ip, + port=self.agent.target_port, + data=json.dumps(node_proxy_meta).encode('ascii'), + endpoint='/node-proxy/oob', + ssl_ctx=self.ssl_ctx) + if status != 200: + msg = f'No out of band tool details could be loaded: {status}, {result}' + logger.debug(msg) + raise RuntimeError(msg) + + result_json = json.loads(result) + kwargs = { + 'host': result_json['result']['addr'], + 'username': result_json['result']['username'], + 'password': result_json['result']['password'], + 'cephx': node_proxy_meta['cephx'], + 'mgr_target_ip': self.agent.target_ip, + 'mgr_target_port': self.agent.target_port + } + if result_json['result'].get('port'): + kwargs['port'] = result_json['result']['port'] + + self.node_proxy = NodeProxy(**kwargs) + self.node_proxy.start() + + def loop(self) -> None: + while not self.stop: + try: + status = self.node_proxy.check_status() + label = 'Ok' if status else 'Critical' + logger.debug(f'node-proxy status: {label}') + except Exception as e: + logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') + time.sleep(120) + self.init() + else: + logger.debug('node-proxy alive, next check in 60sec.') + time.sleep(60) + + def shutdown(self) -> None: + self.stop = True + # if `self.node_proxy.shutdown()` is called before self.start(), it will fail. + if self.__dict__.get('node_proxy'): + self.node_proxy.shutdown() + @register_daemon_form class CephadmAgent(DaemonForm): @@ -1365,7 +1430,11 @@ class CephadmAgent(DaemonForm): self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] self.recent_iteration_index: int = 0 self.cached_ls_values: Dict[str, Dict[str, str]] = {} - self.t_node_proxy: Optional["NodeProxy"] = None + self.ssl_ctx = ssl.create_default_context() + self.ssl_ctx.check_hostname = True + self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED + self.node_proxy_mgr_event = Event() + self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event) def validate(self, config: Dict[str, str] = {}) -> None: # check for the required files @@ -1421,6 +1490,8 @@ class CephadmAgent(DaemonForm): def shutdown(self) -> None: self.stop = True + if self.node_proxy_mgr.is_alive(): + self.node_proxy_mgr.shutdown() if self.mgr_listener.is_alive(): self.mgr_listener.shutdown() if self.ls_gatherer.is_alive(): @@ -1459,92 +1530,12 @@ class CephadmAgent(DaemonForm): self.device_enhanced_scan = True self.volume_gatherer.update_func(lambda: self._ceph_volume(enhanced=self.device_enhanced_scan)) - def query_endpoint(self, - addr: str = '', - port: str = '', - data: Optional[Union[Dict[str, str], str]] = None, - endpoint: str = '', - ssl_ctx: Optional[Any] = None, - timeout: Optional[int] = 10) -> Tuple[int, str]: - _addr = addr if addr else self.target_ip - _port = port if port else self.target_port - url = f'https://{_addr}:{_port}{endpoint}' - logger.info(f"sending query to {url}") - try: - req = Request(url, data, {'Content-Type': 'application/json'}) - send_time = time.monotonic() - with urlopen(req, context=ssl_ctx, timeout=timeout) as response: - response_str = response.read() - response_json = json.loads(response_str) - total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds() - logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.') - response_status = response.status - except HTTPError as e: - logger.debug(f"{e.code} {e.reason}") - response_status = e.code - response_str = e.reason - except URLError as e: - logger.debug(f"{e.reason}") - response_status = -1 - response_str = e.reason - except Exception: - raise - return (response_status, response_str) - - def node_proxy_loop_check(self, ssl_ctx: Any) -> None: - while True: - try: - if isinstance(self.t_node_proxy, NodeProxy): - status = self.t_node_proxy.check_status() - label = 'Ok' if status else 'Critical' - logger.debug(f'node-proxy status: {label}') - else: - raise NodeProxyInitialization("starting node-proxy...") - except Exception as e: - logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') - try: - self.init_node_proxy(ssl_ctx) - except NodeProxyFetchOobError: - logger.info("No oob details could be loaded. " - "Aborting node-proxy initialization. " - "Will retry in 120s.") - time.sleep(120) - - def init_node_proxy(self, ssl_ctx: Any) -> None: - node_proxy_meta = { - 'cephx': { - 'name': self.host, - 'secret': self.keyring - } - } - status, result = self.query_endpoint(data=json.dumps(node_proxy_meta).encode('ascii'), - endpoint='/node-proxy/oob', - ssl_ctx=ssl_ctx) - if status != 200: - msg = f"Couldn't load oob details: {status}, {result}" - logger.debug(msg) - raise NodeProxyFetchOobError(msg) - result_json = json.loads(result) - kwargs = { - 'host': result_json['result']['addr'], - 'username': result_json['result']['username'], - 'password': result_json['result']['password'], - 'cephx': node_proxy_meta['cephx'], - 'mgr_target_ip': self.target_ip, - 'mgr_target_port': self.target_port - } - if result_json['result'].get('port'): - kwargs['port'] = result_json['result']['port'] - - self.t_node_proxy = NodeProxy(**kwargs) - self.t_node_proxy.start() - def run(self) -> None: self.pull_conf_settings() - ssl_ctx = ssl.create_default_context() - ssl_ctx.check_hostname = True - ssl_ctx.verify_mode = ssl.CERT_REQUIRED - ssl_ctx.load_verify_locations(self.ca_path) + self.ssl_ctx.load_verify_locations(self.ca_path) + # only after self.pull_conf_settings() was called we can actually start + # node-proxy + self.node_proxy_mgr_event.set() try: for _ in range(1001): @@ -1566,9 +1557,8 @@ class CephadmAgent(DaemonForm): if not self.volume_gatherer.is_alive(): self.volume_gatherer.start() - # initiate node-proxy thread - node_proxy_loop_thread = Thread(target=self.node_proxy_loop_check, args=(ssl_ctx,)) - node_proxy_loop_thread.start() + if not self.node_proxy_mgr.is_alive(): + self.node_proxy_mgr.start() while not self.stop: start_time = time.monotonic() @@ -1596,9 +1586,18 @@ class CephadmAgent(DaemonForm): data = data.encode('ascii') try: - self.query_endpoint(data=data, - endpoint='/data', - ssl_ctx=ssl_ctx) + send_time = time.monotonic() + status, response = http_query(addr=self.target_ip, + port=self.target_port, + data=data, + endpoint='/data', + ssl_ctx=self.ssl_ctx) + response_json = json.loads(response) + if status != 200: + logger.error(f'HTTP error {status} while querying agent endpoint: {response}') + raise RuntimeError + total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds() + logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.') except Exception as e: logger.error(f'Failed to send metadata to mgr: {e}') diff --git a/src/cephadm/cephadmlib/agent.py b/src/cephadm/cephadmlib/agent.py new file mode 100644 index 000000000000..71924c39ccfe --- /dev/null +++ b/src/cephadm/cephadmlib/agent.py @@ -0,0 +1,33 @@ +from urllib.error import HTTPError, URLError +from urllib.request import urlopen, Request +from typing import Optional, Any, Tuple +import logging + +logger = logging.getLogger() + + +def http_query(addr: str = '', + port: str = '', + data: Optional[bytes] = None, + endpoint: str = '', + ssl_ctx: Optional[Any] = None, + timeout: Optional[int] = 10) -> Tuple[int, str]: + + url = f'https://{addr}:{port}{endpoint}' + logger.debug(f'sending query to {url}') + try: + req = Request(url, data, {'Content-Type': 'application/json'}) + with urlopen(req, context=ssl_ctx, timeout=timeout) as response: + response_str = response.read() + response_status = response.status + except HTTPError as e: + logger.debug(f'{e.code} {e.reason}') + response_status = e.code + response_str = e.reason + except URLError as e: + logger.debug(f'{e.reason}') + response_status = -1 + response_str = e.reason + except Exception: + raise + return (response_status, response_str) diff --git a/src/cephadm/cephadmlib/node_proxy/main.py b/src/cephadm/cephadmlib/node_proxy/main.py index 0575340d0ecd..339d0d2c8533 100644 --- a/src/cephadm/cephadmlib/node_proxy/main.py +++ b/src/cephadm/cephadmlib/node_proxy/main.py @@ -23,14 +23,6 @@ DEFAULT_CONFIG = { } -class NodeProxyInitialization(Exception): - pass - - -class NodeProxyFetchOobError(Exception): - pass - - class NodeProxy(Thread): def __init__(self, **kw: Dict[str, Any]) -> None: super().__init__() @@ -46,6 +38,12 @@ class NodeProxy(Thread): self.exc = e return + def shutdown(self) -> None: + self.log.logger.info('Shutting down node-proxy...') + self.system.client.logout() + self.system.stop_update_loop() + self.reporter_agent.stop() + def check_auth(self, realm: str, username: str, password: str) -> bool: return self.__dict__['username'] == username and \ self.__dict__['password'] == password diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py index 4904cb4f61fc..52cce74e1fb8 100644 --- a/src/cephadm/tests/test_agent.py +++ b/src/cephadm/tests/test_agent.py @@ -416,7 +416,7 @@ def test_agent_get_ls(_ls_subset, _ls, cephadm_fs): @mock.patch("threading.Event.clear") @mock.patch("threading.Event.wait") @mock.patch("urllib.request.Request.__init__") -@mock.patch("cephadm.urlopen") +@mock.patch("cephadmlib.agent.urlopen") @mock.patch("cephadm.list_networks") @mock.patch("cephadm.HostFacts.dump") @mock.patch("cephadm.HostFacts.__init__", lambda _, __: None) @@ -531,7 +531,7 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, 'port': str(open_listener_port) } _RQ_init.assert_called_with( - f'https://{target_ip}:{target_port}/data/', + f'https://{target_ip}:{target_port}/data', json.dumps(expected_data).encode('ascii'), {'Content-Type': 'application/json'} )