]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/nfs: NFS commands to enable, disable and get QOS config for cluster and export
authorShweta Bhosale <Shweta.Bhosale1@ibm.com>
Thu, 9 Jan 2025 13:53:37 +0000 (19:23 +0530)
committerShweta Bhosale <Shweta.Bhosale1@ibm.com>
Mon, 27 Apr 2026 12:49:14 +0000 (18:19 +0530)
fixes: https://tracker.ceph.com/issues/69458
Signed-off-by: Shweta Bhosale <Shweta.Bhosale1@ibm.com>
14 files changed:
src/cephadm/cephadmlib/container_engines.py
src/cephadm/cephadmlib/data_utils.py
src/cephadm/cephadmlib/host_facts.py
src/cephadm/tests/test_agent.py
src/pybind/mgr/nfs/cluster.py
src/pybind/mgr/nfs/export.py
src/pybind/mgr/nfs/export_utils.py [new file with mode: 0644]
src/pybind/mgr/nfs/ganesha_conf.py
src/pybind/mgr/nfs/module.py
src/pybind/mgr/nfs/qos_conf.py [new file with mode: 0644]
src/pybind/mgr/nfs/rados_utils.py [new file with mode: 0644]
src/pybind/mgr/nfs/tests/test_nfs.py
src/pybind/mgr/nfs/utils.py
src/python-common/ceph/utils.py

index e46661c1b1b12cabea7dab9650a8a61f3b6da26f..aca74bc1086ad08350930c6259d2b05caf0b4a33 100644 (file)
@@ -14,7 +14,7 @@ from .constants import (
     MIN_PODMAN_VERSION,
     PIDS_LIMIT_UNLIMITED_PODMAN_VERSION,
 )
-from .data_utils import with_units_to_int
+from ceph.utils import with_units_to_int
 from .exceptions import Error
 
 
index 9920c399913038f40ed8e538c57d8a5f9875d56b..5b3e9cec43a72d6f756fb6fa3d48b02bad2738bf 100644 (file)
@@ -56,51 +56,6 @@ def dict_get_join(d: Dict[str, Any], key: str) -> Any:
     return value
 
 
-def bytes_to_human(num, mode='decimal'):
-    # type: (float, str) -> str
-    """Convert a bytes value into it's human-readable form.
-
-    :param num: number, in bytes, to convert
-    :param mode: Either decimal (default) or binary to determine divisor
-    :returns: string representing the bytes value in a more readable format
-    """
-    unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
-    divisor = 1000.0
-    yotta = 'YB'
-
-    if mode == 'binary':
-        unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
-        divisor = 1024.0
-        yotta = 'YiB'
-
-    for unit in unit_list:
-        if abs(num) < divisor:
-            return '%3.1f%s' % (num, unit)
-        num /= divisor
-    return '%.1f%s' % (num, yotta)
-
-
-def with_units_to_int(v: str) -> int:
-    if v.endswith('iB'):
-        v = v[:-2]
-    elif v.endswith('B'):
-        v = v[:-1]
-    mult = 1
-    if v[-1].upper() == 'K':
-        mult = 1024
-        v = v[:-1]
-    elif v[-1].upper() == 'M':
-        mult = 1024 * 1024
-        v = v[:-1]
-    elif v[-1].upper() == 'G':
-        mult = 1024 * 1024 * 1024
-        v = v[:-1]
-    elif v[-1].upper() == 'T':
-        mult = 1024 * 1024 * 1024 * 1024
-        v = v[:-1]
-    return int(float(v) * mult)
-
-
 def read_config(fn):
     # type: (Optional[str]) -> ConfigParser
     cp = ConfigParser()
index ba05b5434c59b51fb7d819c78650370693f136a2..ec85430d2aa4b91bb1a57d513f0d095c465a239c 100644 (file)
@@ -16,7 +16,7 @@ from typing import Any, cast, Dict, List, Optional, Set, Union
 
 from cephadmlib.call_wrappers import call, call_throws, CallVerbosity
 from cephadmlib.context import CephadmContext
-from cephadmlib.data_utils import bytes_to_human
+from ceph.utils import bytes_to_human
 from cephadmlib.exe_utils import find_executable
 from cephadmlib.file_utils import read_file
 from cephadmlib.net_utils import get_fqdn, get_ipv4_address, get_ipv6_address
index 801f62c92167b6f5cd2b1549d3f24e1fd4d513a5..c2a7efdc7d900b9e07975c7d77a83b7d7dec8fd8 100644 (file)
@@ -254,8 +254,8 @@ def test_agent_daemon_ls_subset(cephadm_fs, funkypatch):
         assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid
         assert daemons['crash.host1']['container_id'] == crash_cid
 
-        assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086  # 456.4 MB
-        assert daemons['crash.host1']['memory_usage'] == 7426015  # 7.082 MB
+        assert daemons['mgr.host1.pntmho']['memory_usage'] == 456400000  # 456.4 MB
+        assert daemons['crash.host1']['memory_usage'] == 7082000  # 7.082 MB
 
 
 @mock.patch("cephadm.list_daemons")
index 1b3ce8213f558d4787fde4d758dfafb2f58d4a0f..97138792ac35eeef3e7fb7c70bba2da44a9b495c 100644 (file)
@@ -18,8 +18,15 @@ from .utils import (
     available_clusters,
     conf_obj_name,
     restart_nfs_service,
-    user_conf_obj_name)
-from .export import NFSRados
+    user_conf_obj_name,
+    USER_CONF_PREFIX,
+    qos_conf_obj_name)
+from .rados_utils import NFSRados
+from .ganesha_conf import format_block, GaneshaConfParser
+from .qos_conf import (
+    QOS,
+    QOSType,
+    QOSBandwidthControl)
 
 if TYPE_CHECKING:
     from nfs.module import Module
@@ -324,7 +331,7 @@ class NFSCluster:
         try:
             if cluster_id in available_clusters(self.mgr):
                 rados_obj = self._rados(cluster_id)
-                if rados_obj.check_user_config():
+                if rados_obj.check_config(USER_CONF_PREFIX):
                     raise NonFatalError("NFS-Ganesha User Config already exists")
                 rados_obj.write_obj(nfs_config, user_conf_obj_name(cluster_id),
                                     conf_obj_name(cluster_id))
@@ -342,7 +349,7 @@ class NFSCluster:
         try:
             if cluster_id in available_clusters(self.mgr):
                 rados_obj = self._rados(cluster_id)
-                if not rados_obj.check_user_config():
+                if not rados_obj.check_config(USER_CONF_PREFIX):
                     raise NonFatalError("NFS-Ganesha User Config does not exist")
                 rados_obj.remove_obj(user_conf_obj_name(cluster_id),
                                      conf_obj_name(cluster_id))
@@ -358,3 +365,100 @@ class NFSCluster:
     def _rados(self, cluster_id: str) -> NFSRados:
         """Return a new NFSRados object for the given cluster id."""
         return NFSRados(self.mgr.rados, cluster_id)
