MIN_PODMAN_VERSION,
PIDS_LIMIT_UNLIMITED_PODMAN_VERSION,
)
-from .data_utils import with_units_to_int
+from ceph.utils import with_units_to_int
from .exceptions import Error
return value
-def bytes_to_human(num, mode='decimal'):
- # type: (float, str) -> str
- """Convert a bytes value into it's human-readable form.
-
- :param num: number, in bytes, to convert
- :param mode: Either decimal (default) or binary to determine divisor
- :returns: string representing the bytes value in a more readable format
- """
- unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
- divisor = 1000.0
- yotta = 'YB'
-
- if mode == 'binary':
- unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
- divisor = 1024.0
- yotta = 'YiB'
-
- for unit in unit_list:
- if abs(num) < divisor:
- return '%3.1f%s' % (num, unit)
- num /= divisor
- return '%.1f%s' % (num, yotta)
-
-
-def with_units_to_int(v: str) -> int:
- if v.endswith('iB'):
- v = v[:-2]
- elif v.endswith('B'):
- v = v[:-1]
- mult = 1
- if v[-1].upper() == 'K':
- mult = 1024
- v = v[:-1]
- elif v[-1].upper() == 'M':
- mult = 1024 * 1024
- v = v[:-1]
- elif v[-1].upper() == 'G':
- mult = 1024 * 1024 * 1024
- v = v[:-1]
- elif v[-1].upper() == 'T':
- mult = 1024 * 1024 * 1024 * 1024
- v = v[:-1]
- return int(float(v) * mult)
-
-
def read_config(fn):
# type: (Optional[str]) -> ConfigParser
cp = ConfigParser()
from cephadmlib.call_wrappers import call, call_throws, CallVerbosity
from cephadmlib.context import CephadmContext
-from cephadmlib.data_utils import bytes_to_human
+from ceph.utils import bytes_to_human
from cephadmlib.exe_utils import find_executable
from cephadmlib.file_utils import read_file
from cephadmlib.net_utils import get_fqdn, get_ipv4_address, get_ipv6_address
assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid
assert daemons['crash.host1']['container_id'] == crash_cid
- assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB
- assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB
+ assert daemons['mgr.host1.pntmho']['memory_usage'] == 456400000 # 456.4 MB
+ assert daemons['crash.host1']['memory_usage'] == 7082000 # 7.082 MB
@mock.patch("cephadm.list_daemons")
available_clusters,
conf_obj_name,
restart_nfs_service,
- user_conf_obj_name)
-from .export import NFSRados
+ user_conf_obj_name,
+ USER_CONF_PREFIX,
+ qos_conf_obj_name)
+from .rados_utils import NFSRados
+from .ganesha_conf import format_block, GaneshaConfParser
+from .qos_conf import (
+ QOS,
+ QOSType,
+ QOSBandwidthControl)
if TYPE_CHECKING:
from nfs.module import Module
try:
if cluster_id in available_clusters(self.mgr):
rados_obj = self._rados(cluster_id)
- if rados_obj.check_user_config():
+ if rados_obj.check_config(USER_CONF_PREFIX):
raise NonFatalError("NFS-Ganesha User Config already exists")
rados_obj.write_obj(nfs_config, user_conf_obj_name(cluster_id),
conf_obj_name(cluster_id))
try:
if cluster_id in available_clusters(self.mgr):
rados_obj = self._rados(cluster_id)
- if not rados_obj.check_user_config():
+ if not rados_obj.check_config(USER_CONF_PREFIX):
raise NonFatalError("NFS-Ganesha User Config does not exist")
rados_obj.remove_obj(user_conf_obj_name(cluster_id),
conf_obj_name(cluster_id))
def _rados(self, cluster_id: str) -> NFSRados:
"""Return a new NFSRados object for the given cluster id."""
return NFSRados(self.mgr.rados, cluster_id)
+
+ def get_cluster_qos_config(self, cluster_id: str) -> Optional[QOS]:
+ """Return QOS object for the given cluster id."""
+ rados_obj = self._rados(cluster_id)
+ conf = rados_obj.read_obj(qos_conf_obj_name(cluster_id))
+ if conf:
+ qos_block = GaneshaConfParser(conf).parse()
+ qos_obj = QOS.from_qos_block(qos_block[0], True)
+ return qos_obj
+ return None
+
+ def update_cluster_qos_bw(self,
+ cluster_id: str,
+ enable_qos: bool,
+ bw_obj: QOSBandwidthControl,
+ qos_type: Optional[QOSType] = None) -> None:
+ """Update cluster QOS config"""
+ qos_obj_exists = False
+ qos_obj = self.get_cluster_qos_config(cluster_id)
+ if not qos_obj:
+ log.debug(f"Creating new QOS block for cluster {cluster_id}")
+ qos_obj = QOS(True, enable_qos, qos_type, bw_obj)
+ else:
+ log.debug(f"Updating existing QOS block for cluster {cluster_id}")
+ qos_obj_exists = True
+ qos_obj.enable_qos = enable_qos
+ qos_obj.qos_type = qos_type
+ qos_obj.bw_obj = bw_obj
+
+ qos_config = format_block(qos_obj.to_qos_block())
+ rados_obj = self._rados(cluster_id)
+ if not qos_obj_exists:
+ rados_obj.write_obj(qos_config, qos_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id))
+ else:
+ rados_obj.update_obj(qos_config, qos_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id), should_notify=False)
+ log.debug(f"Successfully saved {cluster_id}s QOS bandwidth control config: \n {qos_config}")
+
+ def enable_cluster_qos_bw(self,
+ cluster_id: str,
+ qos_type: QOSType,
+ bw_obj: QOSBandwidthControl
+ ) -> None:
+ """
+ There are 2 cases to consider:
+ 1. If combined bandwith control is disabled
+ a. If qos_type is pershare, then export_writebw and export_readbw parameters are compulsory
+ b. If qos_type is perclient, then client_writebw and client_readbw parameters are compulsory
+ c. If qos_type is pershare_perclient then export_writebw, export_readbw, client_writebw and
+ client_readbw are compulsory parameters
+ 2. If combined bandwidth control is enabled
+ a. If qos_type is pershare, then export_rw_bw parameter is compulsory
+ b. If qos_type is perclient, then client_rw_bw parameter is compulsory
+ c. If qos_type is pershare_perclient, then export_rw_bw and client_rw_bw parameters are compulsory
+ """
+ try:
+ bw_obj.qos_bandwidth_checks(qos_type)
+ if cluster_id in available_clusters(self.mgr):
+ self.update_cluster_qos_bw(cluster_id, True, bw_obj, qos_type)
+ restart_nfs_service(self.mgr, cluster_id)
+ log.info(f"QOS bandwidth control has been successfully enabled for cluster {cluster_id}. "
+ "If the qos_type is changed during this process, ensure that the bandwidth "
+ "values for all exports are updated accordingly.")
+ return
+ raise ClusterNotFound()
+ except NotImplementedError:
+ raise ManualRestartRequired(f"NFS-Ganesha QOS bandwidth control config added Successfully for {cluster_id}")
+ except Exception as e:
+ log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def get_cluster_qos(self, cluster_id: str) -> Dict[str, Any]:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ qos_obj = self.get_cluster_qos_config(cluster_id)
+ return qos_obj.to_dict() if qos_obj else {}
+ raise ClusterNotFound()
+ except Exception as e:
+ log.exception(f"Fetching NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def disable_cluster_qos_bw(self, cluster_id: str) -> None:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ self.update_cluster_qos_bw(cluster_id, False, QOSBandwidthControl())
+ restart_nfs_service(self.mgr, cluster_id)
+ log.info("Cluster-level QoS bandwidth control has been successfully disabled for "
+ f"cluster {cluster_id}. As a result, export-level bandwidth control will "
+ "no longer have any effect, even if it's enabled.")
+ return
+ raise ClusterNotFound()
+ except NotImplementedError:
+ raise ManualRestartRequired(f"NFS-Ganesha QOS bandwidth control config added successfully for {cluster_id}")
+ except Exception as e:
+ log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
import cephfs
from mgr_util import CephFSEarmarkResolver
-from rados import TimedOut, ObjectNotFound, Rados
+from rados import TimedOut, ObjectNotFound
from object_format import ErrorResponse
from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
Export,
GaneshaConfParser,
RGWFSAL,
- RawBlock,
format_block)
+from .qos_conf import QOS, QOSBandwidthControl
+from .export_utils import export_qos_bw_checks, export_dict_qos_checks
from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
from .utils import (
EXPORT_PREFIX,
NonFatalError,
- USER_CONF_PREFIX,
export_obj_name,
conf_obj_name,
available_clusters,
get_nfs_spec_for_cluster,
restart_nfs_service,
cephfs_path_is_dir)
+from .rados_utils import NFSRados
if TYPE_CHECKING:
from nfs.module import Module
)
-class NFSRados:
- def __init__(self, rados: 'Rados', namespace: str) -> None:
- self.rados = rados
- self.pool = POOL_NAME
- self.namespace = namespace
-
- def _make_rados_url(self, obj: str) -> str:
- return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
-
- def _create_url_block(self, obj_name: str) -> RawBlock:
- return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
-
- def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
- with self.rados.open_ioctx(self.pool) as ioctx:
- ioctx.set_namespace(self.namespace)
- ioctx.write_full(obj, conf_block.encode('utf-8'))
- if not config_obj:
- # Return after creating empty common config object
- return
- log.debug("write configuration into rados object %s/%s/%s",
- self.pool, self.namespace, obj)
-
- # Add created obj url to common config obj
- ioctx.append(config_obj, format_block(
- self._create_url_block(obj)).encode('utf-8'))
- _check_rados_notify(ioctx, config_obj)
- log.debug("Added %s url to %s", obj, config_obj)
-
- def read_obj(self, obj: str) -> Optional[str]:
- with self.rados.open_ioctx(self.pool) as ioctx:
- ioctx.set_namespace(self.namespace)
- try:
- return ioctx.read(obj, 1048576).decode()
- except ObjectNotFound:
- return None
-
- def update_obj(self, conf_block: str, obj: str, config_obj: str,
- should_notify: Optional[bool] = True) -> None:
- with self.rados.open_ioctx(self.pool) as ioctx:
- ioctx.set_namespace(self.namespace)
- ioctx.write_full(obj, conf_block.encode('utf-8'))
- log.debug("write configuration into rados object %s/%s/%s",
- self.pool, self.namespace, obj)
- if should_notify:
- _check_rados_notify(ioctx, config_obj)
- log.debug("Update export %s in %s", obj, config_obj)
-
- def remove_obj(self, obj: str, config_obj: str) -> None:
- with self.rados.open_ioctx(self.pool) as ioctx:
- ioctx.set_namespace(self.namespace)
- export_urls = ioctx.read(config_obj)
- url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
- export_urls = export_urls.replace(url.encode('utf-8'), b'')
- ioctx.remove_object(obj)
- ioctx.write_full(config_obj, export_urls)
- _check_rados_notify(ioctx, config_obj)
- log.debug("Object deleted: %s", url)
-
- def remove_all_obj(self) -> None:
- with self.rados.open_ioctx(self.pool) as ioctx:
- ioctx.set_namespace(self.namespace)
- for obj in ioctx.list_objects():
- obj.remove()
-
- def check_user_config(self) -> bool:
- with self.rados.open_ioctx(self.pool) as ioctx:
- ioctx.set_namespace(self.namespace)
- for obj in ioctx.list_objects():
- if obj.key.startswith(USER_CONF_PREFIX):
- return True
- return False
-
-
class AppliedExportResults:
"""Gathers the results of multiple changed exports.
Returned by apply_export.
elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
raise NFSInvalidOperation('secret_access_key change is not allowed')
+ # check QOS
+ if new_export_dict.get('qos_block'):
+ export_dict_qos_checks(cluster_id, self.mgr, dict(new_export_dict.get('qos_block', {})))
+
self.exports[cluster_id].remove(old_export)
self._update_export(cluster_id, new_export, need_nfs_service_restart)
exports_count += 1
return exports_count
+ def update_export_qos_bw(self,
+ cluster_id: str,
+ pseudo_path: str,
+ enable_qos: bool,
+ bw_obj: QOSBandwidthControl) -> None:
+ """Update Export QOS block"""
+ export = self._fetch_export(cluster_id, pseudo_path)
+ if not export:
+ raise NFSObjectNotFound(f"Export {pseudo_path} not found in NFS cluster {cluster_id}")
+ # if qos_block does not exists in export create one else update existing block
+ if not export.qos_block:
+ log.debug(f"Creating new QOS block for export {pseudo_path} of cluster {cluster_id}")
+ export.qos_block = QOS(enable_qos=enable_qos, bw_obj=bw_obj)
+ else:
+ log.debug(f"Updating existing QOS block of export {pseudo_path} of cluster {cluster_id}")
+ export.qos_block.enable_qos = enable_qos
+ export.qos_block.bw_obj = bw_obj
+
+ self.exports[cluster_id].remove(export)
+ self._update_export(cluster_id, export, False)
+ log.debug(f"Successfully updated QOS bandwidth control config for export {pseudo_path} of cluster {cluster_id}")
+
+ def enable_export_qos_bw(self,
+ cluster_id: str,
+ pseudo_path: str,
+ bw_obj: QOSBandwidthControl
+ ) -> None:
+ """
+ There are 2 cases to consider, based on QOS type set on cluster level
+ 1. If combined bandwith control is disabled
+ a. If qos_type is pershare, then export_writebw and export_readbw parameters are compulsory
+ b. If qos_type is perclient, then can't enable export level qos
+ c. If qos_type is pershare_perclient then export_writebw, export_readbw, client_writebw and
+ client_readbw are compulsory parameters
+ 2. If combined bandwidth control is enabled
+ a. If qos_type is pershare, then export_rw_bw parameter is compulsory
+ b. If qos_type is perclient, then can't enable export level qos
+ c. If qos_type is pershare_perclient, then export_rw_bw and client_rw_bw parameters are compulsory
+ """
+ try:
+ self._validate_cluster_id(cluster_id)
+ assert pseudo_path
+ export_qos_bw_checks(cluster_id, self.mgr, bw_obj=bw_obj)
+ self.update_export_qos_bw(cluster_id, pseudo_path, True, bw_obj)
+ except Exception as e:
+ log.exception(f"Setting NFS-Ganesha QOS bandwidth control config failed for {pseudo_path} of {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def get_export_qos(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]:
+ try:
+ self._validate_cluster_id(cluster_id)
+ export = self._fetch_export(cluster_id, pseudo_path)
+ if not export:
+ raise NFSObjectNotFound(f"Export {pseudo_path} not found in NFS cluster {cluster_id}")
+ return export.qos_block.to_dict() if export.qos_block else {}
+ except Exception as e:
+ log.exception(f"Failed to get QOS configuration for {pseudo_path} of {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def disable_export_qos_bw(self, cluster_id: str, pseudo_path: str) -> None:
+ try:
+ self._validate_cluster_id(cluster_id)
+ assert pseudo_path
+ self.update_export_qos_bw(cluster_id, pseudo_path, False, QOSBandwidthControl())
+ except Exception as e:
+ log.exception(f"Setting NFS-Ganesha QOS bandwidth control Config failed for {pseudo_path} of {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
def get_user_id(cluster_id: str, fs_name: str, cmount_path: str) -> str:
"""
--- /dev/null
+from typing import Any
+
+from .cluster import NFSCluster
+from .qos_conf import QOSType, QOSParams, QOSBandwidthControl
+
+
+def export_dict_bw_checks(cluster_id: str,
+ mgr_obj: Any,
+ qos_enable: bool,
+ qos_dict: dict) -> None:
+ enable_bw_ctrl = qos_dict.get('enable_bw_control')
+ combined_bw_ctrl = qos_dict.get('combined_rw_bw_control')
+ bandwith_param_exists = any(key.endswith('bw') for key in qos_dict)
+ if enable_bw_ctrl is None:
+ if combined_bw_ctrl and bandwith_param_exists:
+ raise Exception('Bandwidth control is not enabled but associated parameters exists')
+ return
+ if combined_bw_ctrl is None:
+ combined_bw_ctrl = False
+ if not qos_enable and enable_bw_ctrl:
+ raise Exception('To enable bandwidth control, qos_enable should be true.')
+ if not (isinstance(enable_bw_ctrl, bool) and isinstance(combined_bw_ctrl, bool)):
+ raise Exception('Invalid values for the enable_bw_ctrl and combined_bw_ctrl parameters.')
+ # if qos is disabled, then bandwidths should not be set and no need to bandwidth checks
+ if not enable_bw_ctrl:
+ if bandwith_param_exists:
+ raise Exception('Bandwidths should not be passed when enable_bw_control is false.')
+ return
+ if enable_bw_ctrl and not bandwith_param_exists:
+ raise Exception('Bandwidths should be set when enable_bw_control is true.')
+ bw_obj = QOSBandwidthControl(enable_bw_ctrl,
+ combined_bw_ctrl,
+ export_writebw=qos_dict.get(QOSParams.export_writebw.value, '0'),
+ export_readbw=qos_dict.get(QOSParams.export_readbw.value, '0'),
+ client_writebw=qos_dict.get(QOSParams.client_writebw.value, '0'),
+ client_readbw=qos_dict.get(QOSParams.client_readbw.value, '0'),
+ export_rw_bw=qos_dict.get(QOSParams.export_rw_bw.value, '0'),
+ client_rw_bw=qos_dict.get(QOSParams.client_rw_bw.value, '0'))
+ export_qos_bw_checks(cluster_id, mgr_obj, bw_obj)
+
+
+def export_dict_qos_checks(cluster_id: str,
+ mgr_obj: Any,
+ qos_dict: dict) -> None:
+ """Validate the qos block of dict passed to apply_export method"""
+ qos_enable = qos_dict.get('enable_qos')
+ if qos_enable is None:
+ raise Exception('The QOS block requires at least the enable_qos parameter')
+ if not isinstance(qos_enable, bool):
+ raise Exception('Invalid value for the enable_qos parameter')
+ export_dict_bw_checks(cluster_id, mgr_obj, qos_enable, qos_dict)
+
+
+def export_qos_bw_checks(cluster_id: str,
+ mgr_obj: Any,
+ bw_obj: QOSBandwidthControl,
+ nfs_clust_obj: Any = None) -> None:
+ """check cluster level qos is enabled to enable export level qos and validate bandwidths"""
+ if not nfs_clust_obj:
+ nfs_clust_obj = NFSCluster(mgr_obj)
+ clust_qos_obj = nfs_clust_obj.get_cluster_qos_config(cluster_id)
+ if not clust_qos_obj or (clust_qos_obj and not (clust_qos_obj.enable_qos)):
+ raise Exception('To configure bandwidth control for export, you must first enable bandwidth control at the cluster level.')
+ if clust_qos_obj.qos_type:
+ if clust_qos_obj.qos_type == QOSType.PerClient:
+ raise Exception('Export-level QoS bandwidth control cannot be enabled if the QoS type at the cluster level is set to PerClient.')
+ bw_obj.qos_bandwidth_checks(clust_qos_obj.qos_type)
from .exception import NFSInvalidOperation, FSNotFound
from .utils import check_fs
+from .qos_conf import QOS, RawBlock
if TYPE_CHECKING:
from nfs.module import Module
f"XprtSec {xprtsec} invalid, valid types are {valid_xprtsec_types}")
-class RawBlock():
- def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
- if not values: # workaround mutable default argument
- values = {}
- if not blocks: # workaround mutable default argument
- blocks = []
- self.block_name = block_name
- self.blocks = blocks
- self.values = values
-
- def __eq__(self, other: Any) -> bool:
- if not isinstance(other, RawBlock):
- return False
- return self.block_name == other.block_name and \
- self.blocks == other.blocks and \
- self.values == other.values
-
- def __repr__(self) -> str:
- return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
-
-
class GaneshaConfParser:
def __init__(self, raw_config: str):
self.pos = 0
fsal: FSAL,
clients: Optional[List[Client]] = None,
sectype: Optional[List[str]] = None,
- xprtsec: Optional[str] = None) -> None:
+ xprtsec: Optional[str] = None,
+ qos_block: Optional[QOS] = None) -> None:
self.export_id = export_id
self.path = path
self.fsal = fsal
self.clients: List[Client] = clients or []
self.sectype = sectype
self.xprtsec = xprtsec
+ self.qos_block = qos_block
@classmethod
def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export':
client_blocks = [b for b in export_block.blocks
if b.block_name == "CLIENT"]
+ qos_block = [b for b in export_block.blocks
+ if b.block_name == "qos_block"]
+ qos_block = QOS.from_qos_block(qos_block[0]) if qos_block else None
+
protocols = export_block.values.get('protocols')
if not isinstance(protocols, list):
protocols = [protocols]
[Client.from_client_block(client)
for client in client_blocks],
sectype=sectype,
- xprtsec=xprtsec)
+ xprtsec=xprtsec,
+ qos_block=qos_block
+ )
def to_export_block(self) -> RawBlock:
values = {
client.to_client_block()
for client in self.clients
]
+ if self.qos_block:
+ result.blocks.append(self.qos_block.to_qos_block())
return result
@classmethod
def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export':
+ if ex_dict.get('qos_block'):
+ qos_block = QOS.from_dict(ex_dict.get('qos_block', {}))
+ else:
+ qos_block = None
return cls(export_id,
ex_dict.get('path', '/'),
ex_dict['cluster_id'],
FSAL.from_dict(ex_dict.get('fsal', {})),
[Client.from_dict(client) for client in ex_dict.get('clients', [])],
sectype=ex_dict.get("sectype"),
- xprtsec=ex_dict.get('XprtSec'))
+ xprtsec=ex_dict.get('XprtSec'),
+ qos_block=qos_block
+ )
def to_dict(self) -> Dict[str, Any]:
values = {
values['sectype'] = self.sectype
if self.xprtsec:
values['XprtSec'] = self.xprtsec
+ if self.qos_block:
+ values['qos_block'] = self.qos_block.to_dict()
return values
def validate(self, mgr: 'Module') -> None:
from .export import ExportMgr, AppliedExportResults
from .cluster import NFSCluster
from .utils import available_clusters
+from .qos_conf import QOSType, QOSBandwidthControl, UserQoSType
log = logging.getLogger(__name__)
def fetch_nfs_cluster_obj(self) -> NFSCluster:
return self.nfs
+
+ @CLICommand('nfs export qos enable bandwidth_control', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_export_qos_bw_enable(self,
+ cluster_id: str,
+ pseudo_path: str,
+ combined_rw_bw_ctrl: bool = False,
+ max_export_write_bw: str = '0',
+ max_export_read_bw: str = '0',
+ max_client_write_bw: str = '0',
+ max_client_read_bw: str = '0',
+ max_export_combined_bw: str = '0',
+ max_client_combined_bw: str = '0'
+ ) -> None:
+ """enable QOS config for NFS export and set different bandwidth"""
+ try:
+ bw_obj = QOSBandwidthControl(enable_bw_ctrl=True,
+ combined_bw_ctrl=combined_rw_bw_ctrl,
+ export_writebw=max_export_write_bw,
+ export_readbw=max_export_read_bw,
+ client_writebw=max_client_write_bw,
+ client_readbw=max_client_read_bw,
+ export_rw_bw=max_export_combined_bw,
+ client_rw_bw=max_client_combined_bw)
+ except Exception as e:
+ raise object_format.ErrorResponse.wrap(e)
+ return self.export_mgr.enable_export_qos_bw(cluster_id=cluster_id,
+ pseudo_path=pseudo_path,
+ bw_obj=bw_obj)
+
+ @CLICommand('nfs export qos get', perm='r')
+ @object_format.Responder()
+ def _cmd_export_qos_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, int]:
+ """Get NFS export QOS config"""
+ return self.export_mgr.get_export_qos(cluster_id, pseudo_path)
+
+ @CLICommand('nfs export qos disable bandwidth_control', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_export_qos_bw_disable(self, cluster_id: str, pseudo_path: str) -> None:
+ """Disable NFS export QOS config"""
+ return self.export_mgr.disable_export_qos_bw(cluster_id, pseudo_path)
+
+ @CLICommand('nfs cluster qos enable bandwidth_control', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_cluster_qos_bw_enable(self,
+ cluster_id: str,
+ qos_type: UserQoSType,
+ combined_rw_bw_ctrl: bool = False,
+ max_export_write_bw: str = '0',
+ max_export_read_bw: str = '0',
+ max_client_write_bw: str = '0',
+ max_client_read_bw: str = '0',
+ max_export_combined_bw: str = '0',
+ max_client_combined_bw: str = '0') -> None:
+ """Enable QOS ratelimiting for NFS cluster and set default export and client max bandwidth"""
+ try:
+ bw_obj = QOSBandwidthControl(enable_bw_ctrl=True,
+ combined_bw_ctrl=combined_rw_bw_ctrl,
+ export_writebw=max_export_write_bw,
+ export_readbw=max_export_read_bw,
+ client_writebw=max_client_write_bw,
+ client_readbw=max_client_read_bw,
+ export_rw_bw=max_export_combined_bw,
+ client_rw_bw=max_client_combined_bw)
+ except Exception as e:
+ raise object_format.ErrorResponse.wrap(e)
+ return self.nfs.enable_cluster_qos_bw(cluster_id=cluster_id,
+ qos_type=QOSType[qos_type.value],
+ bw_obj=bw_obj)
+
+ @CLICommand('nfs cluster qos disable bandwidth_control', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_cluster_qos_bw_disable(self, cluster_id: str) -> None:
+ """Disable QOS for NFS cluster"""
+ return self.nfs.disable_cluster_qos_bw(cluster_id)
+
+ @CLICommand('nfs cluster qos get', perm='r')
+ @object_format.Responder()
+ def _cmd_cluster_qos_get(self, cluster_id: str) -> Dict[str, Any]:
+ """Get QOS configuration of NFS cluster"""
+ return self.nfs.get_cluster_qos(cluster_id)
--- /dev/null
+from typing import List, Dict, Any, Optional
+from enum import Enum
+
+from ceph.utils import bytes_to_human, with_units_to_int
+
+
+class RawBlock():
+ def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
+ if not values: # workaround mutable default argument
+ values = {}
+ if not blocks: # workaround mutable default argument
+ blocks = []
+ self.block_name = block_name
+ self.blocks = blocks
+ self.values = values
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, RawBlock):
+ return False
+ return self.block_name == other.block_name and \
+ self.blocks == other.blocks and \
+ self.values == other.values
+
+ def __repr__(self) -> str:
+ return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
+
+
+class QOSParams(Enum):
+ clust_block = "QOS_DEFAULT_CONFIG"
+ export_block = "QOS_BLOCK"
+ enable_qos = "enable_qos"
+ enable_bw_ctrl = "enable_bw_control"
+ combined_bw_ctrl = "combined_rw_bw_control"
+ qos_type = "qos_type"
+ export_writebw = "max_export_write_bw"
+ export_readbw = "max_export_read_bw"
+ client_writebw = "max_client_write_bw"
+ client_readbw = "max_client_read_bw"
+ export_rw_bw = "max_export_combined_bw"
+ client_rw_bw = "max_client_combined_bw"
+
+
+class UserQoSType(Enum):
+ per_share = 'PerShare'
+ per_client = 'PerClient'
+ per_share_per_client = 'PerShare_PerClient'
+
+
+class QOSType(Enum):
+ PerShare = 1
+ PerClient = 2
+ PerShare_PerClient = 3
+
+
+def _validate_qos_bw(bandwidth: str) -> int:
+ min_bw = 1000000 # 1MB
+ max_bw = 2000000000 # 2GB
+ bw_bytes = with_units_to_int(bandwidth)
+ if bw_bytes != 0 and (bw_bytes < min_bw or bw_bytes > max_bw):
+ raise Exception(f"Provided bandwidth value is not in range, Please enter a value between {min_bw} (1MB) and {max_bw} (2GB) bytes")
+ return bw_bytes
+
+
+QOS_REQ_PARAMS = {
+ 'combined_bw_disabled': {
+ 'PerShare': ['max_export_write_bw', 'max_export_read_bw'],
+ 'PerClient': ['max_client_write_bw', 'max_client_read_bw'],
+ 'PerShare_PerClient': ['max_export_write_bw', 'max_export_read_bw', 'max_client_write_bw', 'max_client_read_bw']
+ },
+ 'combined_bw_enabled': {
+ 'PerShare': ['max_export_combined_bw'],
+ 'PerClient': ['max_client_combined_bw'],
+ 'PerShare_PerClient': ['max_export_combined_bw', 'max_client_combined_bw'],
+ }
+}
+
+
+class QOSBandwidthControl(object):
+ def __init__(self,
+ enable_bw_ctrl: bool = False,
+ combined_bw_ctrl: bool = False,
+ export_writebw: str = '0',
+ export_readbw: str = '0',
+ client_writebw: str = '0',
+ client_readbw: str = '0',
+ export_rw_bw: str = '0',
+ client_rw_bw: str = '0'
+ ) -> None:
+ self.enable_bw_ctrl = enable_bw_ctrl
+ self.combined_bw_ctrl = combined_bw_ctrl
+ try:
+ self.export_writebw: int = _validate_qos_bw(export_writebw)
+ self.export_readbw: int = _validate_qos_bw(export_readbw)
+ self.client_writebw: int = _validate_qos_bw(client_writebw)
+ self.client_readbw: int = _validate_qos_bw(client_readbw)
+ self.export_rw_bw: int = _validate_qos_bw(export_rw_bw)
+ self.client_rw_bw: int = _validate_qos_bw(client_rw_bw)
+ except Exception as e:
+ raise Exception(f"Invalid bandwidth value. {e}")
+
+ @classmethod
+ def from_dict(cls, qos_dict: Dict[str, Any]) -> 'QOSBandwidthControl':
+ # json has bandwidths in human readable format(str)
+ bw_kwargs = {
+ 'enable_bw_ctrl': qos_dict.get(QOSParams.enable_bw_ctrl.value, False),
+ 'combined_bw_ctrl': qos_dict.get(QOSParams.combined_bw_ctrl.value, False),
+ 'export_writebw': qos_dict.get(QOSParams.export_writebw.value, '0'),
+ 'export_readbw': qos_dict.get(QOSParams.export_readbw.value, '0'),
+ 'client_writebw': qos_dict.get(QOSParams.client_writebw.value, '0'),
+ 'client_readbw': qos_dict.get(QOSParams.client_readbw.value, '0'),
+ 'export_rw_bw': qos_dict.get(QOSParams.export_rw_bw.value, '0'),
+ 'client_rw_bw': qos_dict.get(QOSParams.client_rw_bw.value, '0')
+ }
+ return cls(**bw_kwargs)
+
+ @classmethod
+ def from_qos_block(cls, qos_block: RawBlock) -> 'QOSBandwidthControl':
+ # block has bandwidths in bytes(int)
+ bw_kwargs = {
+ 'enable_bw_ctrl': qos_block.values.get(QOSParams.enable_bw_ctrl.value, False),
+ 'combined_bw_ctrl': qos_block.values.get(QOSParams.combined_bw_ctrl.value, False),
+ 'export_writebw': str(qos_block.values.get(QOSParams.export_writebw.value, 0)),
+ 'export_readbw': str(qos_block.values.get(QOSParams.export_readbw.value, 0)),
+ 'client_writebw': str(qos_block.values.get(QOSParams.client_writebw.value, 0)),
+ 'client_readbw': str(qos_block.values.get(QOSParams.client_readbw.value, 0)),
+ 'export_rw_bw': str(qos_block.values.get(QOSParams.export_rw_bw.value, 0)),
+ 'client_rw_bw': str(qos_block.values.get(QOSParams.client_rw_bw.value, 0))
+ }
+ return cls(**bw_kwargs)
+
+ def to_qos_block(self) -> RawBlock:
+ result = RawBlock('qos_bandwidths_control')
+ result.values[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl
+ result.values[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl
+ if self.export_writebw:
+ result.values[QOSParams.export_writebw.value] = self.export_writebw
+ if self.export_readbw:
+ result.values[QOSParams.export_readbw.value] = self.export_readbw
+ if self.client_writebw:
+ result.values[QOSParams.client_writebw.value] = self.client_writebw
+ if self.client_readbw:
+ result.values[QOSParams.client_readbw.value] = self.client_readbw
+ if self.export_rw_bw:
+ result.values[QOSParams.export_rw_bw.value] = self.export_rw_bw
+ if self.client_rw_bw:
+ result.values[QOSParams.client_rw_bw.value] = self.client_rw_bw
+ return result
+
+ def to_dict(self) -> Dict[str, Any]:
+ r: dict[str, Any] = {}
+ r[QOSParams.enable_bw_ctrl.value] = self.enable_bw_ctrl
+ r[QOSParams.combined_bw_ctrl.value] = self.combined_bw_ctrl
+ if self.export_writebw:
+ r[QOSParams.export_writebw.value] = bytes_to_human(self.export_writebw)
+ if self.export_readbw:
+ r[QOSParams.export_readbw.value] = bytes_to_human(self.export_readbw)
+ if self.client_writebw:
+ r[QOSParams.client_writebw.value] = bytes_to_human(self.client_writebw)
+ if self.client_readbw:
+ r[QOSParams.client_readbw.value] = bytes_to_human(self.client_readbw)
+ if self.export_rw_bw:
+ r[QOSParams.export_rw_bw.value] = bytes_to_human(self.export_rw_bw)
+ if self.client_rw_bw:
+ r[QOSParams.client_rw_bw.value] = bytes_to_human(self.client_rw_bw)
+ return r
+
+ def qos_bandwidth_checks(self, qos_type: QOSType) -> None:
+ """Checks for enabling qos"""
+ params = {}
+ d = vars(self)
+ for key in d:
+ if key.endswith('bw'):
+ params[QOSParams[key].value] = d[key]
+ if not self.combined_bw_ctrl:
+ req_params = QOS_REQ_PARAMS['combined_bw_disabled'][qos_type.name]
+ else:
+ req_params = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name]
+ allowed_params = []
+ not_allowed_params = []
+ for key in params:
+ if key in req_params and params[key] == 0:
+ allowed_params.append(key)
+ elif key not in req_params and params[key] != 0:
+ not_allowed_params.append(key)
+ if allowed_params or not_allowed_params:
+ raise Exception(f"When combined_rw_bw is {'enabled' if self.combined_bw_ctrl else 'disabled'} "
+ f"and qos_type is {qos_type.name}, "
+ f"{'attributes ' + ', '.join(allowed_params) + ' required' if allowed_params else ''} "
+ f"{'attributes ' + ', '.join(not_allowed_params) + ' are not allowed' if not_allowed_params else ''}.")
+
+
+class QOS(object):
+ def __init__(self,
+ cluster_op: bool = False,
+ enable_qos: bool = False,
+ qos_type: Optional[QOSType] = None,
+ bw_obj: Optional[QOSBandwidthControl] = None
+ ) -> None:
+ self.cluster_op = cluster_op
+ self.enable_qos = enable_qos
+ self.qos_type = qos_type
+ self.bw_obj = bw_obj
+
+ @classmethod
+ def from_dict(cls, qos_dict: Dict[str, Any], cluster_op: bool = False) -> 'QOS':
+ kwargs: dict[str, Any] = {}
+ # qos dict will have qos type as enum name
+ if cluster_op:
+ qos_type = qos_dict.get(QOSParams.qos_type.value)
+ if qos_type:
+ kwargs['qos_type'] = QOSType[qos_type]
+ kwargs['enable_qos'] = qos_dict.get(QOSParams.enable_qos.value)
+ kwargs['bw_obj'] = QOSBandwidthControl.from_dict(qos_dict)
+ return cls(cluster_op, **kwargs)
+
+ @classmethod
+ def from_qos_block(cls, qos_block: RawBlock, cluster_op: bool = False) -> 'QOS':
+ kwargs: dict[str, Any] = {}
+ # qos block will have qos type as enum value
+ if cluster_op:
+ qos_type = qos_block.values.get(QOSParams.qos_type.value)
+ if qos_type:
+ kwargs['qos_type'] = QOSType(qos_type)
+ kwargs['enable_qos'] = qos_block.values.get(QOSParams.enable_qos.value)
+ kwargs['bw_obj'] = QOSBandwidthControl.from_qos_block(qos_block)
+ return cls(cluster_op, **kwargs)
+
+ def to_qos_block(self) -> RawBlock:
+ if self.cluster_op:
+ result = RawBlock(QOSParams.clust_block.value)
+ else:
+ result = RawBlock(QOSParams.export_block.value)
+ result.values[QOSParams.enable_qos.value] = self.enable_qos
+ if self.cluster_op and self.qos_type:
+ result.values[QOSParams.qos_type.value] = self.qos_type.value
+ if self.bw_obj and (res := self.bw_obj.to_qos_block()):
+ result.values.update(res.values)
+ return result
+
+ def to_dict(self) -> Dict[str, Any]:
+ r: Dict[str, Any] = {}
+ r[QOSParams.enable_qos.value] = self.enable_qos
+ if self.cluster_op and self.qos_type:
+ r[QOSParams.qos_type.value] = self.qos_type.name
+ if self.bw_obj and (res := self.bw_obj.to_dict()):
+ r.update(res)
+ return r
--- /dev/null
+import logging
+from typing import Optional, Any
+
+from rados import TimedOut, ObjectNotFound, Rados
+from mgr_module import NFS_POOL_NAME as POOL_NAME
+from .ganesha_conf import RawBlock, format_block
+from .utils import USER_CONF_PREFIX
+
+log = logging.getLogger(__name__)
+
+
+def _check_rados_notify(ioctx: Any, obj: str) -> None:
+ try:
+ ioctx.notify(obj)
+ except TimedOut:
+ log.exception("Ganesha timed out")
+
+
+class NFSRados:
+ def __init__(self, rados: 'Rados', namespace: str) -> None:
+ self.rados = rados
+ self.pool = POOL_NAME
+ self.namespace = namespace
+
+ def _make_rados_url(self, obj: str) -> str:
+ return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
+
+ def _create_url_block(self, obj_name: str) -> RawBlock:
+ return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
+
+ def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ ioctx.write_full(obj, conf_block.encode('utf-8'))
+ if not config_obj:
+ # Return after creating empty common config object
+ return
+ log.debug("write configuration into rados object %s/%s/%s",
+ self.pool, self.namespace, obj)
+
+ # Add created obj url to common config obj
+ ioctx.append(config_obj, format_block(
+ self._create_url_block(obj)).encode('utf-8'))
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Added %s url to %s", obj, config_obj)
+
+ def read_obj(self, obj: str) -> Optional[str]:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ try:
+ return ioctx.read(obj, 1048576).decode()
+ except ObjectNotFound:
+ return None
+
+ def update_obj(self, conf_block: str, obj: str, config_obj: str,
+ should_notify: Optional[bool] = True) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ ioctx.write_full(obj, conf_block.encode('utf-8'))
+ log.debug("write configuration into rados object %s/%s/%s",
+ self.pool, self.namespace, obj)
+ if should_notify:
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Update export %s in %s", obj, config_obj)
+
+ def remove_obj(self, obj: str, config_obj: str) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ export_urls = ioctx.read(config_obj)
+ url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
+ export_urls = export_urls.replace(url.encode('utf-8'), b'')
+ ioctx.remove_object(obj)
+ ioctx.write_full(config_obj, export_urls)
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Object deleted: %s", url)
+
+ def remove_all_obj(self) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ for obj in ioctx.list_objects():
+ obj.remove()
+
+ def check_config(self, config: str = USER_CONF_PREFIX) -> bool:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(config):
+ return True
+ return False
from rados import ObjectNotFound
from ceph.deployment.service_spec import NFSServiceSpec
+from ceph.utils import with_units_to_int, bytes_to_human
from nfs import Module
from nfs.export import ExportMgr, normalize_path
-from nfs.ganesha_conf import GaneshaConfParser, Export, RawBlock
+from nfs.ganesha_conf import GaneshaConfParser, Export
+from nfs.qos_conf import RawBlock, QOS, QOSType, QOSParams, QOS_REQ_PARAMS, QOSBandwidthControl
from nfs.cluster import NFSCluster
from orchestrator import ServiceDescription, DaemonDescription, OrchResult
%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"'''
+ qos_cluster_block = """
+QOS {
+ enable_qos = true;
+ enable_bw_control = true;
+ combined_rw_bw_control = false;
+ qos_type = 3;
+ max_export_write_bw = 1000000;
+ max_export_read_bw = 2000000;
+ max_client_write_bw = 3000000;
+ max_client_read_bw = 4000000;
+ max_export_combined_bw = 0;
+ max_client_combined_bw = 0;
+}
+"""
+
+ qos_export_block = """
+QOS_BLOCK {
+ enable_qos = true;
+ enable_bw_control = true;
+ combined_rw_bw_control = false;
+ max_export_write_bw = 1000000;
+ max_export_read_bw = 2000000;
+ max_client_write_bw = 3000000;
+ max_client_read_bw = 4000000;
+ max_export_combined_bw = 0;
+ max_client_combined_bw = 0;
+
+}
+"""
+
+ qos_cluster_dict = {
+ "enable_bw_control": True,
+ "enable_qos": True,
+ "combined_rw_bw_control": False,
+ "max_client_read_bw": "4.0MB",
+ "max_client_write_bw": "3.0MB",
+ "max_export_read_bw": "2.0MB",
+ "max_export_write_bw": "1.0MB",
+ "qos_type": "PerShare_PerClient"
+ }
+
+ qos_export_dict = {
+ "enable_bw_control": True,
+ "enable_qos": True,
+ "combined_rw_bw_control": False,
+ "max_client_read_bw": "4.0MB",
+ "max_client_write_bw": "3.0MB",
+ "max_export_read_bw": "2.0MB",
+ "max_export_write_bw": "1.0MB"
+ }
+
class RObject(object):
def __init__(self, key: str, raw: str) -> None:
self.key = key
assert export.clients[0].access_type is None
assert export.cluster_id == self.cluster_id
- # again, but without export_id
+ # again, but without export_id and qos_block
+ cluster = NFSCluster(nfs_mod)
+ bw_obj = QOSBandwidthControl(True, export_writebw='100MB', export_readbw='200MB')
+ cluster.enable_cluster_qos_bw(self.cluster_id, QOSType['PerShare'], bw_obj)
+
r = conf.apply_export(self.cluster_id, json.dumps({
'path': 'newestbucket',
'pseudo': '/rgw/bucket',
'user_id': 'nfs.foo.newestbucket',
'access_key_id': 'the_access_key',
'secret_access_key': 'the_secret_key',
+ },
+ 'qos_block': {
+ 'combined_rw_bw_control': False,
+ 'enable_bw_control': True,
+ 'enable_qos': True,
+ 'max_export_read_bw': '3000000',
+ 'max_export_write_bw': '2000000'
}
}))
assert len(r.changes) == 1
assert export.clients[0].squash is None
assert export.clients[0].access_type is None
assert export.cluster_id == self.cluster_id
+ assert export.qos_block.enable_qos == True
+ assert export.qos_block.bw_obj.enable_bw_ctrl == True
+ assert export.qos_block.bw_obj.combined_bw_ctrl == False
+ assert export.qos_block.bw_obj.export_writebw == 2000000
+ assert export.qos_block.bw_obj.export_readbw == 3000000
def test_update_export_sectype(self):
self._do_mock_test(self._test_update_export_sectype)
def test_cluster_config(self):
self._do_mock_test(self._do_test_cluster_config)
+ def test_qos_from_dict(self):
+ qos = QOS.from_dict(self.qos_cluster_dict, True)
+ assert qos.enable_qos == True
+ assert qos.bw_obj.enable_bw_ctrl == True
+ assert isinstance(qos.qos_type, QOSType)
+ assert qos.bw_obj.export_writebw == 1000000
+ assert qos.bw_obj.export_readbw == 2000000
+ assert qos.bw_obj.client_writebw == 3000000
+ assert qos.bw_obj.client_readbw == 4000000
+
+ qos = QOS.from_dict(self.qos_export_dict)
+ assert qos.enable_qos == True
+ assert qos.bw_obj.enable_bw_ctrl == True
+ assert qos.qos_type is None
+ assert qos.bw_obj.export_writebw == 1000000
+ assert qos.bw_obj.export_readbw == 2000000
+ assert qos.bw_obj.client_writebw == 3000000
+ assert qos.bw_obj.client_readbw == 4000000
+
+ @pytest.mark.parametrize("qos_block, qos_dict", [
+ (qos_cluster_block, qos_cluster_dict),
+ (qos_export_block, qos_export_dict)
+ ])
+ def test_qos_from_block(self, qos_block, qos_dict):
+ blocks = GaneshaConfParser(qos_block).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 1
+ qos = QOS.from_qos_block(blocks[0], True)
+ assert qos.to_dict() == qos_dict
+
+ def _do_test_cluster_qos(self, qos_type, combined_bw_ctrl, params, positive_tc):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+ try:
+ bw_obj = QOSBandwidthControl(True, combined_bw_ctrl, **params)
+ cluster.enable_cluster_qos_bw(self.cluster_id, qos_type, bw_obj)
+ except Exception:
+ if not positive_tc:
+ return
+ out = cluster.get_cluster_qos(self.cluster_id)
+ expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": combined_bw_ctrl, "qos_type": qos_type.name}
+ for key in params:
+ expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(params[key]))
+ assert out == expected_out
+ cluster.disable_cluster_qos_bw(self.cluster_id)
+ out = cluster.get_cluster_qos(self.cluster_id)
+ assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False}
+
+ @pytest.mark.parametrize("qos_type, combined_bw_ctrl, params, positive_tc", [
+ (QOSType['PerShare'], False, {'export_writebw': '100MB', 'export_readbw': '200MB'}, True),
+ (QOSType['PerClient'], False, {'client_writebw': '300MB', 'client_readbw': '400MB'}, True),
+ (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}, True),
+ (QOSType['PerShare'], True, {'export_rw_bw': '100MB'}, True),
+ (QOSType['PerClient'], True, {'client_rw_bw': '200MB'}, True),
+ (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'}, True),
+ # negative testing
+ (QOSType['PerShare'], False, {'export_writebw': '100MB', 'client_readbw': '200MB'}, False),
+ (QOSType['PerShare'], False, {'export_writebw': '100MB'}, False),
+ (QOSType['PerClient'], False, {'client_writebw': '300MB'}, False),
+ (QOSType['PerClient'], False, {'client_writebw': '300MB', 'export_readbw': '400MB'}, False),
+ (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB'}, False),
+ (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB'}, False),
+ (QOSType['PerShare'], True, {'client_rw_bw': '100MB'}, False),
+ (QOSType['PerShare'], True, {}, False),
+ (QOSType['PerClient'], True, {'client_rw_bw': '200MB', 'export_rw_bw': '100MB'}, False),
+ (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB'}, False)
+ ])
+ def test_cluster_qos(self, qos_type, combined_bw_ctrl, params, positive_tc):
+ self._do_mock_test(self._do_test_cluster_qos, qos_type, combined_bw_ctrl, params, positive_tc)
+
+ def _do_test_export_qos(self, qos_type, clust_combined_bw_ctrl, clust_params, export_combined_bw_ctrl, export_params):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+ export_mgr = ExportMgr(nfs_mod)
+ # try enabling export level qos before enabling cluster level qos
+ try:
+ bw_obj = QOSBandwidthControl(True, export_combined_bw_ctrl, **export_params)
+ export_mgr.enable_export_qos_bw(self.cluster_id, '/cephfs_a/', bw_obj)
+ except Exception as e:
+ assert str(e) == 'To configure bandwidth control for export, you must first enable bandwidth control at the cluster level.'
+ bw_obj = QOSBandwidthControl(True, clust_combined_bw_ctrl, **clust_params)
+ cluster.enable_cluster_qos_bw(self.cluster_id, qos_type, bw_obj)
+
+ # set export qos
+ try:
+ bw_obj = QOSBandwidthControl(True, export_combined_bw_ctrl, **export_params)
+ export_mgr.enable_export_qos_bw(self.cluster_id, '/cephfs_a/', bw_obj)
+ except Exception:
+ if export_combined_bw_ctrl:
+ req = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name]
+ else:
+ req = QOS_REQ_PARAMS['combined_bw_enabled'][qos_type.name]
+ if sorted(export_params.keys()) != sorted(req):
+ return
+ if qos_type.name == 'PerClient':
+ return
+ out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/')
+ expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": export_combined_bw_ctrl}
+ for key in export_params:
+ expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(export_params[key]))
+ assert out == expected_out
+ export_mgr.disable_export_qos_bw(self.cluster_id, '/cephfs_a/')
+ out = export_mgr.get_export_qos(self.cluster_id, '/cephfs_a/')
+ assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False}
+
+
+ @pytest.mark.parametrize("qos_type, clust_combined_bw_ctrl, clust_params", [
+ (QOSType['PerShare'], False, {'export_writebw': '100MB', 'export_readbw': '200MB'}),
+ (QOSType['PerClient'], False, {'client_writebw': '300MB', 'client_readbw': '400MB'}),
+ (QOSType['PerShare_PerClient'], False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}),
+ (QOSType['PerShare'], True, {'export_rw_bw': '100MB'}),
+ (QOSType['PerClient'], True, {'client_rw_bw': '200MB'}),
+ (QOSType['PerShare_PerClient'], True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'})
+ ])
+ @pytest.mark.parametrize("export_combined_bw_ctrl, export_params", [
+ (False, {'export_writebw': '100MB', 'export_readbw': '200MB'}),
+ (False, {'client_writebw': '300MB', 'client_readbw': '400MB'}),
+ (False, {'export_writebw': '100MB', 'export_readbw': '200MB', 'client_writebw': '300MB', 'client_readbw': '400MB'}),
+ (True, {'export_rw_bw': '100MB'}),
+ (True, {'client_rw_bw': '200MB'}),
+ (True, {'export_rw_bw': '100MB', 'client_rw_bw': '200MB'})
+ ])
+ def test_export_qos(self, qos_type, clust_combined_bw_ctrl, clust_params,
+ export_combined_bw_ctrl, export_params):
+ self._do_mock_test(self._do_test_export_qos, qos_type, clust_combined_bw_ctrl,
+ clust_params, export_combined_bw_ctrl, export_params)
@pytest.mark.parametrize(
"path,expected",
EXPORT_PREFIX: str = "export-"
CONF_PREFIX: str = "conf-nfs."
USER_CONF_PREFIX: str = "userconf-nfs."
+QOS_CONF_PREFIX: str = "qosconf-nfs."
log = logging.getLogger(__name__)
def user_conf_obj_name(cluster_id: str) -> str:
- """Returna a rados object name for the user config."""
+ """Return a rados object name for the user config."""
return f"{USER_CONF_PREFIX}{cluster_id}"
+def qos_conf_obj_name(cluster_id: str) -> str:
+ """Return a rados object name for the qos config."""
+ return f"{QOS_CONF_PREFIX}{cluster_id}"
+
+
def available_clusters(mgr: 'Module') -> List[str]:
'''
This method returns list of available cluster ids.
}[unit]
return num * mult
raise ValueError(f'invalid size type {type(v)} (expected int or str)')
+
+
+def bytes_to_human(num: float, mode: str = 'decimal') -> str:
+ """Convert a bytes value into it's human-readable form.
+
+ :param num: number, in bytes, to convert
+ :param mode: Either decimal (default) or binary to determine divisor
+ :returns: string representing the bytes value in a more readable format
+ """
+ unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
+ divisor = 1000.0
+ yotta = 'YB'
+
+ if mode == 'binary':
+ unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
+ divisor = 1024.0
+ yotta = 'YiB'
+
+ for unit in unit_list:
+ if abs(num) < divisor:
+ return '%3.1f%s' % (num, unit)
+ num /= divisor
+ return '%.1f%s' % (num, yotta)
+
+
+def with_units_to_int(v: str) -> int:
+ if not v:
+ return 0
+ if v.endswith('iB'):
+ v = v[:-2]
+ bytes_mult = 1024
+ elif v.endswith('B'):
+ v = v[:-1]
+ bytes_mult = 1000
+ mult = 1
+ if v[-1].upper() == 'K':
+ mult = bytes_mult
+ v = v[:-1]
+ elif v[-1].upper() == 'M':
+ mult = bytes_mult * bytes_mult
+ v = v[:-1]
+ elif v[-1].upper() == 'G':
+ mult = bytes_mult * bytes_mult * bytes_mult
+ v = v[:-1]
+ elif v[-1].upper() == 'T':
+ mult = bytes_mult * bytes_mult * bytes_mult * bytes_mult
+ v = v[:-1]
+ return int(float(v) * mult)