]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
node-proxy: make it a separate daemon
authorGuillaume Abrioux <gabrioux@ibm.com>
Mon, 15 Jan 2024 12:38:39 +0000 (12:38 +0000)
committerGuillaume Abrioux <gabrioux@ibm.com>
Thu, 25 Jan 2024 16:10:56 +0000 (16:10 +0000)
The current implementation requires the inclusion of all the recent
modifications in the cephadm binary, which won't be backported.

Since we need the node-proxy code backported to reef, let's move the
code make it a separate daemon.

Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
Co-authored-by: Adam King <adking@redhat.com>
(cherry picked from commit 7e6bc179ae7e0d633bd63086775002182c861d3f)

33 files changed:
src/ceph-node-proxy/ceph_node_proxy/__init__.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/api.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/baseclient.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/basesystem.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/main.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/redfish_client.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/reporter.py [new file with mode: 0644]
src/ceph-node-proxy/ceph_node_proxy/util.py [new file with mode: 0644]
src/cephadm/cephadm.py
src/cephadm/cephadmlib/node_proxy/__init__.py [deleted file]
src/cephadm/cephadmlib/node_proxy/baseclient.py [deleted file]
src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py [deleted file]
src/cephadm/cephadmlib/node_proxy/basesystem.py [deleted file]
src/cephadm/cephadmlib/node_proxy/main.py [deleted file]
src/cephadm/cephadmlib/node_proxy/redfish_client.py [deleted file]
src/cephadm/cephadmlib/node_proxy/redfishdellsystem.py [deleted file]
src/cephadm/cephadmlib/node_proxy/reporter.py [deleted file]
src/cephadm/cephadmlib/node_proxy/util.py [deleted file]
src/mypy.ini
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/node_proxy.py [new file with mode: 0644]
src/pybind/mgr/cephadm/tests/test_node_proxy.py
src/pybind/mgr/cephadm/utils.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/python-common/ceph/deployment/service_spec.py
src/python-common/ceph/utils.py