+
+    def get_cluster_qos_config(self, cluster_id: str) -> Optional[QOS]:
+        """Return QOS object for the given cluster id."""
+        rados_obj = self._rados(cluster_id)
+        conf = rados_obj.read_obj(qos_conf_obj_name(cluster_id))
+        if conf:
+            qos_block = GaneshaConfParser(conf).parse()
+            qos_obj = QOS.from_qos_block(qos_block[0], True)
+            return qos_obj
+        return None
+
+    def update_cluster_qos_bw(self,
+                              cluster_id: str,
+                              enable_qos: bool,
+                              bw_obj: QOSBandwidthControl,
+                              qos_type: Optional[QOSType] = None) -> None:
+        """Update cluster QOS config"""
+        qos_obj_exists = False
+        qos_obj = self.get_cluster_qos_config(cluster_id)
+        if not qos_obj:
+            log.debug(f"Creating new QOS block for cluster {cluster_id}")
+            qos_obj = QOS(True, enable_qos, qos_type, bw_obj)
+        else:
+            log.debug(f"Updating existing QOS block for cluster {cluster_id}")
+            qos_obj_exists = True
+            qos_obj.enable_qos = enable_qos
+            qos_obj.qos_type = qos_type
+            qos_obj.bw_obj = bw_obj
+
+        qos_config = format_block(qos_obj.to_qos_block())
+        rados_obj = self._rados(cluster_id)
+        if not qos_obj_exists:
+            rados_obj.write_obj(qos_config, qos_conf_obj_name(cluster_id),
+                                conf_obj_name(cluster_id))
+        else:
+            rados_obj.update_obj(qos_config, qos_conf_obj_name(cluster_id),
+                                 conf_obj_name(cluster_id), should_notify=False)
+        log.debug(f"Successfully saved {cluster_id}s QOS bandwidth control config: \n {qos_config}")
+
+    def enable_cluster_qos_bw(self,
+                              cluster_id: str,
+                              qos_type: QOSType,
+                              bw_obj: QOSBandwidthControl
+                              ) -> None:
+        """
+        There are 2 cases to consider:
+        1. If combined bandwith control is disabled
+            a. If qos_type is pershare, then export_writebw and export_readbw parameters are compulsory
+            b. If qos_type is perclient, then client_writebw and client_readbw parameters are compulsory
+            c. If qos_type is pershare_perclient then export_writebw, export_readbw, client_writebw and
+               client_readbw are compulsory parameters
+        2. If combined bandwidth control is enabled
+            a. If qos_type is pershare, then export_rw_bw parameter is compulsory
+            b. If qos_type is perclient, then client_rw_bw parameter is compulsory
+            c. If qos_type is pershare_perclient, then export_rw_bw and client_rw_bw parameters are compulsory
+        """
+        try:
+            bw_obj.qos_bandwidth_checks(qos_type)
+            if cluster_id in available_clusters(self.mgr):
+                self.update_cluster_qos_bw(cluster_id, True, bw_obj, qos_type)
+                restart_nfs_service(self.mgr, cluster_id)
+                log.info(f"QOS bandwidth control has been successfully enabled for cluster {cluster_id}. "
+                         "If the qos_type is changed during this process, ensure that the bandwidth "
+                         "values for all exports are updated accordingly.")
+                return
+            raise ClusterNotFound()
+        except NotImplementedError:
+            raise ManualRestartRequired(f"NFS-Ganesha QOS bandwidth control config added Successfully for {cluster_id}")
+        except Exception as e:
+            log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
+            raise ErrorResponse.wrap(e)
+
+    def get_cluster_qos(self, cluster_id: str) -> Dict[str, Any]:
+        try:
+            if cluster_id in available_clusters(self.mgr):
+                qos_obj = self.get_cluster_qos_config(cluster_id)
+                return qos_obj.to_dict() if qos_obj else {}
+            raise ClusterNotFound()
+        except Exception as e:
+            log.exception(f"Fetching NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
+            raise ErrorResponse.wrap(e)
+
+    def disable_cluster_qos_bw(self, cluster_id: str) -> None:
+        try:
+            if cluster_id in available_clusters(self.mgr):
+                self.update_cluster_qos_bw(cluster_id, False, QOSBandwidthControl())
+                restart_nfs_service(self.mgr, cluster_id)
+                log.info("Cluster-level QoS bandwidth control has been successfully disabled for "
+                         f"cluster {cluster_id}. As a result, export-level bandwidth control will "
+                         "no longer have any effect, even if it's enabled.")
+                return
+            raise ClusterNotFound()
+        except NotImplementedError:
+            raise ManualRestartRequired(f"NFS-Ganesha QOS bandwidth control config added successfully for {cluster_id}")
+        except Exception as e:
+            log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
+            raise ErrorResponse.wrap(e)
index f3fe29405aab38770035aa5ff0e1ff157ac6a9c4..724ea969d91ec987543e55826e0199dae9e45657 100644 (file)
@@ -17,7 +17,7 @@ from ceph.fs.earmarking import EarmarkTopScope
 import cephfs
 
 from mgr_util import CephFSEarmarkResolver
-from rados import TimedOut, ObjectNotFound, Rados
+from rados import TimedOut, ObjectNotFound
 
 from object_format import ErrorResponse
 from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
@@ -27,13 +27,13 @@ from .ganesha_conf import (
     Export,
     GaneshaConfParser,
     RGWFSAL,
-    RawBlock,
     format_block)
+from .qos_conf import QOS, QOSBandwidthControl
+from .export_utils import export_qos_bw_checks, export_dict_qos_checks
 from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
 from .utils import (
     EXPORT_PREFIX,
     NonFatalError,
-    USER_CONF_PREFIX,
     export_obj_name,
     conf_obj_name,
     available_clusters,
@@ -41,6 +41,7 @@ from .utils import (
     get_nfs_spec_for_cluster,
     restart_nfs_service,
     cephfs_path_is_dir)
+from .rados_utils import NFSRados
 
 if TYPE_CHECKING:
     from nfs.module import Module
@@ -90,79 +91,6 @@ def _validate_cmount_path(cmount_path: str, path: str) -> None:
         )
 
 
-class NFSRados:
-    def __init__(self, rados: 'Rados', namespace: str) -> None:
-        self.rados = rados
-        self.pool = POOL_NAME
-        self.namespace = namespace
-
-    def _make_rados_url(self, obj: str) -> str:
-        return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
-
-    def _create_url_block(self, obj_name: str) -> RawBlock:
-        return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
-
-    def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
-        with self.rados.open_ioctx(self.pool) as ioctx:
-            ioctx.set_namespace(self.namespace)
-            ioctx.write_full(obj, conf_block.encode('utf-8'))
-            if not config_obj:
-                # Return after creating empty common config object
-                return
-            log.debug("write configuration into rados object %s/%s/%s",
-                      self.pool, self.namespace, obj)
-
-            # Add created obj url to common config obj
-            ioctx.append(config_obj, format_block(
-                         self._create_url_block(obj)).encode('utf-8'))
-            _check_rados_notify(ioctx, config_obj)
-            log.debug("Added %s url to %s", obj, config_obj)
-
-    def read_obj(self, obj: str) -> Optional[str]:
-        with self.rados.open_ioctx(self.pool) as ioctx:
-            ioctx.set_namespace(self.namespace)
-            try:
-                return ioctx.read(obj, 1048576).decode()
-            except ObjectNotFound:
-                return None
-
-    def update_obj(self, conf_block: str, obj: str, config_obj: str,
-                   should_notify: Optional[bool] = True) -> None:
-        with self.rados.open_ioctx(self.pool) as ioctx:
-            ioctx.set_namespace(self.namespace)
-            ioctx.write_full(obj, conf_block.encode('utf-8'))
-            log.debug("write configuration into rados object %s/%s/%s",
-                      self.pool, self.namespace, obj)
-            if should_notify:
-                _check_rados_notify(ioctx, config_obj)
-            log.debug("Update export %s in %s", obj, config_obj)
-
-    def remove_obj(self, obj: str, config_obj: str) -> None:
-        with self.rados.open_ioctx(self.pool) as ioctx:
-            ioctx.set_namespace(self.namespace)
-            export_urls = ioctx.read(config_obj)
-            url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
-            export_urls = export_urls.replace(url.encode('utf-8'), b'')
-            ioctx.remove_object(obj)
-            ioctx.write_full(config_obj, export_urls)
-            _check_rados_notify(ioctx, config_obj)
-            log.debug("Object deleted: %s", url)
-
-    def remove_all_obj(self) -> None:
-        with self.rados.open_ioctx(self.pool) as ioctx:
-            ioctx.set_namespace(self.namespace)
-            for obj in ioctx.list_objects():
-                obj.remove()
-
-    def check_user_config(self) -> bool:
-        with self.rados.open_ioctx(self.pool) as ioctx:
-            ioctx.set_namespace(self.namespace)
-            for obj in ioctx.list_objects():
-                if obj.key.startswith(USER_CONF_PREFIX):
-                    return True
-        return False
-
-
 class AppliedExportResults:
     """Gathers the results of multiple changed exports.
     Returned by apply_export.
@@ -930,6 +858,10 @@ class ExportMgr:
             elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
                 raise NFSInvalidOperation('secret_access_key change is not allowed')
 
+        # check QOS
+        if new_export_dict.get('qos_block'):
+            export_dict_qos_checks(cluster_id, self.mgr, dict(new_export_dict.get('qos_block', {})))
+
         self.exports[cluster_id].remove(old_export)
 
         self._update_export(cluster_id, new_export, need_nfs_service_restart)
@@ -948,6 +880,74 @@ class ExportMgr:
                 exports_count += 1
         return exports_count
 
+    def update_export_qos_bw(self,
+                             cluster_id: str,
+                             pseudo_path: str,
+                             enable_qos: bool,
+                             bw_obj: QOSBandwidthControl) -> None:
+        """Update Export QOS block"""
+        export = self._fetch_export(cluster_id, pseudo_path)
+        if not export:
+            raise NFSObjectNotFound(f"Export {pseudo_path} not found in NFS cluster {cluster_id}")
+        # if qos_block does not exists in export create one else update existing block
+        if not export.qos_block:
+            log.debug(f"Creating new QOS block for export {pseudo_path} of cluster {cluster_id}")
+            export.qos_block = QOS(enable_qos=enable_qos, bw_obj=bw_obj)
+        else:
+            log.debug(f"Updating existing QOS block of export {pseudo_path} of cluster {cluster_id}")
+            export.qos_block.enable_qos = enable_qos
+            export.qos_block.bw_obj = bw_obj
+
+        self.exports[cluster_id].remove(export)
+        self._update_export(cluster_id, export, False)
+        log.debug(f"Successfully updated QOS bandwidth control config for export {pseudo_path} of cluster {cluster_id}")
+
+    def enable_export_qos_bw(self,
+                             cluster_id: str,
+                             pseudo_path: str,
+                             bw_obj: QOSBandwidthControl
+                             ) -> None:
+        """
+        There are 2 cases to consider, based on QOS type set on cluster level
+        1. If combined bandwith control is disabled
+            a. If qos_type is pershare, then export_writebw and export_readbw parameters are compulsory
+            b. If qos_type is perclient, then can't enable export level qos
+            c. If qos_type is pershare_perclient then export_writebw, export_readbw, client_writebw and
+               client_readbw are compulsory parameters
+        2. If combined bandwidth control is enabled
+            a. If qos_type is pershare, then export_rw_bw parameter is compulsory
+            b. If qos_type is perclient, then can't enable export level qos
+            c. If qos_type is pershare_perclient, then export_rw_bw and client_rw_bw parameters are compulsory
+        """
+        try:
+            self._validate_cluster_id(cluster_id)
+            assert pseudo_path
+            export_qos_bw_checks(cluster_id, self.mgr, bw_obj=bw_obj)
+            self.update_export_qos_bw(cluster_id, pseudo_path, True, bw_obj)
+        except Exception as e:
+            log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {pseudo_path} of {cluster_id}")
+            raise ErrorResponse.wrap(e)
+
+    def get_export_qos(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]:
+        try:
+            self._validate_cluster_id(cluster_id)
+            export = self._fetch_export(cluster_id, pseudo_path)
+            if not export:
+                raise NFSObjectNotFound(f"Export {pseudo_path} not found in NFS cluster {cluster_id}")
+            return export.qos_block.to_dict() if export.qos_block else {}
+        except Exception as e:
+            log.exception(f"Failed to get QOS configuration for {pseudo_path} of {cluster_id}")
+            raise ErrorResponse.wrap(e)
+
+    def disable_export_qos_bw(self, cluster_id: str, pseudo_path: str) -> None:
+        try:
+            self._validate_cluster_id(cluster_id)
+            assert pseudo_path
+            self.update_export_qos_bw(cluster_id, pseudo_path, False, QOSBandwidthControl())
+        except Exception as e:
+            log.exception(f"Setting NFS-Ganesha QOS bandwidth control Config failed for {pseudo_path} of {cluster_id}")
+            raise ErrorResponse.wrap(e)
+
 
 def get_user_id(cluster_id: str, fs_name: str, cmount_path: str) -> str:
     """
diff --git a/src/pybind/mgr/nfs/export_utils.py b/src/pybind/mgr/nfs/export_utils.py
new file mode 100644 (file)
index 0000000..86f560b
--- /dev/null
@@ -0,0 +1,67 @@
+from typing import Any
+
+from .cluster import NFSCluster
+from .qos_conf import QOSType, QOSParams, QOSBandwidthControl
+
+
+def export_dict_bw_checks(cluster_id: str,
+                          mgr_obj: Any,
+                          qos_enable: bool,
+                          qos_dict: dict) -> None:
+    enable_bw_ctrl = qos_dict.get('enable_bw_control')
+    combined_bw_ctrl = qos_dict.get('combined_rw_bw_control')
+    bandwith_param_exists = any(key.endswith('bw') for key in qos_dict)
+    if enable_bw_ctrl is None:
+        if combined_bw_ctrl and bandwith_param_exists:
+            raise Exception('Bandwidth control is not enabled but associated parameters exists')
+        return
+    if combined_bw_ctrl is None:
+        combined_bw_ctrl = False
+    if not qos_enable and enable_bw_ctrl:
+        raise Exception('To enable bandwidth control, qos_enable should be true.')
+    if not (isinstance(enable_bw_ctrl, bool) and isinstance(combined_bw_ctrl, bool)):
+        raise Exception('Invalid values for the enable_bw_ctrl and combined_bw_ctrl parameters.')
+    # if qos is disabled, then bandwidths should not be set and no need to bandwidth checks
+    if not enable_bw_ctrl:
+        if bandwith_param_exists:
+            raise Exception('Bandwidths should not be passed when enable_bw_control is false.')
+        return
+    if enable_bw_ctrl and not bandwith_param_exists:
+        raise Exception('Bandwidths should be set when enable_bw_control is true.')
+    bw_obj = QOSBandwidthControl(enable_bw_ctrl,
+                                 combined_bw_ctrl,
+                                 export_writebw=qos_dict.get(QOSParams.export_writebw.value, '0'),
+                                 export_readbw=qos_dict.get(QOSParams.export_readbw.value, '0'),
+                                 client_writebw=qos_dict.get(QOSParams.client_writebw.value, '0'),
+                                 client_readbw=qos_dict.get(QOSParams.client_readbw.value, '0'),
+                                 export_rw_bw=qos_dict.get(QOSParams.export_rw_bw.value, '0'),
+                                 client_rw_bw=qos_dict.get(QOSParams.client_rw_bw.value, '0'))
+    export_qos_bw_checks(cluster_id, mgr_obj, bw_obj)
+
+
+def export_dict_qos_checks(cluster_id: str,
+                           mgr_obj: Any,
+                           qos_dict: dict) -> None:
+    """Validate the qos block of dict passed to apply_export method"""
+    qos_enable = qos_dict.get('enable_qos')
+    if qos_enable is None:
+        raise Exception('The QOS block requires at least the enable_qos parameter')
+    if not isinstance(qos_enable, bool):
+        raise Exception('Invalid value for the enable_qos parameter')
+    export_dict_bw_checks(cluster_id, mgr_obj, qos_enable, qos_dict)
+
+
+def export_qos_bw_checks(cluster_id: str,
+                         mgr_obj: Any,
+                         bw_obj: QOSBandwidthControl,
+                         nfs_clust_obj: Any = None) -> None:
+    """check cluster level qos is enabled to enable export level qos and validate bandwidths"""
+    if not nfs_clust_obj:
+        nfs_clust_obj = NFSCluster(mgr_obj)
+    clust_qos_obj = nfs_clust_obj.get_cluster_qos_config(cluster_id)
+    if not clust_qos_obj or (clust_qos_obj and not (clust_qos_obj.enable_qos)):
+        raise Exception('To configure bandwidth control for export, you must first enable bandwidth control at the cluster level.')
+    if clust_qos_obj.qos_type:
+        if clust_qos_obj.qos_type == QOSType.PerClient:
+            raise Exception('Export-level QoS bandwidth control cannot be enabled if the QoS type at the cluster level is set to PerClient.')
+        bw_obj.qos_bandwidth_checks(clust_qos_obj.qos_type)
index 759b4dcdc98d7fa28198db2b6d1f0b6ef87b750f..3945aa95574014b074c3c1b28521e373193917e2 100644 (file)
@@ -5,6 +5,7 @@ from mgr_module import NFS_GANESHA_SUPPORTED_FSALS
 
 from .exception import NFSInvalidOperation, FSNotFound
 from .utils import check_fs
+from .qos_conf import QOS, RawBlock
 
 if TYPE_CHECKING:
     from nfs.module import Module
@@ -60,27 +61,6 @@ def _validate_xprtsec_type(xprtsec: str) -> None:
             f"XprtSec {xprtsec} invalid, valid types are {valid_xprtsec_types}")
 
 
