]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
node-proxy: Add a `NodeProxyManager` class
authorGuillaume Abrioux <gabrioux@ibm.com>
Fri, 1 Dec 2023 08:11:31 +0000 (08:11 +0000)
committerGuillaume Abrioux <gabrioux@ibm.com>
Thu, 25 Jan 2024 15:07:20 +0000 (15:07 +0000)
The current approach with `init_node_proxy()` and `node_proxy_loop_check()`
is 'cumbersome' and gives the heebie-jeebies.

Sub-classing `Thread()` makes the code a bit more clearer and readable.

Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
src/cephadm/cephadm.py
src/cephadm/cephadmlib/agent.py [new file with mode: 0644]
src/cephadm/cephadmlib/node_proxy/main.py
src/cephadm/tests/test_agent.py

index e7193ccc29b6b3ba63af20c27e4c3617a44b74cc..879aec568b5b13b5f2da06206ee62b31ef3aeb64 100755 (executable)
@@ -28,9 +28,8 @@ from functools import wraps
 from glob import glob
 from io import StringIO
 from threading import Thread, Event
-from urllib.request import urlopen, Request
 from pathlib import Path
-from cephadmlib.node_proxy.main import NodeProxy, NodeProxyInitialization, NodeProxyFetchOobError
+from cephadmlib.node_proxy.main import NodeProxy
 
 from cephadmlib.constants import (
     # default images
@@ -178,6 +177,7 @@ from cephadmlib.daemons import (
     SNMPGateway,
     Tracing,
 )
+from cephadmlib.agent import http_query
 
 
 FuncT = TypeVar('FuncT', bound=Callable)
@@ -1312,6 +1312,71 @@ class MgrListener(Thread):
             raise RuntimeError('No valid data received.')
 
 
+class NodeProxyManager(Thread):
+    def __init__(self, agent: 'CephadmAgent', event: Event):
+        super().__init__()
+        self.agent = agent
+        self.event = event
+        self.stop = False
+
+    def run(self) -> None:
+        self.event.wait()
+        self.ssl_ctx = self.agent.ssl_ctx
+        self.init()
+        self.loop()
+
+    def init(self) -> None:
+        node_proxy_meta = {
+            'cephx': {
+                'name': self.agent.host,
+                'secret': self.agent.keyring
+            }
+        }
+        status, result = http_query(addr=self.agent.target_ip,
+                                    port=self.agent.target_port,
+                                    data=json.dumps(node_proxy_meta).encode('ascii'),
+                                    endpoint='/node-proxy/oob',
+                                    ssl_ctx=self.ssl_ctx)
+        if status != 200:
+            msg = f'No out of band tool details could be loaded: {status}, {result}'
+            logger.debug(msg)
+            raise RuntimeError(msg)
+
+        result_json = json.loads(result)
+        kwargs = {
+            'host': result_json['result']['addr'],
+            'username': result_json['result']['username'],
+            'password': result_json['result']['password'],
+            'cephx': node_proxy_meta['cephx'],
+            'mgr_target_ip': self.agent.target_ip,
+            'mgr_target_port': self.agent.target_port
+        }
+        if result_json['result'].get('port'):
+            kwargs['port'] = result_json['result']['port']
+
+        self.node_proxy = NodeProxy(**kwargs)
+        self.node_proxy.start()
+
+    def loop(self) -> None:
+        while not self.stop:
+            try:
+                status = self.node_proxy.check_status()
+                label = 'Ok' if status else 'Critical'
+                logger.debug(f'node-proxy status: {label}')
+            except Exception as e:
+                logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
+                time.sleep(120)
+                self.init()
+            else:
+                logger.debug('node-proxy alive, next check in 60sec.')
+                time.sleep(60)
+
+    def shutdown(self) -> None:
+        self.stop = True
+        # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
+        if self.__dict__.get('node_proxy'):
+            self.node_proxy.shutdown()
+
 
 @register_daemon_form
 class CephadmAgent(DaemonForm):
@@ -1365,7 +1430,11 @@ class CephadmAgent(DaemonForm):
         self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
         self.recent_iteration_index: int = 0
         self.cached_ls_values: Dict[str, Dict[str, str]] = {}
-        self.t_node_proxy: Optional["NodeProxy"] = None
+        self.ssl_ctx = ssl.create_default_context()
+        self.ssl_ctx.check_hostname = True
+        self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+        self.node_proxy_mgr_event = Event()
+        self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event)
 
     def validate(self, config: Dict[str, str] = {}) -> None:
         # check for the required files
