import concurrent.futures
+import dataclasses
import json
+from dataclasses import dataclass
from ceph_node_proxy.basesystem import BaseSystem
from ceph_node_proxy.redfish_client import RedFishClient
from time import sleep
-from ceph_node_proxy.util import get_logger, to_snake_case
-from typing import Dict, Any, List, Callable, Union
+from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
+from typing import Dict, Any, List, Callable, Optional
from urllib.error import HTTPError, URLError
+@dataclass
+class ComponentUpdateSpec:
+ collection: str
+ path: str
+ fields: List[str]
+ attribute: Optional[str] = None
+
+
class EndpointMgr:
NAME: str = 'EndpointMgr'
class BaseRedfishSystem(BaseSystem):
+ NETWORK_FIELDS: List[str] = ['Description', 'Name', 'SpeedMbps', 'Status']
+ PROCESSORS_FIELDS: List[str] = [
+ 'Description', 'TotalCores', 'TotalThreads', 'ProcessorType', 'Model', 'Status', 'Manufacturer',
+ ]
+ MEMORY_FIELDS: List[str] = ['Description', 'MemoryDeviceType', 'CapacityMiB', 'Status']
+ POWER_FIELDS: List[str] = ['Name', 'Model', 'Manufacturer', 'Status']
+ FANS_FIELDS: List[str] = ['Name', 'PhysicalContext', 'Status']
+ FIRMWARES_FIELDS: List[str] = [
+ 'Name', 'Description', 'ReleaseDate', 'Version', 'Updateable', 'Status',
+ ]
+
+ COMPONENT_SPECS: Dict[str, ComponentUpdateSpec] = {
+ 'network': ComponentUpdateSpec('systems', 'EthernetInterfaces', NETWORK_FIELDS, None),
+ 'processors': ComponentUpdateSpec('systems', 'Processors', PROCESSORS_FIELDS, None),
+ 'memory': ComponentUpdateSpec('systems', 'Memory', MEMORY_FIELDS, None),
+ 'power': ComponentUpdateSpec('chassis', 'Power', POWER_FIELDS, 'PowerSupplies'),
+ 'fans': ComponentUpdateSpec('chassis', 'Thermal', FANS_FIELDS, 'Fans'),
+ 'firmwares': ComponentUpdateSpec('update_service', 'FirmwareInventory', FIRMWARES_FIELDS, None),
+ }
+
def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
self.log = get_logger(__name__)
f = getattr(self, func)
self.update_funcs.append(f)
+ def build_data(self,
+ data: Dict[str, Any],
+ fields: List[str],
+ attribute: Optional[str] = None) -> Dict[str, Dict[str, Dict]]:
+ result: Dict[str, Dict[str, Optional[Dict]]] = dict()
+ member_id: str = ''
+
+ def process_data(m_id: str, fields: List[str], data: Dict[str, Any]) -> Dict[str, Any]:
+ result: Dict[str, Any] = {}
+ for field in fields:
+ try:
+ result[to_snake_case(field)] = data[field]
+ except KeyError:
+ self.log.debug(f'Could not find field: {field} in data: {data}')
+ result[to_snake_case(field)] = None
+ return result
+
+ try:
+ if attribute is not None:
+ data_items = data[attribute]
+ else:
+ # The following is a hack to re-inject the key to the dict
+ # as we have the following structure when `attribute` is passed:
+ # "PowerSupplies": [ {"MemberId": "0", ...}, {"MemberId": "1", ...} ]
+ # vs. this structure in the opposite case:
+ # { "CPU.Socket.2": { "Id": "CPU.Socket.2", "Manufacturer": "Intel" }, "CPU.Socket.1": {} }
+ # With the first case, we clearly use the field "MemberId".
+ # With the second case, we use the key of the dict.
+ # This is mostly for avoiding code duplication.
+ data_items = [{'MemberId': k, **v} for k, v in data.items()]
+ self.log.error(f"GUITS_DEBUG: data_items= {data_items}")
+ for d in data_items:
+ member_id = d.get('MemberId')
+ result[member_id] = {}
+ result[member_id] = process_data(member_id, fields, d)
+ except (KeyError, TypeError, AttributeError) as e:
+ self.log.error(f"Can't build data: {e}")
+ raise
+ return normalize_dict(result)
+
+ def update(self,
+ collection: str,
+ component: str,
+ path: str,
+ fields: List[str],
+ attribute: Optional[str] = None) -> None:
+ members: List[str] = self.endpoints[collection].get_members_names()
+ result: Dict[str, Any] = {}
+ data: Dict[str, Any] = {}
+ data_built: Dict[str, Any] = {}
+ if not members:
+ data = self.endpoints[collection][path].get_members_data()
+ data_built = self.build_data(data=data, fields=fields, attribute=attribute)
+ result = data_built
+ else:
+ for member in members:
+ data_built = {}
+ try:
+ if attribute is None:
+ data = self.endpoints[collection][member][path].get_members_data()
+ else:
+ data = self.endpoints[collection][member][path].data
+ except HTTPError as e:
+ self.log.error(f'Error while updating {component}: {e}')
+ else:
+ data_built = self.build_data(data=data, fields=fields, attribute=attribute)
+ result[member] = data_built
+ self._sys[component] = result
+
def main(self) -> None:
self.stop = False
self.client.login()
self.log.debug(f'Pending shutdown, aborting query to {path}')
except RuntimeError:
raise
- if result is None:
- self.log.error(f'The client reported an error when getting path: {path}')
- raise RuntimeError(f'Could not get path: {path}')
+
return result
def get_members(self, data: Dict[str, Any], path: str) -> List:
self._system[update_service_members.id] = update_service_members.data
- def _update_sn(self) -> None:
- raise NotImplementedError()
+ def get_sn(self) -> str:
+ return self._sys.get('SKU', '')
- def _update_memory(self) -> None:
- raise NotImplementedError()
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('status', {})
- def _update_power(self) -> None:
- raise NotImplementedError()
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('memory', {})
- def _update_fans(self) -> None:
- raise NotImplementedError()
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('processors', {})
+
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('network', {})
+
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('storage', {})
+
+ def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('firmwares', {})
+
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('power', {})
+
+ def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys.get('fans', {})
+
+ def get_component_spec_overrides(self) -> Dict[str, Dict[str, Any]]:
+ return {}
+
+ def get_update_spec(self, component: str) -> ComponentUpdateSpec:
+ spec = self.COMPONENT_SPECS[component]
+ overrides = self.get_component_spec_overrides().get(component)
+ if not overrides:
+ return spec
+ return dataclasses.replace(spec, **overrides)
+
+ def _run_update(self, component: str) -> None:
+ self.log.debug(f'Updating {component}')
+ spec = self.get_update_spec(component)
+ self.update(spec.collection, component, spec.path, spec.fields, attribute=spec.attribute)
def _update_network(self) -> None:
- raise NotImplementedError()
+ self._run_update('network')
def _update_processors(self) -> None:
- raise NotImplementedError()
+ self._run_update('processors')
def _update_storage(self) -> None:
- raise NotImplementedError()
+ fields = ['Description',
+ 'CapacityBytes',
+ 'Model', 'Protocol',
+ 'LocationIndicatorActive',
+ 'SerialNumber', 'Status',
+ 'PhysicalLocation']
+ result: Dict[str, Dict[str, Dict]] = dict()
+ self.log.debug('Updating storage')
+ members_names = self.endpoints['systems'].get_members_names()
+ for member in members_names:
+ result[member] = {}
+ members_data = self.endpoints['systems'][member]['Storage'].get_members_data()
+ for entity in members_data:
+ for drive in members_data[entity]['Drives']:
+ data: Dict[str, Any] = Endpoint(drive['@odata.id'], self.endpoints.client).data
+ drive_id = data['Id']
+ result[member][drive_id] = dict()
+ result[member][drive_id]['redfish_endpoint'] = data['@odata.id']
+ for field in fields:
+ result[member][drive_id][to_snake_case(field)] = data.get(field)
+ result[member][drive_id]['entity'] = entity
+ self._sys['storage'] = normalize_dict(result)
+
+ def _update_sn(self) -> None:
+ serials: List[str] = []
+ self.log.debug('Updating serial number')
+ data: Dict[str, Any] = self.endpoints['systems'].get_members_data()
+ for sys in data.keys():
+ serials.append(data[sys]['SKU'])
+ self._sys['SKU'] = ','.join(serials)
+
+ def _update_memory(self) -> None:
+ self._run_update('memory')
+
+ def _update_power(self) -> None:
+ self._run_update('power')
+
+ def _update_fans(self) -> None:
+ self._run_update('fans')
def _update_firmwares(self) -> None:
- raise NotImplementedError()
+ self._run_update('firmwares')
def device_led_on(self, device: str) -> int:
- data: Dict[str, bool] = {'LocationIndicatorActive': True}
- try:
- result = self.set_device_led(device, data)
- except (HTTPError, KeyError):
- return 0
- return result
+ raise NotImplementedError()
def device_led_off(self, device: str) -> int:
- data: Dict[str, bool] = {'LocationIndicatorActive': False}
- try:
- result = self.set_device_led(device, data)
- except (HTTPError, KeyError):
- return 0
- return result
+ raise NotImplementedError()
def chassis_led_on(self) -> int:
- data: Dict[str, str] = {'IndicatorLED': 'Blinking'}
- result = self.set_chassis_led(data)
- return result
+ raise NotImplementedError()
def chassis_led_off(self) -> int:
- data: Dict[str, str] = {'IndicatorLED': 'Lit'}
- result = self.set_chassis_led(data)
- return result
+ raise NotImplementedError()
def get_device_led(self, device: str) -> Dict[str, Any]:
- endpoint = self._sys['storage'][device]['redfish_endpoint']
- try:
- result = self.client.query(method='GET',
- endpoint=endpoint,
- timeout=10)
- except HTTPError as e:
- self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
- raise
- response_json = json.loads(result[1])
- _result: Dict[str, Any] = {'http_code': result[2]}
- if result[2] == 200:
- _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
- else:
- _result['LocationIndicatorActive'] = None
- return _result
+ raise NotImplementedError()
def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
- try:
- _, _, status = self.client.query(
- data=json.dumps(data),
- method='PATCH',
- endpoint=self._sys['storage'][device]['redfish_endpoint']
- )
- except (HTTPError, KeyError) as e:
- self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}")
- raise
- return status
+ raise NotImplementedError()
def get_chassis_led(self) -> Dict[str, Any]:
- endpoint = list(self.endpoints['chassis'].get_members_endpoints().values())[0]
- try:
- result = self.client.query(method='GET',
- endpoint=endpoint,
- timeout=10)
- except HTTPError as e:
- self.log.error(f"Couldn't get the ident chassis LED status: {e}")
- raise
- response_json = json.loads(result[1])
- _result: Dict[str, Any] = {'http_code': result[2]}
- if result[2] == 200:
- _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
- else:
- _result['LocationIndicatorActive'] = None
- return _result
+ raise NotImplementedError()
def set_chassis_led(self, data: Dict[str, str]) -> int:
- # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false
- # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
- try:
- _, _, status = self.client.query(
- data=json.dumps(data),
- method='PATCH',
- endpoint=list(self.endpoints['chassis'].get_members_endpoints().values())[0]
- )
- except HTTPError as e:
- self.log.error(f"Couldn't set the ident chassis LED: {e}")
- raise
- return status
+ raise NotImplementedError()
def shutdown_host(self, force: bool = False) -> int:
- reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
-
- try:
- job_id: str = self.create_reboot_job(reboot_type)
- status = self.schedule_reboot_job(job_id)
- except (HTTPError, KeyError) as e:
- self.log.error(f"Couldn't create the reboot job: {e}")
- raise
- return status
+ raise NotImplementedError()
def powercycle(self) -> int:
- try:
- job_id: str = self.create_reboot_job('PowerCycle')
- status = self.schedule_reboot_job(job_id)
- except (HTTPError, URLError) as e:
- self.log.error(f"Couldn't perform power cycle: {e}")
- raise
- return status
+ raise NotImplementedError()
def create_reboot_job(self, reboot_type: str) -> str:
- data: Dict[str, str] = dict(RebootJobType=reboot_type)
- try:
- headers, _, _ = self.client.query(
- data=json.dumps(data),
- endpoint=self.create_reboot_job_endpoint
- )
- job_id: str = headers['Location'].split('/')[-1]
- except (HTTPError, URLError) as e:
- self.log.error(f"Couldn't create the reboot job: {e}")
- raise
- return job_id
+ raise NotImplementedError()
def schedule_reboot_job(self, job_id: str) -> int:
- data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW')
- try:
- _, _, status = self.client.query(
- data=json.dumps(data),
- endpoint=self.setup_job_queue_endpoint
- )
- except (HTTPError, KeyError) as e:
- self.log.error(f"Couldn't schedule the reboot job: {e}")
- raise
- return status
+ raise NotImplementedError()
\ No newline at end of file
-from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem, Endpoint
-from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case
-from typing import Dict, Any, List, Optional
-from urllib.error import HTTPError
+import json
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
+from ceph_node_proxy.util import get_logger
+from typing import Dict, Any, List, Union
+from urllib.error import HTTPError, URLError
class RedfishDellSystem(BaseRedfishSystem):
self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob'
self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue'
- def build_data(self,
- data: Dict[str, Any],
- fields: List[str],
- attribute: Optional[str] = None) -> Dict[str, Dict[str, Dict]]:
- result: Dict[str, Dict[str, Optional[Dict]]] = dict()
- member_id: str = ''
-
- def process_data(m_id: str, fields: List[str], data: Dict[str, Any]) -> Dict[str, Any]:
- result: Dict[str, Any] = {}
- for field in fields:
- try:
- result[to_snake_case(field)] = data[field]
- except KeyError:
- self.log.debug(f'Could not find field: {field} in data: {data}')
- result[to_snake_case(field)] = None
- return result
+ def device_led_on(self, device: str) -> int:
+ data: Dict[str, bool] = {'LocationIndicatorActive': True}
+ try:
+ result = self.set_device_led(device, data)
+ except (HTTPError, KeyError):
+ return 0
+ return result
+ def device_led_off(self, device: str) -> int:
+ data: Dict[str, bool] = {'LocationIndicatorActive': False}
try:
- if attribute is not None:
- data_items = data[attribute]
- else:
- # The following is a hack to re-inject the key to the dict
- # as we have the following structure when `attribute` is passed:
- # "PowerSupplies": [ {"MemberId": "0", ...}, {"MemberId": "1", ...} ]
- # vs. this structure in the opposite case:
- # { "CPU.Socket.2": { "Id": "CPU.Socket.2", "Manufacturer": "Intel" }, "CPU.Socket.1": {} }
- # With the first case, we clearly use the field "MemberId".
- # With the second case, we use the key of the dict.
- # This is mostly for avoiding code duplication.
- data_items = [{'MemberId': k, **v} for k, v in data.items()]
- for d in data_items:
- member_id = d.get('MemberId')
- result[member_id] = {}
- result[member_id] = process_data(member_id, fields, d)
- except (KeyError, TypeError, AttributeError) as e:
- self.log.error(f"Can't build data: {e}")
+ result = self.set_device_led(device, data)
+ except (HTTPError, KeyError):
+ return 0
+ return result
+
+ def chassis_led_on(self) -> int:
+ data: Dict[str, str] = {'IndicatorLED': 'Blinking'}
+ result = self.set_chassis_led(data)
+ return result
+
+ def chassis_led_off(self) -> int:
+ data: Dict[str, str] = {'IndicatorLED': 'Lit'}
+ result = self.set_chassis_led(data)
+ return result
+
+ def get_device_led(self, device: str) -> Dict[str, Any]:
+ endpoint = self._sys['storage'][device]['redfish_endpoint']
+ try:
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ except HTTPError as e:
+ self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
raise
- return normalize_dict(result)
-
- def get_sn(self) -> str:
- return self._sys.get('SKU', '')
-
- def get_status(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('status', {})
-
- def get_memory(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('memory', {})
-
- def get_processors(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('processors', {})
-
- def get_network(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('network', {})
-
- def get_storage(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('storage', {})
-
- def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('firmwares', {})
-
- def get_power(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('power', {})
-
- def get_fans(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys.get('fans', {})
+ response_json = json.loads(result[1])
+ _result: Dict[str, Any] = {'http_code': result[2]}
+ if result[2] == 200:
+ _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ else:
+ _result['LocationIndicatorActive'] = None
+ return _result
- def _update_network(self) -> None:
- fields = ['Description', 'Name', 'SpeedMbps', 'Status']
- self.log.debug('Updating network')
- self.update('systems', 'network', 'EthernetInterfaces', fields)
+ def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+ try:
+ _, _, status = self.client.query(
+ data=json.dumps(data),
+ method='PATCH',
+ endpoint=self._sys['storage'][device]['redfish_endpoint']
+ )
+ except (HTTPError, KeyError) as e:
+ self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}")
+ raise
+ return status
- def update(self,
- collection: str,
- component: str,
- path: str,
- fields: List[str],
- attribute: Optional[str] = None) -> None:
- members: List[str] = self.endpoints[collection].get_members_names()
- result: Dict[str, Any] = {}
- data: Dict[str, Any] = {}
- data_built: Dict[str, Any] = {}
- if not members:
- data = self.endpoints[collection][path].get_members_data()
- data_built = self.build_data(data=data, fields=fields, attribute=attribute)
- result = data_built
+ def get_chassis_led(self) -> Dict[str, Any]:
+ endpoint = list(self.endpoints['chassis'].get_members_endpoints().values())[0]
+ try:
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ except HTTPError as e:
+ self.log.error(f"Couldn't get the ident chassis LED status: {e}")
+ raise
+ response_json = json.loads(result[1])
+ _result: Dict[str, Any] = {'http_code': result[2]}
+ if result[2] == 200:
+ _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
else:
- for member in members:
- data_built = {}
- try:
- if attribute is None:
- data = self.endpoints[collection][member][path].get_members_data()
- else:
- data = self.endpoints[collection][member][path].data
- except HTTPError as e:
- self.log.error(f'Error while updating {component}: {e}')
- else:
- data_built = self.build_data(data=data, fields=fields, attribute=attribute)
- result[member] = data_built
- self._sys[component] = result
-
- def _update_processors(self) -> None:
- fields = ['Description',
- 'TotalCores',
- 'TotalThreads',
- 'ProcessorType',
- 'Model',
- 'Status',
- 'Manufacturer']
- self.log.debug('Updating processors')
- self.update('systems', 'processors', 'Processors', fields)
+ _result['LocationIndicatorActive'] = None
+ return _result
- def _update_storage(self) -> None:
- fields = ['Description',
- 'CapacityBytes',
- 'Model', 'Protocol',
- 'LocationIndicatorActive',
- 'SerialNumber', 'Status',
- 'PhysicalLocation']
- result: Dict[str, Dict[str, Dict]] = dict()
- self.log.debug('Updating storage')
- members_names = self.endpoints['systems'].get_members_names()
- for member in members_names:
- result[member] = {}
- members_data = self.endpoints['systems'][member]['Storage'].get_members_data()
- for entity in members_data:
- for drive in members_data[entity]['Drives']:
- data: Dict[str, Any] = Endpoint(drive['@odata.id'], self.endpoints.client).data
- drive_id = data['Id']
- result[member][drive_id] = dict()
- result[member][drive_id]['redfish_endpoint'] = data['@odata.id']
- for field in fields:
- result[member][drive_id][to_snake_case(field)] = data.get(field)
- result[member][drive_id]['entity'] = entity
- self._sys['storage'] = normalize_dict(result)
+ def set_chassis_led(self, data: Dict[str, str]) -> int:
+ # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false
+ # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
+ try:
+ _, _, status = self.client.query(
+ data=json.dumps(data),
+ method='PATCH',
+ endpoint=list(self.endpoints['chassis'].get_members_endpoints().values())[0]
+ )
+ except HTTPError as e:
+ self.log.error(f"Couldn't set the ident chassis LED: {e}")
+ raise
+ return status
- def _update_sn(self) -> None:
- serials: List[str] = []
- self.log.debug('Updating serial number')
- data: Dict[str, Any] = self.endpoints['systems'].get_members_data()
- for sys in data.keys():
- serials.append(data[sys]['SKU'])
- self._sys['SKU'] = ','.join(serials)
+ def shutdown_host(self, force: bool = False) -> int:
+ reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
- def _update_memory(self) -> None:
- fields = ['Description',
- 'MemoryDeviceType',
- 'CapacityMiB',
- 'Status']
- self.log.debug('Updating memory')
- self.update('systems', 'memory', 'Memory', fields)
+ try:
+ job_id: str = self.create_reboot_job(reboot_type)
+ status = self.schedule_reboot_job(job_id)
+ except (HTTPError, KeyError) as e:
+ self.log.error(f"Couldn't create the reboot job: {e}")
+ raise
+ return status
- def _update_power(self) -> None:
- fields = [
- 'Name',
- 'Model',
- 'Manufacturer',
- 'Status'
- ]
- self.log.debug('Updating powersupplies')
- self.update('chassis', 'power', 'Power', fields, attribute='PowerSupplies')
+ def powercycle(self) -> int:
+ try:
+ job_id: str = self.create_reboot_job('PowerCycle')
+ status = self.schedule_reboot_job(job_id)
+ except (HTTPError, URLError) as e:
+ self.log.error(f"Couldn't perform power cycle: {e}")
+ raise
+ return status
- def _update_fans(self) -> None:
- fields = [
- 'Name',
- 'PhysicalContext',
- 'Status'
- ]
- self.log.debug('Updating fans')
- self.update('chassis', 'fans', 'Thermal', fields, attribute='Fans')
+ def create_reboot_job(self, reboot_type: str) -> str:
+ data: Dict[str, str] = dict(RebootJobType=reboot_type)
+ try:
+ headers, _, _ = self.client.query(
+ data=json.dumps(data),
+ endpoint=self.create_reboot_job_endpoint
+ )
+ job_id: str = headers['Location'].split('/')[-1]
+ except (HTTPError, URLError) as e:
+ self.log.error(f"Couldn't create the reboot job: {e}")
+ raise
+ return job_id
- def _update_firmwares(self) -> None:
- fields = [
- 'Name',
- 'Description',
- 'ReleaseDate',
- 'Version',
- 'Updateable',
- 'Status',
- ]
- self.log.debug('Updating firmwares')
- self.update('update_service', 'firmwares', 'FirmwareInventory', fields)
+ def schedule_reboot_job(self, job_id: str) -> int:
+ data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW')
+ try:
+ _, _, status = self.client.query(
+ data=json.dumps(data),
+ endpoint=self.setup_job_queue_endpoint
+ )
+ except (HTTPError, KeyError) as e:
+ self.log.error(f"Couldn't schedule the reboot job: {e}")
+ raise
+ return status