From 204dcd5ccccba1525b1f5d38607cbabbd1ccde25 Mon Sep 17 00:00:00 2001 From: Guillaume Abrioux Date: Fri, 1 Dec 2023 08:11:31 +0000 Subject: [PATCH] node-proxy: Add a `NodeProxyManager` class The current approach with `init_node_proxy()` and `node_proxy_loop_check()` is 'cumbersome' and gives the heebie-jeebies. Sub-classing `Thread()` makes the code a bit more clearer and readable. Signed-off-by: Guillaume Abrioux (cherry picked from commit 2a840d7ce4d64dd38f7aea381207bbecfef629cd) --- src/cephadm/cephadm.py | 213 ++++++++++++---------- src/cephadm/cephadmlib/node_proxy/main.py | 14 +- src/cephadm/tests/test_agent.py | 4 +- 3 files changed, 128 insertions(+), 103 deletions(-) diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index 5c3084acd47..0aa63ca9d7f 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -37,10 +37,11 @@ from functools import wraps from glob import glob from io import StringIO from threading import Thread, Event +from pathlib import Path from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request -from pathlib import Path -from cephadmlib.node_proxy.main import NodeProxy, NodeProxyInitialization, NodeProxyFetchOobError +from cephadmlib.node_proxy.main import NodeProxy + FuncT = TypeVar('FuncT', bound=Callable) @@ -137,6 +138,33 @@ async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]: return data +def http_query(addr: str = '', + port: str = '', + data: Optional[bytes] = None, + endpoint: str = '', + ssl_ctx: Optional[Any] = None, + timeout: Optional[int] = 10) -> Tuple[int, str]: + + url = f'https://{addr}:{port}{endpoint}' + logger.debug(f'sending query to {url}') + try: + req = Request(url, data, {'Content-Type': 'application/json'}) + with urlopen(req, context=ssl_ctx, timeout=timeout) as response: + response_str = response.read() + response_status = response.status + except HTTPError as e: + logger.debug(f'{e.code} {e.reason}') + response_status = e.code + response_str = e.reason + except URLError as e: + logger.debug(f'{e.reason}') + response_status = -1 + response_str = e.reason + except Exception: + raise + return (response_status, response_str) + + class EndPoint: """EndPoint representing an ip:port format""" @@ -4751,6 +4779,71 @@ class MgrListener(Thread): raise RuntimeError('No valid data received.') +class NodeProxyManager(Thread): + def __init__(self, agent: 'CephadmAgent', event: Event): + super().__init__() + self.agent = agent + self.event = event + self.stop = False + + def run(self) -> None: + self.event.wait() + self.ssl_ctx = self.agent.ssl_ctx + self.init() + self.loop() + + def init(self) -> None: + node_proxy_meta = { + 'cephx': { + 'name': self.agent.host, + 'secret': self.agent.keyring + } + } + status, result = http_query(addr=self.agent.target_ip, + port=self.agent.target_port, + data=json.dumps(node_proxy_meta).encode('ascii'), + endpoint='/node-proxy/oob', + ssl_ctx=self.ssl_ctx) + if status != 200: + msg = f'No out of band tool details could be loaded: {status}, {result}' + logger.debug(msg) + raise RuntimeError(msg) + + result_json = json.loads(result) + kwargs = { + 'host': result_json['result']['addr'], + 'username': result_json['result']['username'], + 'password': result_json['result']['password'], + 'cephx': node_proxy_meta['cephx'], + 'mgr_target_ip': self.agent.target_ip, + 'mgr_target_port': self.agent.target_port + } + if result_json['result'].get('port'): + kwargs['port'] = result_json['result']['port'] + + self.node_proxy = NodeProxy(**kwargs) + self.node_proxy.start() + + def loop(self) -> None: + while not self.stop: + try: + status = self.node_proxy.check_status() + label = 'Ok' if status else 'Critical' + logger.debug(f'node-proxy status: {label}') + except Exception as e: + logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') + time.sleep(120) + self.init() + else: + logger.debug('node-proxy alive, next check in 60sec.') + time.sleep(60) + + def shutdown(self) -> None: + self.stop = True + # if `self.node_proxy.shutdown()` is called before self.start(), it will fail. + if self.__dict__.get('node_proxy'): + self.node_proxy.shutdown() + class CephadmAgent(): @@ -4791,7 +4884,11 @@ class CephadmAgent(): self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] self.recent_iteration_index: int = 0 self.cached_ls_values: Dict[str, Dict[str, str]] = {} - self.t_node_proxy: Optional["NodeProxy"] = None + self.ssl_ctx = ssl.create_default_context() + self.ssl_ctx.check_hostname = True + self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED + self.node_proxy_mgr_event = Event() + self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event) def validate(self, config: Dict[str, str] = {}) -> None: # check for the required files @@ -4863,6 +4960,8 @@ WantedBy=ceph-{fsid}.target def shutdown(self) -> None: self.stop = True + if self.node_proxy_mgr.is_alive(): + self.node_proxy_mgr.shutdown() if self.mgr_listener.is_alive(): self.mgr_listener.shutdown() if self.ls_gatherer.is_alive(): @@ -4901,92 +5000,12 @@ WantedBy=ceph-{fsid}.target self.device_enhanced_scan = True self.volume_gatherer.update_func(lambda: self._ceph_volume(enhanced=self.device_enhanced_scan)) - def query_endpoint(self, - addr: str = '', - port: str = '', - data: Optional[Union[Dict[str, str], str]] = None, - endpoint: str = '', - ssl_ctx: Optional[Any] = None, - timeout: Optional[int] = 10) -> Tuple[int, str]: - _addr = addr if addr else self.target_ip - _port = port if port else self.target_port - url = f'https://{_addr}:{_port}{endpoint}' - logger.info(f"sending query to {url}") - try: - req = Request(url, data, {'Content-Type': 'application/json'}) - send_time = time.monotonic() - with urlopen(req, context=ssl_ctx, timeout=timeout) as response: - response_str = response.read() - response_json = json.loads(response_str) - total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds() - logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.') - response_status = response.status - except HTTPError as e: - logger.debug(f"{e.code} {e.reason}") - response_status = e.code - response_str = e.reason - except URLError as e: - logger.debug(f"{e.reason}") - response_status = -1 - response_str = e.reason - except Exception: - raise - return (response_status, response_str) - - def node_proxy_loop_check(self, ssl_ctx: Any) -> None: - while True: - try: - if isinstance(self.t_node_proxy, NodeProxy): - status = self.t_node_proxy.check_status() - label = 'Ok' if status else 'Critical' - logger.debug(f'node-proxy status: {label}') - else: - raise NodeProxyInitialization("starting node-proxy...") - except Exception as e: - logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}') - try: - self.init_node_proxy(ssl_ctx) - except NodeProxyFetchOobError: - logger.info("No oob details could be loaded. " - "Aborting node-proxy initialization. " - "Will retry in 120s.") - time.sleep(120) - - def init_node_proxy(self, ssl_ctx: Any) -> None: - node_proxy_meta = { - 'cephx': { - 'name': self.host, - 'secret': self.keyring - } - } - status, result = self.query_endpoint(data=json.dumps(node_proxy_meta).encode('ascii'), - endpoint='/node-proxy/oob', - ssl_ctx=ssl_ctx) - if status != 200: - msg = f"Couldn't load oob details: {status}, {result}" - logger.debug(msg) - raise NodeProxyFetchOobError(msg) - result_json = json.loads(result) - kwargs = { - 'host': result_json['result']['addr'], - 'username': result_json['result']['username'], - 'password': result_json['result']['password'], - 'cephx': node_proxy_meta['cephx'], - 'mgr_target_ip': self.target_ip, - 'mgr_target_port': self.target_port - } - if result_json['result'].get('port'): - kwargs['port'] = result_json['result']['port'] - - self.t_node_proxy = NodeProxy(**kwargs) - self.t_node_proxy.start() - def run(self) -> None: self.pull_conf_settings() - ssl_ctx = ssl.create_default_context() - ssl_ctx.check_hostname = True - ssl_ctx.verify_mode = ssl.CERT_REQUIRED - ssl_ctx.load_verify_locations(self.ca_path) + self.ssl_ctx.load_verify_locations(self.ca_path) + # only after self.pull_conf_settings() was called we can actually start + # node-proxy + self.node_proxy_mgr_event.set() try: for _ in range(1001): @@ -5008,9 +5027,8 @@ WantedBy=ceph-{fsid}.target if not self.volume_gatherer.is_alive(): self.volume_gatherer.start() - # initiate node-proxy thread - node_proxy_loop_thread = Thread(target=self.node_proxy_loop_check, args=(ssl_ctx,)) - node_proxy_loop_thread.start() + if not self.node_proxy_mgr.is_alive(): + self.node_proxy_mgr.start() while not self.stop: start_time = time.monotonic() @@ -5038,9 +5056,18 @@ WantedBy=ceph-{fsid}.target data = data.encode('ascii') try: - self.query_endpoint(data=data, - endpoint='/data', - ssl_ctx=ssl_ctx) + send_time = time.monotonic() + status, response = http_query(addr=self.target_ip, + port=self.target_port, + data=data, + endpoint='/data', + ssl_ctx=self.ssl_ctx) + response_json = json.loads(response) + if status != 200: + logger.error(f'HTTP error {status} while querying agent endpoint: {response}') + raise RuntimeError + total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds() + logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.') except Exception as e: logger.error(f'Failed to send metadata to mgr: {e}') diff --git a/src/cephadm/cephadmlib/node_proxy/main.py b/src/cephadm/cephadmlib/node_proxy/main.py index 0575340d0ec..339d0d2c853 100644 --- a/src/cephadm/cephadmlib/node_proxy/main.py +++ b/src/cephadm/cephadmlib/node_proxy/main.py @@ -23,14 +23,6 @@ DEFAULT_CONFIG = { } -class NodeProxyInitialization(Exception): - pass - - -class NodeProxyFetchOobError(Exception): - pass - - class NodeProxy(Thread): def __init__(self, **kw: Dict[str, Any]) -> None: super().__init__() @@ -46,6 +38,12 @@ class NodeProxy(Thread): self.exc = e return + def shutdown(self) -> None: + self.log.logger.info('Shutting down node-proxy...') + self.system.client.logout() + self.system.stop_update_loop() + self.reporter_agent.stop() + def check_auth(self, realm: str, username: str, password: str) -> bool: return self.__dict__['username'] == username and \ self.__dict__['password'] == password diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py index f9cf201e275..c90d0385f3d 100644 --- a/src/cephadm/tests/test_agent.py +++ b/src/cephadm/tests/test_agent.py @@ -412,7 +412,7 @@ def test_agent_get_ls(_ls_subset, _ls, cephadm_fs): @mock.patch("threading.Event.clear") @mock.patch("threading.Event.wait") @mock.patch("urllib.request.Request.__init__") -@mock.patch("cephadm.urlopen") +@mock.patch("cephadmlib.agent.urlopen") @mock.patch("cephadm.list_networks") @mock.patch("cephadm.HostFacts.dump") @mock.patch("cephadm.HostFacts.__init__", lambda _, __: None) @@ -527,7 +527,7 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, 'port': str(open_listener_port) } _RQ_init.assert_called_with( - f'https://{target_ip}:{target_port}/data/', + f'https://{target_ip}:{target_port}/data', json.dumps(expected_data).encode('ascii'), {'Content-Type': 'application/json'} ) -- 2.39.5