-class RawBlock():
-    def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
-        if not values:  # workaround mutable default argument
-            values = {}
-        if not blocks:  # workaround mutable default argument
-            blocks = []
-        self.block_name = block_name
-        self.blocks = blocks
-        self.values = values
-
-    def __eq__(self, other: Any) -> bool:
-        if not isinstance(other, RawBlock):
-            return False
-        return self.block_name == other.block_name and \
-            self.blocks == other.blocks and \
-            self.values == other.values
-
-    def __repr__(self) -> str:
-        return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
-
-
 class GaneshaConfParser:
     def __init__(self, raw_config: str):
         self.pos = 0
@@ -383,7 +363,8 @@ class Export:
             fsal: FSAL,
             clients: Optional[List[Client]] = None,
             sectype: Optional[List[str]] = None,
-            xprtsec: Optional[str] = None) -> None:
+            xprtsec: Optional[str] = None,
+            qos_block: Optional[QOS] = None) -> None:
         self.export_id = export_id
         self.path = path
         self.fsal = fsal
@@ -398,6 +379,7 @@ class Export:
         self.clients: List[Client] = clients or []
         self.sectype = sectype
         self.xprtsec = xprtsec
+        self.qos_block = qos_block
 
     @classmethod
     def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export':
@@ -407,6 +389,10 @@ class Export:
         client_blocks = [b for b in export_block.blocks
                          if b.block_name == "CLIENT"]
 