@@ -1421,6 +1490,8 @@ class CephadmAgent(DaemonForm):
 
     def shutdown(self) -> None:
         self.stop = True
+        if self.node_proxy_mgr.is_alive():
+            self.node_proxy_mgr.shutdown()
         if self.mgr_listener.is_alive():
             self.mgr_listener.shutdown()
         if self.ls_gatherer.is_alive():
@@ -1459,92 +1530,12 @@ class CephadmAgent(DaemonForm):
             self.device_enhanced_scan = True
         self.volume_gatherer.update_func(lambda: self._ceph_volume(enhanced=self.device_enhanced_scan))
 
-    def query_endpoint(self,
-                       addr: str = '',
-                       port: str = '',
-                       data: Optional[Union[Dict[str, str], str]] = None,
-                       endpoint: str = '',
-                       ssl_ctx: Optional[Any] = None,
-                       timeout: Optional[int] = 10) -> Tuple[int, str]:
-        _addr = addr if addr else self.target_ip
-        _port = port if port else self.target_port
-        url = f'https://{_addr}:{_port}{endpoint}'
-        logger.info(f"sending query to {url}")
-        try:
-            req = Request(url, data, {'Content-Type': 'application/json'})
-            send_time = time.monotonic()
-            with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
-                response_str = response.read()
-                response_json = json.loads(response_str)
-                total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
-                logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.')
-                response_status = response.status
-        except HTTPError as e:
-            logger.debug(f"{e.code} {e.reason}")
-            response_status = e.code
-            response_str = e.reason
-        except URLError as e:
-            logger.debug(f"{e.reason}")
-            response_status = -1
-            response_str = e.reason
-        except Exception:
-            raise
-        return (response_status, response_str)
-
-    def node_proxy_loop_check(self, ssl_ctx: Any) -> None:
-        while True:
-            try:
-                if isinstance(self.t_node_proxy, NodeProxy):
-                    status = self.t_node_proxy.check_status()
-                    label = 'Ok' if status else 'Critical'
-                    logger.debug(f'node-proxy status: {label}')
-                else:
-                    raise NodeProxyInitialization("starting node-proxy...")
-            except Exception as e:
-                logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
-                try:
-                    self.init_node_proxy(ssl_ctx)
-                except NodeProxyFetchOobError:
-                    logger.info("No oob details could be loaded. "
-                                "Aborting node-proxy initialization. "
-                                "Will retry in 120s.")
-                    time.sleep(120)
-
-    def init_node_proxy(self, ssl_ctx: Any) -> None:
-        node_proxy_meta = {
-            'cephx': {
-                'name': self.host,
-                'secret': self.keyring
-            }
-        }
-        status, result = self.query_endpoint(data=json.dumps(node_proxy_meta).encode('ascii'),
-                                             endpoint='/node-proxy/oob',
-                                             ssl_ctx=ssl_ctx)
-        if status != 200:
-            msg = f"Couldn't load oob details: {status}, {result}"
-            logger.debug(msg)
-            raise NodeProxyFetchOobError(msg)
-        result_json = json.loads(result)
-        kwargs = {
-            'host': result_json['result']['addr'],
-            'username': result_json['result']['username'],
-            'password': result_json['result']['password'],
-            'cephx': node_proxy_meta['cephx'],
-            'mgr_target_ip': self.target_ip,
-            'mgr_target_port': self.target_port
-        }
-        if result_json['result'].get('port'):
-            kwargs['port'] = result_json['result']['port']
-
-        self.t_node_proxy = NodeProxy(**kwargs)
-        self.t_node_proxy.start()
-
     def run(self) -> None:
         self.pull_conf_settings()
-        ssl_ctx = ssl.create_default_context()
-        ssl_ctx.check_hostname = True
-        ssl_ctx.verify_mode = ssl.CERT_REQUIRED
-        ssl_ctx.load_verify_locations(self.ca_path)
+        self.ssl_ctx.load_verify_locations(self.ca_path)
+        # only after self.pull_conf_settings() was called we can actually start
+        # node-proxy
+        self.node_proxy_mgr_event.set()
 
         try:
             for _ in range(1001):
@@ -1566,9 +1557,8 @@ class CephadmAgent(DaemonForm):
         if not self.volume_gatherer.is_alive():
             self.volume_gatherer.start()
 
