"enable_bw_control": True,
"enable_qos": True,
"combined_rw_bw_control": False,
- "max_client_read_bw": "4.0MB",
- "max_client_write_bw": "3.0MB",
- "max_export_read_bw": "2.0MB",
- "max_export_write_bw": "2.0MB",
+ "max_client_read_bw": bytes_to_human(4000000, mode='binary'),
+ "max_client_write_bw": bytes_to_human(3000000, mode='binary'),
+ "max_export_read_bw": bytes_to_human(2000000, mode='binary'),
+ "max_export_write_bw": bytes_to_human(2000000, mode='binary'),
"qos_type": "PerShare_PerClient",
"enable_iops_control": False
}
"enable_bw_control": True,
"enable_qos": True,
"combined_rw_bw_control": False,
- "max_client_read_bw": "4.0MB",
- "max_client_write_bw": "3.0MB",
- "max_export_read_bw": "2.0MB",
- "max_export_write_bw": "2.0MB",
+ "max_client_read_bw": bytes_to_human(4000000, mode='binary'),
+ "max_client_write_bw": bytes_to_human(3000000, mode='binary'),
+ "max_export_read_bw": bytes_to_human(2000000, mode='binary'),
+ "max_export_write_bw": bytes_to_human(2000000, mode='binary'),
"enable_iops_control": False
}
qos_export_dict_bw_in_bytes = {
def test_qos_from_dict(self):
qos = QOS.from_dict(self.qos_cluster_dict, True)
- assert qos.enable_qos == True
- assert qos.bw_obj.enable_bw_ctrl == True
- assert isinstance(qos.qos_type, QOSType)
- assert qos.bw_obj.export_writebw == 2000000
- assert qos.bw_obj.export_readbw == 2000000
- assert qos.bw_obj.client_writebw == 3000000
- assert qos.bw_obj.client_readbw == 4000000
+ assert qos.to_dict() == self.qos_cluster_dict
qos = QOS.from_dict(self.qos_export_dict)
- assert qos.enable_qos == True
- assert qos.bw_obj.enable_bw_ctrl == True
- assert qos.qos_type is None
- assert qos.bw_obj.export_writebw == 2000000
- assert qos.bw_obj.export_readbw == 2000000
- assert qos.bw_obj.client_writebw == 3000000
- assert qos.bw_obj.client_readbw == 4000000
+ assert qos.to_dict() == self.qos_export_dict
@pytest.mark.parametrize("qos_block, qos_dict, qos_dict_bw_in_bytes", [
(qos_cluster_block, qos_cluster_dict, qos_cluster_dict_bw_in_bytes),
out = cluster.get_cluster_qos(self.cluster_id)
expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": combined_bw_ctrl, "qos_type": qos_type.name, "enable_iops_control": False}
for key in params:
- expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(params[key]))
+ expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(params[key]), mode='binary')
assert out == expected_out
cluster.disable_cluster_qos_bw(self.cluster_id)
out = cluster.get_cluster_qos(self.cluster_id)
out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/')
expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": export_combined_bw_ctrl}
for key in export_params:
- expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_params[key]))
+ expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_params[key]), mode='binary')
expected_out.update(clust_qos_conf)
assert out == expected_out
export_mgr.disable_export_qos_bw(self.cluster_id, '/cephfs_a/')
bw_out = {}
ops_out = {}
for key in bw_params:
- bw_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(bw_params[key]))
+ bw_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(bw_params[key]), mode='binary')
for key in ops_params:
ops_out[QOSParams[key].value] = ops_params[key]
expected_out.update(bw_out)
bw_out = {}
ops_out = {}
for key in export_bw_params:
- bw_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_bw_params[key]))
+ bw_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_bw_params[key]), mode='binary')
for key in export_ops_params:
ops_out[QOSParams[key].value] = export_ops_params[key]
expected_out.update(bw_out)