+        qos_block = [b for b in export_block.blocks
+                     if b.block_name == "qos_block"]
+        qos_block = QOS.from_qos_block(qos_block[0]) if qos_block else None
+
         protocols = export_block.values.get('protocols')
         if not isinstance(protocols, list):
             protocols = [protocols]
@@ -443,7 +429,9 @@ class Export:
                    [Client.from_client_block(client)
                     for client in client_blocks],
                    sectype=sectype,
-                   xprtsec=xprtsec)
+                   xprtsec=xprtsec,
+                   qos_block=qos_block
+                   )
 
     def to_export_block(self) -> RawBlock:
         values = {
@@ -468,10 +456,16 @@ class Export:
             client.to_client_block()
             for client in self.clients
         ]
+        if self.qos_block:
+            result.blocks.append(self.qos_block.to_qos_block())
         return result
 
     @classmethod
     def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export':
+        if ex_dict.get('qos_block'):
+            qos_block = QOS.from_dict(ex_dict.get('qos_block', {}))
+        else:
+            qos_block = None
         return cls(export_id,
                    ex_dict.get('path', '/'),
                    ex_dict['cluster_id'],
@@ -484,7 +478,9 @@ class Export:
                    FSAL.from_dict(ex_dict.get('fsal', {})),
                    [Client.from_dict(client) for client in ex_dict.get('clients', [])],
                    sectype=ex_dict.get("sectype"),
-                   xprtsec=ex_dict.get('XprtSec'))
+                   xprtsec=ex_dict.get('XprtSec'),
+                   qos_block=qos_block
+                   )
 
     def to_dict(self) -> Dict[str, Any]:
         values = {
@@ -504,6 +500,8 @@ class Export:
             values['sectype'] = self.sectype
         if self.xprtsec:
             values['XprtSec'] = self.xprtsec
+        if self.qos_block:
+            values['qos_block'] = self.qos_block.to_dict()
         return values
 
     def validate(self, mgr: 'Module') -> None:
index 762a2fab9c5ab460df06f083ca35323860723161..747c198b4d6a99a83b6bac44b19d5f402f6f0081 100644 (file)
@@ -14,6 +14,7 @@ from mgr_util import CephFSEarmarkResolver
 from .export import ExportMgr, AppliedExportResults
 from .cluster import NFSCluster
 from .utils import available_clusters
+from .qos_conf import QOSType, QOSBandwidthControl, UserQoSType
 
 log = logging.getLogger(__name__)
 
@@ -243,3 +244,84 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
 
     def fetch_nfs_cluster_obj(self) -> NFSCluster:
         return self.nfs
+
+    @CLICommand('nfs export qos enable bandwidth_control', perm='rw')
+    @object_format.EmptyResponder()
+    def _cmd_export_qos_bw_enable(self,
+                                  cluster_id: str,
+                                  pseudo_path: str,
+                                  combined_rw_bw_ctrl: bool = False,
+                                  max_export_write_bw: str = '0',
+                                  max_export_read_bw: str = '0',
+                                  max_client_write_bw: str = '0',
+                                  max_client_read_bw: str = '0',
+                                  max_export_combined_bw: str = '0',
+                                  max_client_combined_bw: str = '0'
+                                  ) -> None:
+        """enable QOS config for NFS export and set different bandwidth"""
+        try:
+            bw_obj = QOSBandwidthControl(enable_bw_ctrl=True,
+                                         combined_bw_ctrl=combined_rw_bw_ctrl,
+                                         export_writebw=max_export_write_bw,
+                                         export_readbw=max_export_read_bw,
+                                         client_writebw=max_client_write_bw,
+                                         client_readbw=max_client_read_bw,
+                                         export_rw_bw=max_export_combined_bw,
+                                         client_rw_bw=max_client_combined_bw)
+        except Exception as e:
+            raise object_format.ErrorResponse.wrap(e)
+        return self.export_mgr.enable_export_qos_bw(cluster_id=cluster_id,
+                                                    pseudo_path=pseudo_path,
+                                                    bw_obj=bw_obj)
+
+    @CLICommand('nfs export qos get', perm='r')
+    @object_format.Responder()
+    def _cmd_export_qos_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]:
+        """Get NFS export QOS config"""
+        return self.export_mgr.get_export_qos(cluster_id, pseudo_path)
+
+    @CLICommand('nfs export qos disable bandwidth_control', perm='rw')
+    @object_format.EmptyResponder()
+    def _cmd_export_qos_bw_disable(self, cluster_id: str, pseudo_path: str) -> None:
+        """Disable NFS export QOS config"""
+        return self.export_mgr.disable_export_qos_bw(cluster_id, pseudo_path)
+
+    @CLICommand('nfs cluster qos enable bandwidth_control', perm='rw')
+    @object_format.EmptyResponder()
+    def _cmd_cluster_qos_bw_enable(self,
+                                   cluster_id: str,
+                                   qos_type: UserQoSType,
+                                   combined_rw_bw_ctrl: bool = False,
+                                   max_export_write_bw: str = '0',
+                                   max_export_read_bw: str = '0',
+                                   max_client_write_bw: str = '0',
+                                   max_client_read_bw: str = '0',
+                                   max_export_combined_bw: str = '0',
+                                   max_client_combined_bw: str = '0') -> None:
+        """Enable QOS ratelimiting for NFS cluster and set default export and client max bandwidth"""
+        try:
+            bw_obj = QOSBandwidthControl(enable_bw_ctrl=True,
+                                         combined_bw_ctrl=combined_rw_bw_ctrl,
+                                         export_writebw=max_export_write_bw,
+                                         export_readbw=max_export_read_bw,
+                                         client_writebw=max_client_write_bw,
+                                         client_readbw=max_client_read_bw,
+                                         export_rw_bw=max_export_combined_bw,
+                                         client_rw_bw=max_client_combined_bw)
+        except Exception as e:
+            raise object_format.ErrorResponse.wrap(e)
+        return self.nfs.enable_cluster_qos_bw(cluster_id=cluster_id,
+                                              qos_type=QOSType[qos_type.value],
+                                              bw_obj=bw_obj)
+
+    @CLICommand('nfs cluster qos disable bandwidth_control', perm='rw')
+    @object_format.EmptyResponder()
+    def _cmd_cluster_qos_bw_disable(self, cluster_id: str) -> None:
+        """Disable QOS for NFS cluster"""
+        return self.nfs.disable_cluster_qos_bw(cluster_id)
+
+    @CLICommand('nfs cluster qos get', perm='r')
+    @object_format.Responder()
+    def _cmd_cluster_qos_get(self, cluster_id: str) -> Dict[str, Any]:
+        """Get QOS configuration of NFS cluster"""
+        return self.nfs.get_cluster_qos(cluster_id)
diff --git a/src/pybind/mgr/nfs/qos_conf.py b/src/pybind/mgr/nfs/qos_conf.py
new file mode 100644 (file)
index 0000000..91c1ca3
--- /dev/null
@@ -0,0 +1,247 @@
+from typing import List, Dict, Any, Optional
+from enum import Enum
+
+from ceph.utils import bytes_to_human, with_units_to_int
+
+
+class RawBlock():
+    def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
+        if not values:  # workaround mutable default argument
+            values = {}
+        if not blocks:  # workaround mutable default argument
+            blocks = []
+        self.block_name = block_name
+        self.blocks = blocks
+        self.values = values
+
+    def __eq__(self, other: Any) -> bool:
+        if not isinstance(other, RawBlock):
+            return False
+        return self.block_name == other.block_name and \
+            self.blocks == other.blocks and \
+            self.values == other.values
+
+    def __repr__(self) -> str:
+        return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
+
+
+class QOSParams(Enum):
+    clust_block = "QOS_DEFAULT_CONFIG"
+    export_block = "QOS_BLOCK"
+    enable_qos = "enable_qos"
+    enable_bw_ctrl = "enable_bw_control"
+    combined_bw_ctrl = "combined_rw_bw_control"
+    qos_type = "qos_type"
+    export_writebw = "max_export_write_bw"
+    export_readbw = "max_export_read_bw"
+    client_writebw = "max_client_write_bw"
+    client_readbw = "max_client_read_bw"
+    export_rw_bw = "max_export_combined_bw"
+    client_rw_bw = "max_client_combined_bw"
+
+
+class UserQoSType(Enum):
+    per_share = 'PerShare'
+    per_client = 'PerClient'
+    per_share_per_client = 'PerShare_PerClient'
+
+
+class QOSType(Enum):
+    PerShare = 1
+    PerClient = 2
+    PerShare_PerClient = 3
+
+
+def _validate_qos_bw(bandwidth: str) -> int:
+    min_bw = 1000000  # 1MB
+    max_bw = 2000000000  # 2GB
+    bw_bytes = with_units_to_int(bandwidth)
+    if bw_bytes != 0 and (bw_bytes < min_bw or bw_bytes > max_bw):
+        raise Exception(f"Provided bandwidth value is not in range, Please enter a value between {min_bw} (1MB) and {max_bw} (2GB) bytes")
+    return bw_bytes
+
+
+QOS_REQ_PARAMS = {
+    'combined_bw_disabled': {
+        'PerShare': ['max_export_write_bw', 'max_export_read_bw'],
+        'PerClient': ['max_client_write_bw', 'max_client_read_bw'],
+        'PerShare_PerClient': ['max_export_write_bw', 'max_export_read_bw', 'max_client_write_bw', 'max_client_read_bw']
+    },
+    'combined_bw_enabled': {
+        'PerShare': ['max_export_combined_bw'],
+        'PerClient': ['max_client_combined_bw'],
+        'PerShare_PerClient': ['max_export_combined_bw', 'max_client_combined_bw'],
+    }
+}
+
+
+class QOSBandwidthControl(object):
+    def __init__(self,
+                 enable_bw_ctrl: bool = False,
+                 combined_bw_ctrl: bool = False,
+                 export_writebw: str = '0',
+                 export_readbw: str = '0',
+                 client_writebw: str = '0',
+                 client_readbw: str = '0',
+                 export_rw_bw: str = '0',
+                 client_rw_bw: str = '0'
+                 ) -> None:
+        self.enable_bw_ctrl = enable_bw_ctrl
+        self.combined_bw_ctrl = combined_bw_ctrl
+        try:
+            self.export_writebw: int = _validate_qos_bw(export_writebw)
+            self.export_readbw: int = _validate_qos_bw(export_readbw)
+            self.client_writebw: int = _validate_qos_bw(client_writebw)
+            self.client_readbw: int = _validate_qos_bw(client_readbw)
+            self.export_rw_bw: int = _validate_qos_bw(export_rw_bw)
+            self.client_rw_bw: int = _validate_qos_bw(client_rw_bw)
+        except Exception as e:
+            raise Exception(f"Invalid bandwidth value. {e}")
+
+    @classmethod
+    def from_dict(cls, qos_dict: Dict[str, Any]) -> 'QOSBandwidthControl':
+        # json has bandwidths in human readable format(str)
+        bw_kwargs = {
+            'enable_bw_ctrl': qos_dict.get(QOSParams.enable_bw_ctrl.value, False),
+            'combined_bw_ctrl': qos_dict.get(QOSParams.combined_bw_ctrl.value, False),
+            'export_writebw': qos_dict.get(QOSParams.export_writebw.value, '0'),
+            'export_readbw': qos_dict.get(QOSParams.export_readbw.value, '0'),
+            'client_writebw': qos_dict.get(QOSParams.client_writebw.value, '0'),
+            'client_readbw': qos_dict.get(QOSParams.client_readbw.value, '0'),
+            'export_rw_bw': qos_dict.get(QOSParams.export_rw_bw.value, '0'),
+            'client_rw_bw': qos_dict.get(QOSParams.client_rw_bw.value, '0')
+        }
+        return cls(**bw_kwargs)
+
+    @classmethod
+    def from_qos_block(cls, qos_block: RawBlock) -> 'QOSBandwidthControl':
+        # block has bandwidths in bytes(int)
+        bw_kwargs = {
+            'enable_bw_ctrl': qos_block.values.get(QOSParams.enable_bw_ctrl.value, False),
+            'combined_bw_ctrl': qos_block.values.get(QOSParams.combined_bw_ctrl.value, False),
+            'export_writebw': str(qos_block.values.get(QOSParams.export_writebw.value, 0)),
+            'export_readbw': str(qos_block.values.get(QOSParams.export_readbw.value, 0)),
+            'client_writebw': str(qos_block.values.get(QOSParams.client_writebw.value, 0)),
+            'client_readbw': str(qos_block.values.get(QOSParams.client_readbw.value, 0)),
+            'export_rw_bw': str(qos_block.values.get(QOSParams.export_rw_bw.value, 0)),
+            'client_rw_bw': str(qos_block.values.get(QOSParams.client_rw_bw.value, 0))
+        }
+        return cls(**bw_kwargs)
+
+    def to_qos_block(self) -> RawBlock:
+        result = RawBlock('qos_bandwidths_control')
+        result.values[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl
+        result.values[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl
+        if self.export_writebw:
+            result.values[QOSParams.export_writebw.value] = self.export_writebw
+        if self.export_readbw:
+            result.values[QOSParams.export_readbw.value] = self.export_readbw
+        if self.client_writebw:
+            result.values[QOSParams.client_writebw.value] = self.client_writebw
+        if self.client_readbw:
+            result.values[QOSParams.client_readbw.value] = self.client_readbw
+        if self.export_rw_bw:
+            result.values[QOSParams.export_rw_bw.value] = self.export_rw_bw
+        if self.client_rw_bw:
+            result.values[QOSParams.client_rw_bw.value] = self.client_rw_bw
+        return result
+
+    def to_dict(self) -> Dict[str, Any]:
+        r: dict[str, Any] = {}
+        r[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl
+        r[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl
+        if self.export_writebw:
+            r[QOSParams.export_writebw.value] = bytes_to_human(self.export_writebw)
+        if self.export_readbw:
+            r[QOSParams.export_readbw.value] = bytes_to_human(self.export_readbw)
+        if self.client_writebw:
+            r[QOSParams.client_writebw.value] = bytes_to_human(self.client_writebw)
+        if self.client_readbw:
+            r[QOSParams.client_readbw.value] = bytes_to_human(self.client_readbw)
+        if self.export_rw_bw:
+            r[QOSParams.export_rw_bw.value] = bytes_to_human(self.export_rw_bw)
+        if self.client_rw_bw:
+            r[QOSParams.client_rw_bw.value] = bytes_to_human(self.client_rw_bw)
+        return r
+
+    def qos_bandwidth_checks(self, qos_type: QOSType) -> None:
+        """Checks for enabling qos"""
+        params = {}
+        d = vars(self)
+        for key in d:
+            if key.endswith('bw'):
+                params[QOSParams[key].value] = d[key]
+        if not self.combined_bw_ctrl:
+            req_params = QOS_REQ_PARAMS['combined_bw_disabled'][qos_type.name]
+        else:
+            req_params = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name]
+        allowed_params = []
+        not_allowed_params = []
+        for key in params:
+            if key in req_params and params[key] == 0:
+                allowed_params.append(key)
+            elif key not in req_params and params[key] != 0:
+                not_allowed_params.append(key)
+        if allowed_params or not_allowed_params:
+            raise Exception(f"When combined_rw_bw is {'enabled' if self.combined_bw_ctrl else 'disabled'} "
+                            f"and qos_type is {qos_type.name}, "
+                            f"{'attributes ' + ', '.join(allowed_params) + ' required' if allowed_params else ''} "
+                            f"{'attributes ' + ', '.join(not_allowed_params) + ' are not allowed' if not_allowed_params else ''}.")
+
+
+class QOS(object):
+    def __init__(self,
+                 cluster_op: bool = False,
+                 enable_qos: bool = False,
+                 qos_type: Optional[QOSType] = None,
+                 bw_obj: Optional[QOSBandwidthControl] = None
+                 ) -> None:
+        self.cluster_op = cluster_op
+        self.enable_qos = enable_qos
+        self.qos_type = qos_type
+        self.bw_obj = bw_obj
+
+    @classmethod
+    def from_dict(cls, qos_dict: Dict[str, Any], cluster_op: bool = False) -> 'QOS':
+        kwargs: dict[str, Any] = {}
+        # qos dict will have qos type as enum name
+        if cluster_op:
+            qos_type = qos_dict.get(QOSParams.qos_type.value)
+            if qos_type:
+                kwargs['qos_type'] = QOSType[qos_type]
+        kwargs['enable_qos'] = qos_dict.get(QOSParams.enable_qos.value)
+        kwargs['bw_obj'] = QOSBandwidthControl.from_dict(qos_dict)
+        return cls(cluster_op, **kwargs)
+
+    @classmethod
+    def from_qos_block(cls, qos_block: RawBlock, cluster_op: bool = False) -> 'QOS':
+        kwargs: dict[str, Any] = {}
+        # qos block will have qos type as enum value
+        if cluster_op:
+            qos_type = qos_block.values.get(QOSParams.qos_type.value)
+            if qos_type:
+                kwargs['qos_type'] = QOSType(qos_type)
+        kwargs['enable_qos'] = qos_block.values.get(QOSParams.enable_qos.value)
+        kwargs['bw_obj'] = QOSBandwidthControl.from_qos_block(qos_block)
+        return cls(cluster_op, **kwargs)
+
+    def to_qos_block(self) -> RawBlock:
+        if self.cluster_op:
+            result = RawBlock(QOSParams.clust_block.value)
+        else:
+            result = RawBlock(QOSParams.export_block.value)
+        result.values[QOSParams.enable_qos.value] = self.enable_qos
+        if self.cluster_op and self.qos_type:
+            result.values[QOSParams.qos_type.value] = self.qos_type.value
+        if self.bw_obj and (res := self.bw_obj.to_qos_block()):
+            result.values.update(res.values)
+        return result
+
+    def to_dict(self) -> Dict[str, Any]:
+        r: Dict[str, Any] = {}
+        r[QOSParams.enable_qos.value] = self.enable_qos
+        if self.cluster_op and self.qos_type:
+            r[QOSParams.qos_type.value] = self.qos_type.name
+        if self.bw_obj and (res := self.bw_obj.to_dict()):
+            r.update(res)
+        return r
diff --git a/src/pybind/mgr/nfs/rados_utils.py b/src/pybind/mgr/nfs/rados_utils.py
new file mode 100644 (file)
index 0000000..5ce699f
--- /dev/null
@@ -0,0 +1,89 @@
+import logging
+from typing import Optional, Any
+
+from rados import TimedOut, ObjectNotFound, Rados
+from mgr_module import NFS_POOL_NAME as POOL_NAME
+from .ganesha_conf import RawBlock, format_block
+from .utils import USER_CONF_PREFIX
+
+log = logging.getLogger(__name__)
+
+
+def _check_rados_notify(ioctx: Any, obj: str) -> None:
+    try:
+        ioctx.notify(obj)
+    except TimedOut:
+        log.exception("Ganesha timed out")
+
+
+class NFSRados:
+    def __init__(self, rados: 'Rados', namespace: str) -> None:
+        self.rados = rados
+        self.pool = POOL_NAME
+        self.namespace = namespace
+
+    def _make_rados_url(self, obj: str) -> str:
+        return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
+
+    def _create_url_block(self, obj_name: str) -> RawBlock:
+        return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
+
+    def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
+        with self.rados.open_ioctx(self.pool) as ioctx:
+            ioctx.set_namespace(self.namespace)
+            ioctx.write_full(obj, conf_block.encode('utf-8'))
+            if not config_obj:
+                # Return after creating empty common config object
+                return
+            log.debug("write configuration into rados object %s/%s/%s",
+                      self.pool, self.namespace, obj)
+
+            # Add created obj url to common config obj
+            ioctx.append(config_obj, format_block(
+                         self._create_url_block(obj)).encode('utf-8'))
+            _check_rados_notify(ioctx, config_obj)
+            log.debug("Added %s url to %s", obj, config_obj)
+
+    def read_obj(self, obj: str) -> Optional[str]:
+        with self.rados.open_ioctx(self.pool) as ioctx:
+            ioctx.set_namespace(self.namespace)
+            try:
+                return ioctx.read(obj, 1048576).decode()
+            except ObjectNotFound:
+                return None
+
+    def update_obj(self, conf_block: str, obj: str, config_obj: str,
+                   should_notify: Optional[bool] = True) -> None:
+        with self.rados.open_ioctx(self.pool) as ioctx:
+            ioctx.set_namespace(self.namespace)
+            ioctx.write_full(obj, conf_block.encode('utf-8'))
+            log.debug("write configuration into rados object %s/%s/%s",
+                      self.pool, self.namespace, obj)
+            if should_notify:
+                _check_rados_notify(ioctx, config_obj)
+            log.debug("Update export %s in %s", obj, config_obj)
+
+    def remove_obj(self, obj: str, config_obj: str) -> None:
+        with self.rados.open_ioctx(self.pool) as ioctx:
+            ioctx.set_namespace(self.namespace)
+            export_urls = ioctx.read(config_obj)
+            url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
+            export_urls = export_urls.replace(url.encode('utf-8'), b'')
+            ioctx.remove_object(obj)
+            ioctx.write_full(config_obj, export_urls)
+            _check_rados_notify(ioctx, config_obj)
+            log.debug("Object deleted: %s", url)
+
+    def remove_all_obj(self) -> None:
+        with self.rados.open_ioctx(self.pool) as ioctx:
+            ioctx.set_namespace(self.namespace)
+            for obj in ioctx.list_objects():
+                obj.remove()
+
+    def check_config(self, config: str = USER_CONF_PREFIX) -> bool:
+        with self.rados.open_ioctx(self.pool) as ioctx:
+            ioctx.set_namespace(self.namespace)
+            for obj in ioctx.list_objects():
+                if obj.key.startswith(config):
+                    return True
+        return False
index cb9587e6ad1ee66f320016a02ccc204ce00402ef..e3a3f7e6f4c19d95f82aceda1262136261b1089e 100644 (file)
@@ -11,9 +11,11 @@ from mgr_module import MgrModule, NFS_POOL_NAME
 from rados import ObjectNotFound
 
 from ceph.deployment.service_spec import NFSServiceSpec
+from ceph.utils import with_units_to_int, bytes_to_human
 from nfs import Module
 from nfs.export import ExportMgr, normalize_path
-from nfs.ganesha_conf import GaneshaConfParser, Export, RawBlock
+from nfs.ganesha_conf import GaneshaConfParser, Export
+from nfs.qos_conf import RawBlock, QOS, QOSType, QOSParams, QOS_REQ_PARAMS, QOSBandwidthControl
 from nfs.cluster import NFSCluster
 from orchestrator import ServiceDescription, DaemonDescription, OrchResult
 
@@ -134,6 +136,57 @@ EXPORT {
 
 %url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"'''
 
+    qos_cluster_block = """
+QOS {
+    enable_qos = true;
+    enable_bw_control = true;
+    combined_rw_bw_control = false;
+    qos_type = 3;
+    max_export_write_bw = 1000000;
+    max_export_read_bw = 2000000;
+    max_client_write_bw = 3000000;
+    max_client_read_bw = 4000000;
+    max_export_combined_bw = 0;
+    max_client_combined_bw = 0;
+}
+"""
+
+    qos_export_block = """
+QOS_BLOCK {
+    enable_qos = true;
+    enable_bw_control = true;
+    combined_rw_bw_control = false;
+    max_export_write_bw = 1000000;
+    max_export_read_bw = 2000000;
+    max_client_write_bw = 3000000;
+    max_client_read_bw = 4000000;
+    max_export_combined_bw = 0;
+    max_client_combined_bw = 0;
+
+}
+"""
+
+    qos_cluster_dict = {
+        "enable_bw_control": True,
+        "enable_qos": True,
+        "combined_rw_bw_control": False,
+        "max_client_read_bw": "4.0MB",
+        "max_client_write_bw": "3.0MB",
+        "max_export_read_bw": "2.0MB",
+        "max_export_write_bw": "1.0MB",
+        "qos_type": "PerShare_PerClient"
+    }
+
+    qos_export_dict = {
+        "enable_bw_control": True,
+        "enable_qos": True,
+        "combined_rw_bw_control": False,
+        "max_client_read_bw": "4.0MB",
+        "max_client_write_bw": "3.0MB",
+        "max_export_read_bw": "2.0MB",
+        "max_export_write_bw": "1.0MB"
+    }
+
     class RObject(object):
         def __init__(self, key: str, raw: str) -> None:
             self.key = key
@@ -712,7 +765,11 @@ NFS_CORE_PARAM {
         assert export.clients[0].access_type is None
         assert export.cluster_id == self.cluster_id
 
-        # again, but without export_id
+        # again, but without export_id and qos_block
+        cluster = NFSCluster(nfs_mod)
+        bw_obj = QOSBandwidthControl(True, export_writebw='100MB', export_readbw='200MB')
+        cluster.enable_cluster_qos_bw(self.cluster_id, QOSType['PerShare'], bw_obj)
+
         r = conf.apply_export(self.cluster_id, json.dumps({
             'path': 'newestbucket',
             'pseudo': '/rgw/bucket',
@@ -732,6 +789,13 @@ NFS_CORE_PARAM {
                 'user_id': 'nfs.foo.newestbucket',
                 'access_key_id': 'the_access_key',
                 'secret_access_key': 'the_secret_key',
+            },
+            'qos_block': {
+               'combined_rw_bw_control': False,
+               'enable_bw_control': True,
+               'enable_qos': True,
+               'max_export_read_bw': '3000000',
+               'max_export_write_bw': '2000000'
             }
         }))
         assert len(r.changes) == 1
@@ -751,6 +815,11 @@ NFS_CORE_PARAM {
         assert export.clients[0].squash is None
         assert export.clients[0].access_type is None
         assert export.cluster_id == self.cluster_id
+        assert export.qos_block.enable_qos == True
+        assert export.qos_block.bw_obj.enable_bw_ctrl == True
+        assert export.qos_block.bw_obj.combined_bw_ctrl == False
+        assert export.qos_block.bw_obj.export_writebw == 2000000
+        assert export.qos_block.bw_obj.export_readbw == 3000000
 
     def test_update_export_sectype(self):
         self._do_mock_test(self._test_update_export_sectype)
@@ -1354,6 +1423,132 @@ EXPORT {
     def test_cluster_config(self):
         self._do_mock_test(self._do_test_cluster_config)
 
+    def test_qos_from_dict(self):
+        qos = QOS.from_dict(self.qos_cluster_dict, True)
+        assert qos.enable_qos == True
+        assert qos.bw_obj.enable_bw_ctrl == True
+        assert isinstance(qos.qos_type, QOSType)
+        assert qos.bw_obj.export_writebw == 1000000
+        assert qos.bw_obj.export_readbw == 2000000
+        assert qos.bw_obj.client_writebw == 3000000
+        assert qos.bw_obj.client_readbw == 4000000
+
+        qos = QOS.from_dict(self.qos_export_dict)
+        assert qos.enable_qos == True
+        assert qos.bw_obj.enable_bw_ctrl == True
+        assert qos.qos_type is None
+        assert qos.bw_obj.export_writebw == 1000000
+        assert qos.bw_obj.export_readbw == 2000000
+        assert qos.bw_obj.client_writebw == 3000000
+        assert qos.bw_obj.client_readbw == 4000000
+
+    @pytest.mark.parametrize("qos_block, qos_dict", [
+        (qos_cluster_block, qos_cluster_dict),
+        (qos_export_block, qos_export_dict)
+        ])
+    def test_qos_from_block(self, qos_block, qos_dict):
+        blocks = GaneshaConfParser(qos_block).parse()
+        assert isinstance(blocks, list)
+        assert len(blocks) == 1
+        qos = QOS.from_qos_block(blocks[0], True)
+        assert qos.to_dict() == qos_dict
+
+    def _do_test_cluster_qos(self, qos_type, combined_bw_ctrl, params, positive_tc):
+        nfs_mod = Module('nfs', '', '')
+        cluster = NFSCluster(nfs_mod)
+        try:
+            bw_obj = QOSBandwidthControl(True, combined_bw_ctrl, **params)
+            cluster.enable_cluster_qos_bw(self.cluster_id, qos_type, bw_obj)
+        except Exception:
+            if not positive_tc:
+                return
+        out = cluster.get_cluster_qos(self.cluster_id)
+        expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": combined_bw_ctrl, "qos_type": qos_type.name}
+        for key in params:
+            expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(params[key]))
+        assert out == expected_out
+        cluster.disable_cluster_qos_bw(self.cluster_id)
+        out = cluster.get_cluster_qos(self.cluster_id)
+        assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False}
+
+    @pytest.mark.parametrize("qos_type, combined_bw_ctrl, params, positive_tc", [
+        (QOSType['PerShare'], False, {'export_writebw': '100MB', 'export_readbw': '200MB'}, True),
+        (QOSType['PerClient'], False, {'client_writebw': '300MB', 'client_readbw': '400MB'}, True),
+        (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}, True),
+        (QOSType['PerShare'], True, {'export_rw_bw': '100MB'}, True),
+        (QOSType['PerClient'], True, {'client_rw_bw': '200MB'}, True),
+        (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'}, True),
+        # negative testing
+        (QOSType['PerShare'], False, {'export_writebw': '100MB', 'client_readbw': '200MB'}, False),
+        (QOSType['PerShare'], False, {'export_writebw': '100MB'}, False),
+        (QOSType['PerClient'], False, {'client_writebw': '300MB'}, False),
+        (QOSType['PerClient'], False, {'client_writebw': '300MB', 'export_readbw': '400MB'}, False),
+        (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB'}, False),
+        (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB'}, False),
+        (QOSType['PerShare'], True, {'client_rw_bw': '100MB'}, False),
+        (QOSType['PerShare'], True, {}, False),
+        (QOSType['PerClient'], True, {'client_rw_bw': '200MB', 'export_rw_bw': '100MB'}, False),
+        (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB'}, False)
+        ])
+    def test_cluster_qos(self, qos_type, combined_bw_ctrl, params, positive_tc):
+        self._do_mock_test(self._do_test_cluster_qos, qos_type, combined_bw_ctrl, params, positive_tc)
+
+    def _do_test_export_qos(self, qos_type, clust_combined_bw_ctrl, clust_params, export_combined_bw_ctrl, export_params):
+        nfs_mod = Module('nfs', '', '')
+        cluster = NFSCluster(nfs_mod)
+        export_mgr = ExportMgr(nfs_mod)
+        # try enabling export level qos before enabling cluster level qos
+        try:
+            bw_obj = QOSBandwidthControl(True, export_combined_bw_ctrl, **export_params)
+            export_mgr.enable_export_qos_bw(self.cluster_id, '/cephfs_a/', bw_obj)
+        except Exception as e:
+            assert str(e) == 'To configure bandwidth control for export, you must first enable bandwidth control at the cluster level.'
+        bw_obj = QOSBandwidthControl(True, clust_combined_bw_ctrl, **clust_params)
+        cluster.enable_cluster_qos_bw(self.cluster_id, qos_type, bw_obj)
+
+        # set export qos
+        try:
+            bw_obj = QOSBandwidthControl(True, export_combined_bw_ctrl, **export_params)
+            export_mgr.enable_export_qos_bw(self.cluster_id, '/cephfs_a/', bw_obj)
+        except Exception:
+            if export_combined_bw_ctrl:
+                req = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name]
+            else:
+                req = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name]
+            if sorted(export_params.keys()) != sorted(req):
+                return
+            if qos_type.name == 'PerClient':
+                return
+        out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/')
+        expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": export_combined_bw_ctrl}
+        for key in export_params:
+            expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_params[key]))
+        assert out == expected_out
+        export_mgr.disable_export_qos_bw(self.cluster_id, '/cephfs_a/')
+        out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/')
+        assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False}
+
+
+    @pytest.mark.parametrize("qos_type, clust_combined_bw_ctrl, clust_params", [
+        (QOSType['PerShare'], False, {'export_writebw': '100MB', 'export_readbw': '200MB'}),
+        (QOSType['PerClient'], False, {'client_writebw': '300MB', 'client_readbw': '400MB'}),
+        (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}),
+        (QOSType['PerShare'], True, {'export_rw_bw': '100MB'}),
+        (QOSType['PerClient'], True, {'client_rw_bw': '200MB'}),
+        (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'})
+        ])
+    @pytest.mark.parametrize("export_combined_bw_ctrl, export_params", [
+        (False, {'export_writebw': '100MB', 'export_readbw': '200MB'}),
+        (False, {'client_writebw': '300MB', 'client_readbw': '400MB'}),
+        (False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}),
+        (True, {'export_rw_bw': '100MB'}),
+        (True, {'client_rw_bw': '200MB'}),
+        (True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'})
+        ])
+    def test_export_qos(self, qos_type, clust_combined_bw_ctrl, clust_params,
+                        export_combined_bw_ctrl, export_params):
+        self._do_mock_test(self._do_test_export_qos, qos_type, clust_combined_bw_ctrl,
+                           clust_params, export_combined_bw_ctrl, export_params)
 
 @pytest.mark.parametrize(
     "path,expected",
index fe928007a6d9cb103cc695f7c07d75f74d8a8983..96fe4eb19f35ca194341e70537c64da0a6e05c13 100644 (file)
@@ -18,6 +18,7 @@ if TYPE_CHECKING:
 EXPORT_PREFIX: str = "export-"
 CONF_PREFIX: str = "conf-nfs."
 USER_CONF_PREFIX: str = "userconf-nfs."
+QOS_CONF_PREFIX: str = "qosconf-nfs."
 
 log = logging.getLogger(__name__)
 
@@ -57,10 +58,15 @@ def conf_obj_name(cluster_id: str) -> str:
 
 
 def user_conf_obj_name(cluster_id: str) -> str:
-    """Returna a rados object name for the user config."""
+    """Return a rados object name for the user config."""
     return f"{USER_CONF_PREFIX}{cluster_id}"
 
 
+def qos_conf_obj_name(cluster_id: str) -> str:
+    """Return a rados object name for the qos config."""
+    return f"{QOS_CONF_PREFIX}{cluster_id}"
+
+
 def available_clusters(mgr: 'Module') -> List[str]:
     '''
     This method returns list of available cluster ids.
index 4bd3c2c329f1f8969a32894463007b0a9bda828a..303bf55ee371cc15b1e8c1a3c0c215c7afd0e621 100644 (file)
@@ -215,3 +215,51 @@ def size_to_bytes(v: Union[str, int]) -> int:
         }[unit]
         return num * mult
     raise ValueError(f'invalid size type {type(v)} (expected int or str)')
+
+
+def bytes_to_human(num: float, mode: str = 'decimal') -> str:
+    """Convert a bytes value into it's human-readable form.
+
+    :param num: number, in bytes, to convert
+    :param mode: Either decimal (default) or binary to determine divisor
+    :returns: string representing the bytes value in a more readable format
+    """
+    unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
+    divisor = 1000.0
+    yotta = 'YB'
+
+    if mode == 'binary':
+        unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
+        divisor = 1024.0
+        yotta = 'YiB'
+
+    for unit in unit_list:
+        if abs(num) < divisor:
+            return '%3.1f%s' % (num, unit)
+        num /= divisor
+    return '%.1f%s' % (num, yotta)
+
+
+def with_units_to_int(v: str) -> int:
+    if not v:
+        return 0
+    if v.endswith('iB'):
+        v = v[:-2]
+        bytes_mult = 1024
+    elif v.endswith('B'):
+        v = v[:-1]
+        bytes_mult = 1000
+    mult = 1
+    if v[-1].upper() == 'K':
+        mult = bytes_mult
+        v = v[:-1]
+    elif v[-1].upper() == 'M':
+        mult = bytes_mult * bytes_mult
+        v = v[:-1]
+    elif v[-1].upper() == 'G':
+        mult = bytes_mult * bytes_mult * bytes_mult
+        v = v[:-1]
+    elif v[-1].upper() == 'T':
+        mult = bytes_mult * bytes_mult * bytes_mult * bytes_mult
+        v = v[:-1]
+    return int(float(v) * mult)