from glob import glob
from io import StringIO
from threading import Thread, Event
-from urllib.request import urlopen, Request
from pathlib import Path
-from cephadmlib.node_proxy.main import NodeProxy, NodeProxyInitialization, NodeProxyFetchOobError
+from cephadmlib.node_proxy.main import NodeProxy
from cephadmlib.constants import (
# default images
SNMPGateway,
Tracing,
)
+from cephadmlib.agent import http_query
FuncT = TypeVar('FuncT', bound=Callable)
raise RuntimeError('No valid data received.')
+class NodeProxyManager(Thread):
+ def __init__(self, agent: 'CephadmAgent', event: Event):
+ super().__init__()
+ self.agent = agent
+ self.event = event
+ self.stop = False
+
+ def run(self) -> None:
+ self.event.wait()
+ self.ssl_ctx = self.agent.ssl_ctx
+ self.init()
+ self.loop()
+
+ def init(self) -> None:
+ node_proxy_meta = {
+ 'cephx': {
+ 'name': self.agent.host,
+ 'secret': self.agent.keyring
+ }
+ }
+ status, result = http_query(addr=self.agent.target_ip,
+ port=self.agent.target_port,
+ data=json.dumps(node_proxy_meta).encode('ascii'),
+ endpoint='/node-proxy/oob',
+ ssl_ctx=self.ssl_ctx)
+ if status != 200:
+ msg = f'No out of band tool details could be loaded: {status}, {result}'
+ logger.debug(msg)
+ raise RuntimeError(msg)
+
+ result_json = json.loads(result)
+ kwargs = {
+ 'host': result_json['result']['addr'],
+ 'username': result_json['result']['username'],
+ 'password': result_json['result']['password'],
+ 'cephx': node_proxy_meta['cephx'],
+ 'mgr_target_ip': self.agent.target_ip,
+ 'mgr_target_port': self.agent.target_port
+ }
+ if result_json['result'].get('port'):
+ kwargs['port'] = result_json['result']['port']
+
+ self.node_proxy = NodeProxy(**kwargs)
+ self.node_proxy.start()
+
+ def loop(self) -> None:
+ while not self.stop:
+ try:
+ status = self.node_proxy.check_status()
+ label = 'Ok' if status else 'Critical'
+ logger.debug(f'node-proxy status: {label}')
+ except Exception as e:
+ logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
+ time.sleep(120)
+ self.init()
+ else:
+ logger.debug('node-proxy alive, next check in 60sec.')
+ time.sleep(60)
+
+ def shutdown(self) -> None:
+ self.stop = True
+ # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
+ if self.__dict__.get('node_proxy'):
+ self.node_proxy.shutdown()
+
@register_daemon_form
class CephadmAgent(DaemonForm):
self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
self.recent_iteration_index: int = 0
self.cached_ls_values: Dict[str, Dict[str, str]] = {}
- self.t_node_proxy: Optional["NodeProxy"] = None
+ self.ssl_ctx = ssl.create_default_context()
+ self.ssl_ctx.check_hostname = True
+ self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ self.node_proxy_mgr_event = Event()
+ self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event)
def validate(self, config: Dict[str, str] = {}) -> None:
# check for the required files
def shutdown(self) -> None:
self.stop = True
+ if self.node_proxy_mgr.is_alive():
+ self.node_proxy_mgr.shutdown()
if self.mgr_listener.is_alive():
self.mgr_listener.shutdown()
if self.ls_gatherer.is_alive():
self.device_enhanced_scan = True
self.volume_gatherer.update_func(lambda: self._ceph_volume(enhanced=self.device_enhanced_scan))
- def query_endpoint(self,
- addr: str = '',
- port: str = '',
- data: Optional[Union[Dict[str, str], str]] = None,
- endpoint: str = '',
- ssl_ctx: Optional[Any] = None,
- timeout: Optional[int] = 10) -> Tuple[int, str]:
- _addr = addr if addr else self.target_ip
- _port = port if port else self.target_port
- url = f'https://{_addr}:{_port}{endpoint}'
- logger.info(f"sending query to {url}")
- try:
- req = Request(url, data, {'Content-Type': 'application/json'})
- send_time = time.monotonic()
- with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
- response_str = response.read()
- response_json = json.loads(response_str)
- total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
- logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.')
- response_status = response.status
- except HTTPError as e:
- logger.debug(f"{e.code} {e.reason}")
- response_status = e.code
- response_str = e.reason
- except URLError as e:
- logger.debug(f"{e.reason}")
- response_status = -1
- response_str = e.reason
- except Exception:
- raise
- return (response_status, response_str)
-
- def node_proxy_loop_check(self, ssl_ctx: Any) -> None:
- while True:
- try:
- if isinstance(self.t_node_proxy, NodeProxy):
- status = self.t_node_proxy.check_status()
- label = 'Ok' if status else 'Critical'
- logger.debug(f'node-proxy status: {label}')
- else:
- raise NodeProxyInitialization("starting node-proxy...")
- except Exception as e:
- logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
- try:
- self.init_node_proxy(ssl_ctx)
- except NodeProxyFetchOobError:
- logger.info("No oob details could be loaded. "
- "Aborting node-proxy initialization. "
- "Will retry in 120s.")
- time.sleep(120)
-
- def init_node_proxy(self, ssl_ctx: Any) -> None:
- node_proxy_meta = {
- 'cephx': {
- 'name': self.host,
- 'secret': self.keyring
- }
- }
- status, result = self.query_endpoint(data=json.dumps(node_proxy_meta).encode('ascii'),
- endpoint='/node-proxy/oob',
- ssl_ctx=ssl_ctx)
- if status != 200:
- msg = f"Couldn't load oob details: {status}, {result}"
- logger.debug(msg)
- raise NodeProxyFetchOobError(msg)
- result_json = json.loads(result)
- kwargs = {
- 'host': result_json['result']['addr'],
- 'username': result_json['result']['username'],
- 'password': result_json['result']['password'],
- 'cephx': node_proxy_meta['cephx'],
- 'mgr_target_ip': self.target_ip,
- 'mgr_target_port': self.target_port
- }
- if result_json['result'].get('port'):
- kwargs['port'] = result_json['result']['port']
-
- self.t_node_proxy = NodeProxy(**kwargs)
- self.t_node_proxy.start()
-
def run(self) -> None:
self.pull_conf_settings()
- ssl_ctx = ssl.create_default_context()
- ssl_ctx.check_hostname = True
- ssl_ctx.verify_mode = ssl.CERT_REQUIRED
- ssl_ctx.load_verify_locations(self.ca_path)
+ self.ssl_ctx.load_verify_locations(self.ca_path)
+ # only after self.pull_conf_settings() was called we can actually start
+ # node-proxy
+ self.node_proxy_mgr_event.set()
try:
for _ in range(1001):
if not self.volume_gatherer.is_alive():
self.volume_gatherer.start()
- # initiate node-proxy thread
- node_proxy_loop_thread = Thread(target=self.node_proxy_loop_check, args=(ssl_ctx,))
- node_proxy_loop_thread.start()
+ if not self.node_proxy_mgr.is_alive():
+ self.node_proxy_mgr.start()
while not self.stop:
start_time = time.monotonic()
data = data.encode('ascii')
try:
- self.query_endpoint(data=data,
- endpoint='/data',
- ssl_ctx=ssl_ctx)
+ send_time = time.monotonic()
+ status, response = http_query(addr=self.target_ip,
+ port=self.target_port,
+ data=data,
+ endpoint='/data',
+ ssl_ctx=self.ssl_ctx)
+ response_json = json.loads(response)
+ if status != 200:
+ logger.error(f'HTTP error {status} while querying agent endpoint: {response}')
+ raise RuntimeError
+ total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
+ logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.')
except Exception as e:
logger.error(f'Failed to send metadata to mgr: {e}')