-        # initiate node-proxy thread
-        node_proxy_loop_thread = Thread(target=self.node_proxy_loop_check, args=(ssl_ctx,))
-        node_proxy_loop_thread.start()
+        if not self.node_proxy_mgr.is_alive():
+            self.node_proxy_mgr.start()
 
         while not self.stop:
             start_time = time.monotonic()
@@ -1596,9 +1586,18 @@ class CephadmAgent(DaemonForm):
             data = data.encode('ascii')
 
             try:
-                self.query_endpoint(data=data,
-                                    endpoint='/data',
-                                    ssl_ctx=ssl_ctx)
+                send_time = time.monotonic()
+                status, response = http_query(addr=self.target_ip,
+                                              port=self.target_port,
+                                              data=data,
+                                              endpoint='/data',
+                                              ssl_ctx=self.ssl_ctx)
+                response_json = json.loads(response)
+                if status != 200:
+                    logger.error(f'HTTP error {status} while querying agent endpoint: {response}')
+                    raise RuntimeError
+                total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
+                logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.')
             except Exception as e:
                 logger.error(f'Failed to send metadata to mgr: {e}')
 
diff --git a/src/cephadm/cephadmlib/agent.py b/src/cephadm/cephadmlib/agent.py
new file mode 100644 (file)
index 0000000..71924c3
--- /dev/null
@@ -0,0 +1,33 @@
+from urllib.error import HTTPError, URLError
+from urllib.request import urlopen, Request
+from typing import Optional, Any, Tuple
+import logging
+
+logger = logging.getLogger()
+
+
+def http_query(addr: str = '',
+               port: str = '',
+               data: Optional[bytes] = None,
+               endpoint: str = '',
+               ssl_ctx: Optional[Any] = None,
+               timeout: Optional[int] = 10) -> Tuple[int, str]:
+
+    url = f'https://{addr}:{port}{endpoint}'
+    logger.debug(f'sending query to {url}')
+    try:
+        req = Request(url, data, {'Content-Type': 'application/json'})
+        with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+            response_str = response.read()
+            response_status = response.status
+    except HTTPError as e:
+        logger.debug(f'{e.code} {e.reason}')
+        response_status = e.code
+        response_str = e.reason
+    except URLError as e:
+        logger.debug(f'{e.reason}')
+        response_status = -1
+        response_str = e.reason
+    except Exception:
+        raise
+    return (response_status, response_str)
index 0575340d0ecd20dddbc49e12dab9da454c74a8c0..339d0d2c85338e7ebe8d6539a68d2c682316bc1b 100644 (file)
@@ -23,14 +23,6 @@ DEFAULT_CONFIG = {
 }
 
 
-class NodeProxyInitialization(Exception):
-    pass
-
-
-class NodeProxyFetchOobError(Exception):
-    pass
-
-
 class NodeProxy(Thread):
     def __init__(self, **kw: Dict[str, Any]) -> None:
         super().__init__()
@@ -46,6 +38,12 @@ class NodeProxy(Thread):
             self.exc = e
             return
 
+    def shutdown(self) -> None:
+        self.log.logger.info('Shutting down node-proxy...')
+        self.system.client.logout()
+        self.system.stop_update_loop()
+        self.reporter_agent.stop()
+
     def check_auth(self, realm: str, username: str, password: str) -> bool:
         return self.__dict__['username'] == username and \
             self.__dict__['password'] == password
index 4904cb4f61fc1f8406e29c5cd402e2fe03f017ee..52cce74e1fb8f32c63411f0b4d2b799885cab096 100644 (file)
@@ -416,7 +416,7 @@ def test_agent_get_ls(_ls_subset, _ls, cephadm_fs):
 @mock.patch("threading.Event.clear")
 @mock.patch("threading.Event.wait")
 @mock.patch("urllib.request.Request.__init__")
-@mock.patch("cephadm.urlopen")
+@mock.patch("cephadmlib.agent.urlopen")
 @mock.patch("cephadm.list_networks")
 @mock.patch("cephadm.HostFacts.dump")
 @mock.patch("cephadm.HostFacts.__init__", lambda _, __: None)
@@ -531,7 +531,7 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start,
            'port': str(open_listener_port)
         }
         _RQ_init.assert_called_with(
-            f'https://{target_ip}:{target_port}/data/',
+            f'https://{target_ip}:{target_port}/data',
             json.dumps(expected_data).encode('ascii'),
             {'Content-Type': 'application/json'}
         )