diff --git a/src/ceph-node-proxy/ceph_node_proxy/__init__.py b/src/ceph-node-proxy/ceph_node_proxy/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/ceph-node-proxy/ceph_node_proxy/api.py b/src/ceph-node-proxy/ceph_node_proxy/api.py
new file mode 100644 (file)
index 0000000..93e41de
--- /dev/null
@@ -0,0 +1,291 @@
+import cherrypy
+from urllib.error import HTTPError
+from cherrypy._cpserver import Server
+from threading import Thread, Event
+from typing import Dict, Any, List
+from ceph_node_proxy.util import Config, Logger, write_tmp_file
+from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.reporter import Reporter
+from typing import TYPE_CHECKING, Optional
+
+if TYPE_CHECKING:
+    from ceph_node_proxy.main import NodeProxy
+
+
+@cherrypy.tools.auth_basic(on=True)
+@cherrypy.tools.allow(methods=['PUT'])
+@cherrypy.tools.json_out()
+class Admin():
+    def __init__(self, api: 'API') -> None:
+        self.api = api
+
+    @cherrypy.expose
+    def start(self) -> Dict[str, str]:
+        self.api.backend.start_client()
+        # self.backend.start_update_loop()
+        self.api.reporter.run()
+        return {'ok': 'node-proxy daemon started'}
+
+    @cherrypy.expose
+    def reload(self) -> Dict[str, str]:
+        self.api.config.reload()
+        return {'ok': 'node-proxy config reloaded'}
+
+    def _stop(self) -> None:
+        self.api.backend.stop_update_loop()
+        self.api.backend.client.logout()
+        self.api.reporter.stop()
+
+    @cherrypy.expose
+    def stop(self) -> Dict[str, str]:
+        self._stop()
+        return {'ok': 'node-proxy daemon stopped'}
+
+    @cherrypy.expose
+    def shutdown(self) -> Dict[str, str]:
+        self._stop()
+        cherrypy.engine.exit()
+        return {'ok': 'Server shutdown.'}
+
+    @cherrypy.expose
+    def flush(self) -> Dict[str, str]:
+        self.api.backend.flush()
+        return {'ok': 'node-proxy data flushed'}
+
+
+class API(Server):
+    def __init__(self,
+                 backend: 'BaseSystem',
+                 reporter: 'Reporter',
+                 config: 'Config',
+                 addr: str = '0.0.0.0',
+                 port: int = 0) -> None:
+        super().__init__()
+        self.log = Logger(__name__)
+        self.backend = backend
+        self.reporter = reporter
+        self.config = config
+        self.socket_port = self.config.__dict__['server']['port'] if not port else port
+        self.socket_host = addr
+        self.subscribe()
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def memory(self) -> Dict[str, Any]:
+        return {'memory': self.backend.get_memory()}
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def network(self) -> Dict[str, Any]:
+        return {'network': self.backend.get_network()}
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def processors(self) -> Dict[str, Any]:
+        return {'processors': self.backend.get_processors()}
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def storage(self) -> Dict[str, Any]:
+        return {'storage': self.backend.get_storage()}
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def power(self) -> Dict[str, Any]:
+        return {'power': self.backend.get_power()}
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def fans(self) -> Dict[str, Any]:
+        return {'fans': self.backend.get_fans()}
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def firmwares(self) -> Dict[str, Any]:
+        return {'firmwares': self.backend.get_firmwares()}
+
+    def _cp_dispatch(self, vpath: List[str]) -> 'API':
+        if vpath[0] == 'led' and len(vpath) > 1:  # /led/{type}/{id}
+            _type = vpath[1]
+            cherrypy.request.params['type'] = _type
+            vpath.pop(1)  # /led/{id} or # /led
+            if _type == 'drive' and len(vpath) > 1:  # /led/{id}
+                _id = vpath[1]
+                vpath.pop(1)  # /led
+                cherrypy.request.params['id'] = _id
+            vpath[0] = '_led'
+        # /<endpoint>
+        return self
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['POST'])
+    @cherrypy.tools.json_in()
+    @cherrypy.tools.json_out()
+    @cherrypy.tools.auth_basic(on=True)
+    def shutdown(self, **kw: Any) -> int:
+        data: Dict[str, bool] = cherrypy.request.json
+
+        if 'force' not in data.keys():
+            msg = "The key 'force' wasn't passed."
+            self.log.logger.debug(msg)
+            raise cherrypy.HTTPError(400, msg)
+        try:
+            result: int = self.backend.shutdown(force=data['force'])
+        except HTTPError as e:
+            raise cherrypy.HTTPError(e.code, e.reason)
+        return result
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['POST'])
+    @cherrypy.tools.json_in()
+    @cherrypy.tools.json_out()
+    @cherrypy.tools.auth_basic(on=True)
+    def powercycle(self, **kw: Any) -> int:
+        try:
+            result: int = self.backend.powercycle()
+        except HTTPError as e:
+            raise cherrypy.HTTPError(e.code, e.reason)
+        return result
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET', 'PATCH'])
+    @cherrypy.tools.json_in()
+    @cherrypy.tools.json_out()
+    @cherrypy.tools.auth_basic(on=True)
+    def _led(self, **kw: Any) -> Dict[str, Any]:
+        method: str = cherrypy.request.method
+        led_type: Optional[str] = kw.get('type')
+        id_drive: Optional[str] = kw.get('id')
+        result: Dict[str, Any] = dict()
+
+        if not led_type:
+            msg = "the led type must be provided (either 'chassis' or 'drive')."
+            self.log.logger.debug(msg)
+            raise cherrypy.HTTPError(400, msg)
+
+        if led_type == 'drive':
+            id_drive_required = not id_drive
+            if id_drive_required or id_drive not in self.backend.get_storage():
+                msg = 'A valid device ID must be provided.'
+                self.log.logger.debug(msg)
+                raise cherrypy.HTTPError(400, msg)
+
+        try:
+            if method == 'PATCH':
+                data: Dict[str, Any] = cherrypy.request.json
+
+                if 'state' not in data or data['state'] not in ['on', 'off']:
+                    msg = "Invalid data. 'state' must be provided and have a valid value (on|off)."
+                    self.log.logger.error(msg)
+                    raise cherrypy.HTTPError(400, msg)
+
+                func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else
+                             self.backend.device_led_off if led_type == 'drive' and data['state'] == 'off' else
+                             self.backend.chassis_led_on if led_type != 'drive' and data['state'] == 'on' else
+                             self.backend.chassis_led_off if led_type != 'drive' and data['state'] == 'off' else None)
+
+            else:
+                func = self.backend.get_device_led if led_type == 'drive' else self.backend.get_chassis_led
+
+            result = func(id_drive) if led_type == 'drive' else func()
+
+        except HTTPError as e:
+            raise cherrypy.HTTPError(e.code, e.reason)
+        return result
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['GET'])
+    @cherrypy.tools.json_out()
+    def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+        return self.backend.get_led()
+
+    @cherrypy.expose
+    @cherrypy.tools.allow(methods=['PATCH'])
+    @cherrypy.tools.json_in()
+    @cherrypy.tools.json_out()
+    @cherrypy.tools.auth_basic(on=True)
+    def set_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+        data = cherrypy.request.json
+        rc = self.backend.set_led(data)
+
+        if rc != 200:
+            cherrypy.response.status = rc
+            result = {'state': 'error: please, verify the data you sent.'}
+        else:
+            result = {'state': data['state'].lower()}
+        return result
+
+    def stop(self) -> None:
+        self.unsubscribe()
+        super().stop()
+
+
+class NodeProxyApi(Thread):
+    def __init__(self,
+                 node_proxy: 'NodeProxy',
+                 username: str,
+                 password: str,
+                 ssl_crt: str,
+                 ssl_key: str) -> None:
+        super().__init__()
+        self.log = Logger(__name__)
+        self.cp_shutdown_event = Event()
+        self.node_proxy = node_proxy
+        self.username = username
+        self.password = password
+        self.ssl_crt = ssl_crt
+        self.ssl_key = ssl_key
+        self.api = API(self.node_proxy.system,
+                       self.node_proxy.reporter_agent,
+                       self.node_proxy.config)
+
+    def check_auth(self, realm: str, username: str, password: str) -> bool:
+        return self.username == username and \
+            self.password == password
+
+    def shutdown(self) -> None:
+        self.log.logger.info('Stopping node-proxy API...')
+        self.cp_shutdown_event.set()
+
+    def run(self) -> None:
+        self.log.logger.info('node-proxy API configuration...')
+        cherrypy.config.update({
+            'environment': 'production',
+            'engine.autoreload.on': False,
+            'log.screen': True,
+        })
+        config = {'/': {
+            'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'),
+            'tools.trailing_slash.on': False,
+            'tools.auth_basic.realm': 'localhost',
+            'tools.auth_basic.checkpassword': self.check_auth
+        }}
+        cherrypy.tree.mount(self.api, '/', config=config)
+        # cherrypy.tree.mount(admin, '/admin', config=config)
+
+        ssl_crt = write_tmp_file(self.ssl_crt,
+                                 prefix_name='listener-crt-')
+        ssl_key = write_tmp_file(self.ssl_key,
+                                 prefix_name='listener-key-')
+
+        self.api.ssl_certificate = ssl_crt.name
+        self.api.ssl_private_key = ssl_key.name
+
+        cherrypy.server.unsubscribe()
+        try:
+            cherrypy.engine.start()
+            self.log.logger.info('node-proxy API started.')
+            self.cp_shutdown_event.wait()
+            self.cp_shutdown_event.clear()
+            cherrypy.engine.stop()
+            cherrypy.server.httpserver = None
+            self.log.logger.info('node-proxy API shutdown.')
+        except Exception as e:
+            self.log.logger.error(f'node-proxy API error: {e}')
diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseclient.py b/src/ceph-node-proxy/ceph_node_proxy/baseclient.py
new file mode 100644 (file)
index 0000000..6b46561
--- /dev/null
@@ -0,0 +1,20 @@
+from typing import Dict, Any
+
+
+class BaseClient:
+    def __init__(self,
+                 host: str,
+                 username: str,
+                 password: str) -> None:
+        self.host = host
+        self.username = username
+        self.password = password
+
+    def login(self) -> None:
+        raise NotImplementedError()
+
+    def logout(self) -> Dict[str, Any]:
+        raise NotImplementedError()
+
+    def get_path(self, path: str) -> Dict:
+        raise NotImplementedError()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py
new file mode 100644 (file)
index 0000000..98f1171
--- /dev/null
@@ -0,0 +1,289 @@
+import concurrent.futures
+import json
+from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.redfish_client import RedFishClient
+from threading import Thread, Lock
+from time import sleep
+from ceph_node_proxy.util import Logger, retry
+from typing import Dict, Any, List, Callable, Union
+from urllib.error import HTTPError, URLError
+
+
+class BaseRedfishSystem(BaseSystem):
+    def __init__(self, **kw: Any) -> None:
+        super().__init__(**kw)
+        self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1',
+                                                                       '/UpdateService'])
+        self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1')
+        self.log = Logger(__name__)
+        self.host: str = kw['host']
+        self.port: str = kw['port']
+        self.username: str = kw['username']
+        self.password: str = kw['password']
+        # move the following line (class attribute?)
+        self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
+        self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
+
+        self.run: bool = False
+        self.thread: Thread
+        self.data_ready: bool = False
+        self.previous_data: Dict = {}
+        self.lock: Lock = Lock()
+        self.data: Dict[str, Dict[str, Any]] = {}
+        self._system: Dict[str, Dict[str, Any]] = {}
+        self._sys: Dict[str, Any] = {}
+        self.job_service_endpoint: str = ''
+        self.create_reboot_job_endpoint: str = ''
+        self.setup_job_queue_endpoint: str = ''
+
+        self.start_client()
+
+    def start_client(self) -> None:
+        self.client.login()
+        self.start_update_loop()
+
+    def start_update_loop(self) -> None:
+        self.run = True
+        self.thread = Thread(target=self.update)
+        self.thread.start()
+
+    def stop_update_loop(self) -> None:
+        self.run = False
+        self.thread.join()
+
+    def update(self) -> None:
+        #  this loop can have:
+        #  - caching logic
+        while self.run:
+            self.log.logger.debug('waiting for a lock in the update loop.')
+            self.lock.acquire()
+            self.log.logger.debug('lock acquired in the update loop.')
+            try:
+                self._update_system()
+                self._update_sn()
+                update_funcs = [self._update_memory,
+                                self._update_power,
+                                self._update_fans,
+                                self._update_network,
+                                self._update_processors,
+                                self._update_storage,
+                                self._update_firmwares]
+
+                with concurrent.futures.ThreadPoolExecutor() as executor:
+                    executor.map(lambda f: f(), update_funcs)
+
+                self.data_ready = True
+                sleep(5)
+            except RuntimeError as e:
+                self.run = False
+                self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
+                self.client.logout()
+            finally:
+                self.lock.release()
+                self.log.logger.debug('lock released in the update loop.')
+
+    def flush(self) -> None:
+        self.log.logger.debug('Acquiring lock to flush data.')
+        self.lock.acquire()
+        self.log.logger.debug('Lock acquired, flushing data.')
+        self._system = {}
+        self.previous_data = {}
+        self.log.logger.info('Data flushed.')
+        self.data_ready = False
+        self.log.logger.debug('Data marked as not ready.')
+        self.lock.release()
+        self.log.logger.debug('Released the lock after flushing data.')
+
+    @retry(retries=10, delay=2)
+    def _get_path(self, path: str) -> Dict:
+        try:
+            result = self.client.get_path(path)
+        except RuntimeError:
+            raise
+        if result is None:
+            self.log.logger.error(f'The client reported an error when getting path: {path}')
+            raise RuntimeError(f'Could not get path: {path}')
+        return result
+
+    def get_members(self, data: Dict[str, Any], path: str) -> List:
+        _path = data[path]['@odata.id']
+        _data = self._get_path(_path)
+        return [self._get_path(member['@odata.id']) for member in _data['Members']]
+
+    def get_system(self) -> Dict[str, Any]:
+        result = {
+            'host': self.get_host(),
+            'sn': self.get_sn(),
+            'status': {
+                'storage': self.get_storage(),
+                'processors': self.get_processors(),
+                'network': self.get_network(),
+                'memory': self.get_memory(),
+                'power': self.get_power(),
+                'fans': self.get_fans()
+            },
+            'firmwares': self.get_firmwares(),
+            'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'}  # TODO(guits): not ideal
+        }
+        return result
+
+    def _update_system(self) -> None:
+        for endpoint in self.common_endpoints:
+            result = self.client.get_path(endpoint)
+            _endpoint = endpoint.strip('/').split('/')[0]
+            self._system[_endpoint] = result
+
+    def _update_sn(self) -> None:
+        raise NotImplementedError()
+
+    def _update_memory(self) -> None:
+        raise NotImplementedError()
+
+    def _update_power(self) -> None:
+        raise NotImplementedError()
+
+    def _update_fans(self) -> None:
+        raise NotImplementedError()
+
+    def _update_network(self) -> None:
+        raise NotImplementedError()
+
+    def _update_processors(self) -> None:
+        raise NotImplementedError()
+
+    def _update_storage(self) -> None:
+        raise NotImplementedError()
+
+    def _update_firmwares(self) -> None:
+        raise NotImplementedError()
+
+    def device_led_on(self, device: str) -> int:
+        data: Dict[str, bool] = {'LocationIndicatorActive': True}
+        try:
+            result = self.set_device_led(device, data)
+        except (HTTPError, KeyError):
+            return 0
+        return result
+
+    def device_led_off(self, device: str) -> int:
+        data: Dict[str, bool] = {'LocationIndicatorActive': False}
+        try:
+            result = self.set_device_led(device, data)
+        except (HTTPError, KeyError):
+            return 0
+        return result
+
+    def chassis_led_on(self) -> int:
+        data: Dict[str, str] = {'IndicatorLED': 'Blinking'}
+        result = self.set_chassis_led(data)
+        return result
+
+    def chassis_led_off(self) -> int:
+        data: Dict[str, str] = {'IndicatorLED': 'Lit'}
+        result = self.set_chassis_led(data)
+        return result
+
+    def get_device_led(self, device: str) -> Dict[str, Any]:
+        endpoint = self._sys['storage'][device]['redfish_endpoint']
+        try:
+            result = self.client.query(method='GET',
+                                       endpoint=endpoint,
+                                       timeout=10)
+        except HTTPError as e:
+            self.log.logger.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
+            raise
+        response_json = json.loads(result[1])
+        _result: Dict[str, Any] = {'http_code': result[2]}
+        if result[2] == 200:
+            _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+        else:
+            _result['LocationIndicatorActive'] = None
+        return _result
+
+    def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+        try:
+            _, response, status = self.client.query(
+                data=json.dumps(data),
+                method='PATCH',
+                endpoint=self._sys['storage'][device]['redfish_endpoint']
+            )
+        except (HTTPError, KeyError) as e:
+            self.log.logger.error(f"Couldn't set the ident device LED for device '{device}': {e}")
+            raise
+        return status
+
+    def get_chassis_led(self) -> Dict[str, Any]:
+        endpoint = f'/redfish/v1/{self.chassis_endpoint}'
+        try:
+            result = self.client.query(method='GET',
+                                       endpoint=endpoint,
+                                       timeout=10)
+        except HTTPError as e:
+            self.log.logger.error(f"Couldn't get the ident chassis LED status: {e}")
+            raise
+        response_json = json.loads(result[1])
+        _result: Dict[str, Any] = {'http_code': result[2]}
+        if result[2] == 200:
+            _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+        else:
+            _result['LocationIndicatorActive'] = None
+        return _result
+
+    def set_chassis_led(self, data: Dict[str, str]) -> int:
+        # '{"IndicatorLED": "Lit"}'      -> LocationIndicatorActive = false
+        # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
+        try:
+            _, response, status = self.client.query(
+                data=json.dumps(data),
+                method='PATCH',
+                endpoint=f'/redfish/v1{self.chassis_endpoint}'
+            )
+        except HTTPError as e:
+            self.log.logger.error(f"Couldn't set the ident chassis LED: {e}")
+            raise
+        return status
+
+    def shutdown(self, force: bool = False) -> int:
+        reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
+
+        try:
+            job_id: str = self.create_reboot_job(reboot_type)
+            status = self.schedule_reboot_job(job_id)
+        except (HTTPError, KeyError) as e:
+            self.log.logger.error(f"Couldn't create the reboot job: {e}")
+            raise
+        return status
+
+    def powercycle(self) -> int:
+        try:
+            job_id: str = self.create_reboot_job('PowerCycle')
+            status = self.schedule_reboot_job(job_id)
+        except (HTTPError, URLError) as e:
+            self.log.logger.error(f"Couldn't perform power cycle: {e}")
+            raise
+        return status
+
+    def create_reboot_job(self, reboot_type: str) -> str:
+        data: Dict[str, str] = dict(RebootJobType=reboot_type)
+        try:
+            headers, response, status = self.client.query(
+                data=json.dumps(data),
+                endpoint=self.create_reboot_job_endpoint
+            )
+            job_id: str = headers['Location'].split('/')[-1]
+        except (HTTPError, URLError) as e:
+            self.log.logger.error(f"Couldn't create the reboot job: {e}")
+            raise
+        return job_id
+
+    def schedule_reboot_job(self, job_id: str) -> int:
+        data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW')
+        try:
+            headers, response, status = self.client.query(
+                data=json.dumps(data),
+                endpoint=self.setup_job_queue_endpoint
+            )
+        except (HTTPError, KeyError) as e:
+            self.log.logger.error(f"Couldn't schedule the reboot job: {e}")
+            raise
+        return status
diff --git a/src/ceph-node-proxy/ceph_node_proxy/basesystem.py b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py
new file mode 100644 (file)
index 0000000..c2389d8
--- /dev/null
@@ -0,0 +1,95 @@
+import socket
+from ceph_node_proxy.util import Config
+from typing import Dict, Any
+from ceph_node_proxy.baseclient import BaseClient
+
+
+class BaseSystem:
+    def __init__(self, **kw: Any) -> None:
+        self._system: Dict = {}
+        self.config: Config = kw['config']
+        self.client: BaseClient
+
+    def get_system(self) -> Dict[str, Any]:
+        raise NotImplementedError()
+
+    def get_status(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_power(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_network(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+        raise NotImplementedError()
+
+    def get_sn(self) -> str:
+        raise NotImplementedError()
+
+    def get_led(self) -> Dict[str, Any]:
+        raise NotImplementedError()
+
+    def set_led(self, data: Dict[str, str]) -> int:
+        raise NotImplementedError()
+
+    def get_chassis_led(self) -> Dict[str, Any]:
+        raise NotImplementedError()
+
+    def set_chassis_led(self, data: Dict[str, str]) -> int:
+        raise NotImplementedError()
+
+    def device_led_on(self, device: str) -> int:
+        raise NotImplementedError()
+
+    def device_led_off(self, device: str) -> int:
+        raise NotImplementedError()
+
+    def get_device_led(self, device: str) -> Dict[str, Any]:
+        raise NotImplementedError()
+
+    def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+        raise NotImplementedError()
+
+    def chassis_led_on(self) -> int:
+        raise NotImplementedError()
+
+    def chassis_led_off(self) -> int:
+        raise NotImplementedError()
+
+    def get_host(self) -> str:
+        return socket.gethostname()
+
+    def start_update_loop(self) -> None:
+        raise NotImplementedError()
+
+    def stop_update_loop(self) -> None:
+        raise NotImplementedError()
+
+    def start_client(self) -> None:
+        raise NotImplementedError()
+
+    def flush(self) -> None:
+        raise NotImplementedError()
+
+    def shutdown(self, force: bool = False) -> int:
+        raise NotImplementedError()
+
+    def powercycle(self) -> int:
+        raise NotImplementedError()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/main.py b/src/ceph-node-proxy/ceph_node_proxy/main.py
new file mode 100644 (file)
index 0000000..689089a
--- /dev/null
@@ -0,0 +1,244 @@
+from threading import Thread
+from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.api import NodeProxyApi
+from ceph_node_proxy.reporter import Reporter
+from ceph_node_proxy.util import Config, Logger, http_req, write_tmp_file
+from typing import Dict, Any, Optional
+
+import argparse
+import traceback
+import logging
+import os
+import ssl
+import json
+import time
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_CONFIG = {
+    'reporter': {
+        'check_interval': 5,
+        'push_data_max_retries': 30,
+        'endpoint': 'https://127.0.0.1:7150/node-proxy/data',
+    },
+    'system': {
+        'refresh_interval': 5
+    },
+    'server': {
+        'port': 8080,
+    },
+    'logging': {
+        'level': 20,
+    }
+}
+
+
+class NodeProxyManager(Thread):
+    def __init__(self,
+                 mgr_host: str,
+                 cephx_name: str,
+                 cephx_secret: str,
+                 ca_path: str,
+                 api_ssl_crt: str,
+                 api_ssl_key: str,
+                 mgr_agent_port: int = 7150):
+        super().__init__()
+        self.mgr_host = mgr_host
+        self.cephx_name = cephx_name
+        self.cephx_secret = cephx_secret
+        self.ca_path = ca_path
+        self.api_ssl_crt = api_ssl_crt
+        self.api_ssl_key = api_ssl_key
+        self.mgr_agent_port = str(mgr_agent_port)
+        self.stop = False
+        self.ssl_ctx = ssl.create_default_context()
+        self.ssl_ctx.check_hostname = True
+        self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+        self.ssl_ctx.load_verify_locations(self.ca_path)
+
+    def run(self) -> None:
+        self.init()
+        self.loop()
+
+    def init(self) -> None:
+        node_proxy_meta = {
+            'cephx': {
+                'name': self.cephx_name,
+                'secret': self.cephx_secret
+            }
+        }
+        headers, result, status = http_req(hostname=self.mgr_host,
+                                           port=self.mgr_agent_port,
+                                           data=json.dumps(node_proxy_meta),
+                                           endpoint='/node-proxy/oob',
+                                           ssl_ctx=self.ssl_ctx)
+        if status != 200:
+            msg = f'No out of band tool details could be loaded: {status}, {result}'
+            logger.debug(msg)
+            raise RuntimeError(msg)
+
+        result_json = json.loads(result)
+        kwargs = {
+            'host': result_json['result']['addr'],
+            'username': result_json['result']['username'],
+            'password': result_json['result']['password'],
+            'cephx': node_proxy_meta['cephx'],
+            'mgr_host': self.mgr_host,
+            'mgr_agent_port': self.mgr_agent_port,
+            'api_ssl_crt': self.api_ssl_crt,
+            'api_ssl_key': self.api_ssl_key
+        }
+        if result_json['result'].get('port'):
+            kwargs['port'] = result_json['result']['port']
+
+        self.node_proxy: NodeProxy = NodeProxy(**kwargs)
+        self.node_proxy.start()
+
+    def loop(self) -> None:
+        while not self.stop:
+            try:
+                status = self.node_proxy.check_status()
+                label = 'Ok' if status else 'Critical'
+                logger.debug(f'node-proxy status: {label}')
+            except Exception as e:
+                logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
+                time.sleep(120)
+                self.init()
+            else:
+                logger.debug('node-proxy alive, next check in 60sec.')
+                time.sleep(60)
+
+    def shutdown(self) -> None:
+        self.stop = True
+        # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
+        if self.__dict__.get('node_proxy'):
+            self.node_proxy.shutdown()
+
+
+class NodeProxy(Thread):
+    def __init__(self, **kw: Any) -> None:
+        super().__init__()
+        self.username: str = kw.get('username', '')
+        self.password: str = kw.get('password', '')
+        self.host: str = kw.get('host', '')
+        self.port: int = kw.get('port', 443)
+        self.cephx: Dict[str, Any] = kw.get('cephx', {})
+        self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
+        self.mgr_host: str = kw.get('mgr_host', '')
+        self.mgr_agent_port: str = kw.get('mgr_agent_port', '')
+        self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
+        self.api_ssl_crt: str = kw.get('api_ssl_crt', '')
+        self.api_ssl_key: str = kw.get('api_ssl_key', '')
+        self.exc: Optional[Exception] = None
+        self.log = Logger(__name__)
+
+    def run(self) -> None:
+        try:
+            self.main()
+        except Exception as e:
+            self.exc = e
+            return
+
+    def shutdown(self) -> None:
+        self.log.logger.info('Shutting down node-proxy...')
+        self.system.client.logout()
+        self.system.stop_update_loop()
+        self.reporter_agent.stop()
+
+    def check_status(self) -> bool:
+        if self.__dict__.get('system') and not self.system.run:
+            raise RuntimeError('node-proxy encountered an error.')
+        if self.exc:
+            traceback.print_tb(self.exc.__traceback__)
+            self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}')
+            raise self.exc
+        return True
+
+    def main(self) -> None:
+        # TODO: add a check and fail if host/username/password/data aren't passed
+        self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG)
+        self.log = Logger(__name__, level=self.config.__dict__['logging']['level'])
+
+        # create the redfish system and the obsever
+        self.log.logger.info('Server initialization...')
+        try:
+            self.system = RedfishDellSystem(host=self.host,
+                                            port=self.port,
+                                            username=self.username,
+                                            password=self.password,
+                                            config=self.config)
+        except RuntimeError:
+            self.log.logger.error("Can't initialize the redfish system.")
+            raise
+
+        try:
+            self.reporter_agent = Reporter(self.system,
+                                           self.cephx,
+                                           reporter_scheme=self.reporter_scheme,
+                                           reporter_hostname=self.mgr_host,
+                                           reporter_port=self.mgr_agent_port,
+                                           reporter_endpoint=self.reporter_endpoint)
+            self.reporter_agent.run()
+        except RuntimeError:
+            self.log.logger.error("Can't initialize the reporter.")
+            raise
+
+        try:
+            self.log.logger.info('Starting node-proxy API...')
+            self.api = NodeProxyApi(self,
+                                    username=self.username,
+                                    password=self.password,
+                                    ssl_crt=self.api_ssl_crt,
+                                    ssl_key=self.api_ssl_key)
+            self.api.start()
+        except Exception as e:
+            self.log.logger.error(f"Can't start node-proxy API: {e}")
+            raise
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(
+        description='Ceph Node-Proxy for HW Monitoring',
+        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+    parser.add_argument(
+        '--config',
+        help='path of config file in json format',
+        required=True
+    )
+
+    args = parser.parse_args()
+
+    if not os.path.exists(args.config):
+        raise Exception(f'No config file found at provided config path: {args.config}')
+
+    with open(args.config, 'r') as f:
+        try:
+            config_json = f.read()
+            config = json.loads(config_json)
+        except Exception as e:
+            raise Exception(f'Failed to load json config: {str(e)}')
+
+    target_ip = config['target_ip']
+    target_port = config['target_port']
+    keyring = config['keyring']
+    root_cert = config['root_cert.pem']
+    listener_cert = config['listener.crt']
+    listener_key = config['listener.key']
+    name = config['name']
+
+    f = write_tmp_file(root_cert,
+                       prefix_name='cephadm-endpoint-root-cert')
+
+    node_proxy_mgr = NodeProxyManager(mgr_host=target_ip,
+                                      cephx_name=name,
+                                      cephx_secret=keyring,
+                                      mgr_agent_port=target_port,
+                                      ca_path=f.name,
+                                      api_ssl_crt=listener_cert,
+                                      api_ssl_key=listener_key)
+    if not node_proxy_mgr.is_alive():
+        node_proxy_mgr.start()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py
new file mode 100644 (file)
index 0000000..eeca2e5
--- /dev/null
@@ -0,0 +1,115 @@
+import json
+from urllib.error import HTTPError, URLError
+from ceph_node_proxy.baseclient import BaseClient
+from ceph_node_proxy.util import Logger, http_req
+from typing import Dict, Any, Tuple, Optional
+from http.client import HTTPMessage
+
+
+class RedFishClient(BaseClient):
+    PREFIX = '/redfish/v1/'
+
+    def __init__(self,
+                 host: str = '',
+                 port: str = '443',
+                 username: str = '',
+                 password: str = ''):
+        super().__init__(host, username, password)
+        self.log: Logger = Logger(__name__)
+        self.log.logger.info(f'Initializing redfish client {__name__}')
+        self.host: str = host
+        self.port: str = port
+        self.url: str = f'https://{self.host}:{self.port}'
+        self.token: str = ''
+        self.location: str = ''
+
+    def login(self) -> None:
+        if not self.is_logged_in():
+            self.log.logger.info('Logging in to '
+                                 f"{self.url} as '{self.username}'")
+            oob_credentials = json.dumps({'UserName': self.username,
+                                          'Password': self.password})
+            headers = {'Content-Type': 'application/json'}
+
+            try:
+                _headers, _data, _status_code = self.query(data=oob_credentials,
+                                                           headers=headers,
+                                                           endpoint='/redfish/v1/SessionService/Sessions/')
+                if _status_code != 201:
+                    self.log.logger.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
+                    raise RuntimeError
+            except URLError as e:
+                msg = f"Can't log in to {self.url} as '{self.username}': {e}"
+                self.log.logger.error(msg)
+                raise RuntimeError
+            self.token = _headers['X-Auth-Token']
+            self.location = _headers['Location']
+
+    def is_logged_in(self) -> bool:
+        self.log.logger.debug(f'Checking token validity for {self.url}')
+        if not self.location or not self.token:
+            self.log.logger.debug(f'No token found for {self.url}.')
+            return False
+        headers = {'X-Auth-Token': self.token}
+        try:
+            _headers, _data, _status_code = self.query(headers=headers,
+                                                       endpoint=self.location)
+        except URLError as e:
+            self.log.logger.error("Can't check token "
+                                  f'validity for {self.url}: {e}')
+            raise
+        return _status_code == 200
+
+    def logout(self) -> Dict[str, Any]:
+        result: Dict[str, Any] = {}
+        try:
+            if self.is_logged_in():
+                _, _data, _status_code = self.query(method='DELETE',
+                                                    headers={'X-Auth-Token': self.token},
+                                                    endpoint=self.location)
+                result = json.loads(_data)
+        except URLError:
+            self.log.logger.error(f"Can't log out from {self.url}")
+
+        self.location = ''
+        self.token = ''
+
+        return result
+
+    def get_path(self, path: str) -> Dict[str, Any]:
+        if self.PREFIX not in path:
+            path = f'{self.PREFIX}{path}'
+        try:
+            _, result, _status_code = self.query(endpoint=path)
+            result_json = json.loads(result)
+            return result_json
+        except URLError as e:
+            self.log.logger.error(f"Can't get path {path}:\n{e}")
+            raise RuntimeError
+
+    def query(self,
+              data: Optional[str] = None,
+              headers: Dict[str, str] = {},
+              method: Optional[str] = None,
+              endpoint: str = '',
+              timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
+        _headers = headers.copy() if headers else {}
+        if self.token:
+            _headers['X-Auth-Token'] = self.token
+        if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
+            _headers['Content-Type'] = 'application/json'
+        try:
+            (response_headers,
+             response_str,
+             response_status) = http_req(hostname=self.host,
+                                         port=self.port,
+                                         endpoint=endpoint,
+                                         headers=_headers,
+                                         method=method,
+                                         data=data,
+                                         timeout=timeout)
+
+            return response_headers, response_str, response_status
+        except (HTTPError, URLError) as e:
+            self.log.logger.debug(f'{e}')
+            raise
diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py
new file mode 100644 (file)
index 0000000..0424bb3
--- /dev/null
@@ -0,0 +1,165 @@
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
+from ceph_node_proxy.util import Logger, normalize_dict, to_snake_case
+from typing import Dict, Any, List
+
+
+class RedfishDellSystem(BaseRedfishSystem):
+    def __init__(self, **kw: Any) -> None:
+        super().__init__(**kw)
+        self.log = Logger(__name__)
+        self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService'
+        self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob'
+        self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue'
+
+    def build_common_data(self,
+                          data: Dict[str, Any],
+                          fields: List,
+                          path: str) -> Dict[str, Dict[str, Dict]]:
+        result: Dict[str, Dict[str, Dict]] = dict()
+        for member_info in self.get_members(data, path):
+            member_id = member_info['Id']
+            result[member_id] = dict()
+            for field in fields:
+                try:
+                    result[member_id][to_snake_case(field)] = member_info[field]
+                except KeyError:
+                    self.log.logger.warning(f'Could not find field: {field} in member_info: {member_info}')
+
+        return normalize_dict(result)
+
+    def build_chassis_data(self,
+                           fields: Dict[str, List[str]],
+                           path: str) -> Dict[str, Dict[str, Dict]]:
+        result: Dict[str, Dict[str, Dict]] = dict()
+        data = self._get_path(f'{self.chassis_endpoint}/{path}')
+
+        for elt, _fields in fields.items():
+            for member_elt in data[elt]:
+                _id = member_elt['MemberId']
+                result[_id] = dict()
+                for field in _fields:
+                    try:
+                        result[_id][to_snake_case(field)] = member_elt[field]
+                    except KeyError:
+                        self.log.logger.warning(f'Could not find field: {field} in data: {data[elt]}')
+        return normalize_dict(result)
+
+    def get_sn(self) -> str:
+        return self._sys['SKU']
+
+    def get_status(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['status']
+
+    def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['memory']
+
+    def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['processors']
+
+    def get_network(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['network']
+
+    def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['storage']
+
+    def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['firmwares']
+
+    def get_power(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['power']
+
+    def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+        return self._sys['fans']
+
+    def _update_network(self) -> None:
+        fields = ['Description', 'Name', 'SpeedMbps', 'Status']
+        self.log.logger.debug('Updating network')
+        self._sys['network'] = self.build_common_data(data=self._system['Systems'],
+                                                      fields=fields,
+                                                      path='EthernetInterfaces')
+
+    def _update_processors(self) -> None:
+        fields = ['Description',
+                  'TotalCores',
+                  'TotalThreads',
+                  'ProcessorType',
+                  'Model',
+                  'Status',
+                  'Manufacturer']
+        self.log.logger.debug('Updating processors')
+        self._sys['processors'] = self.build_common_data(data=self._system['Systems'],
+                                                         fields=fields,
+                                                         path='Processors')
+
+    def _update_storage(self) -> None:
+        fields = ['Description',
+                  'CapacityBytes',
+                  'Model', 'Protocol',
+                  'SerialNumber', 'Status',
+                  'PhysicalLocation']
+        entities = self.get_members(data=self._system['Systems'],
+                                    path='Storage')
+        self.log.logger.debug('Updating storage')
+        result: Dict[str, Dict[str, Dict]] = dict()
+        for entity in entities:
+            for drive in entity['Drives']:
+                drive_path = drive['@odata.id']
+                drive_info = self._get_path(drive_path)
+                drive_id = drive_info['Id']
+                result[drive_id] = dict()
+                result[drive_id]['redfish_endpoint'] = drive['@odata.id']
+                for field in fields:
+                    result[drive_id][to_snake_case(field)] = drive_info[field]
+                    result[drive_id]['entity'] = entity['Id']
+        self._sys['storage'] = normalize_dict(result)
+
+    def _update_sn(self) -> None:
+        self.log.logger.debug('Updating serial number')
+        self._sys['SKU'] = self._system['Systems']['SKU']
+
+    def _update_memory(self) -> None:
+        fields = ['Description',
+                  'MemoryDeviceType',
+                  'CapacityMiB',
+                  'Status']
+        self.log.logger.debug('Updating memory')
+        self._sys['memory'] = self.build_common_data(data=self._system['Systems'],
+                                                     fields=fields,
+                                                     path='Memory')
+
+    def _update_power(self) -> None:
+        fields = {
+            'PowerSupplies': [
+                'Name',
+                'Model',
+                'Manufacturer',
+                'Status'
+            ]
+        }
+        self.log.logger.debug('Updating powersupplies')
+        self._sys['power'] = self.build_chassis_data(fields, 'Power')
+
+    def _update_fans(self) -> None:
+        fields = {
+            'Fans': [
+                'Name',
+                'PhysicalContext',
+                'Status'
+            ],
+        }
+        self.log.logger.debug('Updating fans')
+        self._sys['fans'] = self.build_chassis_data(fields, 'Thermal')
+
+    def _update_firmwares(self) -> None:
+        fields = [
+            'Name',
+            'Description',
+            'ReleaseDate',
+            'Version',
+            'Updateable',
+            'Status',
+        ]
+        self.log.logger.debug('Updating firmwares')
+        self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
+                                                        fields=fields,
+                                                        path='FirmwareInventory')
diff --git a/src/ceph-node-proxy/ceph_node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py
new file mode 100644 (file)
index 0000000..9e5521a
--- /dev/null
@@ -0,0 +1,74 @@
+from threading import Thread
+import time
+import json
+from ceph_node_proxy.util import Logger, http_req
+from urllib.error import HTTPError, URLError
+from typing import Dict, Any
+
+
+class Reporter:
+    def __init__(self,
+                 system: Any,
+                 cephx: Dict[str, Any],
+                 reporter_scheme: str = 'https',
+                 reporter_hostname: str = '',
+                 reporter_port: str = '443',
+                 reporter_endpoint: str = '/node-proxy/data') -> None:
+        self.system = system
+        self.data: Dict[str, Any] = {}
+        self.finish = False
+        self.cephx = cephx
+        self.data['cephx'] = self.cephx
+        self.reporter_scheme: str = reporter_scheme
+        self.reporter_hostname: str = reporter_hostname
+        self.reporter_port: str = reporter_port
+        self.reporter_endpoint: str = reporter_endpoint
+        self.log = Logger(__name__)
+        self.reporter_url: str = (f'{reporter_scheme}:{reporter_hostname}:'
+                                  f'{reporter_port}{reporter_endpoint}')
+        self.log.logger.info(f'Reporter url set to {self.reporter_url}')
+
+    def stop(self) -> None:
+        self.finish = True
+        self.thread.join()
+
+    def run(self) -> None:
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+
+    def loop(self) -> None:
+        while not self.finish:
+            # Any logic to avoid sending the all the system
+            # information every loop can go here. In a real
+            # scenario probably we should just send the sub-parts
+            # that have changed to minimize the traffic in
+            # dense clusters
+            self.log.logger.debug('waiting for a lock in reporter loop.')
+            self.system.lock.acquire()
+            self.log.logger.debug('lock acquired in reporter loop.')
+            if self.system.data_ready:
+                self.log.logger.info('data ready to be sent to the mgr.')
+                if not self.system.get_system() == self.system.previous_data:
+                    self.log.logger.info('data has changed since last iteration.')
+                    self.data['patch'] = self.system.get_system()
+                    try:
+                        # TODO: add a timeout parameter to the reporter in the config file
+                        self.log.logger.info(f'sending data to {self.reporter_url}')
+                        http_req(hostname=self.reporter_hostname,
+                                 port=self.reporter_port,
+                                 method='POST',
+                                 headers={'Content-Type': 'application/json'},
+                                 endpoint=self.reporter_endpoint,
+                                 scheme=self.reporter_scheme,
+                                 data=json.dumps(self.data))
+                    except (HTTPError, URLError) as e:
+                        self.log.logger.error(f"The reporter couldn't send data to the mgr: {e}")
+                        # Need to add a new parameter 'max_retries' to the reporter if it can't
+                        # send the data for more than x times, maybe the daemon should stop altogether
+                    else:
+                        self.system.previous_data = self.system.get_system()
+                else:
+                    self.log.logger.info('no diff, not sending data to the mgr.')
+            self.system.lock.release()
+            self.log.logger.debug('lock released in reporter loop.')
+            time.sleep(5)
diff --git a/src/ceph-node-proxy/ceph_node_proxy/util.py b/src/ceph-node-proxy/ceph_node_proxy/util.py
new file mode 100644 (file)
index 0000000..a94acc9
--- /dev/null
@@ -0,0 +1,151 @@
+import logging
+import yaml
+import os
+import time
+import re
+import ssl
+from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
+from urllib.error import HTTPError, URLError
+from urllib.request import urlopen, Request
+from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple
+
+
+class Logger:
+    _Logger: List['Logger'] = []
+
+    def __init__(self, name: str, level: int = logging.INFO):
+        self.name = name
+        self.level = level
+
+        Logger._Logger.append(self)
+        self.logger = self.get_logger()
+
+    def get_logger(self) -> logging.Logger:
+        logger = logging.getLogger(self.name)
+        logger.setLevel(self.level)
+        handler = logging.StreamHandler()
+        handler.setLevel(self.level)
+        fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+        handler.setFormatter(fmt)
+        logger.handlers.clear()
+        logger.addHandler(handler)
+        logger.propagate = False
+
+        return logger
+
+
+class Config:
+
+    def __init__(self,
+                 config_file: str = '/etc/ceph/node-proxy.yaml',
+                 default_config: Dict[str, Any] = {}) -> None:
+        self.config_file = config_file
+        self.default_config = default_config
+
+        self.load_config()
+
+    def load_config(self) -> None:
+        if os.path.exists(self.config_file):
+            with open(self.config_file, 'r') as f:
+                self.config = yaml.safe_load(f)
+        else:
+            self.config = self.default_config
+
+        for k, v in self.default_config.items():
+            if k not in self.config.keys():
+                self.config[k] = v
+
+        for k, v in self.config.items():
+            setattr(self, k, v)
+
+        # TODO: need to be improved
+        for _l in Logger._Logger:
+            _l.logger.setLevel(self.logging['level'])  # type: ignore
+            _l.logger.handlers[0].setLevel(self.logging['level'])  # type: ignore
+
+    def reload(self, config_file: str = '') -> None:
+        if config_file != '':
+            self.config_file = config_file
+        self.load_config()
+
+
+log = Logger(__name__)
+
+
+def to_snake_case(name: str) -> str:
+    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
+
+
+def normalize_dict(test_dict: Dict) -> Dict:
+    res = dict()
+    for key in test_dict.keys():
+        if isinstance(test_dict[key], dict):
+            res[key.lower()] = normalize_dict(test_dict[key])
+        else:
+            res[key.lower()] = test_dict[key]
+    return res
+
+
+def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable:
+    def decorator(f: Callable) -> Callable:
+        def _retry(*args: str, **kwargs: Any) -> Callable:
+            _tries = retries
+            while _tries > 1:
+                try:
+                    log.logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
+                    return f(*args, **kwargs)
+                except exceptions:
+                    time.sleep(delay)
+                    _tries -= 1
+            log.logger.warn('{} has failed after {} tries'.format(f, retries))
+            return f(*args, **kwargs)
+        return _retry
+    return decorator
+
+
+def http_req(hostname: str = '',
+             port: str = '443',
+             method: Optional[str] = None,
+             headers: MutableMapping[str, str] = {},
+             data: Optional[str] = None,
+             endpoint: str = '/',
+             scheme: str = 'https',
+             ssl_verify: bool = False,
+             timeout: Optional[int] = None,
+             ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
+
+    if not ssl_ctx:
+        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        if not ssl_verify:
+            ssl_ctx.check_hostname = False
+            ssl_ctx.verify_mode = ssl.CERT_NONE
+        else:
+            ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+
+    url: str = f'{scheme}://{hostname}:{port}{endpoint}'
+    _data = bytes(data, 'ascii') if data else None
+    _headers = headers
+    if data and not method:
+        method = 'POST'
+    if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
+        _headers['Content-Type'] = 'application/json'
+    try:
+        req = Request(url, _data, _headers, method=method)
+        with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+            response_str = response.read()
+            response_headers = response.headers
+            response_code = response.code
+        return response_headers, response_str.decode(), response_code
+    except (HTTPError, URLError) as e:
+        print(f'{e}')
+        # handle error here if needed
+        raise
+
+
+def write_tmp_file(data: str, prefix_name: str = 'node-proxy-') -> _TemporaryFileWrapper:
+    f = NamedTemporaryFile(prefix=prefix_name)
+    os.fchmod(f.fileno(), 0o600)
+    f.write(data.encode('utf-8'))
+    f.flush()
+    return f
index 9f855fea38b1a0bacbb8cd2c5c3999cf3e6f791b..e882428ccb582efc8e62f0c297b69300037fd072 100755 (executable)
@@ -40,7 +40,6 @@ from threading import Thread, Event
 from pathlib import Path
 from urllib.error import HTTPError, URLError
 from urllib.request import urlopen, Request
-from cephadmlib.node_proxy.main import NodeProxy
 
 
 FuncT = TypeVar('FuncT', bound=Callable)
@@ -701,6 +700,133 @@ class Monitoring(object):
 ##################################
 
 
+@register_daemon_form
+class NodeProxy(ContainerDaemonForm):
+    """Defines a node-proxy container"""
+
+    daemon_type = 'node-proxy'
+    # TODO: update this if we make node-proxy an executable
+    entrypoint = 'python3'
+    required_files = ['node-proxy.json']
+
+    @classmethod
+    def for_daemon_type(cls, daemon_type: str) -> bool:
+        return cls.daemon_type == daemon_type
+
+    def __init__(
+        self,
+        ctx: CephadmContext,
+        ident: DaemonIdentity,
+        config_json: Dict,
+        image: str = DEFAULT_IMAGE,
+    ):
+        self.ctx = ctx
+        self._identity = ident
+        self.image = image
+
+        # config-json options
+        config = dict_get(config_json, 'node-proxy.json', {})
+        self.files = {'node-proxy.json': config}
+
+        # validate the supplied args
+        self.validate()
+
+    @classmethod
+    def init(
+        cls, ctx: CephadmContext, fsid: str, daemon_id: str
+    ) -> 'NodeProxy':
+        return cls.create(
+            ctx, DaemonIdentity(fsid, cls.daemon_type, daemon_id)
+        )
+
+    @classmethod
+    def create(
+        cls, ctx: CephadmContext, ident: DaemonIdentity
+    ) -> 'NodeProxy':
+        return cls(ctx, ident, fetch_configs(ctx), ctx.image)
+
+    @property
+    def identity(self) -> DaemonIdentity:
+        return self._identity
+
+    @property
+    def fsid(self) -> str:
+        return self._identity.fsid
+
+    @property
+    def daemon_id(self) -> str:
+        return self._identity.daemon_id
+
+    def customize_container_mounts(
+        self, ctx: CephadmContext, mounts: Dict[str, str]
+    ) -> None:
+        data_dir = self.identity.data_dir(ctx.data_dir)
+        # TODO: update this when we have the actual location
+        # in the ceph container we are going to keep node-proxy
+        mounts.update({os.path.join(data_dir, 'node-proxy.json'): '/usr/share/ceph/node-proxy.json:z'})
+
+    def customize_process_args(self, ctx: CephadmContext, args: List[str]) -> None:
+        # TODO: this corresponds with the mount location of
+        # the config in _get_container_mounts above. They
+        # will both need to be updated when we have a proper
+        # location in the container for node-proxy
+        args.extend(['/usr/share/ceph/ceph_node_proxy/main.py', '--config', '/usr/share/ceph/node-proxy.json'])
+
+    def validate(self):
+        # type: () -> None
+        if not is_fsid(self.fsid):
+            raise Error('not an fsid: %s' % self.fsid)
+        if not self.daemon_id:
+            raise Error('invalid daemon_id: %s' % self.daemon_id)
+        if not self.image:
+            raise Error('invalid image: %s' % self.image)
+        # check for the required files
+        if self.required_files:
+            for fname in self.required_files:
+                if fname not in self.files:
+                    raise Error(
+                        'required file missing from config-json: %s' % fname
+                    )
+
+    def get_daemon_name(self):
+        # type: () -> str
+        return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+    def get_container_name(self, desc=None):
+        # type: (Optional[str]) -> str
+        cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name())
+        if desc:
+            cname = '%s-%s' % (cname, desc)
+        return cname
+
+    def create_daemon_dirs(self, data_dir, uid, gid):
+        # type: (str, int, int) -> None
+        """Create files under the container data dir"""
+        if not os.path.isdir(data_dir):
+            raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+        logger.info('Writing node-proxy config...')
+        # populate files from the config-json
+        populate_files(data_dir, self.files, uid, gid)
+
+    def container(self, ctx: CephadmContext) -> CephContainer:
+        # So the container can modprobe iscsi_target_mod and have write perms
+        # to configfs we need to make this a privileged container.
+        ctr = daemon_to_container(ctx, self, privileged=True, envs=['PYTHONPATH=$PYTHONPATH:/usr/share/ceph'])
+        return to_deployment_container(ctx, ctr)
+
+    def config_and_keyring(
+        self, ctx: CephadmContext
+    ) -> Tuple[Optional[str], Optional[str]]:
+        return get_config_and_keyring(ctx)
+
+    def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+        return extract_uid_gid(ctx)
+
+    def default_entrypoint(self) -> str:
+        return self.entrypoint
+
+
 @contextmanager
 def write_new(
     destination: Union[str, Path],
@@ -1649,6 +1775,7 @@ def get_supported_daemons():
     supported_daemons.append(CephadmAgent.daemon_type)
     supported_daemons.append(SNMPGateway.daemon_type)
     supported_daemons.extend(Tracing.components)
+    supported_daemons.append(NodeProxy.daemon_type)
     assert len(supported_daemons) == len(set(supported_daemons))
     return supported_daemons
 
@@ -3202,6 +3329,10 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
         sg = SNMPGateway.init(ctx, fsid, daemon_id)
         sg.create_daemon_conf()
 
+    elif daemon_type == NodeProxy.daemon_type:
+        node_proxy = NodeProxy.init(ctx, fsid, daemon_id)
+        node_proxy.create_daemon_dirs(data_dir, uid, gid)
+
     _write_custom_conf_files(ctx, daemon_type, str(daemon_id), fsid, uid, gid)
 
 
@@ -4754,28 +4885,13 @@ class MgrListener(Thread):
                             self.agent.ls_gatherer.wakeup()
                             self.agent.volume_gatherer.wakeup()
                             logger.debug(f'Got mgr message {data}')
-                        if 'node_proxy_oob_cmd' in data:
-                            if data['node_proxy_oob_cmd']['action'] in ['get_led', 'set_led']:
-                                conn.send(bytes(json.dumps(self.node_proxy_oob_cmd_result), 'utf-8'))
             except Exception as e:
                 logger.error(f'Mgr Listener encountered exception: {e}')
 
     def shutdown(self) -> None:
         self.stop = True
 
-    def validate_node_proxy_payload(self, data: Dict[str, Any]) -> None:
-        if 'action' not in data.keys():
-            raise RuntimeError('node-proxy oob command needs an action.')
-        if data['action'] in ['get_led', 'set_led']:
-            fields = ['type', 'id']
-            if data['type'] not in ['chassis', 'drive']:
-                raise RuntimeError('the LED type must be either "chassis" or "drive".')
-            for field in fields:
-                if field not in data.keys():
-                    raise RuntimeError('Received invalid node-proxy cmd.')
-
     def handle_json_payload(self, data: Dict[Any, Any]) -> None:
-        self.node_proxy_oob_cmd_result: Dict[str, Any] = {}
         if 'counter' in data:
             self.agent.ack = int(data['counter'])
             if 'config' in data:
@@ -4788,95 +4904,10 @@ class MgrListener(Thread):
                             f.write(config[filename])
                 self.agent.pull_conf_settings()
                 self.agent.wakeup()
-        elif 'node_proxy_shutdown' in data:
-            logger.info('Received node_proxy_shutdown command.')
-            self.agent.shutdown()
-        elif 'node_proxy_oob_cmd' in data:
-            node_proxy_cmd: Dict[str, Any] = data['node_proxy_oob_cmd']
-            try:
-                self.validate_node_proxy_payload(node_proxy_cmd)
-            except RuntimeError as e:
-                logger.error(f"Couldn't validate node-proxy payload:\n{node_proxy_cmd}\n{e}")
-                raise
-            logger.info(f'Received node_proxy_oob_cmd command: {node_proxy_cmd}')
-            if node_proxy_cmd['action'] == 'get_led':
-                if node_proxy_cmd['type'] == 'chassis':
-                    self.node_proxy_oob_cmd_result = self.agent.node_proxy_mgr.node_proxy.system.get_chassis_led()
-            if node_proxy_cmd['action'] == 'set_led':
-                if node_proxy_cmd['type'] == 'chassis':
-                    _data: Dict[str, Any] = json.loads(node_proxy_cmd['data'])
-                    _result: int = self.agent.node_proxy_mgr.node_proxy.system.set_chassis_led(_data)
-                    self.node_proxy_oob_cmd_result = {'http_code': _result}
         else:
             raise RuntimeError('No valid data received.')
 
 
-class NodeProxyManager(Thread):
-    def __init__(self, agent: 'CephadmAgent', event: Event):
-        super().__init__()
-        self.agent = agent
-        self.event = event
-        self.stop = False
-
-    def run(self) -> None:
-        self.event.wait()
-        self.ssl_ctx = self.agent.ssl_ctx
-        self.init()
-        self.loop()
-
-    def init(self) -> None:
-        node_proxy_meta = {
-            'cephx': {
-                'name': self.agent.host,
-                'secret': self.agent.keyring
-            }
-        }
-        status, result = http_query(addr=self.agent.target_ip,
-                                    port=self.agent.target_port,
-                                    data=json.dumps(node_proxy_meta).encode('ascii'),
-                                    endpoint='/node-proxy/oob',
-                                    ssl_ctx=self.ssl_ctx)
-        if status != 200:
-            msg = f'No out of band tool details could be loaded: {status}, {result}'
-            logger.debug(msg)
-            raise RuntimeError(msg)
-
-        result_json = json.loads(result)
-        kwargs = {
-            'host': result_json['result']['addr'],
-            'username': result_json['result']['username'],
-            'password': result_json['result']['password'],
-            'cephx': node_proxy_meta['cephx'],
-            'mgr_target_ip': self.agent.target_ip,
-            'mgr_target_port': self.agent.target_port
-        }
-        if result_json['result'].get('port'):
-            kwargs['port'] = result_json['result']['port']
-
-        self.node_proxy: NodeProxy = NodeProxy(**kwargs)
-        self.node_proxy.start()
-
-    def loop(self) -> None:
-        while not self.stop:
-            try:
-                status = self.node_proxy.check_status()
-                label = 'Ok' if status else 'Critical'
-                logger.debug(f'node-proxy status: {label}')
-            except Exception as e:
-                logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
-                time.sleep(120)
-                self.init()
-            else:
-                logger.debug('node-proxy alive, next check in 60sec.')
-                time.sleep(60)
-
-    def shutdown(self) -> None:
-        self.stop = True
-        # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
-        if self.__dict__.get('node_proxy'):
-            self.node_proxy.shutdown()
-
-
 class CephadmAgent():
 
     daemon_type = 'agent'
@@ -4919,8 +4950,6 @@ class CephadmAgent():
         self.ssl_ctx = ssl.create_default_context()
         self.ssl_ctx.check_hostname = True
         self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
-        self.node_proxy_mgr_event = Event()
-        self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event)
 
     def validate(self, config: Dict[str, str] = {}) -> None:
         # check for the required files
@@ -4992,8 +5021,6 @@ WantedBy=ceph-{fsid}.target
 
     def shutdown(self) -> None:
         self.stop = True
-        if self.node_proxy_mgr.is_alive():
-            self.node_proxy_mgr.shutdown()
         if self.mgr_listener.is_alive():
             self.mgr_listener.shutdown()
         if self.ls_gatherer.is_alive():
@@ -5035,9 +5062,6 @@ WantedBy=ceph-{fsid}.target
     def run(self) -> None:
         self.pull_conf_settings()
         self.ssl_ctx.load_verify_locations(self.ca_path)
-        # only after self.pull_conf_settings() was called we can actually start
-        # node-proxy
-        self.node_proxy_mgr_event.set()
 
         try:
             for _ in range(1001):
@@ -5059,9 +5083,6 @@ WantedBy=ceph-{fsid}.target
         if not self.volume_gatherer.is_alive():
             self.volume_gatherer.start()
 
-        if not self.node_proxy_mgr.is_alive():
-            self.node_proxy_mgr.start()
-
         while not self.stop:
             start_time = time.monotonic()
             ack = self.ack
diff --git a/src/cephadm/cephadmlib/node_proxy/__init__.py b/src/cephadm/cephadmlib/node_proxy/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/src/cephadm/cephadmlib/node_proxy/baseclient.py b/src/cephadm/cephadmlib/node_proxy/baseclient.py
deleted file mode 100644 (file)
index 6b46561..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-from typing import Dict, Any
-
-
-class BaseClient:
-    def __init__(self,
-                 host: str,
-                 username: str,
-                 password: str) -> None:
-        self.host = host
-        self.username = username
-        self.password = password
-
-    def login(self) -> None:
-        raise NotImplementedError()
-
-    def logout(self) -> Dict[str, Any]:
-        raise NotImplementedError()
-
-    def get_path(self, path: str) -> Dict:
-        raise NotImplementedError()
diff --git a/src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py b/src/cephadm/cephadmlib/node_proxy/baseredfishsystem.py
deleted file mode 100644 (file)
index 45c80e0..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-import concurrent.futures
-from .basesystem import BaseSystem
-from .redfish_client import RedFishClient
-from threading import Thread, Lock
-from time import sleep
-from .util import Logger, retry
-from typing import Dict, Any, List
-
-
-class BaseRedfishSystem(BaseSystem):
-    def __init__(self, **kw: Any) -> None:
-        super().__init__(**kw)
-        self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1',
-                                                                       '/UpdateService'])
-        self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1')
-        self.log = Logger(__name__)
-        self.host: str = kw['host']
-        self.port: str = kw['port']
-        self.username: str = kw['username']
-        self.password: str = kw['password']
-        # move the following line (class attribute?)
-        self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
-        self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
-
-        self.run: bool = False
-        self.thread: Thread
-        self.data_ready: bool = False
-        self.previous_data: Dict = {}
-        self.lock: Lock = Lock()
-        self.data: Dict[str, Dict[str, Any]] = {}
-        self._system: Dict[str, Dict[str, Any]] = {}
-        self._sys: Dict[str, Any] = {}
-        self.start_client()
-
-    def start_client(self) -> None:
-        self.client.login()
-        self.start_update_loop()
-
-    def start_update_loop(self) -> None:
-        self.run = True
-        self.thread = Thread(target=self.update)
-        self.thread.start()
-
-    def stop_update_loop(self) -> None:
-        self.run = False
-        self.thread.join()
-
-    def update(self) -> None:
-        #  this loop can have:
-        #  - caching logic
-        while self.run:
-            self.log.logger.debug('waiting for a lock in the update loop.')
-            self.lock.acquire()
-            self.log.logger.debug('lock acquired in the update loop.')
-            try:
-                self._update_system()
-                self._update_sn()
-                update_funcs = [self._update_memory,
-                                self._update_power,
-                                self._update_fans,
-                                self._update_network,
-                                self._update_processors,
-                                self._update_storage,
-                                self._update_firmwares]
-
-                with concurrent.futures.ThreadPoolExecutor() as executor:
-                    executor.map(lambda f: f(), update_funcs)
-
-                self.data_ready = True
-                sleep(5)
-            except RuntimeError as e:
-                self.run = False
-                self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
-                self.client.logout()
-            finally:
-                self.lock.release()
-                self.log.logger.debug('lock released in the update loop.')
-
-    def flush(self) -> None:
-        self.log.logger.debug('Acquiring lock to flush data.')
-        self.lock.acquire()
-        self.log.logger.debug('Lock acquired, flushing data.')
-        self._system = {}
-        self.previous_data = {}
-        self.log.logger.info('Data flushed.')
-        self.data_ready = False
-        self.log.logger.debug('Data marked as not ready.')
-        self.lock.release()
-        self.log.logger.debug('Released the lock after flushing data.')
-
-    @retry(retries=10, delay=2)
-    def _get_path(self, path: str) -> Dict:
-        try:
-            result = self.client.get_path(path)
-        except RuntimeError:
-            raise
-        if result is None:
-            self.log.logger.error(f'The client reported an error when getting path: {path}')
-            raise RuntimeError(f'Could not get path: {path}')
-        return result
-
-    def get_members(self, data: Dict[str, Any], path: str) -> List:
-        _path = data[path]['@odata.id']
-        _data = self._get_path(_path)
-        return [self._get_path(member['@odata.id']) for member in _data['Members']]
-
-    def get_system(self) -> Dict[str, Any]:
-        result = {
-            'host': self.get_host(),
-            'sn': self.get_sn(),
-            'status': {
-                'storage': self.get_storage(),
-                'processors': self.get_processors(),
-                'network': self.get_network(),
-                'memory': self.get_memory(),
-                'power': self.get_power(),
-                'fans': self.get_fans()
-            },
-            'firmwares': self.get_firmwares(),
-            'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'}  # TODO(guits): not ideal
-        }
-        return result
-
-    def _update_system(self) -> None:
-        for endpoint in self.common_endpoints:
-            result = self.client.get_path(endpoint)
-            _endpoint = endpoint.strip('/').split('/')[0]
-            self._system[_endpoint] = result
-
-    def _update_sn(self) -> None:
-        raise NotImplementedError()
-
-    def _update_memory(self) -> None:
-        raise NotImplementedError()
-
-    def _update_power(self) -> None:
-        raise NotImplementedError()
-
-    def _update_fans(self) -> None:
-        raise NotImplementedError()
-
-    def _update_network(self) -> None:
-        raise NotImplementedError()
-
-    def _update_processors(self) -> None:
-        raise NotImplementedError()
-
-    def _update_storage(self) -> None:
-        raise NotImplementedError()
-
-    def _update_firmwares(self) -> None:
-        raise NotImplementedError()
diff --git a/src/cephadm/cephadmlib/node_proxy/basesystem.py b/src/cephadm/cephadmlib/node_proxy/basesystem.py
deleted file mode 100644 (file)
index 4fb1b7b..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-import socket
-from .util import Config
-from typing import Dict, Any
-from .baseclient import BaseClient
-
-
-class BaseSystem:
-    def __init__(self, **kw: Any) -> None:
-        self._system: Dict = {}
-        self.config: Config = kw['config']
-        self.client: BaseClient
-
-    def get_system(self) -> Dict[str, Any]:
-        raise NotImplementedError()
-
-    def get_status(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_processors(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_memory(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_fans(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_power(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_network(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_storage(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
-        raise NotImplementedError()
-
-    def get_sn(self) -> str:
-        raise NotImplementedError()
-
-    def get_led(self) -> Dict[str, Any]:
-        raise NotImplementedError()
-
-    def set_led(self, data: Dict[str, str]) -> int:
-        raise NotImplementedError()
-
-    def get_host(self) -> str:
-        return socket.gethostname()
-
-    def start_update_loop(self) -> None:
-        raise NotImplementedError()
-
-    def stop_update_loop(self) -> None:
-        raise NotImplementedError()
-
-    def start_client(self) -> None:
-        raise NotImplementedError()
-
-    def flush(self) -> None:
-        raise NotImplementedError()
diff --git a/src/cephadm/cephadmlib/node_proxy/main.py b/src/cephadm/cephadmlib/node_proxy/main.py
deleted file mode 100644 (file)
index 968dfd3..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-from threading import Thread
-from .redfishdellsystem import RedfishDellSystem
-from .reporter import Reporter
-from .util import Config, Logger
-from typing import Dict, Any, Optional
-import traceback
-
-DEFAULT_CONFIG = {
-    'reporter': {
-        'check_interval': 5,
-        'push_data_max_retries': 30,
-        'endpoint': 'https://127.0.0.1:7150/node-proxy/data',
-    },
-    'system': {
-        'refresh_interval': 5
-    },
-    'server': {
-        'port': 8080,
-    },
-    'logging': {
-        'level': 20,
-    }
-}
-
-
-class NodeProxy(Thread):
-    def __init__(self, **kw: Any) -> None:
-        super().__init__()
-        self.username: str = kw.get('username', '')
-        self.password: str = kw.get('password', '')
-        self.host: str = kw.get('host', '')
-        self.port: int = kw.get('port', 443)
-        self.cephx: Dict[str, Any] = kw.get('cephx', {})
-        self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
-        self.mgr_target_ip: str = kw.get('mgr_target_ip', '')
-        self.mgr_target_port: str = kw.get('mgr_target_port', '')
-        self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
-        self.exc: Optional[Exception] = None
-        self.log = Logger(__name__)
-
-    def run(self) -> None:
-        try:
-            self.main()
-        except Exception as e:
-            self.exc = e
-            return
-
-    def shutdown(self) -> None:
-        self.log.logger.info('Shutting down node-proxy...')
-        self.system.client.logout()
-        self.system.stop_update_loop()
-        self.reporter_agent.stop()
-
-    def check_auth(self, realm: str, username: str, password: str) -> bool:
-        return self.username == username and \
-            self.password == password
-
-    def check_status(self) -> bool:
-        if self.__dict__.get('system') and not self.system.run:
-            raise RuntimeError('node-proxy encountered an error.')
-        if self.exc:
-            traceback.print_tb(self.exc.__traceback__)
-            self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}')
-            raise self.exc
-        return True
-
-    def main(self) -> None:
-        # TODO: add a check and fail if host/username/password/data aren't passed
-        self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG)
-        self.log = Logger(__name__, level=self.config.__dict__['logging']['level'])
-
-        # create the redfish system and the obsever
-        self.log.logger.info('Server initialization...')
-        try:
-            self.system = RedfishDellSystem(host=self.host,
-                                            port=self.port,
-                                            username=self.username,
-                                            password=self.password,
-                                            config=self.config)
-        except RuntimeError:
-            self.log.logger.error("Can't initialize the redfish system.")
-            raise
-
-        try:
-            self.reporter_agent = Reporter(self.system,
-                                           self.cephx,
-                                           reporter_scheme=self.reporter_scheme,
-                                           reporter_hostname=self.mgr_target_ip,
-                                           reporter_port=self.mgr_target_port,
-                                           reporter_endpoint=self.reporter_endpoint)
-            self.reporter_agent.run()
-        except RuntimeError:
-            self.log.logger.error("Can't initialize the reporter.")
-            raise
diff --git a/src/cephadm/cephadmlib/node_proxy/redfish_client.py b/src/cephadm/cephadmlib/node_proxy/redfish_client.py
deleted file mode 100644 (file)
index 040db8c..0000000
+++ /dev/null
@@ -1,112 +0,0 @@
-import json
-from urllib.error import HTTPError, URLError
-from .baseclient import BaseClient
-from .util import Logger, http_req
-from typing import Dict, Any, Tuple, Optional
-from http.client import HTTPMessage
-
-
-class RedFishClient(BaseClient):
-    PREFIX = '/redfish/v1/'
-
-    def __init__(self,
-                 host: str = '',
-                 port: str = '443',
-                 username: str = '',
-                 password: str = ''):
-        super().__init__(host, username, password)
-        self.log: Logger = Logger(__name__)
-        self.log.logger.info(f'Initializing redfish client {__name__}')
-        self.host: str = host
-        self.port: str = port
-        self.url: str = f'https://{self.host}:{self.port}'
-        self.token: str = ''
-        self.location: str = ''
-
-    def login(self) -> None:
-        if not self.is_logged_in():
-            self.log.logger.info('Logging in to '
-                                 f"{self.url} as '{self.username}'")
-            oob_credentials = json.dumps({'UserName': self.username,
-                                          'Password': self.password})
-            headers = {'Content-Type': 'application/json'}
-
-            try:
-                _headers, _data, _status_code = self.query(data=oob_credentials,
-                                                           headers=headers,
-                                                           endpoint='/redfish/v1/SessionService/Sessions/')
-                if _status_code != 201:
-                    self.log.logger.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
-                    raise RuntimeError
-            except URLError as e:
-                msg = f"Can't log in to {self.url} as '{self.username}': {e}"
-                self.log.logger.error(msg)
-                raise RuntimeError
-            self.token = _headers['X-Auth-Token']
-            self.location = _headers['Location']
-
-    def is_logged_in(self) -> bool:
-        self.log.logger.debug(f'Checking token validity for {self.url}')
-        if not self.location or not self.token:
-            self.log.logger.debug(f'No token found for {self.url}.')
-            return False
-        headers = {'X-Auth-Token': self.token}
-        try:
-            _headers, _data, _status_code = self.query(headers=headers,
-                                                       endpoint=self.location)
-        except URLError as e:
-            self.log.logger.error("Can't check token "
-                                  f'validity for {self.url}: {e}')
-            raise RuntimeError
-        return _status_code == 200
-
-    def logout(self) -> Dict[str, Any]:
-        try:
-            _, _data, _status_code = self.query(method='DELETE',
-                                                headers={'X-Auth-Token': self.token},
-                                                endpoint=self.location)
-        except URLError:
-            self.log.logger.error(f"Can't log out from {self.url}")
-            return {}
-
-        response_str = _data
-
-        return json.loads(response_str)
-
-    def get_path(self, path: str) -> Dict[str, Any]:
-        if self.PREFIX not in path:
-            path = f'{self.PREFIX}{path}'
-        try:
-            _, result, _status_code = self.query(endpoint=path)
-            result_json = json.loads(result)
-            return result_json
-        except URLError as e:
-            self.log.logger.error(f"Can't get path {path}:\n{e}")
-            raise RuntimeError
-
-    def query(self,
-              data: Optional[str] = None,
-              headers: Dict[str, str] = {},
-              method: Optional[str] = None,
-              endpoint: str = '',
-              timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
-        _headers = headers.copy() if headers else {}
-        if self.token:
-            _headers['X-Auth-Token'] = self.token
-        if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
-            _headers['Content-Type'] = 'application/json'
-        try:
-            (response_headers,
-             response_str,
-             response_status) = http_req(hostname=self.host,
-                                         port=self.port,
-                                         endpoint=endpoint,
-                                         headers=_headers,
-                                         method=method,
-                                         data=data,
-                                         timeout=timeout)
-
-            return response_headers, response_str, response_status
-        except (HTTPError, URLError) as e:
-            self.log.logger.debug(f'{e}')
-            raise
diff --git a/src/cephadm/cephadmlib/node_proxy/redfishdellsystem.py b/src/cephadm/cephadmlib/node_proxy/redfishdellsystem.py
deleted file mode 100644 (file)
index 71006c4..0000000
+++ /dev/null
@@ -1,186 +0,0 @@
-import json
-from .baseredfishsystem import BaseRedfishSystem
-from .util import Logger, normalize_dict, to_snake_case
-from typing import Dict, Any, List
-
-
-class RedfishDellSystem(BaseRedfishSystem):
-    def __init__(self, **kw: Any) -> None:
-        super().__init__(**kw)
-        self.log = Logger(__name__)
-
-    def build_common_data(self,
-                          data: Dict[str, Any],
-                          fields: List,
-                          path: str) -> Dict[str, Dict[str, Dict]]:
-        result: Dict[str, Dict[str, Dict]] = dict()
-        for member_info in self.get_members(data, path):
-            member_id = member_info['Id']
-            result[member_id] = dict()
-            for field in fields:
-                try:
-                    result[member_id][to_snake_case(field)] = member_info[field]
-                except KeyError:
-                    self.log.logger.warning(f'Could not find field: {field} in member_info: {member_info}')
-
-        return normalize_dict(result)
-
-    def build_chassis_data(self,
-                           fields: Dict[str, List[str]],
-                           path: str) -> Dict[str, Dict[str, Dict]]:
-        result: Dict[str, Dict[str, Dict]] = dict()
-        data = self._get_path(f'{self.chassis_endpoint}/{path}')
-
-        for elt, _fields in fields.items():
-            for member_elt in data[elt]:
-                _id = member_elt['MemberId']
-                result[_id] = dict()
-                for field in _fields:
-                    try:
-                        result[_id][to_snake_case(field)] = member_elt[field]
-                    except KeyError:
-                        self.log.logger.warning(f'Could not find field: {field} in data: {data[elt]}')
-        return normalize_dict(result)
-
-    def get_sn(self) -> str:
-        return self._sys['SKU']
-
-    def get_status(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['status']
-
-    def get_memory(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['memory']
-
-    def get_processors(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['processors']
-
-    def get_network(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['network']
-
-    def get_storage(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['storage']
-
-    def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['firmwares']
-
-    def get_power(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['power']
-
-    def get_fans(self) -> Dict[str, Dict[str, Dict]]:
-        return self._sys['fans']
-
-    def _update_network(self) -> None:
-        fields = ['Description', 'Name', 'SpeedMbps', 'Status']
-        self.log.logger.debug('Updating network')
-        self._sys['network'] = self.build_common_data(data=self._system['Systems'],
-                                                      fields=fields,
-                                                      path='EthernetInterfaces')
-
-    def _update_processors(self) -> None:
-        fields = ['Description',
-                  'TotalCores',
-                  'TotalThreads',
-                  'ProcessorType',
-                  'Model',
-                  'Status',
-                  'Manufacturer']
-        self.log.logger.debug('Updating processors')
-        self._sys['processors'] = self.build_common_data(data=self._system['Systems'],
-                                                         fields=fields,
-                                                         path='Processors')
-
-    def _update_storage(self) -> None:
-        fields = ['Description',
-                  'CapacityBytes',
-                  'Model', 'Protocol',
-                  'SerialNumber', 'Status',
-                  'PhysicalLocation']
-        entities = self.get_members(data=self._system['Systems'],
-                                    path='Storage')
-        self.log.logger.debug('Updating storage')
-        result: Dict[str, Dict[str, Dict]] = dict()
-        for entity in entities:
-            for drive in entity['Drives']:
-                drive_path = drive['@odata.id']
-                drive_info = self._get_path(drive_path)
-                drive_id = drive_info['Id']
-                result[drive_id] = dict()
-                result[drive_id]['redfish_endpoint'] = drive['@odata.id']
-                for field in fields:
-                    result[drive_id][to_snake_case(field)] = drive_info[field]
-                    result[drive_id]['entity'] = entity['Id']
-        self._sys['storage'] = normalize_dict(result)
-
-    def _update_sn(self) -> None:
-        self.log.logger.debug('Updating serial number')
-        self._sys['SKU'] = self._system['Systems']['SKU']
-
-    def _update_memory(self) -> None:
-        fields = ['Description',
-                  'MemoryDeviceType',
-                  'CapacityMiB',
-                  'Status']
-        self.log.logger.debug('Updating memory')
-        self._sys['memory'] = self.build_common_data(data=self._system['Systems'],
-                                                     fields=fields,
-                                                     path='Memory')
-
-    def _update_power(self) -> None:
-        fields = {
-            'PowerSupplies': [
-                'Name',
-                'Model',
-                'Manufacturer',
-                'Status'
-            ]
-        }
-        self.log.logger.debug('Updating powersupplies')
-        self._sys['power'] = self.build_chassis_data(fields, 'Power')
-
-    def _update_fans(self) -> None:
-        fields = {
-            'Fans': [
-                'Name',
-                'PhysicalContext',
-                'Status'
-            ],
-        }
-        self.log.logger.debug('Updating fans')
-        self._sys['fans'] = self.build_chassis_data(fields, 'Thermal')
-
-    def _update_firmwares(self) -> None:
-        fields = [
-            'Name',
-            'Description',
-            'ReleaseDate',
-            'Version',
-            'Updateable',
-            'Status',
-        ]
-        self.log.logger.debug('Updating firmwares')
-        self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
-                                                        fields=fields,
-                                                        path='FirmwareInventory')
-
-    def get_chassis_led(self) -> Dict[str, Any]:
-        endpoint = f'/redfish/v1/{self.chassis_endpoint}'
-        result = self.client.query(method='GET',
-                                   endpoint=endpoint,
-                                   timeout=10)
-        response_json = json.loads(result[1])
-        _result: Dict[str, Any] = {'http_code': result[2]}
-        if result[2] == 200:
-            _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
-        else:
-            _result['LocationIndicatorActive'] = None
-        return _result
-
-    def set_chassis_led(self, data: Dict[str, str]) -> int:
-        # '{"IndicatorLED": "Lit"}'      -> LocationIndicatorActive = false
-        # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
-        _, response, status = self.client.query(
-            data=json.dumps(data),
-            method='PATCH',
-            endpoint=f'/redfish/v1{self.chassis_endpoint}'
-        )
-        return status
diff --git a/src/cephadm/cephadmlib/node_proxy/reporter.py b/src/cephadm/cephadmlib/node_proxy/reporter.py
deleted file mode 100644 (file)
index fb92a45..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-from threading import Thread
-import time
-import json
-from .util import Logger, http_req
-from urllib.error import HTTPError, URLError
-from typing import Dict, Any
-
-
-class Reporter:
-    def __init__(self,
-                 system: Any,
-                 cephx: Dict[str, Any],
-                 reporter_scheme: str = 'https',
-                 reporter_hostname: str = '',
-                 reporter_port: str = '443',
-                 reporter_endpoint: str = '/node-proxy/data') -> None:
-        self.system = system
-        self.data: Dict[str, Any] = {}
-        self.finish = False
-        self.cephx = cephx
-        self.data['cephx'] = self.cephx
-        self.reporter_scheme: str = reporter_scheme
-        self.reporter_hostname: str = reporter_hostname
-        self.reporter_port: str = reporter_port
-        self.reporter_endpoint: str = reporter_endpoint
-        self.log = Logger(__name__)
-        self.reporter_url: str = (f'{reporter_scheme}:{reporter_hostname}:'
-                                  f'{reporter_port}{reporter_endpoint}')
-        self.log.logger.info(f'Reporter url set to {self.reporter_url}')
-
-    def stop(self) -> None:
-        self.finish = True
-        self.thread.join()
-
-    def run(self) -> None:
-        self.thread = Thread(target=self.loop)
-        self.thread.start()
-
-    def loop(self) -> None:
-        while not self.finish:
-            # Any logic to avoid sending the all the system
-            # information every loop can go here. In a real
-            # scenario probably we should just send the sub-parts
-            # that have changed to minimize the traffic in
-            # dense clusters
-            self.log.logger.debug('waiting for a lock in reporter loop.')
-            self.system.lock.acquire()
-            self.log.logger.debug('lock acquired in reporter loop.')
-            if self.system.data_ready:
-                self.log.logger.info('data ready to be sent to the mgr.')
-                if not self.system.get_system() == self.system.previous_data:
-                    self.log.logger.info('data has changed since last iteration.')
-                    self.data['patch'] = self.system.get_system()
-                    try:
-                        # TODO: add a timeout parameter to the reporter in the config file
-                        self.log.logger.info(f'sending data to {self.reporter_url}')
-                        http_req(hostname=self.reporter_hostname,
-                                 port=self.reporter_port,
-                                 method='POST',
-                                 headers={'Content-Type': 'application/json'},
-                                 endpoint=self.reporter_endpoint,
-                                 scheme=self.reporter_scheme,
-                                 data=json.dumps(self.data))
-                    except (HTTPError, URLError) as e:
-                        self.log.logger.error(f"The reporter couldn't send data to the mgr: {e}")
-                        # Need to add a new parameter 'max_retries' to the reporter if it can't
-                        # send the data for more than x times, maybe the daemon should stop altogether
-                    else:
-                        self.system.previous_data = self.system.get_system()
-                else:
-                    self.log.logger.info('no diff, not sending data to the mgr.')
-            self.system.lock.release()
-            self.log.logger.debug('lock released in reporter loop.')
-            time.sleep(5)
diff --git a/src/cephadm/cephadmlib/node_proxy/util.py b/src/cephadm/cephadmlib/node_proxy/util.py
deleted file mode 100644 (file)
index 31c1c00..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-import logging
-import yaml
-import os
-import time
-import re
-import ssl
-from urllib.error import HTTPError, URLError
-from urllib.request import urlopen, Request
-from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple
-
-
-class Logger:
-    _Logger: List['Logger'] = []
-
-    def __init__(self, name: str, level: int = logging.INFO):
-        self.name = name
-        self.level = level
-
-        Logger._Logger.append(self)
-        self.logger = self.get_logger()
-
-    def get_logger(self) -> logging.Logger:
-        logger = logging.getLogger(self.name)
-        logger.setLevel(self.level)
-        handler = logging.StreamHandler()
-        handler.setLevel(self.level)
-        fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-        handler.setFormatter(fmt)
-        logger.handlers.clear()
-        logger.addHandler(handler)
-        logger.propagate = False
-
-        return logger
-
-
-class Config:
-
-    def __init__(self,
-                 config_file: str = '/etc/ceph/node-proxy.yaml',
-                 default_config: Dict[str, Any] = {}) -> None:
-        self.config_file = config_file
-        self.default_config = default_config
-
-        self.load_config()
-
-    def load_config(self) -> None:
-        if os.path.exists(self.config_file):
-            with open(self.config_file, 'r') as f:
-                self.config = yaml.safe_load(f)
-        else:
-            self.config = self.default_config
-
-        for k, v in self.default_config.items():
-            if k not in self.config.keys():
-                self.config[k] = v
-
-        for k, v in self.config.items():
-            setattr(self, k, v)
-
-        # TODO: need to be improved
-        for _l in Logger._Logger:
-            _l.logger.setLevel(self.logging['level'])  # type: ignore
-            _l.logger.handlers[0].setLevel(self.logging['level'])  # type: ignore
-
-    def reload(self, config_file: str = '') -> None:
-        if config_file != '':
-            self.config_file = config_file
-        self.load_config()
-
-
-log = Logger(__name__)
-
-
-def to_snake_case(name: str) -> str:
-    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
-    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
-
-
-def normalize_dict(test_dict: Dict) -> Dict:
-    res = dict()
-    for key in test_dict.keys():
-        if isinstance(test_dict[key], dict):
-            res[key.lower()] = normalize_dict(test_dict[key])
-        else:
-            res[key.lower()] = test_dict[key]
-    return res
-
-
-def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable:
-    def decorator(f: Callable) -> Callable:
-        def _retry(*args: str, **kwargs: Any) -> Callable:
-            _tries = retries
-            while _tries > 1:
-                try:
-                    log.logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
-                    return f(*args, **kwargs)
-                except exceptions:
-                    time.sleep(delay)
-                    _tries -= 1
-            log.logger.warn('{} has failed after {} tries'.format(f, retries))
-            return f(*args, **kwargs)
-        return _retry
-    return decorator
-
-
-def http_req(hostname: str = '',
-             port: str = '443',
-             method: Optional[str] = None,
-             headers: MutableMapping[str, str] = {},
-             data: Optional[str] = None,
-             endpoint: str = '/',
-             scheme: str = 'https',
-             ssl_verify: bool = False,
-             timeout: Optional[int] = None,
-             ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
-
-    if not ssl_ctx:
-        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
-        if not ssl_verify:
-            ssl_ctx.check_hostname = False
-            ssl_ctx.verify_mode = ssl.CERT_NONE
-        else:
-            ssl_ctx.verify_mode = ssl.CERT_REQUIRED
-
-    url: str = f'{scheme}://{hostname}:{port}{endpoint}'
-    _data = bytes(data, 'ascii') if data else None
-
-    try:
-        req = Request(url, _data, headers, method=method)
-        with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
-            response_str = response.read()
-            response_headers = response.headers
-            response_code = response.code
-        return response_headers, response_str.decode(), response_code
-    except (HTTPError, URLError) as e:
-        print(f'{e}')
-        # handle error here if needed
-        raise
index bd4b436b492666756134c55df6a0a1c56d8653e6..1b158822b526bc3e5420ffd273d1f72b07c253f6 100755 (executable)
@@ -45,6 +45,8 @@ ignore_missing_imports = True
 [mypy-kubernetes.*]
 ignore_missing_imports = True
 
+[mypy-setuptools]
+ignore_missing_imports = True
 
 # Make dashboard happy:
 [mypy-coverage]
index b589886e566c82c77b5a0a8031297f4bc8b42c28..68495d3bc061f473c5dee37201a397e3801d4317 100644 (file)
@@ -16,14 +16,15 @@ import time
 
 from orchestrator import DaemonDescriptionStatus
 from orchestrator._interface import daemon_type_to_service
-from ceph.utils import datetime_now
+from ceph.utils import datetime_now, http_req
 from ceph.deployment.inventory import Devices
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from cephadm.ssl_cert_utils import SSLCerts
 from mgr_util import test_port_allocation, PortAlreadyInUse
 
-from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
+from urllib.error import HTTPError, URLError
+from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -138,8 +139,9 @@ class NodeProxyEndpoint:
 
         self.validate_node_proxy_data(data)
 
-        host = data["cephx"]["name"]
-        results['result'] = self.mgr.node_proxy_cache.oob.get(host)
+        # expecting name to be "node-proxy.<hostname>"
+        hostname = data['cephx']['name'][11:]
+        results['result'] = self.mgr.node_proxy_cache.oob.get(hostname, '')
         if not results['result']:
             raise cherrypy.HTTPError(400, 'The provided host has no iDrac details.')
         return results
@@ -160,13 +162,15 @@ class NodeProxyEndpoint:
                 raise cherrypy.HTTPError(400, 'The field \'cephx\' must be provided.')
             elif 'name' not in data['cephx'].keys():
                 cherrypy.response.status = 400
-                raise cherrypy.HTTPError(400, 'The field \'host\' must be provided.')
-            elif 'secret' not in data['cephx'].keys():
-                raise cherrypy.HTTPError(400, 'The agent keyring must be provided.')
-            elif not self.mgr.agent_cache.agent_keys.get(data['cephx']['name']):
-                raise cherrypy.HTTPError(502, f'Make sure the agent is running on {data["cephx"]["name"]}')
-            elif data['cephx']['secret'] != self.mgr.agent_cache.agent_keys[data['cephx']['name']]:
-                raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {data["cephx"]["name"]}.')
+                raise cherrypy.HTTPError(400, 'The field \'name\' must be provided.')
+            # expecting name to be "node-proxy.<hostname>"
+            hostname = data['cephx']['name'][11:]
+            if 'secret' not in data['cephx'].keys():
+                raise cherrypy.HTTPError(400, 'The node-proxy keyring must be provided.')
+            elif not self.mgr.node_proxy_cache.keyrings.get(hostname, ''):
+                raise cherrypy.HTTPError(502, f'Make sure the node-proxy is running on {hostname}')
+            elif data['cephx']['secret'] != self.mgr.node_proxy_cache.keyrings[hostname]:
+                raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {hostname}.')
         except AttributeError:
             raise cherrypy.HTTPError(400, 'Malformed data received.')
 
@@ -289,12 +293,19 @@ class NodeProxyEndpoint:
         :rtype: dict[str, Any]
         """
         method: str = cherrypy.request.method
+        header: MutableMapping[str, str] = {}
         hostname: Optional[str] = kw.get('hostname')
         led_type: Optional[str] = kw.get('type')
         id_drive: Optional[str] = kw.get('id')
-        data: Optional[str] = None
-        # this method is restricted to 'GET' or 'PATCH'
-        action: str = 'get_led' if method == 'GET' else 'set_led'
+        payload: Optional[Dict[str, str]] = None
+        endpoint: List[Any] = ['led', led_type]
+        device: str = id_drive if id_drive else ''
+
+        ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+        ssl_ctx = ssl.create_default_context()
+        ssl_ctx.check_hostname = True
+        ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+        ssl_ctx.load_verify_locations(cadata=ssl_root_crt)
 
         if not hostname:
             msg: str = "listing enclosure LED status for all nodes is not implemented."
@@ -311,16 +322,32 @@ class NodeProxyEndpoint:
             self.mgr.log.debug(msg)
             raise cherrypy.HTTPError(400, msg)
 
+        if led_type == 'drive':
+            endpoint.append(device)
+
         if hostname not in self.mgr.node_proxy_cache.data.keys():
             # TODO(guits): update unit test for this
             msg = f"'{hostname}' not found."
             self.mgr.log.debug(msg)
             raise cherrypy.HTTPError(400, msg)
 
+        addr: str = self.mgr.inventory.get_addr(hostname)
+
         if method == 'PATCH':
             # TODO(guits): need to check the request is authorized
             # allowing a specific keyring only ? (client.admin or client.agent.. ?)
-            data = json.dumps(cherrypy.request.json)
+            data: Dict[str, Any] = cherrypy.request.json
+            if 'state' not in data.keys():
+                msg = "'state' key not provided."
+                raise cherrypy.HTTPError(400, msg)
+            if 'keyring' not in data.keys():
+                msg = "'keyring' key must be provided."
+                raise cherrypy.HTTPError(400, msg)
+            if data['keyring'] != self.mgr.node_proxy_cache.keyrings.get(hostname):
+                msg = 'wrong keyring provided.'
+                raise cherrypy.HTTPError(401, msg)
+            payload = {}
+            payload['state'] = data['state']
 
             if led_type == 'drive':
                 if id_drive not in self.mgr.node_proxy_cache.data[hostname]['status']['storage'].keys():
@@ -329,28 +356,23 @@ class NodeProxyEndpoint:
                     self.mgr.log.debug(msg)
                     raise cherrypy.HTTPError(400, msg)
 
-        payload: Dict[str, Any] = {"node_proxy_oob_cmd":
-                                   {"action": action,
-                                    "type": led_type,
-                                    "id": id_drive,
-                                    "host": hostname,
-                                    "data": data}}
-        try:
-            message_thread = AgentMessageThread(
-                hostname, self.mgr.agent_cache.agent_ports[hostname], payload, self.mgr)
-            message_thread.start()
-            message_thread.join()  # TODO(guits): Add a timeout?
-        except KeyError:
-            raise cherrypy.HTTPError(502, f"{hostname}'s agent not running, please check.")
-        agent_response = message_thread.get_agent_response()
+        endpoint = f'/{"/".join(endpoint)}'
+        header = self.mgr.node_proxy.generate_auth_header(hostname)
+
         try:
-            response_json: Dict[str, Any] = json.loads(agent_response)
-        except json.decoder.JSONDecodeError:
-            cherrypy.response.status = 503
-        else:
-            cherrypy.response.status = response_json.get('http_code', 503)
-        if cherrypy.response.status != 200:
-            raise cherrypy.HTTPError(cherrypy.response.status, "Couldn't change the LED status.")
+            headers, result, status = http_req(hostname=addr,
+                                               port='8080',
+                                               headers=header,
+                                               method=method,
+                                               data=json.dumps(payload),
+                                               endpoint=endpoint,
+                                               ssl_ctx=ssl_ctx)
+            response_json = json.loads(result)
+        except HTTPError as e:
+            self.mgr.log.debug(e)
+        except URLError:
+            raise cherrypy.HTTPError(502, f'Make sure the node-proxy agent is deployed and running on {hostname}')
+
         return response_json
 
     @cherrypy.expose
@@ -842,16 +864,6 @@ class CephadmAgentHelpers:
                 host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
             message_thread.start()
 
-    def _shutdown_node_proxy(self) -> None:
-        hosts = set([h for h in self.mgr.cache.get_hosts() if
-                     (h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])
-
-        for host in hosts:
-            payload: Dict[str, Any] = {'node_proxy_shutdown': host}
-            message_thread = AgentMessageThread(
-                host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr)
-            message_thread.start()
-
     def _request_ack_all_not_up_to_date(self) -> None:
         self.mgr.agent_helpers._request_agent_acks(
             set([h for h in self.mgr.cache.get_hosts() if
index a5003d5e657fd12bb87b301cee51391bfd9fb458..53c1b01b017bbf9b0c47ebb814e96bfe74054e1d 100644 (file)
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
 HOST_CACHE_PREFIX = "host."
 SPEC_STORE_PREFIX = "spec."
 AGENT_CACHE_PREFIX = 'agent.'
-NODE_PROXY_CACHE_PREFIX = 'node_proxy/data'
+NODE_PROXY_CACHE_PREFIX = 'node_proxy'
 
 
 class HostCacheStatus(enum.Enum):
@@ -1411,20 +1411,25 @@ class NodeProxyCache:
         self.mgr = mgr
         self.data: Dict[str, Any] = {}
         self.oob: Dict[str, Any] = {}
+        self.keyrings: Dict[str, str] = {}
 
     def load(self) -> None:
-        _oob = self.mgr.get_store('node_proxy/oob', "{}")
+        _oob = self.mgr.get_store(f'{NODE_PROXY_CACHE_PREFIX}/oob', '{}')
         self.oob = json.loads(_oob)
 
-        for k, v in self.mgr.get_store_prefix(NODE_PROXY_CACHE_PREFIX).items():
+        _keyrings = self.mgr.get_store(f'{NODE_PROXY_CACHE_PREFIX}/keyrings', '{}')
+        self.keyrings = json.loads(_keyrings)
+
+        for k, v in self.mgr.get_store_prefix(f'{NODE_PROXY_CACHE_PREFIX}/data').items():
             host = k.split('/')[-1:][0]
 
             if host not in self.mgr.inventory.keys():
                 # remove entry for host that no longer exists
-                self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/{host}", None)
+                self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/data/{host}", None)
                 try:
                     self.oob.pop(host)
                     self.data.pop(host)
+                    self.keyrings.pop(host)
                 except KeyError:
                     pass
                 continue
@@ -1434,7 +1439,15 @@ class NodeProxyCache:
     def save(self,
              host: str = '',
              data: Dict[str, Any] = {}) -> None:
-        self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/{host}", json.dumps(data))
+        self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/data/{host}", json.dumps(data))
+
+    def update_oob(self, host: str, host_oob_info: Dict[str, str]) -> None:
+        self.oob[host] = host_oob_info
+        self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/oob", json.dumps(self.oob))
+
+    def update_keyring(self, host: str, key: str) -> None:
+        self.keyrings[host] = key
+        self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/keyrings", json.dumps(self.keyrings))
 
     def fullreport(self, **kw: Any) -> Dict[str, Any]:
         """
index f27f8ed7b323524b67f99659dd2bee30dd9d24cd..c5a7602f77675d659493f737d475c0741026c43e 100644 (file)
@@ -10,6 +10,7 @@ from configparser import ConfigParser
 from contextlib import contextmanager
 from functools import wraps
 from tempfile import TemporaryDirectory, NamedTemporaryFile
+from urllib.error import HTTPError
 from threading import Event
 
 from cephadm.service_discovery import ServiceDiscovery
@@ -64,6 +65,7 @@ from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
     NodeExporterService, SNMPGatewayService, LokiService, PromtailService
 from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
+from .services.node_proxy import NodeProxy
 from .schedule import HostAssignment
 from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
     ClientKeyringStore, ClientKeyringSpec, TunedProfileStore, NodeProxyCache
@@ -435,6 +437,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=3.0,
             desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
         ),
+        Option(
+            'hw_monitoring',
+            type='bool',
+            default=False,
+            desc='Deploy hw monitoring daemon on every host.'
+        ),
         Option(
             'max_osd_draining_count',
             type='int',
@@ -545,6 +553,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self.agent_refresh_rate = 0
             self.agent_down_multiplier = 0.0
             self.agent_starting_port = 0
+            self.hw_monitoring = False
             self.service_discovery_port = 0
             self.secure_monitoring_stack = False
             self.apply_spec_fails: List[Tuple[str, str]] = []
@@ -624,7 +633,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
             IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
             CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
-            JaegerQueryService, JaegerAgentService, JaegerCollectorService
+            JaegerQueryService, JaegerAgentService, JaegerCollectorService, NodeProxy
         ]
 
         # https://github.com/python/mypy/issues/8993
@@ -635,6 +644,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
         self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
         self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
+        self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy'])
 
         self.scheduled_async_actions: List[Callable] = []
 
@@ -647,6 +657,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.http_server = CephadmHttpServer(self)
         self.http_server.start()
+
+        self.node_proxy = NodeProxy(self)
+
         self.agent_helpers = CephadmAgentHelpers(self)
         if self.use_agent:
             self.agent_helpers._apply_agent()
@@ -813,7 +826,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         Generate a unique random service name
         """
         suffix = daemon_type not in [
-            'mon', 'crash', 'ceph-exporter',
+            'mon', 'crash', 'ceph-exporter', 'node-proxy',
             'prometheus', 'node-exporter', 'grafana', 'alertmanager',
             'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
             'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
@@ -1605,13 +1618,12 @@ Then run the following:
                 spec.oob['addr'] = spec.hostname
             if not spec.oob.get('port'):
                 spec.oob['port'] = '443'
-            data = json.loads(self.get_store('node_proxy/oob', '{}'))
-            data[spec.hostname] = dict()
-            data[spec.hostname]['addr'] = spec.oob['addr']
-            data[spec.hostname]['port'] = spec.oob['port']
-            data[spec.hostname]['username'] = spec.oob['username']
-            data[spec.hostname]['password'] = spec.oob['password']
-            self.set_store('node_proxy/oob', json.dumps(data))
+            host_oob_info = dict()
+            host_oob_info['addr'] = spec.oob['addr']
+            host_oob_info['port'] = spec.oob['port']
+            host_oob_info['username'] = spec.oob['username']
+            host_oob_info['password'] = spec.oob['password']
+            self.node_proxy_cache.update_oob(spec.hostname, host_oob_info)
 
         # prime crush map?
         if spec.location:
@@ -1636,6 +1648,51 @@ Then run the following:
     def add_host(self, spec: HostSpec) -> str:
         return self._add_host(spec)
 
+    @handle_orch_error
+    def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]:
+        try:
+            result = self.node_proxy.led(light_type=light_type,
+                                         action=action,
+                                         hostname=hostname,
+                                         device=device)
+        except RuntimeError as e:
+            self.log.error(e)
+            raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
+        except HTTPError as e:
+            self.log.error(e)
+            raise OrchestratorValidationError(f"http error while querying node-proxy API: {e}")
+        return result
+
+    @handle_orch_error
+    def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> str:
+        if not yes_i_really_mean_it:
+            raise OrchestratorError("you must pass --yes-i-really-mean-it")
+
+        try:
+            self.node_proxy.shutdown(hostname, force)
+        except RuntimeError as e:
+            self.log.error(e)
+            raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
+        except HTTPError as e:
+            self.log.error(e)
+            raise OrchestratorValidationError(f"Can't shutdown node {hostname}: {e}")
+        return f'Shutdown scheduled on {hostname}'
+
+    @handle_orch_error
+    def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> str:
+        if not yes_i_really_mean_it:
+            raise OrchestratorError("you must pass --yes-i-really-mean-it")
+
+        try:
+            self.node_proxy.powercycle(hostname)
+        except RuntimeError as e:
+            self.log.error(e)
+            raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
+        except HTTPError as e:
+            self.log.error(e)
+            raise OrchestratorValidationError(f"Can't perform powercycle on node {hostname}: {e}")
+        return f'Powercycle scheduled on {hostname}'
+
     @handle_orch_error
     def node_proxy_summary(self, hostname: Optional[str] = None) -> Dict[str, Any]:
         return self.node_proxy_cache.summary(hostname=hostname)
@@ -2697,6 +2754,15 @@ Then run the following:
                 pass
             deps = sorted([self.get_mgr_ip(), server_port, root_cert,
                            str(self.device_enhanced_scan)])
+        elif daemon_type == 'node-proxy':
+            root_cert = ''
+            server_port = ''
+            try:
+                server_port = str(self.http_server.agent.server_port)
+                root_cert = self.http_server.agent.ssl_certs.get_root_cert()
+            except Exception:
+                pass
+            deps = sorted([self.get_mgr_ip(), server_port, root_cert])
         elif daemon_type == 'iscsi':
             if spec:
                 iscsi_spec = cast(IscsiServiceSpec, spec)
index f16e9f8c7b048db4884533e85d2e033789db3772..e92407ec80a37726381395a5d9263df1926fd115 100644 (file)
@@ -113,6 +113,9 @@ class CephadmServe:
                     if self.mgr.agent_helpers._handle_use_agent_setting():
                         continue
 
+                    if self.mgr.node_proxy_service.handle_hw_monitoring_setting():
+                        continue
+
                     if self.mgr.upgrade.continue_upgrade():
                         continue
 
index d91a3a3f22d3fd3c112adb7dab87b08a49b04a02..f3c45b164a47da7ebd8d10e737784ee07bed10e1 100644 (file)
@@ -41,7 +41,7 @@ def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEnt
     # the CephService class refers to service types, not daemon types
     if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']:
         return AuthEntity(f'client.{daemon_type}.{daemon_id}')
-    elif daemon_type in ['crash', 'agent']:
+    elif daemon_type in ['crash', 'agent', 'node-proxy']:
         if host == "":
             raise OrchestratorError(
                 f'Host not provided to generate <{daemon_type}> auth entity name')
@@ -1223,16 +1223,6 @@ class CephadmAgent(CephService):
 
         return daemon_spec
 
-    def pre_remove(self, daemon: DaemonDescription) -> None:
-        super().pre_remove(daemon)
-
-        assert daemon.daemon_id is not None
-        daemon_id: str = daemon.daemon_id
-
-        logger.info('Removing agent %s...' % daemon_id)
-
-        self.mgr.agent_helpers._shutdown_node_proxy()
-
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         agent = self.mgr.http_server.agent
         try:
diff --git a/src/pybind/mgr/cephadm/services/node_proxy.py b/src/pybind/mgr/cephadm/services/node_proxy.py
new file mode 100644 (file)
index 0000000..ebbbaf2
--- /dev/null
@@ -0,0 +1,180 @@
+import json
+import ssl
+import base64
+
+from urllib.error import HTTPError, URLError
+from typing import List, Any, Dict, Tuple, Optional, MutableMapping
+
+from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+from ceph.utils import http_req
+from orchestrator import OrchestratorError
+
+
+class NodeProxy(CephService):
+    TYPE = 'node-proxy'
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+        if not self.mgr.http_server.agent:
+            raise OrchestratorError('Cannot deploy node-proxy before creating cephadm endpoint')
+
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+        daemon_spec.keyring = keyring
+        self.mgr.node_proxy_cache.update_keyring(host, keyring)
+
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+        return daemon_spec
+
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        # node-proxy is re-using the agent endpoint and therefore
+        # needs similar checks to see if the endpoint is ready.
+        self.agent_endpoint = self.mgr.http_server.agent
+        try:
+            assert self.agent_endpoint
+            assert self.agent_endpoint.ssl_certs.get_root_cert()
+            assert self.agent_endpoint.server_port
+        except Exception:
+            raise OrchestratorError(
+                'Cannot deploy node-proxy daemons until cephadm endpoint has finished generating certs')
+
+        listener_cert, listener_key = self.agent_endpoint.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
+        cfg = {
+            'target_ip': self.mgr.get_mgr_ip(),
+            'target_port': self.agent_endpoint.server_port,
+            'name': f'node-proxy.{daemon_spec.host}',
+            'keyring': daemon_spec.keyring,
+            'root_cert.pem': self.agent_endpoint.ssl_certs.get_root_cert(),
+            'listener.crt': listener_cert,
+            'listener.key': listener_key,
+        }
+        config = {'node-proxy.json': json.dumps(cfg)}
+
+        return config, sorted([str(self.mgr.get_mgr_ip()), str(self.agent_endpoint.server_port),
+                               self.agent_endpoint.ssl_certs.get_root_cert()])
+
+    def handle_hw_monitoring_setting(self) -> bool:
+        # function to apply or remove node-proxy service spec depending
+        # on whether the hw_mointoring config option is set or not.
+        # It should return True when it either creates or deletes a spec
+        # and False otherwise.
+        if self.mgr.hw_monitoring:
+            if 'node-proxy' not in self.mgr.spec_store:
+                spec = ServiceSpec(
+                    service_type='node-proxy',
+                    placement=PlacementSpec(host_pattern='*')
+                )
+                self.mgr.spec_store.save(spec)
+                return True
+            return False
+        else:
+            if 'node-proxy' in self.mgr.spec_store:
+                self.mgr.spec_store.rm('node-proxy')
+                return True
+            return False
+
+    def get_ssl_ctx(self) -> ssl.SSLContext:
+        ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+        ssl_ctx = ssl.create_default_context()
+        ssl_ctx.check_hostname = True
+        ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+        ssl_ctx.load_verify_locations(cadata=ssl_root_crt)
+        return ssl_ctx
+
+    def led(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]:
+        ssl_ctx: ssl.SSLContext = self.get_ssl_ctx()
+        header: MutableMapping[str, str] = {}
+        method: str = 'PATCH' if action in ['on', 'off'] else 'GET'
+        payload: Optional[Dict[str, str]] = None
+        addr: str = self.mgr.inventory.get_addr(hostname)
+        endpoint: List[str] = ['led', light_type]
+        _device: str = device if device else ''
+
+        if light_type == 'drive':
+            endpoint.append(_device)
+
+        if method == 'PATCH':
+            payload = dict(state=action)
+
+        header = self.generate_auth_header(hostname)
+
+        endpoint = f'/{"/".join(endpoint)}'
+
+        try:
+            headers, result, status = http_req(hostname=addr,
+                                               port='8080',
+                                               headers=header,
+                                               method=method,
+                                               data=json.dumps(payload),
+                                               endpoint=endpoint,
+                                               ssl_ctx=ssl_ctx)
+            result_json = json.loads(result)
+        except HTTPError as e:
+            self.mgr.log.error(e)
+            raise
+        except URLError as e:
+            raise RuntimeError(e)
+
+        return result_json
+
+    def generate_auth_header(self, hostname: str) -> Dict[str, str]:
+        try:
+            username = self.mgr.node_proxy_cache.oob[hostname]['username']
+            password = self.mgr.node_proxy_cache.oob[hostname]['password']
+            auth: bytes = f'{username}:{password}'.encode('utf-8')
+            auth_str: str = base64.b64encode(auth).decode('utf-8')
+            header = {'Authorization': f'Basic {auth_str}'}
+        except KeyError as e:
+            self.mgr.log.error(f'Check oob information is provided for {hostname}.')
+            raise RuntimeError(e)
+        return header
+
+    def shutdown(self, hostname: str, force: Optional[bool] = False) -> Dict[str, Any]:
+        ssl_ctx: ssl.SSLContext = self.get_ssl_ctx()
+        header: Dict[str, str] = self.generate_auth_header(hostname)
+        addr: str = self.mgr.inventory.get_addr(hostname)
+
+        endpoint = '/shutdown'
+        payload: Dict[str, Optional[bool]] = dict(force=force)
+
+        try:
+            headers, result, status = http_req(hostname=addr,
+                                               port='8080',
+                                               headers=header,
+                                               data=json.dumps(payload),
+                                               endpoint=endpoint,
+                                               ssl_ctx=ssl_ctx)
+            result_json = json.loads(result)
+        except HTTPError as e:
+            self.mgr.log.error(e)
+            raise
+        except URLError as e:
+            raise RuntimeError(e)
+
+        return result_json
+
+    def powercycle(self, hostname: str) -> Dict[str, Any]:
+        ssl_ctx: ssl.SSLContext = self.get_ssl_ctx()
+        header: Dict[str, str] = self.generate_auth_header(hostname)
+        addr: str = self.mgr.inventory.get_addr(hostname)
+
+        endpoint = '/powercycle'
+
+        try:
+            headers, result, status = http_req(hostname=addr,
+                                               port='8080',
+                                               headers=header,
+                                               data="{}",
+                                               endpoint=endpoint,
+                                               ssl_ctx=ssl_ctx)
+            result_json = json.loads(result)
+        except HTTPError as e:
+            self.mgr.log.error(e)
+            raise
+        except URLError as e:
+            raise RuntimeError(e)
+
+        return result_json
index b713d04cd5975e5768452b3105cdfaf37f666c05..0c9ee127547c42b5d9d9ea45285c873bef7e65eb 100644 (file)
@@ -35,8 +35,8 @@ class FakeMgr:
 class TestNodeProxyEndpoint(helper.CPWebCase):
     mgr = FakeMgr()
     app = NodeProxyEndpoint(mgr)
-    mgr.agent_cache.agent_keys = {"host01": "fake-secret01",
-                                  "host02": "fake-secret02"}
+    mgr.node_proxy.keyrings = {"host01": "fake-secret01",
+                               "host02": "fake-secret02"}
     mgr.node_proxy.oob = {"host01": {"username": "oob-user01",
                                      "password": "oob-pass01"},
                           "host02": {"username": "oob-user02",
@@ -68,38 +68,38 @@ class TestNodeProxyEndpoint(helper.CPWebCase):
         self.assertStatus('400 Bad Request')
 
     def test_oob_data_misses_secret_field(self):
-        data = '{"cephx": {"name": "host01"}}'
+        data = '{"cephx": {"name": "node-proxy.host01"}}'
         self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
                                                                 ('Content-Length', str(len(data)))])
         self.assertStatus('400 Bad Request')
 
     def test_oob_agent_not_running(self):
-        data = '{"cephx": {"name": "host03", "secret": "fake-secret03"}}'
+        data = '{"cephx": {"name": "node-proxy.host03", "secret": "fake-secret03"}}'
         self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
                                                                 ('Content-Length', str(len(data)))])
         self.assertStatus('502 Bad Gateway')
 
     def test_oob_wrong_keyring(self):
-        data = '{"cephx": {"name": "host01", "secret": "wrong-keyring"}}'
+        data = '{"cephx": {"name": "node-proxy.host01", "secret": "wrong-keyring"}}'
         self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
                                                                 ('Content-Length', str(len(data)))])
         self.assertStatus('403 Forbidden')
 
     def test_oob_ok(self):
-        data = '{"cephx": {"name": "host01", "secret": "fake-secret01"}}'
+        data = '{"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}}'
         self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
                                                                 ('Content-Length', str(len(data)))])
         self.assertStatus('200 OK')
 
     def test_data_missing_patch(self):
-        data = '{"cephx": {"name": "host01", "secret": "fake-secret01"}}'
+        data = '{"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}}'
         self.getPage("/data", method="POST", body=data, headers=[('Content-Type', 'application/json'),
                                                                  ('Content-Length', str(len(data)))])
         self.assertStatus('400 Bad Request')
 
     def test_data_raises_alert(self):
         patch = node_proxy_data.full_set_with_critical
-        data = {"cephx": {"name": "host01", "secret": "fake-secret01"}, "patch": patch}
+        data = {"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}, "patch": patch}
         data_str = json.dumps(data)
         self.getPage("/data", method="POST", body=data_str, headers=[('Content-Type', 'application/json'),
                                                                      ('Content-Length', str(len(data_str)))])
index 63672936c7cb317f5b8c435cf544dc0fa4551836..3aedfbd86f0084eec57af70ab814ccba96e09819 100644 (file)
@@ -31,7 +31,7 @@ RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['haproxy', 'nfs']
 CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES
 
 # these daemon types use the ceph container image
-CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs']
+CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs', 'node-proxy']
 
 # these daemons do not use the ceph image. There are other daemons
 # that also don't use the ceph image, but we only care about those
index 5878f3ed6488599b5a9f8fff61f66a14d6057515..7892f4946f8a8f4689126348bbd113fd06b481d9 100644 (file)
@@ -359,6 +359,33 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> OrchResult[Dict[str, Any]]:
+        """
+        Light a chassis or device ident LED.
+
+        :param light_type: led type (chassis or device).
+        :param action: set or get status led.
+        :param hostname: the name of the host.
+        :param device: the device id (when light_type = 'device')
+        """
+        raise NotImplementedError()
+
+    def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> OrchResult[str]:
+        """
+        Reboot a host.
+
+        :param hostname: the name of the host being rebooted.
+        """
+        raise NotImplementedError()
+
+    def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> OrchResult[str]:
+        """
+        Shutdown a host.
+
+        :param hostname: the name of the host to shutdown.
+        """
+        raise NotImplementedError()
+
     def hardware_status(self, hostname: Optional[str] = None, category: Optional[str] = 'summary') -> OrchResult[str]:
         """
         Display hardware status.
@@ -869,6 +896,7 @@ def daemon_type_to_service(dtype: str) -> str:
         'crashcollector': 'crash',  # Specific Rook Daemon
         'container': 'container',
         'agent': 'agent',
+        'node-proxy': 'node-proxy',
         'snmp-gateway': 'snmp-gateway',
         'elasticsearch': 'elasticsearch',
         'jaeger-agent': 'jaeger-agent',
@@ -901,6 +929,7 @@ def service_to_daemon_types(stype: str) -> List[str]:
         'crash': ['crash'],
         'container': ['container'],
         'agent': ['agent'],
+        'node-proxy': ['node-proxy'],
         'snmp-gateway': ['snmp-gateway'],
         'elasticsearch': ['elasticsearch'],
         'jaeger-agent': ['jaeger-agent'],
index 561367195b786b3de6bacaa1ab6f305017b9eb96..0763cdcc5d13dc6695eb244cc21f09da46a5861d 100644 (file)
@@ -590,6 +590,51 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
 
         return table.get_string()
 
+    class HardwareLightType(enum.Enum):
+        chassis = 'chassis'
+        device = 'drive'
+
+    class HardwareLightAction(enum.Enum):
+        on = 'on'
+        off = 'off'
+        get = 'get'
+
+    @_cli_write_command('orch hardware light')
+    def _hardware_light(self,
+                        light_type: HardwareLightType, action: HardwareLightAction,
+                        hostname: str, device: Optional[str] = None) -> HandleCommandResult:
+        """Enable or Disable a device or chassis LED"""
+        if light_type == self.HardwareLightType.device and not device:
+            return HandleCommandResult(stderr='you must pass a device ID.',
+                                       retval=-errno.ENOENT)
+
+        completion = self.hardware_light(light_type.value, action.value, hostname, device)
+        data = raise_if_exception(completion)
+        output: str = ''
+        if action == self.HardwareLightAction.get:
+            status = 'on' if data["LocationIndicatorActive"] else 'off'
+            if light_type == self.HardwareLightType.device:
+                output = f'ident LED for {device} on {hostname} is: {status}'
+            else:
+                output = f'ident chassis LED for {hostname} is: {status}'
+        else:
+            pass
+        return HandleCommandResult(stdout=output)
+
+    @_cli_write_command('orch hardware powercycle')
+    def _hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
+        """Reboot a host"""
+        completion = self.hardware_powercycle(hostname, yes_i_really_mean_it=yes_i_really_mean_it)
+        raise_if_exception(completion)
+        return HandleCommandResult(stdout=completion.result_str())
+
+    @_cli_write_command('orch hardware shutdown')
+    def _hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
+        """Shutdown a host"""
+        completion = self.hardware_shutdown(hostname, force, yes_i_really_mean_it=yes_i_really_mean_it)
+        raise_if_exception(completion)
+        return HandleCommandResult(stdout=completion.result_str())
+
     @_cli_write_command('orch host rm')
     def _remove_host(self, hostname: str, force: bool = False, offline: bool = False) -> HandleCommandResult:
         """Remove a host"""
index be9f3e8ea584ebb6c2f879db79af6bf551b2f50b..bcebf23c20822b567cbe27c9fef7b096fe1c0d95 100644 (file)
@@ -625,7 +625,8 @@ class ServiceSpec(object):
     KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \
                           'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \
                           'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \
-                          'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split()
+                          'elasticsearch jaeger-agent jaeger-collector jaeger-query ' \
+                          'node-proxy'.split()
     REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split()
     MANAGED_CONFIG_OPTIONS = [
         'mds_join_fs',
index 643be06580b6194d3c03e3b660569598520bec0e..e92a2d1de7db8613d6e678713b7c0a3cd994020c 100644 (file)
@@ -1,8 +1,15 @@
 import datetime
 import re
 import string
+import ssl
 
-from typing import Optional
+from typing import Optional, MutableMapping, Tuple, Any
+from urllib.error import HTTPError, URLError
+from urllib.request import urlopen, Request
+
+import logging
+
+log = logging.getLogger(__name__)
 
 
 def datetime_now() -> datetime.datetime:
@@ -121,3 +128,42 @@ def is_hex(s: str, strict: bool = True) -> bool:
             return False
 
     return True
+
+
+def http_req(hostname: str = '',
+             port: str = '443',
+             method: Optional[str] = None,
+             headers: MutableMapping[str, str] = {},
+             data: Optional[str] = None,
+             endpoint: str = '/',
+             scheme: str = 'https',
+             ssl_verify: bool = False,
+             timeout: Optional[int] = None,
+             ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
+
+    if not ssl_ctx:
+        ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        if not ssl_verify:
+            ssl_ctx.check_hostname = False
+            ssl_ctx.verify_mode = ssl.CERT_NONE
+        else:
+            ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+
+    url: str = f'{scheme}://{hostname}:{port}{endpoint}'
+    _data = bytes(data, 'ascii') if data else None
+    _headers = headers
+    if data and not method:
+        method = 'POST'
+    if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
+        _headers['Content-Type'] = 'application/json'
+    try:
+        req = Request(url, _data, _headers, method=method)
+        with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+            response_str = response.read()
+            response_headers = response.headers
+            response_code = response.code
+        return response_headers, response_str.decode(), response_code
+    except (HTTPError, URLError) as e:
+        log.error(e)
+        # handle error here if needed
+        raise