log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
raise ErrorResponse.wrap(e)
- def get_cluster_qos(self, cluster_id: str) -> Dict[str, Any]:
+ def get_cluster_qos(self, cluster_id: str, ret_bw_in_bytes: bool = False) -> Dict[str, Any]:
try:
if cluster_id in available_clusters(self.mgr):
qos_obj = self.get_cluster_qos_config(cluster_id)
- return qos_obj.to_dict() if qos_obj else {}
+ return qos_obj.to_dict(ret_bw_in_bytes) if qos_obj else {}
raise ClusterNotFound()
except Exception as e:
log.exception(f"Fetching NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
log.exception(f"Setting NFS-Ganesha QOS bandwidth control failed for {pseudo_path} of {cluster_id}")
raise ErrorResponse.wrap(e)
- def get_export_qos(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]:
+ def get_export_qos(self, cluster_id: str, pseudo_path: str, ret_bw_in_bytes: bool = False) -> Dict[str, int]:
try:
export_obj = self.get_export_obj(cluster_id, pseudo_path)
- return export_obj.qos_block.to_dict() if export_obj.qos_block else {}
+ return export_obj.qos_block.to_dict(ret_bw_in_bytes) if export_obj.qos_block else {}
except Exception as e:
log.exception(f"Failed to get QOS configuration for {pseudo_path} of {cluster_id}")
raise ErrorResponse.wrap(e)
result.values[QOSParams.client_rw_bw.value] = self.client_rw_bw
return result
- def to_dict(self) -> Dict[str, Any]:
+ @staticmethod
+ def bw_for_to_dict(bandwidth: int, ret_bw_in_bytes: bool = False) -> str:
+ return bytes_to_human(bandwidth) if not ret_bw_in_bytes else str(bandwidth)
+
+ def to_dict(self, ret_bw_in_bytes: bool = False) -> Dict[str, Any]:
r: dict[str, Any] = {}
r[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl
r[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl
if self.export_writebw:
- r[QOSParams.export_writebw.value] = bytes_to_human(self.export_writebw)
+ r[QOSParams.export_writebw.value] = self.bw_for_to_dict(self.export_writebw, ret_bw_in_bytes)
if self.export_readbw:
- r[QOSParams.export_readbw.value] = bytes_to_human(self.export_readbw)
+ r[QOSParams.export_readbw.value] = self.bw_for_to_dict(self.export_readbw, ret_bw_in_bytes)
if self.client_writebw:
- r[QOSParams.client_writebw.value] = bytes_to_human(self.client_writebw)
+ r[QOSParams.client_writebw.value] = self.bw_for_to_dict(self.client_writebw, ret_bw_in_bytes)
if self.client_readbw:
- r[QOSParams.client_readbw.value] = bytes_to_human(self.client_readbw)
+ r[QOSParams.client_readbw.value] = self.bw_for_to_dict(self.client_readbw, ret_bw_in_bytes)
if self.export_rw_bw:
- r[QOSParams.export_rw_bw.value] = bytes_to_human(self.export_rw_bw)
+ r[QOSParams.export_rw_bw.value] = self.bw_for_to_dict(self.export_rw_bw, ret_bw_in_bytes)
if self.client_rw_bw:
- r[QOSParams.client_rw_bw.value] = bytes_to_human(self.client_rw_bw)
+ r[QOSParams.client_rw_bw.value] = self.bw_for_to_dict(self.client_rw_bw, ret_bw_in_bytes)
return r
def qos_bandwidth_checks(self, qos_type: QOSType) -> None:
result.values.update(res.values)
return result
- def to_dict(self) -> Dict[str, Any]:
+ def to_dict(self, ret_bw_in_bytes: bool = False) -> Dict[str, Any]:
r: Dict[str, Any] = {}
r[QOSParams.enable_qos.value] = self.enable_qos
if self.cluster_op and self.qos_type:
r[QOSParams.qos_type.value] = self.qos_type.name
- if self.bw_obj and (res := self.bw_obj.to_dict()):
+ if self.bw_obj and (res := self.bw_obj.to_dict(ret_bw_in_bytes)):
r.update(res)
if self.ops_obj and (res := self.ops_obj.to_dict()):
r.update(res)
"enable_iops_control": False
}
+ qos_cluster_dict_bw_in_bytes = {
+ "enable_bw_control": True,
+ "enable_qos": True,
+ "combined_rw_bw_control": False,
+ "max_client_read_bw": "4000000",
+ "max_client_write_bw": "3000000",
+ "max_export_read_bw": "2000000",
+ "max_export_write_bw": "1000000",
+ "qos_type": "PerShare_PerClient",
+ "enable_iops_control": False
+ }
+
qos_export_dict = {
"enable_bw_control": True,
"enable_qos": True,
"max_export_write_bw": "1.0MB",
"enable_iops_control": False
}
+ qos_export_dict_bw_in_bytes = {
+ "enable_bw_control": True,
+ "enable_qos": True,
+ "combined_rw_bw_control": False,
+ "max_client_read_bw": "4000000",
+ "max_client_write_bw": "3000000",
+ "max_export_read_bw": "2000000",
+ "max_export_write_bw": "1000000",
+ "enable_iops_control": False
+ }
class RObject(object):
def __init__(self, key: str, raw: str) -> None:
assert qos.bw_obj.client_writebw == 3000000
assert qos.bw_obj.client_readbw == 4000000
- @pytest.mark.parametrize("qos_block, qos_dict", [
- (qos_cluster_block, qos_cluster_dict),
- (qos_export_block, qos_export_dict)
+ @pytest.mark.parametrize("qos_block, qos_dict, qos_dict_bw_in_bytes", [
+ (qos_cluster_block, qos_cluster_dict, qos_cluster_dict_bw_in_bytes),
+ (qos_export_block, qos_export_dict, qos_export_dict_bw_in_bytes)
])
- def test_qos_from_block(self, qos_block, qos_dict):
+ def test_qos_from_block(self, qos_block, qos_dict, qos_dict_bw_in_bytes):
blocks = GaneshaConfParser(qos_block).parse()
assert isinstance(blocks, list)
assert len(blocks) == 1
qos = QOS.from_qos_block(blocks[0], True)
assert qos.to_dict() == qos_dict
+ assert qos.to_dict(ret_bw_in_bytes=True) == qos_dict_bw_in_bytes
def _do_test_cluster_qos_bw(self, qos_type, combined_bw_ctrl, params, positive_tc):
nfs_mod = Module('nfs', '', '')