From: Shweta Bhosale Date: Thu, 9 Jan 2025 13:53:37 +0000 (+0530) Subject: mgr/nfs: NFS commands to enable, disable and get QOS config for cluster and export X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f43262d8eb6725c8845ddabb26ca0fba045f1bb2;p=ceph.git mgr/nfs: NFS commands to enable, disable and get QOS config for cluster and export fixes: https://tracker.ceph.com/issues/69458 Signed-off-by: Shweta Bhosale --- diff --git a/src/cephadm/cephadmlib/container_engines.py b/src/cephadm/cephadmlib/container_engines.py index e46661c1b1b1..aca74bc1086a 100644 --- a/src/cephadm/cephadmlib/container_engines.py +++ b/src/cephadm/cephadmlib/container_engines.py @@ -14,7 +14,7 @@ from .constants import ( MIN_PODMAN_VERSION, PIDS_LIMIT_UNLIMITED_PODMAN_VERSION, ) -from .data_utils import with_units_to_int +from ceph.utils import with_units_to_int from .exceptions import Error diff --git a/src/cephadm/cephadmlib/data_utils.py b/src/cephadm/cephadmlib/data_utils.py index 9920c3999130..5b3e9cec43a7 100644 --- a/src/cephadm/cephadmlib/data_utils.py +++ b/src/cephadm/cephadmlib/data_utils.py @@ -56,51 +56,6 @@ def dict_get_join(d: Dict[str, Any], key: str) -> Any: return value -def bytes_to_human(num, mode='decimal'): - # type: (float, str) -> str - """Convert a bytes value into it's human-readable form. - - :param num: number, in bytes, to convert - :param mode: Either decimal (default) or binary to determine divisor - :returns: string representing the bytes value in a more readable format - """ - unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB'] - divisor = 1000.0 - yotta = 'YB' - - if mode == 'binary': - unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB'] - divisor = 1024.0 - yotta = 'YiB' - - for unit in unit_list: - if abs(num) < divisor: - return '%3.1f%s' % (num, unit) - num /= divisor - return '%.1f%s' % (num, yotta) - - -def with_units_to_int(v: str) -> int: - if v.endswith('iB'): - v = v[:-2] - elif v.endswith('B'): - v = v[:-1] - mult = 1 - if v[-1].upper() == 'K': - mult = 1024 - v = v[:-1] - elif v[-1].upper() == 'M': - mult = 1024 * 1024 - v = v[:-1] - elif v[-1].upper() == 'G': - mult = 1024 * 1024 * 1024 - v = v[:-1] - elif v[-1].upper() == 'T': - mult = 1024 * 1024 * 1024 * 1024 - v = v[:-1] - return int(float(v) * mult) - - def read_config(fn): # type: (Optional[str]) -> ConfigParser cp = ConfigParser() diff --git a/src/cephadm/cephadmlib/host_facts.py b/src/cephadm/cephadmlib/host_facts.py index ba05b5434c59..ec85430d2aa4 100644 --- a/src/cephadm/cephadmlib/host_facts.py +++ b/src/cephadm/cephadmlib/host_facts.py @@ -16,7 +16,7 @@ from typing import Any, cast, Dict, List, Optional, Set, Union from cephadmlib.call_wrappers import call, call_throws, CallVerbosity from cephadmlib.context import CephadmContext -from cephadmlib.data_utils import bytes_to_human +from ceph.utils import bytes_to_human from cephadmlib.exe_utils import find_executable from cephadmlib.file_utils import read_file from cephadmlib.net_utils import get_fqdn, get_ipv4_address, get_ipv6_address diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py index 801f62c92167..c2a7efdc7d90 100644 --- a/src/cephadm/tests/test_agent.py +++ b/src/cephadm/tests/test_agent.py @@ -254,8 +254,8 @@ def test_agent_daemon_ls_subset(cephadm_fs, funkypatch): assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid assert daemons['crash.host1']['container_id'] == crash_cid - assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB - assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB + assert daemons['mgr.host1.pntmho']['memory_usage'] == 456400000 # 456.4 MB + assert daemons['crash.host1']['memory_usage'] == 7082000 # 7.082 MB @mock.patch("cephadm.list_daemons") diff --git a/src/pybind/mgr/nfs/cluster.py b/src/pybind/mgr/nfs/cluster.py index 1b3ce8213f55..97138792ac35 100644 --- a/src/pybind/mgr/nfs/cluster.py +++ b/src/pybind/mgr/nfs/cluster.py @@ -18,8 +18,15 @@ from .utils import ( available_clusters, conf_obj_name, restart_nfs_service, - user_conf_obj_name) -from .export import NFSRados + user_conf_obj_name, + USER_CONF_PREFIX, + qos_conf_obj_name) +from .rados_utils import NFSRados +from .ganesha_conf import format_block, GaneshaConfParser +from .qos_conf import ( + QOS, + QOSType, + QOSBandwidthControl) if TYPE_CHECKING: from nfs.module import Module @@ -324,7 +331,7 @@ class NFSCluster: try: if cluster_id in available_clusters(self.mgr): rados_obj = self._rados(cluster_id) - if rados_obj.check_user_config(): + if rados_obj.check_config(USER_CONF_PREFIX): raise NonFatalError("NFS-Ganesha User Config already exists") rados_obj.write_obj(nfs_config, user_conf_obj_name(cluster_id), conf_obj_name(cluster_id)) @@ -342,7 +349,7 @@ class NFSCluster: try: if cluster_id in available_clusters(self.mgr): rados_obj = self._rados(cluster_id) - if not rados_obj.check_user_config(): + if not rados_obj.check_config(USER_CONF_PREFIX): raise NonFatalError("NFS-Ganesha User Config does not exist") rados_obj.remove_obj(user_conf_obj_name(cluster_id), conf_obj_name(cluster_id)) @@ -358,3 +365,100 @@ class NFSCluster: def _rados(self, cluster_id: str) -> NFSRados: """Return a new NFSRados object for the given cluster id.""" return NFSRados(self.mgr.rados, cluster_id) + + def get_cluster_qos_config(self, cluster_id: str) -> Optional[QOS]: + """Return QOS object for the given cluster id.""" + rados_obj = self._rados(cluster_id) + conf = rados_obj.read_obj(qos_conf_obj_name(cluster_id)) + if conf: + qos_block = GaneshaConfParser(conf).parse() + qos_obj = QOS.from_qos_block(qos_block[0], True) + return qos_obj + return None + + def update_cluster_qos_bw(self, + cluster_id: str, + enable_qos: bool, + bw_obj: QOSBandwidthControl, + qos_type: Optional[QOSType] = None) -> None: + """Update cluster QOS config""" + qos_obj_exists = False + qos_obj = self.get_cluster_qos_config(cluster_id) + if not qos_obj: + log.debug(f"Creating new QOS block for cluster {cluster_id}") + qos_obj = QOS(True, enable_qos, qos_type, bw_obj) + else: + log.debug(f"Updating existing QOS block for cluster {cluster_id}") + qos_obj_exists = True + qos_obj.enable_qos = enable_qos + qos_obj.qos_type = qos_type + qos_obj.bw_obj = bw_obj + + qos_config = format_block(qos_obj.to_qos_block()) + rados_obj = self._rados(cluster_id) + if not qos_obj_exists: + rados_obj.write_obj(qos_config, qos_conf_obj_name(cluster_id), + conf_obj_name(cluster_id)) + else: + rados_obj.update_obj(qos_config, qos_conf_obj_name(cluster_id), + conf_obj_name(cluster_id), should_notify=False) + log.debug(f"Successfully saved {cluster_id}s QOS bandwidth control config: \n {qos_config}") + + def enable_cluster_qos_bw(self, + cluster_id: str, + qos_type: QOSType, + bw_obj: QOSBandwidthControl + ) -> None: + """ + There are 2 cases to consider: + 1. If combined bandwith control is disabled + a. If qos_type is pershare, then export_writebw and export_readbw parameters are compulsory + b. If qos_type is perclient, then client_writebw and client_readbw parameters are compulsory + c. If qos_type is pershare_perclient then export_writebw, export_readbw, client_writebw and + client_readbw are compulsory parameters + 2. If combined bandwidth control is enabled + a. If qos_type is pershare, then export_rw_bw parameter is compulsory + b. If qos_type is perclient, then client_rw_bw parameter is compulsory + c. If qos_type is pershare_perclient, then export_rw_bw and client_rw_bw parameters are compulsory + """ + try: + bw_obj.qos_bandwidth_checks(qos_type) + if cluster_id in available_clusters(self.mgr): + self.update_cluster_qos_bw(cluster_id, True, bw_obj, qos_type) + restart_nfs_service(self.mgr, cluster_id) + log.info(f"QOS bandwidth control has been successfully enabled for cluster {cluster_id}. " + "If the qos_type is changed during this process, ensure that the bandwidth " + "values for all exports are updated accordingly.") + return + raise ClusterNotFound() + except NotImplementedError: + raise ManualRestartRequired(f"NFS-Ganesha QOS bandwidth control config added Successfully for {cluster_id}") + except Exception as e: + log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}") + raise ErrorResponse.wrap(e) + + def get_cluster_qos(self, cluster_id: str) -> Dict[str, Any]: + try: + if cluster_id in available_clusters(self.mgr): + qos_obj = self.get_cluster_qos_config(cluster_id) + return qos_obj.to_dict() if qos_obj else {} + raise ClusterNotFound() + except Exception as e: + log.exception(f"Fetching NFS-Ganesha QOS bandwidth control config failed for {cluster_id}") + raise ErrorResponse.wrap(e) + + def disable_cluster_qos_bw(self, cluster_id: str) -> None: + try: + if cluster_id in available_clusters(self.mgr): + self.update_cluster_qos_bw(cluster_id, False, QOSBandwidthControl()) + restart_nfs_service(self.mgr, cluster_id) + log.info("Cluster-level QoS bandwidth control has been successfully disabled for " + f"cluster {cluster_id}. As a result, export-level bandwidth control will " + "no longer have any effect, even if it's enabled.") + return + raise ClusterNotFound() + except NotImplementedError: + raise ManualRestartRequired(f"NFS-Ganesha QOS bandwidth control config added successfully for {cluster_id}") + except Exception as e: + log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}") + raise ErrorResponse.wrap(e) diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py index f3fe29405aab..724ea969d91e 100644 --- a/src/pybind/mgr/nfs/export.py +++ b/src/pybind/mgr/nfs/export.py @@ -17,7 +17,7 @@ from ceph.fs.earmarking import EarmarkTopScope import cephfs from mgr_util import CephFSEarmarkResolver -from rados import TimedOut, ObjectNotFound, Rados +from rados import TimedOut, ObjectNotFound from object_format import ErrorResponse from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS @@ -27,13 +27,13 @@ from .ganesha_conf import ( Export, GaneshaConfParser, RGWFSAL, - RawBlock, format_block) +from .qos_conf import QOS, QOSBandwidthControl +from .export_utils import export_qos_bw_checks, export_dict_qos_checks from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound from .utils import ( EXPORT_PREFIX, NonFatalError, - USER_CONF_PREFIX, export_obj_name, conf_obj_name, available_clusters, @@ -41,6 +41,7 @@ from .utils import ( get_nfs_spec_for_cluster, restart_nfs_service, cephfs_path_is_dir) +from .rados_utils import NFSRados if TYPE_CHECKING: from nfs.module import Module @@ -90,79 +91,6 @@ def _validate_cmount_path(cmount_path: str, path: str) -> None: ) -class NFSRados: - def __init__(self, rados: 'Rados', namespace: str) -> None: - self.rados = rados - self.pool = POOL_NAME - self.namespace = namespace - - def _make_rados_url(self, obj: str) -> str: - return "rados://{}/{}/{}".format(self.pool, self.namespace, obj) - - def _create_url_block(self, obj_name: str) -> RawBlock: - return RawBlock('%url', values={'value': self._make_rados_url(obj_name)}) - - def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None: - with self.rados.open_ioctx(self.pool) as ioctx: - ioctx.set_namespace(self.namespace) - ioctx.write_full(obj, conf_block.encode('utf-8')) - if not config_obj: - # Return after creating empty common config object - return - log.debug("write configuration into rados object %s/%s/%s", - self.pool, self.namespace, obj) - - # Add created obj url to common config obj - ioctx.append(config_obj, format_block( - self._create_url_block(obj)).encode('utf-8')) - _check_rados_notify(ioctx, config_obj) - log.debug("Added %s url to %s", obj, config_obj) - - def read_obj(self, obj: str) -> Optional[str]: - with self.rados.open_ioctx(self.pool) as ioctx: - ioctx.set_namespace(self.namespace) - try: - return ioctx.read(obj, 1048576).decode() - except ObjectNotFound: - return None - - def update_obj(self, conf_block: str, obj: str, config_obj: str, - should_notify: Optional[bool] = True) -> None: - with self.rados.open_ioctx(self.pool) as ioctx: - ioctx.set_namespace(self.namespace) - ioctx.write_full(obj, conf_block.encode('utf-8')) - log.debug("write configuration into rados object %s/%s/%s", - self.pool, self.namespace, obj) - if should_notify: - _check_rados_notify(ioctx, config_obj) - log.debug("Update export %s in %s", obj, config_obj) - - def remove_obj(self, obj: str, config_obj: str) -> None: - with self.rados.open_ioctx(self.pool) as ioctx: - ioctx.set_namespace(self.namespace) - export_urls = ioctx.read(config_obj) - url = '%url "{}"\n\n'.format(self._make_rados_url(obj)) - export_urls = export_urls.replace(url.encode('utf-8'), b'') - ioctx.remove_object(obj) - ioctx.write_full(config_obj, export_urls) - _check_rados_notify(ioctx, config_obj) - log.debug("Object deleted: %s", url) - - def remove_all_obj(self) -> None: - with self.rados.open_ioctx(self.pool) as ioctx: - ioctx.set_namespace(self.namespace) - for obj in ioctx.list_objects(): - obj.remove() - - def check_user_config(self) -> bool: - with self.rados.open_ioctx(self.pool) as ioctx: - ioctx.set_namespace(self.namespace) - for obj in ioctx.list_objects(): - if obj.key.startswith(USER_CONF_PREFIX): - return True - return False - - class AppliedExportResults: """Gathers the results of multiple changed exports. Returned by apply_export. @@ -930,6 +858,10 @@ class ExportMgr: elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key: raise NFSInvalidOperation('secret_access_key change is not allowed') + # check QOS + if new_export_dict.get('qos_block'): + export_dict_qos_checks(cluster_id, self.mgr, dict(new_export_dict.get('qos_block', {}))) + self.exports[cluster_id].remove(old_export) self._update_export(cluster_id, new_export, need_nfs_service_restart) @@ -948,6 +880,74 @@ class ExportMgr: exports_count += 1 return exports_count + def update_export_qos_bw(self, + cluster_id: str, + pseudo_path: str, + enable_qos: bool, + bw_obj: QOSBandwidthControl) -> None: + """Update Export QOS block""" + export = self._fetch_export(cluster_id, pseudo_path) + if not export: + raise NFSObjectNotFound(f"Export {pseudo_path} not found in NFS cluster {cluster_id}") + # if qos_block does not exists in export create one else update existing block + if not export.qos_block: + log.debug(f"Creating new QOS block for export {pseudo_path} of cluster {cluster_id}") + export.qos_block = QOS(enable_qos=enable_qos, bw_obj=bw_obj) + else: + log.debug(f"Updating existing QOS block of export {pseudo_path} of cluster {cluster_id}") + export.qos_block.enable_qos = enable_qos + export.qos_block.bw_obj = bw_obj + + self.exports[cluster_id].remove(export) + self._update_export(cluster_id, export, False) + log.debug(f"Successfully updated QOS bandwidth control config for export {pseudo_path} of cluster {cluster_id}") + + def enable_export_qos_bw(self, + cluster_id: str, + pseudo_path: str, + bw_obj: QOSBandwidthControl + ) -> None: + """ + There are 2 cases to consider, based on QOS type set on cluster level + 1. If combined bandwith control is disabled + a. If qos_type is pershare, then export_writebw and export_readbw parameters are compulsory + b. If qos_type is perclient, then can't enable export level qos + c. If qos_type is pershare_perclient then export_writebw, export_readbw, client_writebw and + client_readbw are compulsory parameters + 2. If combined bandwidth control is enabled + a. If qos_type is pershare, then export_rw_bw parameter is compulsory + b. If qos_type is perclient, then can't enable export level qos + c. If qos_type is pershare_perclient, then export_rw_bw and client_rw_bw parameters are compulsory + """ + try: + self._validate_cluster_id(cluster_id) + assert pseudo_path + export_qos_bw_checks(cluster_id, self.mgr, bw_obj=bw_obj) + self.update_export_qos_bw(cluster_id, pseudo_path, True, bw_obj) + except Exception as e: + log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {pseudo_path} of {cluster_id}") + raise ErrorResponse.wrap(e) + + def get_export_qos(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]: + try: + self._validate_cluster_id(cluster_id) + export = self._fetch_export(cluster_id, pseudo_path) + if not export: + raise NFSObjectNotFound(f"Export {pseudo_path} not found in NFS cluster {cluster_id}") + return export.qos_block.to_dict() if export.qos_block else {} + except Exception as e: + log.exception(f"Failed to get QOS configuration for {pseudo_path} of {cluster_id}") + raise ErrorResponse.wrap(e) + + def disable_export_qos_bw(self, cluster_id: str, pseudo_path: str) -> None: + try: + self._validate_cluster_id(cluster_id) + assert pseudo_path + self.update_export_qos_bw(cluster_id, pseudo_path, False, QOSBandwidthControl()) + except Exception as e: + log.exception(f"Setting NFS-Ganesha QOS bandwidth control Config failed for {pseudo_path} of {cluster_id}") + raise ErrorResponse.wrap(e) + def get_user_id(cluster_id: str, fs_name: str, cmount_path: str) -> str: """ diff --git a/src/pybind/mgr/nfs/export_utils.py b/src/pybind/mgr/nfs/export_utils.py new file mode 100644 index 000000000000..86f560b84f45 --- /dev/null +++ b/src/pybind/mgr/nfs/export_utils.py @@ -0,0 +1,67 @@ +from typing import Any + +from .cluster import NFSCluster +from .qos_conf import QOSType, QOSParams, QOSBandwidthControl + + +def export_dict_bw_checks(cluster_id: str, + mgr_obj: Any, + qos_enable: bool, + qos_dict: dict) -> None: + enable_bw_ctrl = qos_dict.get('enable_bw_control') + combined_bw_ctrl = qos_dict.get('combined_rw_bw_control') + bandwith_param_exists = any(key.endswith('bw') for key in qos_dict) + if enable_bw_ctrl is None: + if combined_bw_ctrl and bandwith_param_exists: + raise Exception('Bandwidth control is not enabled but associated parameters exists') + return + if combined_bw_ctrl is None: + combined_bw_ctrl = False + if not qos_enable and enable_bw_ctrl: + raise Exception('To enable bandwidth control, qos_enable should be true.') + if not (isinstance(enable_bw_ctrl, bool) and isinstance(combined_bw_ctrl, bool)): + raise Exception('Invalid values for the enable_bw_ctrl and combined_bw_ctrl parameters.') + # if qos is disabled, then bandwidths should not be set and no need to bandwidth checks + if not enable_bw_ctrl: + if bandwith_param_exists: + raise Exception('Bandwidths should not be passed when enable_bw_control is false.') + return + if enable_bw_ctrl and not bandwith_param_exists: + raise Exception('Bandwidths should be set when enable_bw_control is true.') + bw_obj = QOSBandwidthControl(enable_bw_ctrl, + combined_bw_ctrl, + export_writebw=qos_dict.get(QOSParams.export_writebw.value, '0'), + export_readbw=qos_dict.get(QOSParams.export_readbw.value, '0'), + client_writebw=qos_dict.get(QOSParams.client_writebw.value, '0'), + client_readbw=qos_dict.get(QOSParams.client_readbw.value, '0'), + export_rw_bw=qos_dict.get(QOSParams.export_rw_bw.value, '0'), + client_rw_bw=qos_dict.get(QOSParams.client_rw_bw.value, '0')) + export_qos_bw_checks(cluster_id, mgr_obj, bw_obj) + + +def export_dict_qos_checks(cluster_id: str, + mgr_obj: Any, + qos_dict: dict) -> None: + """Validate the qos block of dict passed to apply_export method""" + qos_enable = qos_dict.get('enable_qos') + if qos_enable is None: + raise Exception('The QOS block requires at least the enable_qos parameter') + if not isinstance(qos_enable, bool): + raise Exception('Invalid value for the enable_qos parameter') + export_dict_bw_checks(cluster_id, mgr_obj, qos_enable, qos_dict) + + +def export_qos_bw_checks(cluster_id: str, + mgr_obj: Any, + bw_obj: QOSBandwidthControl, + nfs_clust_obj: Any = None) -> None: + """check cluster level qos is enabled to enable export level qos and validate bandwidths""" + if not nfs_clust_obj: + nfs_clust_obj = NFSCluster(mgr_obj) + clust_qos_obj = nfs_clust_obj.get_cluster_qos_config(cluster_id) + if not clust_qos_obj or (clust_qos_obj and not (clust_qos_obj.enable_qos)): + raise Exception('To configure bandwidth control for export, you must first enable bandwidth control at the cluster level.') + if clust_qos_obj.qos_type: + if clust_qos_obj.qos_type == QOSType.PerClient: + raise Exception('Export-level QoS bandwidth control cannot be enabled if the QoS type at the cluster level is set to PerClient.') + bw_obj.qos_bandwidth_checks(clust_qos_obj.qos_type) diff --git a/src/pybind/mgr/nfs/ganesha_conf.py b/src/pybind/mgr/nfs/ganesha_conf.py index 759b4dcdc98d..3945aa955740 100644 --- a/src/pybind/mgr/nfs/ganesha_conf.py +++ b/src/pybind/mgr/nfs/ganesha_conf.py @@ -5,6 +5,7 @@ from mgr_module import NFS_GANESHA_SUPPORTED_FSALS from .exception import NFSInvalidOperation, FSNotFound from .utils import check_fs +from .qos_conf import QOS, RawBlock if TYPE_CHECKING: from nfs.module import Module @@ -60,27 +61,6 @@ def _validate_xprtsec_type(xprtsec: str) -> None: f"XprtSec {xprtsec} invalid, valid types are {valid_xprtsec_types}") -class RawBlock(): - def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}): - if not values: # workaround mutable default argument - values = {} - if not blocks: # workaround mutable default argument - blocks = [] - self.block_name = block_name - self.blocks = blocks - self.values = values - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, RawBlock): - return False - return self.block_name == other.block_name and \ - self.blocks == other.blocks and \ - self.values == other.values - - def __repr__(self) -> str: - return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})' - - class GaneshaConfParser: def __init__(self, raw_config: str): self.pos = 0 @@ -383,7 +363,8 @@ class Export: fsal: FSAL, clients: Optional[List[Client]] = None, sectype: Optional[List[str]] = None, - xprtsec: Optional[str] = None) -> None: + xprtsec: Optional[str] = None, + qos_block: Optional[QOS] = None) -> None: self.export_id = export_id self.path = path self.fsal = fsal @@ -398,6 +379,7 @@ class Export: self.clients: List[Client] = clients or [] self.sectype = sectype self.xprtsec = xprtsec + self.qos_block = qos_block @classmethod def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export': @@ -407,6 +389,10 @@ class Export: client_blocks = [b for b in export_block.blocks if b.block_name == "CLIENT"] + qos_block = [b for b in export_block.blocks + if b.block_name == "qos_block"] + qos_block = QOS.from_qos_block(qos_block[0]) if qos_block else None + protocols = export_block.values.get('protocols') if not isinstance(protocols, list): protocols = [protocols] @@ -443,7 +429,9 @@ class Export: [Client.from_client_block(client) for client in client_blocks], sectype=sectype, - xprtsec=xprtsec) + xprtsec=xprtsec, + qos_block=qos_block + ) def to_export_block(self) -> RawBlock: values = { @@ -468,10 +456,16 @@ class Export: client.to_client_block() for client in self.clients ] + if self.qos_block: + result.blocks.append(self.qos_block.to_qos_block()) return result @classmethod def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export': + if ex_dict.get('qos_block'): + qos_block = QOS.from_dict(ex_dict.get('qos_block', {})) + else: + qos_block = None return cls(export_id, ex_dict.get('path', '/'), ex_dict['cluster_id'], @@ -484,7 +478,9 @@ class Export: FSAL.from_dict(ex_dict.get('fsal', {})), [Client.from_dict(client) for client in ex_dict.get('clients', [])], sectype=ex_dict.get("sectype"), - xprtsec=ex_dict.get('XprtSec')) + xprtsec=ex_dict.get('XprtSec'), + qos_block=qos_block + ) def to_dict(self) -> Dict[str, Any]: values = { @@ -504,6 +500,8 @@ class Export: values['sectype'] = self.sectype if self.xprtsec: values['XprtSec'] = self.xprtsec + if self.qos_block: + values['qos_block'] = self.qos_block.to_dict() return values def validate(self, mgr: 'Module') -> None: diff --git a/src/pybind/mgr/nfs/module.py b/src/pybind/mgr/nfs/module.py index 762a2fab9c5a..747c198b4d6a 100644 --- a/src/pybind/mgr/nfs/module.py +++ b/src/pybind/mgr/nfs/module.py @@ -14,6 +14,7 @@ from mgr_util import CephFSEarmarkResolver from .export import ExportMgr, AppliedExportResults from .cluster import NFSCluster from .utils import available_clusters +from .qos_conf import QOSType, QOSBandwidthControl, UserQoSType log = logging.getLogger(__name__) @@ -243,3 +244,84 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): def fetch_nfs_cluster_obj(self) -> NFSCluster: return self.nfs + + @CLICommand('nfs export qos enable bandwidth_control', perm='rw') + @object_format.EmptyResponder() + def _cmd_export_qos_bw_enable(self, + cluster_id: str, + pseudo_path: str, + combined_rw_bw_ctrl: bool = False, + max_export_write_bw: str = '0', + max_export_read_bw: str = '0', + max_client_write_bw: str = '0', + max_client_read_bw: str = '0', + max_export_combined_bw: str = '0', + max_client_combined_bw: str = '0' + ) -> None: + """enable QOS config for NFS export and set different bandwidth""" + try: + bw_obj = QOSBandwidthControl(enable_bw_ctrl=True, + combined_bw_ctrl=combined_rw_bw_ctrl, + export_writebw=max_export_write_bw, + export_readbw=max_export_read_bw, + client_writebw=max_client_write_bw, + client_readbw=max_client_read_bw, + export_rw_bw=max_export_combined_bw, + client_rw_bw=max_client_combined_bw) + except Exception as e: + raise object_format.ErrorResponse.wrap(e) + return self.export_mgr.enable_export_qos_bw(cluster_id=cluster_id, + pseudo_path=pseudo_path, + bw_obj=bw_obj) + + @CLICommand('nfs export qos get', perm='r') + @object_format.Responder() + def _cmd_export_qos_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]: + """Get NFS export QOS config""" + return self.export_mgr.get_export_qos(cluster_id, pseudo_path) + + @CLICommand('nfs export qos disable bandwidth_control', perm='rw') + @object_format.EmptyResponder() + def _cmd_export_qos_bw_disable(self, cluster_id: str, pseudo_path: str) -> None: + """Disable NFS export QOS config""" + return self.export_mgr.disable_export_qos_bw(cluster_id, pseudo_path) + + @CLICommand('nfs cluster qos enable bandwidth_control', perm='rw') + @object_format.EmptyResponder() + def _cmd_cluster_qos_bw_enable(self, + cluster_id: str, + qos_type: UserQoSType, + combined_rw_bw_ctrl: bool = False, + max_export_write_bw: str = '0', + max_export_read_bw: str = '0', + max_client_write_bw: str = '0', + max_client_read_bw: str = '0', + max_export_combined_bw: str = '0', + max_client_combined_bw: str = '0') -> None: + """Enable QOS ratelimiting for NFS cluster and set default export and client max bandwidth""" + try: + bw_obj = QOSBandwidthControl(enable_bw_ctrl=True, + combined_bw_ctrl=combined_rw_bw_ctrl, + export_writebw=max_export_write_bw, + export_readbw=max_export_read_bw, + client_writebw=max_client_write_bw, + client_readbw=max_client_read_bw, + export_rw_bw=max_export_combined_bw, + client_rw_bw=max_client_combined_bw) + except Exception as e: + raise object_format.ErrorResponse.wrap(e) + return self.nfs.enable_cluster_qos_bw(cluster_id=cluster_id, + qos_type=QOSType[qos_type.value], + bw_obj=bw_obj) + + @CLICommand('nfs cluster qos disable bandwidth_control', perm='rw') + @object_format.EmptyResponder() + def _cmd_cluster_qos_bw_disable(self, cluster_id: str) -> None: + """Disable QOS for NFS cluster""" + return self.nfs.disable_cluster_qos_bw(cluster_id) + + @CLICommand('nfs cluster qos get', perm='r') + @object_format.Responder() + def _cmd_cluster_qos_get(self, cluster_id: str) -> Dict[str, Any]: + """Get QOS configuration of NFS cluster""" + return self.nfs.get_cluster_qos(cluster_id) diff --git a/src/pybind/mgr/nfs/qos_conf.py b/src/pybind/mgr/nfs/qos_conf.py new file mode 100644 index 000000000000..91c1ca35e701 --- /dev/null +++ b/src/pybind/mgr/nfs/qos_conf.py @@ -0,0 +1,247 @@ +from typing import List, Dict, Any, Optional +from enum import Enum + +from ceph.utils import bytes_to_human, with_units_to_int + + +class RawBlock(): + def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}): + if not values: # workaround mutable default argument + values = {} + if not blocks: # workaround mutable default argument + blocks = [] + self.block_name = block_name + self.blocks = blocks + self.values = values + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, RawBlock): + return False + return self.block_name == other.block_name and \ + self.blocks == other.blocks and \ + self.values == other.values + + def __repr__(self) -> str: + return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})' + + +class QOSParams(Enum): + clust_block = "QOS_DEFAULT_CONFIG" + export_block = "QOS_BLOCK" + enable_qos = "enable_qos" + enable_bw_ctrl = "enable_bw_control" + combined_bw_ctrl = "combined_rw_bw_control" + qos_type = "qos_type" + export_writebw = "max_export_write_bw" + export_readbw = "max_export_read_bw" + client_writebw = "max_client_write_bw" + client_readbw = "max_client_read_bw" + export_rw_bw = "max_export_combined_bw" + client_rw_bw = "max_client_combined_bw" + + +class UserQoSType(Enum): + per_share = 'PerShare' + per_client = 'PerClient' + per_share_per_client = 'PerShare_PerClient' + + +class QOSType(Enum): + PerShare = 1 + PerClient = 2 + PerShare_PerClient = 3 + + +def _validate_qos_bw(bandwidth: str) -> int: + min_bw = 1000000 # 1MB + max_bw = 2000000000 # 2GB + bw_bytes = with_units_to_int(bandwidth) + if bw_bytes != 0 and (bw_bytes < min_bw or bw_bytes > max_bw): + raise Exception(f"Provided bandwidth value is not in range, Please enter a value between {min_bw} (1MB) and {max_bw} (2GB) bytes") + return bw_bytes + + +QOS_REQ_PARAMS = { + 'combined_bw_disabled': { + 'PerShare': ['max_export_write_bw', 'max_export_read_bw'], + 'PerClient': ['max_client_write_bw', 'max_client_read_bw'], + 'PerShare_PerClient': ['max_export_write_bw', 'max_export_read_bw', 'max_client_write_bw', 'max_client_read_bw'] + }, + 'combined_bw_enabled': { + 'PerShare': ['max_export_combined_bw'], + 'PerClient': ['max_client_combined_bw'], + 'PerShare_PerClient': ['max_export_combined_bw', 'max_client_combined_bw'], + } +} + + +class QOSBandwidthControl(object): + def __init__(self, + enable_bw_ctrl: bool = False, + combined_bw_ctrl: bool = False, + export_writebw: str = '0', + export_readbw: str = '0', + client_writebw: str = '0', + client_readbw: str = '0', + export_rw_bw: str = '0', + client_rw_bw: str = '0' + ) -> None: + self.enable_bw_ctrl = enable_bw_ctrl + self.combined_bw_ctrl = combined_bw_ctrl + try: + self.export_writebw: int = _validate_qos_bw(export_writebw) + self.export_readbw: int = _validate_qos_bw(export_readbw) + self.client_writebw: int = _validate_qos_bw(client_writebw) + self.client_readbw: int = _validate_qos_bw(client_readbw) + self.export_rw_bw: int = _validate_qos_bw(export_rw_bw) + self.client_rw_bw: int = _validate_qos_bw(client_rw_bw) + except Exception as e: + raise Exception(f"Invalid bandwidth value. {e}") + + @classmethod + def from_dict(cls, qos_dict: Dict[str, Any]) -> 'QOSBandwidthControl': + # json has bandwidths in human readable format(str) + bw_kwargs = { + 'enable_bw_ctrl': qos_dict.get(QOSParams.enable_bw_ctrl.value, False), + 'combined_bw_ctrl': qos_dict.get(QOSParams.combined_bw_ctrl.value, False), + 'export_writebw': qos_dict.get(QOSParams.export_writebw.value, '0'), + 'export_readbw': qos_dict.get(QOSParams.export_readbw.value, '0'), + 'client_writebw': qos_dict.get(QOSParams.client_writebw.value, '0'), + 'client_readbw': qos_dict.get(QOSParams.client_readbw.value, '0'), + 'export_rw_bw': qos_dict.get(QOSParams.export_rw_bw.value, '0'), + 'client_rw_bw': qos_dict.get(QOSParams.client_rw_bw.value, '0') + } + return cls(**bw_kwargs) + + @classmethod + def from_qos_block(cls, qos_block: RawBlock) -> 'QOSBandwidthControl': + # block has bandwidths in bytes(int) + bw_kwargs = { + 'enable_bw_ctrl': qos_block.values.get(QOSParams.enable_bw_ctrl.value, False), + 'combined_bw_ctrl': qos_block.values.get(QOSParams.combined_bw_ctrl.value, False), + 'export_writebw': str(qos_block.values.get(QOSParams.export_writebw.value, 0)), + 'export_readbw': str(qos_block.values.get(QOSParams.export_readbw.value, 0)), + 'client_writebw': str(qos_block.values.get(QOSParams.client_writebw.value, 0)), + 'client_readbw': str(qos_block.values.get(QOSParams.client_readbw.value, 0)), + 'export_rw_bw': str(qos_block.values.get(QOSParams.export_rw_bw.value, 0)), + 'client_rw_bw': str(qos_block.values.get(QOSParams.client_rw_bw.value, 0)) + } + return cls(**bw_kwargs) + + def to_qos_block(self) -> RawBlock: + result = RawBlock('qos_bandwidths_control') + result.values[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl + result.values[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl + if self.export_writebw: + result.values[QOSParams.export_writebw.value] = self.export_writebw + if self.export_readbw: + result.values[QOSParams.export_readbw.value] = self.export_readbw + if self.client_writebw: + result.values[QOSParams.client_writebw.value] = self.client_writebw + if self.client_readbw: + result.values[QOSParams.client_readbw.value] = self.client_readbw + if self.export_rw_bw: + result.values[QOSParams.export_rw_bw.value] = self.export_rw_bw + if self.client_rw_bw: + result.values[QOSParams.client_rw_bw.value] = self.client_rw_bw + return result + + def to_dict(self) -> Dict[str, Any]: + r: dict[str, Any] = {} + r[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl + r[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl + if self.export_writebw: + r[QOSParams.export_writebw.value] = bytes_to_human(self.export_writebw) + if self.export_readbw: + r[QOSParams.export_readbw.value] = bytes_to_human(self.export_readbw) + if self.client_writebw: + r[QOSParams.client_writebw.value] = bytes_to_human(self.client_writebw) + if self.client_readbw: + r[QOSParams.client_readbw.value] = bytes_to_human(self.client_readbw) + if self.export_rw_bw: + r[QOSParams.export_rw_bw.value] = bytes_to_human(self.export_rw_bw) + if self.client_rw_bw: + r[QOSParams.client_rw_bw.value] = bytes_to_human(self.client_rw_bw) + return r + + def qos_bandwidth_checks(self, qos_type: QOSType) -> None: + """Checks for enabling qos""" + params = {} + d = vars(self) + for key in d: + if key.endswith('bw'): + params[QOSParams[key].value] = d[key] + if not self.combined_bw_ctrl: + req_params = QOS_REQ_PARAMS['combined_bw_disabled'][qos_type.name] + else: + req_params = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name] + allowed_params = [] + not_allowed_params = [] + for key in params: + if key in req_params and params[key] == 0: + allowed_params.append(key) + elif key not in req_params and params[key] != 0: + not_allowed_params.append(key) + if allowed_params or not_allowed_params: + raise Exception(f"When combined_rw_bw is {'enabled' if self.combined_bw_ctrl else 'disabled'} " + f"and qos_type is {qos_type.name}, " + f"{'attributes ' + ', '.join(allowed_params) + ' required' if allowed_params else ''} " + f"{'attributes ' + ', '.join(not_allowed_params) + ' are not allowed' if not_allowed_params else ''}.") + + +class QOS(object): + def __init__(self, + cluster_op: bool = False, + enable_qos: bool = False, + qos_type: Optional[QOSType] = None, + bw_obj: Optional[QOSBandwidthControl] = None + ) -> None: + self.cluster_op = cluster_op + self.enable_qos = enable_qos + self.qos_type = qos_type + self.bw_obj = bw_obj + + @classmethod + def from_dict(cls, qos_dict: Dict[str, Any], cluster_op: bool = False) -> 'QOS': + kwargs: dict[str, Any] = {} + # qos dict will have qos type as enum name + if cluster_op: + qos_type = qos_dict.get(QOSParams.qos_type.value) + if qos_type: + kwargs['qos_type'] = QOSType[qos_type] + kwargs['enable_qos'] = qos_dict.get(QOSParams.enable_qos.value) + kwargs['bw_obj'] = QOSBandwidthControl.from_dict(qos_dict) + return cls(cluster_op, **kwargs) + + @classmethod + def from_qos_block(cls, qos_block: RawBlock, cluster_op: bool = False) -> 'QOS': + kwargs: dict[str, Any] = {} + # qos block will have qos type as enum value + if cluster_op: + qos_type = qos_block.values.get(QOSParams.qos_type.value) + if qos_type: + kwargs['qos_type'] = QOSType(qos_type) + kwargs['enable_qos'] = qos_block.values.get(QOSParams.enable_qos.value) + kwargs['bw_obj'] = QOSBandwidthControl.from_qos_block(qos_block) + return cls(cluster_op, **kwargs) + + def to_qos_block(self) -> RawBlock: + if self.cluster_op: + result = RawBlock(QOSParams.clust_block.value) + else: + result = RawBlock(QOSParams.export_block.value) + result.values[QOSParams.enable_qos.value] = self.enable_qos + if self.cluster_op and self.qos_type: + result.values[QOSParams.qos_type.value] = self.qos_type.value + if self.bw_obj and (res := self.bw_obj.to_qos_block()): + result.values.update(res.values) + return result + + def to_dict(self) -> Dict[str, Any]: + r: Dict[str, Any] = {} + r[QOSParams.enable_qos.value] = self.enable_qos + if self.cluster_op and self.qos_type: + r[QOSParams.qos_type.value] = self.qos_type.name + if self.bw_obj and (res := self.bw_obj.to_dict()): + r.update(res) + return r diff --git a/src/pybind/mgr/nfs/rados_utils.py b/src/pybind/mgr/nfs/rados_utils.py new file mode 100644 index 000000000000..5ce699fd3ff2 --- /dev/null +++ b/src/pybind/mgr/nfs/rados_utils.py @@ -0,0 +1,89 @@ +import logging +from typing import Optional, Any + +from rados import TimedOut, ObjectNotFound, Rados +from mgr_module import NFS_POOL_NAME as POOL_NAME +from .ganesha_conf import RawBlock, format_block +from .utils import USER_CONF_PREFIX + +log = logging.getLogger(__name__) + + +def _check_rados_notify(ioctx: Any, obj: str) -> None: + try: + ioctx.notify(obj) + except TimedOut: + log.exception("Ganesha timed out") + + +class NFSRados: + def __init__(self, rados: 'Rados', namespace: str) -> None: + self.rados = rados + self.pool = POOL_NAME + self.namespace = namespace + + def _make_rados_url(self, obj: str) -> str: + return "rados://{}/{}/{}".format(self.pool, self.namespace, obj) + + def _create_url_block(self, obj_name: str) -> RawBlock: + return RawBlock('%url', values={'value': self._make_rados_url(obj_name)}) + + def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + if not config_obj: + # Return after creating empty common config object + return + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + + # Add created obj url to common config obj + ioctx.append(config_obj, format_block( + self._create_url_block(obj)).encode('utf-8')) + _check_rados_notify(ioctx, config_obj) + log.debug("Added %s url to %s", obj, config_obj) + + def read_obj(self, obj: str) -> Optional[str]: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + try: + return ioctx.read(obj, 1048576).decode() + except ObjectNotFound: + return None + + def update_obj(self, conf_block: str, obj: str, config_obj: str, + should_notify: Optional[bool] = True) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + if should_notify: + _check_rados_notify(ioctx, config_obj) + log.debug("Update export %s in %s", obj, config_obj) + + def remove_obj(self, obj: str, config_obj: str) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + export_urls = ioctx.read(config_obj) + url = '%url "{}"\n\n'.format(self._make_rados_url(obj)) + export_urls = export_urls.replace(url.encode('utf-8'), b'') + ioctx.remove_object(obj) + ioctx.write_full(config_obj, export_urls) + _check_rados_notify(ioctx, config_obj) + log.debug("Object deleted: %s", url) + + def remove_all_obj(self) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + obj.remove() + + def check_config(self, config: str = USER_CONF_PREFIX) -> bool: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(config): + return True + return False diff --git a/src/pybind/mgr/nfs/tests/test_nfs.py b/src/pybind/mgr/nfs/tests/test_nfs.py index cb9587e6ad1e..e3a3f7e6f4c1 100644 --- a/src/pybind/mgr/nfs/tests/test_nfs.py +++ b/src/pybind/mgr/nfs/tests/test_nfs.py @@ -11,9 +11,11 @@ from mgr_module import MgrModule, NFS_POOL_NAME from rados import ObjectNotFound from ceph.deployment.service_spec import NFSServiceSpec +from ceph.utils import with_units_to_int, bytes_to_human from nfs import Module from nfs.export import ExportMgr, normalize_path -from nfs.ganesha_conf import GaneshaConfParser, Export, RawBlock +from nfs.ganesha_conf import GaneshaConfParser, Export +from nfs.qos_conf import RawBlock, QOS, QOSType, QOSParams, QOS_REQ_PARAMS, QOSBandwidthControl from nfs.cluster import NFSCluster from orchestrator import ServiceDescription, DaemonDescription, OrchResult @@ -134,6 +136,57 @@ EXPORT { %url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"''' + qos_cluster_block = """ +QOS { + enable_qos = true; + enable_bw_control = true; + combined_rw_bw_control = false; + qos_type = 3; + max_export_write_bw = 1000000; + max_export_read_bw = 2000000; + max_client_write_bw = 3000000; + max_client_read_bw = 4000000; + max_export_combined_bw = 0; + max_client_combined_bw = 0; +} +""" + + qos_export_block = """ +QOS_BLOCK { + enable_qos = true; + enable_bw_control = true; + combined_rw_bw_control = false; + max_export_write_bw = 1000000; + max_export_read_bw = 2000000; + max_client_write_bw = 3000000; + max_client_read_bw = 4000000; + max_export_combined_bw = 0; + max_client_combined_bw = 0; + +} +""" + + qos_cluster_dict = { + "enable_bw_control": True, + "enable_qos": True, + "combined_rw_bw_control": False, + "max_client_read_bw": "4.0MB", + "max_client_write_bw": "3.0MB", + "max_export_read_bw": "2.0MB", + "max_export_write_bw": "1.0MB", + "qos_type": "PerShare_PerClient" + } + + qos_export_dict = { + "enable_bw_control": True, + "enable_qos": True, + "combined_rw_bw_control": False, + "max_client_read_bw": "4.0MB", + "max_client_write_bw": "3.0MB", + "max_export_read_bw": "2.0MB", + "max_export_write_bw": "1.0MB" + } + class RObject(object): def __init__(self, key: str, raw: str) -> None: self.key = key @@ -712,7 +765,11 @@ NFS_CORE_PARAM { assert export.clients[0].access_type is None assert export.cluster_id == self.cluster_id - # again, but without export_id + # again, but without export_id and qos_block + cluster = NFSCluster(nfs_mod) + bw_obj = QOSBandwidthControl(True, export_writebw='100MB', export_readbw='200MB') + cluster.enable_cluster_qos_bw(self.cluster_id, QOSType['PerShare'], bw_obj) + r = conf.apply_export(self.cluster_id, json.dumps({ 'path': 'newestbucket', 'pseudo': '/rgw/bucket', @@ -732,6 +789,13 @@ NFS_CORE_PARAM { 'user_id': 'nfs.foo.newestbucket', 'access_key_id': 'the_access_key', 'secret_access_key': 'the_secret_key', + }, + 'qos_block': { + 'combined_rw_bw_control': False, + 'enable_bw_control': True, + 'enable_qos': True, + 'max_export_read_bw': '3000000', + 'max_export_write_bw': '2000000' } })) assert len(r.changes) == 1 @@ -751,6 +815,11 @@ NFS_CORE_PARAM { assert export.clients[0].squash is None assert export.clients[0].access_type is None assert export.cluster_id == self.cluster_id + assert export.qos_block.enable_qos == True + assert export.qos_block.bw_obj.enable_bw_ctrl == True + assert export.qos_block.bw_obj.combined_bw_ctrl == False + assert export.qos_block.bw_obj.export_writebw == 2000000 + assert export.qos_block.bw_obj.export_readbw == 3000000 def test_update_export_sectype(self): self._do_mock_test(self._test_update_export_sectype) @@ -1354,6 +1423,132 @@ EXPORT { def test_cluster_config(self): self._do_mock_test(self._do_test_cluster_config) + def test_qos_from_dict(self): + qos = QOS.from_dict(self.qos_cluster_dict, True) + assert qos.enable_qos == True + assert qos.bw_obj.enable_bw_ctrl == True + assert isinstance(qos.qos_type, QOSType) + assert qos.bw_obj.export_writebw == 1000000 + assert qos.bw_obj.export_readbw == 2000000 + assert qos.bw_obj.client_writebw == 3000000 + assert qos.bw_obj.client_readbw == 4000000 + + qos = QOS.from_dict(self.qos_export_dict) + assert qos.enable_qos == True + assert qos.bw_obj.enable_bw_ctrl == True + assert qos.qos_type is None + assert qos.bw_obj.export_writebw == 1000000 + assert qos.bw_obj.export_readbw == 2000000 + assert qos.bw_obj.client_writebw == 3000000 + assert qos.bw_obj.client_readbw == 4000000 + + @pytest.mark.parametrize("qos_block, qos_dict", [ + (qos_cluster_block, qos_cluster_dict), + (qos_export_block, qos_export_dict) + ]) + def test_qos_from_block(self, qos_block, qos_dict): + blocks = GaneshaConfParser(qos_block).parse() + assert isinstance(blocks, list) + assert len(blocks) == 1 + qos = QOS.from_qos_block(blocks[0], True) + assert qos.to_dict() == qos_dict + + def _do_test_cluster_qos(self, qos_type, combined_bw_ctrl, params, positive_tc): + nfs_mod = Module('nfs', '', '') + cluster = NFSCluster(nfs_mod) + try: + bw_obj = QOSBandwidthControl(True, combined_bw_ctrl, **params) + cluster.enable_cluster_qos_bw(self.cluster_id, qos_type, bw_obj) + except Exception: + if not positive_tc: + return + out = cluster.get_cluster_qos(self.cluster_id) + expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": combined_bw_ctrl, "qos_type": qos_type.name} + for key in params: + expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(params[key])) + assert out == expected_out + cluster.disable_cluster_qos_bw(self.cluster_id) + out = cluster.get_cluster_qos(self.cluster_id) + assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False} + + @pytest.mark.parametrize("qos_type, combined_bw_ctrl, params, positive_tc", [ + (QOSType['PerShare'], False, {'export_writebw': '100MB', 'export_readbw': '200MB'}, True), + (QOSType['PerClient'], False, {'client_writebw': '300MB', 'client_readbw': '400MB'}, True), + (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}, True), + (QOSType['PerShare'], True, {'export_rw_bw': '100MB'}, True), + (QOSType['PerClient'], True, {'client_rw_bw': '200MB'}, True), + (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'}, True), + # negative testing + (QOSType['PerShare'], False, {'export_writebw': '100MB', 'client_readbw': '200MB'}, False), + (QOSType['PerShare'], False, {'export_writebw': '100MB'}, False), + (QOSType['PerClient'], False, {'client_writebw': '300MB'}, False), + (QOSType['PerClient'], False, {'client_writebw': '300MB', 'export_readbw': '400MB'}, False), + (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB'}, False), + (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB'}, False), + (QOSType['PerShare'], True, {'client_rw_bw': '100MB'}, False), + (QOSType['PerShare'], True, {}, False), + (QOSType['PerClient'], True, {'client_rw_bw': '200MB', 'export_rw_bw': '100MB'}, False), + (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB'}, False) + ]) + def test_cluster_qos(self, qos_type, combined_bw_ctrl, params, positive_tc): + self._do_mock_test(self._do_test_cluster_qos, qos_type, combined_bw_ctrl, params, positive_tc) + + def _do_test_export_qos(self, qos_type, clust_combined_bw_ctrl, clust_params, export_combined_bw_ctrl, export_params): + nfs_mod = Module('nfs', '', '') + cluster = NFSCluster(nfs_mod) + export_mgr = ExportMgr(nfs_mod) + # try enabling export level qos before enabling cluster level qos + try: + bw_obj = QOSBandwidthControl(True, export_combined_bw_ctrl, **export_params) + export_mgr.enable_export_qos_bw(self.cluster_id, '/cephfs_a/', bw_obj) + except Exception as e: + assert str(e) == 'To configure bandwidth control for export, you must first enable bandwidth control at the cluster level.' + bw_obj = QOSBandwidthControl(True, clust_combined_bw_ctrl, **clust_params) + cluster.enable_cluster_qos_bw(self.cluster_id, qos_type, bw_obj) + + # set export qos + try: + bw_obj = QOSBandwidthControl(True, export_combined_bw_ctrl, **export_params) + export_mgr.enable_export_qos_bw(self.cluster_id, '/cephfs_a/', bw_obj) + except Exception: + if export_combined_bw_ctrl: + req = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name] + else: + req = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name] + if sorted(export_params.keys()) != sorted(req): + return + if qos_type.name == 'PerClient': + return + out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/') + expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": export_combined_bw_ctrl} + for key in export_params: + expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_params[key])) + assert out == expected_out + export_mgr.disable_export_qos_bw(self.cluster_id, '/cephfs_a/') + out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/') + assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False} + + + @pytest.mark.parametrize("qos_type, clust_combined_bw_ctrl, clust_params", [ + (QOSType['PerShare'], False, {'export_writebw': '100MB', 'export_readbw': '200MB'}), + (QOSType['PerClient'], False, {'client_writebw': '300MB', 'client_readbw': '400MB'}), + (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}), + (QOSType['PerShare'], True, {'export_rw_bw': '100MB'}), + (QOSType['PerClient'], True, {'client_rw_bw': '200MB'}), + (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'}) + ]) + @pytest.mark.parametrize("export_combined_bw_ctrl, export_params", [ + (False, {'export_writebw': '100MB', 'export_readbw': '200MB'}), + (False, {'client_writebw': '300MB', 'client_readbw': '400MB'}), + (False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}), + (True, {'export_rw_bw': '100MB'}), + (True, {'client_rw_bw': '200MB'}), + (True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'}) + ]) + def test_export_qos(self, qos_type, clust_combined_bw_ctrl, clust_params, + export_combined_bw_ctrl, export_params): + self._do_mock_test(self._do_test_export_qos, qos_type, clust_combined_bw_ctrl, + clust_params, export_combined_bw_ctrl, export_params) @pytest.mark.parametrize( "path,expected", diff --git a/src/pybind/mgr/nfs/utils.py b/src/pybind/mgr/nfs/utils.py index fe928007a6d9..96fe4eb19f35 100644 --- a/src/pybind/mgr/nfs/utils.py +++ b/src/pybind/mgr/nfs/utils.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: EXPORT_PREFIX: str = "export-" CONF_PREFIX: str = "conf-nfs." USER_CONF_PREFIX: str = "userconf-nfs." +QOS_CONF_PREFIX: str = "qosconf-nfs." log = logging.getLogger(__name__) @@ -57,10 +58,15 @@ def conf_obj_name(cluster_id: str) -> str: def user_conf_obj_name(cluster_id: str) -> str: - """Returna a rados object name for the user config.""" + """Return a rados object name for the user config.""" return f"{USER_CONF_PREFIX}{cluster_id}" +def qos_conf_obj_name(cluster_id: str) -> str: + """Return a rados object name for the qos config.""" + return f"{QOS_CONF_PREFIX}{cluster_id}" + + def available_clusters(mgr: 'Module') -> List[str]: ''' This method returns list of available cluster ids. diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py index 4bd3c2c329f1..303bf55ee371 100644 --- a/src/python-common/ceph/utils.py +++ b/src/python-common/ceph/utils.py @@ -215,3 +215,51 @@ def size_to_bytes(v: Union[str, int]) -> int: }[unit] return num * mult raise ValueError(f'invalid size type {type(v)} (expected int or str)') + + +def bytes_to_human(num: float, mode: str = 'decimal') -> str: + """Convert a bytes value into it's human-readable form. + + :param num: number, in bytes, to convert + :param mode: Either decimal (default) or binary to determine divisor + :returns: string representing the bytes value in a more readable format + """ + unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB'] + divisor = 1000.0 + yotta = 'YB' + + if mode == 'binary': + unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB'] + divisor = 1024.0 + yotta = 'YiB' + + for unit in unit_list: + if abs(num) < divisor: + return '%3.1f%s' % (num, unit) + num /= divisor + return '%.1f%s' % (num, yotta) + + +def with_units_to_int(v: str) -> int: + if not v: + return 0 + if v.endswith('iB'): + v = v[:-2] + bytes_mult = 1024 + elif v.endswith('B'): + v = v[:-1] + bytes_mult = 1000 + mult = 1 + if v[-1].upper() == 'K': + mult = bytes_mult + v = v[:-1] + elif v[-1].upper() == 'M': + mult = bytes_mult * bytes_mult + v = v[:-1] + elif v[-1].upper() == 'G': + mult = bytes_mult * bytes_mult * bytes_mult + v = v[:-1] + elif v[-1].upper() == 'T': + mult = bytes_mult * bytes_mult * bytes_mult * bytes_mult + v = v[:-1] + return int(float(v) * mult)