Added below CEPH_NODES_LIST block in ganesha.conf and enable_cluster_qos in cluster level QoS block
CEPH_NODES_LIST {
Ceph_Nodes = 192.168.100.100, 192.168.100.101, 192.168.100.102;
}
Fixes: https://tracker.ceph.com/issues/69861
Signed-off-by: Shweta Bhosale <Shweta.Bhosale1@ibm.com>
mgr/cephadm: Changes to add NFS cluster qos inter node communication port in spec
mgr/nfs: Addressed review comments for cluster level qos support
mgr/nfs: add enable_cluster_qos = true while enabling qos
from orchestrator import DaemonDescription, OrchestratorError
from cephadm import utils
from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonDeploySpec, CephService
+from cephadm.schedule import get_placement_hosts
if TYPE_CHECKING:
from ..module import CephadmOrchestrator
if monitoring_ip:
daemon_spec.port_ips.update({str(monitoring_port): monitoring_ip})
+ ceph_nodes = []
+ hosts = get_placement_hosts(spec, self.mgr.cache.get_schedulable_hosts(), self.mgr.cache.get_draining_hosts())
+ for host in hosts:
+ host_ip = self.mgr.inventory.get_addr(host.hostname)
+ ceph_nodes.append(host_ip)
+
# generate the ganesha config
rdma_port = None
if spec.enable_rdma and daemon_spec.ports and len(daemon_spec.ports) > 2:
"port": port,
"monitoring_addr": monitoring_ip,
"monitoring_port": monitoring_port,
+ "cqos_port": spec.cluster_qos_port,
"bind_addr": bind_addr,
"haproxy_hosts": [],
"nfs_idmap_conf": nfs_idmap_conf,
"tls_min_version": spec.tls_min_version,
"tls_ktls": spec.tls_ktls,
"tls_debug": spec.tls_debug,
+ "ceph_nodes": ceph_nodes
}
if spec.enable_haproxy_protocol:
context["haproxy_hosts"] = self._haproxy_hosts()
{% if enable_rdma and rdma_port %}
NFS_RDMA_Port = {{ rdma_port }};
{% endif %}
+{% if cqos_port %}
+ Cqos_Port = {{ cqos_port }};
+{% endif %}
}
NFSv4 {
namespace = "{{ namespace }}";
}
+CEPH_NODES_LIST {
+ Ceph_Nodes = {{ ceph_nodes|join(", ") }};
+}
+
RADOS_URLS {
UserId = "{{ user }}";
watch_url = "{{ url }}";
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
def fake_resolve_ip(hostname: str) -> str:
- if hostname in ('host1', "192.168.122.111"):
+ if hostname in ('host1', '192.168.122.111'):
return '192.168.122.111'
elif hostname in ('host2', '192.168.122.222'):
return '192.168.122.222'
' namespace = "foo";\n'
'}\n'
'\n'
+ 'CEPH_NODES_LIST {\n'
+ ' Ceph_Nodes = 192.168.122.111, 192.168.122.222;\n'
+ '}\n'
+ '\n'
'RADOS_URLS {\n'
' UserId = "nfs.foo.test.0.0";\n'
' watch_url = '
[],
{},
{0: {0: None}, 1: {0: None}, 2: {0: None}},
- ['nfs:host3(rank=0.0 *:2049,9587)', 'nfs:host2(rank=1.0 *:2049,9587)', 'nfs:host1(rank=2.0 *:2049,9587)'],
- ['nfs:host3(rank=0.0 *:2049,9587)', 'nfs:host2(rank=1.0 *:2049,9587)', 'nfs:host1(rank=2.0 *:2049,9587)'],
+ ['nfs:host3(rank=0.0 *:2049,9587,31311)', 'nfs:host2(rank=1.0 *:2049,9587,31311)', 'nfs:host1(rank=2.0 *:2049,9587,31311)'],
+ ['nfs:host3(rank=0.0 *:2049,9587,31311)', 'nfs:host2(rank=1.0 *:2049,9587,31311)', 'nfs:host1(rank=2.0 *:2049,9587,31311)'],
[]
),
# 21: ranked, exist
],
{0: {1: '0.1'}},
{0: {1: '0.1'}, 1: {0: None}, 2: {0: None}},
- ['nfs:host1(rank=0.1 *:2049,9587)', 'nfs:host3(rank=1.0 *:2049,9587)', 'nfs:host2(rank=2.0 *:2049,9587)'],
- ['nfs:host3(rank=1.0 *:2049,9587)', 'nfs:host2(rank=2.0 *:2049,9587)'],
+ ['nfs:host1(rank=0.1 *:2049,9587,31311)', 'nfs:host3(rank=1.0 *:2049,9587,31311)', 'nfs:host2(rank=2.0 *:2049,9587,31311)'],
+ ['nfs:host3(rank=1.0 *:2049,9587,31311)', 'nfs:host2(rank=2.0 *:2049,9587,31311)'],
[]
),
# ranked, exist, different ranks
],
{0: {1: '0.1'}, 1: {1: '1.1'}},
{0: {1: '0.1'}, 1: {1: '1.1'}, 2: {0: None}},
- ['nfs:host1(rank=0.1 *:2049,9587)', 'nfs:host2(rank=1.1 *:2049,9587)', 'nfs:host3(rank=2.0 *:2049,9587)'],
- ['nfs:host3(rank=2.0 *:2049,9587)'],
+ ['nfs:host1(rank=0.1 *:2049,9587,31311)', 'nfs:host2(rank=1.1 *:2049,9587,31311)', 'nfs:host3(rank=2.0 *:2049,9587,31311)'],
+ ['nfs:host3(rank=2.0 *:2049,9587,31311)'],
[]
),
# ranked, exist, different ranks (2)
],
{0: {1: '0.1'}, 1: {1: '1.1'}},
{0: {1: '0.1'}, 1: {1: '1.1'}, 2: {0: None}},
- ['nfs:host1(rank=0.1 *:2049,9587)', 'nfs:host3(rank=1.1 *:2049,9587)', 'nfs:host2(rank=2.0 *:2049,9587)'],
- ['nfs:host2(rank=2.0 *:2049,9587)'],
+ ['nfs:host1(rank=0.1 *:2049,9587,31311)', 'nfs:host3(rank=1.1 *:2049,9587,31311)', 'nfs:host2(rank=2.0 *:2049,9587,31311)'],
+ ['nfs:host2(rank=2.0 *:2049,9587,31311)'],
[]
),
# ranked, exist, extra ranks
],
{0: {5: '0.5'}, 1: {5: '1.5'}},
{0: {5: '0.5'}, 1: {5: '1.5'}, 2: {0: None}},
- ['nfs:host1(rank=0.5 *:2049,9587)', 'nfs:host2(rank=1.5 *:2049,9587)', 'nfs:host3(rank=2.0 *:2049,9587)'],
- ['nfs:host3(rank=2.0 *:2049,9587)'],
+ ['nfs:host1(rank=0.5 *:2049,9587,31311)', 'nfs:host2(rank=1.5 *:2049,9587,31311)', 'nfs:host3(rank=2.0 *:2049,9587,31311)'],
+ ['nfs:host3(rank=2.0 *:2049,9587,31311)'],
['nfs.4.5']
),
# 25: ranked, exist, extra ranks (scale down: kill off high rank)
],
{0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}},
{0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}},
- ['nfs:host1(rank=0.5 *:2049,9587)', 'nfs:host2(rank=1.5 *:2049,9587)'],
+ ['nfs:host1(rank=0.5 *:2049,9587,31311)', 'nfs:host2(rank=1.5 *:2049,9587,31311)'],
[],
['nfs.2.5']
),
],
{0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}},
{0: {5: '0.5'}, 1: {5: '1.5', 6: None}, 2: {5: '2.5'}},
- ['nfs:host1(rank=0.5 *:2049,9587)', 'nfs:host3(rank=1.6 *:2049,9587)'],
- ['nfs:host3(rank=1.6 *:2049,9587)'],
+ ['nfs:host1(rank=0.5 *:2049,9587,31311)', 'nfs:host3(rank=1.6 *:2049,9587,31311)'],
+ ['nfs:host3(rank=1.6 *:2049,9587,31311)'],
['nfs.2.5', 'nfs.1.5']
),
# ranked, exist, duplicate rank
],
{0: {0: '0.0'}, 1: {2: '1.2'}},
{0: {0: '0.0'}, 1: {2: '1.2'}, 2: {0: None}},
- ['nfs:host1(rank=0.0 *:2049,9587)', 'nfs:host3(rank=1.2 *:2049,9587)', 'nfs:host2(rank=2.0 *:2049,9587)'],
- ['nfs:host2(rank=2.0 *:2049,9587)'],
+ ['nfs:host1(rank=0.0 *:2049,9587,31311)', 'nfs:host3(rank=1.2 *:2049,9587,31311)', 'nfs:host2(rank=2.0 *:2049,9587,31311)'],
+ ['nfs:host2(rank=2.0 *:2049,9587,31311)'],
['nfs.1.1']
),
# 28: ranked, all gens stale (failure during update cycle)
],
{0: {2: '0.2'}, 1: {2: '1.2', 3: '1.3'}},
{0: {2: '0.2'}, 1: {2: '1.2', 3: '1.3', 4: None}},
- ['nfs:host1(rank=0.2 *:2049,9587)', 'nfs:host3(rank=1.4 *:2049,9587)'],
- ['nfs:host3(rank=1.4 *:2049,9587)'],
+ ['nfs:host1(rank=0.2 *:2049,9587,31311)', 'nfs:host3(rank=1.4 *:2049,9587,31311)'],
+ ['nfs:host3(rank=1.4 *:2049,9587,31311)'],
['nfs.1.2']
),
# ranked, not enough hosts (with colocation, 4th daemon can be placed)
],
{0: {2: '0.2'}, 1: {2: '1.2'}},
{0: {2: '0.2'}, 1: {2: '1.2'}, 2: {0: None}, 3: {0: None}},
- ['nfs:host1(rank=0.2 *:2049,9587)', 'nfs:host2(rank=1.2 *:2049,9587)', 'nfs:host3(rank=2.0 *:2049,9587)', 'nfs:host3(rank=3.0 *:2050,9588)'],
- ['nfs:host3(rank=2.0 *:2049,9587)', 'nfs:host3(rank=3.0 *:2050,9588)'],
+ ['nfs:host1(rank=0.2 *:2049,9587,31311)', 'nfs:host2(rank=1.2 *:2049,9587,31311)', 'nfs:host3(rank=2.0 *:2049,9587,31311)', 'nfs:host3(rank=3.0 *:2050,9588,31311)'],
+ ['nfs:host3(rank=2.0 *:2049,9587,31311)', 'nfs:host3(rank=3.0 *:2050,9588,31311)'],
[]
),
# ranked, scale down
],
{0: {2: '0.2'}, 1: {2: '1.2'}, 2: {2: '2.2'}},
{0: {2: '0.2', 3: None}, 1: {2: '1.2'}, 2: {2: '2.2'}},
- ['nfs:host2(rank=0.3 *:2049,9587)'],
- ['nfs:host2(rank=0.3 *:2049,9587)'],
+ ['nfs:host2(rank=0.3 *:2049,9587,31311)'],
+ ['nfs:host2(rank=0.3 *:2049,9587,31311)'],
['nfs.0.2', 'nfs.1.2', 'nfs.2.2']
),
# NFS colocation - count > hosts, ports should increment
import re
import socket
from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING
+from enum import Enum
from mgr_module import NFS_POOL_NAME as POOL_NAME
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
QOSType,
QOSBandwidthControl,
QOSOpsControl,
- QOSParams)
+ QOSParams,
+ validate_clust_qos_msg_interval)
if TYPE_CHECKING:
from nfs.module import Module
log = logging.getLogger(__name__)
+class ClusterQosAction(Enum):
+ enable = 'enable'
+ disable = 'disable'
+
+
def resolve_ip(hostname: str) -> str:
try:
r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME,
if not qos_type:
raise NFSInvalidOperation('qos_type is not specified in qos dict')
qos_type = QOSType[str(qos_type)]
+ enable_cluster_qos = qos_dict.get(QOSParams.enable_cluster_qos.value, True)
+ clust_qos_msg_interval = int(qos_dict.get(QOSParams.clust_qos_msg_interval.value, 0))
+ assert isinstance(enable_cluster_qos, (bool, type(None)))
enable_bw_ctrl = qos_dict.get(QOSParams.enable_bw_ctrl.value)
combined_bw_ctrl = qos_dict.get(QOSParams.combined_bw_ctrl.value)
enable_iops_ctrl = qos_dict.get(QOSParams.enable_iops_ctrl.value)
cluster_id=cluster_id,
qos_obj=None,
enable_qos=True,
+ enable_cluster_qos=enable_cluster_qos,
+ clust_qos_msg_interval=clust_qos_msg_interval,
qos_type=qos_type,
bw_obj=bw_obj,
ops_obj=ops_obj,
cluster_id: str,
qos_obj: Optional[QOS],
enable_qos: bool,
+ enable_cluster_qos: Optional[bool] = None,
+ clust_qos_msg_interval: int = 0,
qos_type: Optional[QOSType] = None,
bw_obj: Optional[QOSBandwidthControl] = None,
ops_obj: Optional[QOSOpsControl] = None,
qos_obj_exists = False
if not qos_obj:
log.debug(f"Creating new QoS block for cluster {cluster_id}")
- qos_obj = QOS(True, enable_qos, qos_type, bw_obj, ops_obj)
+ qos_obj = QOS(True, enable_qos, enable_cluster_qos, clust_qos_msg_interval, qos_type, bw_obj, ops_obj)
else:
log.debug(f"Updating existing QoS block for cluster {cluster_id}")
qos_obj_exists = True
qos_obj.enable_qos = enable_qos
+ qos_obj.enable_cluster_qos = enable_cluster_qos
+ qos_obj.clust_qos_msg_interval = validate_clust_qos_msg_interval(clust_qos_msg_interval)
qos_obj.qos_type = qos_type
if bw_obj:
qos_obj.bw_obj = bw_obj
cluster_id: str,
qos_obj: Optional[QOS],
enable_qos: bool,
+ enable_cluster_qos: Optional[bool] = None,
+ clust_qos_msg_interval: int = 0,
qos_type: Optional[QOSType] = None,
bw_obj: Optional[QOSBandwidthControl] = None,
ops_obj: Optional[QOSOpsControl] = None) -> None:
cluster_id=cluster_id,
qos_obj=qos_obj,
enable_qos=enable_qos,
+ enable_cluster_qos=enable_cluster_qos,
+ clust_qos_msg_interval=clust_qos_msg_interval,
qos_type=qos_type,
bw_obj=bw_obj,
ops_obj=ops_obj
cluster_id: str,
qos_obj: Optional[QOS],
enable_qos: bool,
+ enable_cluster_qos: Optional[bool] = None,
+ clust_qos_msg_interval: int = 0,
qos_type: Optional[QOSType] = None,
bw_obj: Optional[QOSBandwidthControl] = None,
ops_obj: Optional[QOSOpsControl] = None) -> None:
try:
if cluster_id in available_clusters(self.mgr):
- self.update_cluster_qos_obj(cluster_id, qos_obj, enable_qos, qos_type, bw_obj, ops_obj)
+ self.update_cluster_qos_obj(cluster_id, qos_obj, enable_qos, enable_cluster_qos,
+ clust_qos_msg_interval, qos_type, bw_obj, ops_obj)
restart_nfs_service(self.mgr, cluster_id)
return
raise ClusterNotFound()
if qos_obj:
self.validate_qos_type(qos_obj, qos_type, bw_obj=bw_obj)
bw_obj.qos_bandwidth_checks(qos_type)
- self.update_cluster_qos(cluster_id, qos_obj, True, qos_type=qos_type, bw_obj=bw_obj)
+ self.update_cluster_qos(
+ cluster_id,
+ qos_obj,
+ True,
+ enable_cluster_qos=True,
+ qos_type=qos_type,
+ bw_obj=bw_obj
+ )
log.info(f"QoS bandwidth control has been successfully enabled for cluster {cluster_id}. "
"If the qos_type is changed during this process, ensure that the bandwidth "
"values for all exports are updated accordingly.")
qos_obj = self.get_cluster_qos_config(cluster_id)
status = False
qos_type = None
+ enable_cluster_qos = None
+ clust_qos_msg_interval = 0
if qos_obj:
status = qos_obj.get_enable_qos_val(disable_bw=True)
if status:
qos_type = qos_obj.qos_type
- self.update_cluster_qos(cluster_id, qos_obj, status, qos_type, bw_obj=QOSBandwidthControl())
+ enable_cluster_qos = qos_obj.enable_cluster_qos
+ if qos_obj.clust_qos_msg_interval:
+ clust_qos_msg_interval = qos_obj.clust_qos_msg_interval
+ self.update_cluster_qos(cluster_id, qos_obj, status, enable_cluster_qos,
+ clust_qos_msg_interval, qos_type=qos_type, bw_obj=QOSBandwidthControl())
log.info("Cluster-level QoS bandwidth control has been successfully disabled for "
f"cluster {cluster_id}. As a result, export-level bandwidth control will "
"no longer have any effect, even if enabled.")
if qos_obj:
self.validate_qos_type(qos_obj, qos_type, ops_obj=ops_obj)
ops_obj.qos_ops_checks(qos_type)
- self.update_cluster_qos(cluster_id, qos_obj, True, qos_type=qos_type, ops_obj=ops_obj)
+ self.update_cluster_qos(
+ cluster_id,
+ qos_obj,
+ True,
+ enable_cluster_qos=True,
+ qos_type=qos_type,
+ ops_obj=ops_obj
+ )
log.info(f"QOS IOPS control has been successfully enabled for cluster {cluster_id}. "
"If the qos_type is changed during this process, ensure that ops count "
"values for all exports are updated accordingly.")
qos_obj = self.get_cluster_qos_config(cluster_id)
status = False
qos_type = None
+ enable_cluster_qos = None
+ clust_qos_msg_interval = 0
if qos_obj:
status = qos_obj.get_enable_qos_val(disable_ops=True)
if status:
qos_type = qos_obj.qos_type
- self.update_cluster_qos(cluster_id, qos_obj, status, qos_type, ops_obj=QOSOpsControl())
+ enable_cluster_qos = qos_obj.enable_cluster_qos
+ if qos_obj.clust_qos_msg_interval:
+ clust_qos_msg_interval = qos_obj.clust_qos_msg_interval
+ self.update_cluster_qos(cluster_id, qos_obj, status, enable_cluster_qos,
+ clust_qos_msg_interval, qos_type=qos_type, ops_obj=QOSOpsControl())
log.info("Cluster-level QoS IOPS control has been successfully disabled for "
f"cluster {cluster_id}. As a result, export-level ops control will "
"no longer have any effect, even if enabled.")
except Exception as e:
log.exception(f"Setting NFS-Ganesha QoS IOPS control config failed for {cluster_id}")
raise ErrorResponse.wrap(e)
+
+ def global_cluster_qos_action(
+ self,
+ cluster_id: str,
+ action: str,
+ msg_interval: int = 0
+ ) -> None:
+ try:
+ qos_obj = self.get_cluster_qos_config(cluster_id)
+ if not qos_obj:
+ err_msg = f'No existing QoS configuration found for cluster {cluster_id}. Can not {action} cluster-qos'
+ log.error(err_msg)
+ raise Exception(err_msg)
+
+ clust_qos_msg_interval = 0
+ if action == 'enable':
+ if (qos_obj.enable_cluster_qos or qos_obj.enable_cluster_qos is None) and not msg_interval:
+ log.info('Cluster QoS is already enabled')
+ return
+
+ enable_cluster_qos = True
+ clust_qos_msg_interval = msg_interval
+ else: # disable
+ enable_cluster_qos = False
+ self.update_cluster_qos(
+ cluster_id=cluster_id,
+ qos_obj=qos_obj,
+ enable_qos=qos_obj.enable_qos,
+ enable_cluster_qos=enable_cluster_qos,
+ clust_qos_msg_interval=clust_qos_msg_interval,
+ qos_type=qos_obj.qos_type
+ )
+ action_past = "enabled" if action == "enable" else "disabled"
+ log.info(f"Cluster-level QoS has been successfully {action_past} for cluster {cluster_id}")
+ except Exception as e:
+ log.exception(f"Failed to {action} cluster-level QoS for cluster {cluster_id}")
+ raise ErrorResponse.wrap(e)
from mgr_util import CephFSEarmarkResolver
from .export import ExportMgr, AppliedExportResults
-from .cluster import NFSCluster
+from .cluster import NFSCluster, ClusterQosAction
from .utils import available_clusters
from .qos_conf import QOSType, QOSBandwidthControl, UserQoSType, QOSOpsControl
"""Disable QOS bandwidth control for NFS cluster"""
return self.nfs.disable_cluster_qos_bw(cluster_id)
+ @CLICommand('nfs cluster cluster_qos', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_cluster_global_qos(self,
+ cluster_id: str,
+ action: ClusterQosAction,
+ msg_interval: int = 0) -> None:
+ """Enable or disable cluster-wide QoS. If disabled, QoS remains enabled,
+ but the configured values apply on a per-host basis"""
+ return self.nfs.global_cluster_qos_action(cluster_id, action.name, msg_interval)
+
@CLICommand('nfs cluster qos get', perm='r')
@object_format.Responder()
def _cmd_cluster_qos_get(self, cluster_id: str) -> Dict[str, Any]:
clust_block = "QOS_DEFAULT_CONFIG"
export_block = "QOS_BLOCK"
enable_qos = "enable_qos"
+ enable_cluster_qos = "enable_cluster_qos"
+ clust_qos_msg_interval = "cqos_msg_interval"
qos_type = "qos_type"
# bandwidth control
enable_bw_ctrl = "enable_bw_control"
return count
+def validate_clust_qos_msg_interval(msg_interval: int = 0) -> int:
+ min_interval = 100 # ms
+ max_interval = 300 # ms
+
+ if not msg_interval:
+ # msg_interval of 0 is treated as unset
+ return 0
+ if msg_interval < min_interval or msg_interval > max_interval:
+ raise Exception(
+ f'Provided message interval {msg_interval} is not in range, Please '
+ f'enter a value between {min_interval}ms and {max_interval}ms.'
+ )
+ return msg_interval
+
+
QOS_REQ_BW_PARAMS = {
"combined_bw_disabled": {
"PerShare": ["max_export_write_bw", "max_export_read_bw"],
self,
cluster_op: bool = False,
enable_qos: bool = False,
+ enable_cluster_qos: Optional[bool] = None,
+ clust_qos_msg_interval: int = 0,
qos_type: Optional[QOSType] = None,
bw_obj: Optional[QOSBandwidthControl] = None,
ops_obj: Optional[QOSOpsControl] = None,
) -> None:
self.cluster_op = cluster_op
self.enable_qos = enable_qos
+ self.enable_cluster_qos = enable_cluster_qos
+ self.clust_qos_msg_interval: int = validate_clust_qos_msg_interval(clust_qos_msg_interval)
self.qos_type = qos_type
self.bw_obj = bw_obj
self.ops_obj = ops_obj
qos_type = qos_dict.get(QOSParams.qos_type.value)
if qos_type:
kwargs["qos_type"] = QOSType[qos_type]
+ kwargs["enable_cluster_qos"] = qos_dict.get(QOSParams.enable_cluster_qos.value)
+ kwargs['clust_qos_msg_interval'] = qos_dict.get(QOSParams.clust_qos_msg_interval.value)
kwargs["enable_qos"] = qos_dict.get(QOSParams.enable_qos.value)
kwargs["bw_obj"] = QOSBandwidthControl.from_dict(qos_dict)
kwargs["ops_obj"] = QOSOpsControl.from_dict(qos_dict)
qos_type = qos_block.values.get(QOSParams.qos_type.value)
if qos_type:
kwargs["qos_type"] = QOSType(qos_type)
+ kwargs["enable_cluster_qos"] = qos_block.values.get(QOSParams.enable_cluster_qos.value)
+ kwargs['clust_qos_msg_interval'] = qos_block.values.get(QOSParams.clust_qos_msg_interval.value)
kwargs["enable_qos"] = qos_block.values.get(QOSParams.enable_qos.value)
kwargs["bw_obj"] = QOSBandwidthControl.from_qos_block(qos_block)
kwargs["ops_obj"] = QOSOpsControl.from_qos_block(qos_block)
else:
result = RawBlock(QOSParams.export_block.value)
result.values[QOSParams.enable_qos.value] = self.enable_qos
- if self.cluster_op and self.qos_type:
- result.values[QOSParams.qos_type.value] = self.qos_type.value
+ if self.cluster_op:
+ if self.qos_type:
+ result.values[QOSParams.qos_type.value] = self.qos_type.value
+ if self.enable_cluster_qos is not None:
+ result.values[QOSParams.enable_cluster_qos.value] = self.enable_cluster_qos
+ if self.clust_qos_msg_interval:
+ result.values[QOSParams.clust_qos_msg_interval.value] = self.clust_qos_msg_interval
if self.bw_obj and (res := self.bw_obj.to_qos_block()):
result.values.update(res.values)
if self.ops_obj and (res := self.ops_obj.to_qos_block()):
def to_dict(self, ret_bw_in_bytes: bool = False) -> Dict[str, Any]:
r: Dict[str, Any] = {}
r[QOSParams.enable_qos.value] = self.enable_qos
- if self.cluster_op and self.qos_type:
- r[QOSParams.qos_type.value] = self.qos_type.name
+ if self.cluster_op:
+ if self.qos_type:
+ r[QOSParams.qos_type.value] = self.qos_type.name
+ if self.enable_cluster_qos is not None:
+ r[QOSParams.enable_cluster_qos.value] = self.enable_cluster_qos
+ if self.clust_qos_msg_interval:
+ r[QOSParams.clust_qos_msg_interval.value] = self.clust_qos_msg_interval
if self.bw_obj and (res := self.bw_obj.to_dict(ret_bw_in_bytes)):
r.update(res)
if self.ops_obj and (res := self.ops_obj.to_dict()):
qos_cluster_block = """
QOS {
enable_qos = true;
+ enable_cluster_qos = true;
enable_bw_control = true;
combined_rw_bw_control = false;
qos_type = 3;
qos_cluster_dict = {
"enable_bw_control": True,
"enable_qos": True,
+ "enable_cluster_qos": True,
"combined_rw_bw_control": False,
"max_client_read_bw": bytes_to_human(4000000, mode='binary'),
"max_client_write_bw": bytes_to_human(3000000, mode='binary'),
qos_cluster_dict_bw_in_bytes = {
"enable_bw_control": True,
"enable_qos": True,
+ "enable_cluster_qos": True,
"combined_rw_bw_control": False,
"max_client_read_bw": "4000000",
"max_client_write_bw": "3000000",
qos = QOS.from_dict(self.qos_export_dict)
assert qos.to_dict() == self.qos_export_dict
- @pytest.mark.parametrize("qos_block, qos_dict, qos_dict_bw_in_bytes", [
- (qos_cluster_block, qos_cluster_dict, qos_cluster_dict_bw_in_bytes),
- (qos_export_block, qos_export_dict, qos_export_dict_bw_in_bytes)
+ @pytest.mark.parametrize("qos_block, qos_dict, qos_dict_bw_in_bytes, clust_op", [
+ (qos_cluster_block, qos_cluster_dict, qos_cluster_dict_bw_in_bytes, True),
+ (qos_export_block, qos_export_dict, qos_export_dict_bw_in_bytes, False)
])
- def test_qos_from_block(self, qos_block, qos_dict, qos_dict_bw_in_bytes):
+ def test_qos_from_block(self, qos_block, qos_dict, qos_dict_bw_in_bytes, clust_op):
blocks = GaneshaConfParser(qos_block).parse()
assert isinstance(blocks, list)
- assert len(blocks) == 1
- qos = QOS.from_qos_block(blocks[0], True)
+ qos = QOS.from_qos_block(blocks[0], clust_op)
assert qos.to_dict() == qos_dict
assert qos.to_dict(ret_bw_in_bytes=True) == qos_dict_bw_in_bytes
if not positive_tc:
raise Exception("This TC was supposed to fail")
out = cluster.get_cluster_qos(self.cluster_id)
- expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": combined_bw_ctrl, "qos_type": qos_type.name, "enable_iops_control": False}
+ expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": combined_bw_ctrl, "qos_type": qos_type.name, "enable_iops_control": False, "enable_cluster_qos": True}
for key in params:
expected_out[QOSParams[key].value] = bytes_to_human(with_units_to_int(params[key]), mode='binary')
assert out == expected_out
+ cluster.global_cluster_qos_action(self.cluster_id, 'enable', 200)
+ expected_out.update({'enable_cluster_qos': True, 'cqos_msg_interval': 200})
+ assert cluster.get_cluster_qos(self.cluster_id) == expected_out
+ cluster.global_cluster_qos_action(self.cluster_id, 'disable')
+ expected_out.update({'enable_cluster_qos': False})
+ del expected_out['cqos_msg_interval']
+ assert cluster.get_cluster_qos(self.cluster_id) == expected_out
cluster.disable_cluster_qos_bw(self.cluster_id)
out = cluster.get_cluster_qos(self.cluster_id)
assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False, "enable_iops_control": False}
if not positive_tc:
raise Exception("This TC was supposed to fail")
out = cluster.get_cluster_qos(self.cluster_id)
- expected_out = {"enable_bw_control": False, "enable_qos": True, "combined_rw_bw_control": False, "qos_type": qos_type.name, "enable_iops_control": True}
+ expected_out = {"enable_bw_control": False, "enable_qos": True, "combined_rw_bw_control": False, "qos_type": qos_type.name, "enable_iops_control": True, "enable_cluster_qos": True}
for key in params:
expected_out[QOSParams[key].value] = params[key]
assert out == expected_out
+ cluster.global_cluster_qos_action(self.cluster_id, 'enable', 200)
+ expected_out.update({'enable_cluster_qos': True, 'cqos_msg_interval': 200})
+ assert cluster.get_cluster_qos(self.cluster_id) == expected_out
+ cluster.global_cluster_qos_action(self.cluster_id, 'disable')
+ expected_out.update({'enable_cluster_qos': False})
+ del expected_out['cqos_msg_interval']
cluster.disable_cluster_qos_ops(self.cluster_id)
out = cluster.get_cluster_qos(self.cluster_id)
assert out == {"enable_bw_control": False, "enable_qos": False, "combined_rw_bw_control": False, "enable_iops_control": False}
if not positive_tc:
raise Exception("This TC passed but it was supposed to fail")
out = cluster.get_cluster_qos(self.cluster_id)
- expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": False, "qos_type": ops_qos_type.name, "enable_iops_control": True}
+ expected_out = {"enable_bw_control": True, "enable_qos": True, "combined_rw_bw_control": False, "qos_type": ops_qos_type.name, "enable_iops_control": True, "enable_cluster_qos":True}
bw_out = {}
ops_out = {}
for key in bw_params:
# disable bandwidth control
cluster.disable_cluster_qos_bw(self.cluster_id)
out = cluster.get_cluster_qos(self.cluster_id)
- ops_out.update({"enable_bw_control": False, "enable_qos": True, "combined_rw_bw_control": False, "enable_iops_control": True, "qos_type": ops_qos_type.name})
+ ops_out.update({"enable_bw_control": False, "enable_qos": True, "combined_rw_bw_control": False, "enable_iops_control": True, "qos_type": ops_qos_type.name, "enable_cluster_qos": True})
assert out == ops_out
# disable ops control
cluster.disable_cluster_qos_ops(self.cluster_id)
ns.append(obj.nspace)
except ObjectNotFound:
log.debug("Failed to open pool %s", nfs_pool)
- finally:
- return ns
+ return ns
def restart_nfs_service(mgr: 'Module', cluster_id: str) -> None:
idmap_conf: Optional[Dict[str, Dict[str, str]]] = None,
custom_configs: Optional[List[CustomConfig]] = None,
cluster_qos_config: Optional[Dict[str, Union[str, bool, int]]] = None,
+ cluster_qos_port: Optional[int] = None,
ssl: bool = False,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
self.enable_rdma = enable_rdma
self.rdma_port = rdma_port
self.cluster_qos_config = cluster_qos_config
+ self.cluster_qos_port = cluster_qos_port
# colocation_ports is a list of port dicts for ADDITIONAL colocated daemons
# The first daemon always uses port and monitoring_port from the spec
return self.COLOCATION_PORT_FIELDS
def get_port_start(self) -> List[int]:
- ports = [self.port or 2049, self.monitoring_port or 9587]
+ ports = [self.port or 2049, self.monitoring_port or 9587, self.cluster_qos_port or 31311]
if self.enable_rdma:
ports.append(self.rdma_port or 20049)
return ports
qos_type = self.cluster_qos_config.get('qos_type')
valid_qos_types = ['PerShare', 'PerClient', 'PerShare_PerClient']
if not qos_type:
- raise SpecValidationError('Invalid NFS spec: to set cluster-level QoS, "qos_type" must be provided.')
+ raise SpecValidationError(
+ 'Invalid NFS spec: to set cluster-level QoS, "qos_type" must be provided.'
+ )
if qos_type not in valid_qos_types:
raise SpecValidationError(
- f'Invalid NFS spec: "{qos_type}" is not a valid qos_type. Valid types are: {"|".join(valid_qos_types)}.'
+ f'Invalid NFS spec: "{qos_type}" is not a valid qos_type. '
+ f'Valid types are: {"|".join(valid_qos_types)}.'
)
# Verify bandwidth and IOPS types
for key, value in self.cluster_qos_config.items():
if key.endswith('bw') and not isinstance(value, str):
- raise SpecValidationError(f"Invalid NFS spec: bandwidth '{key}' should be a string")
+ raise SpecValidationError(
+ f"Invalid NFS spec: bandwidth '{key}' should be a string"
+ )
if key.endswith('iops') and not isinstance(value, int):
- raise SpecValidationError(f"Invalid NFS spec: IOPS '{key}' should be an integer")
+ raise SpecValidationError(
+ f"Invalid NFS spec: IOPS '{key}' should be an integer")
# TLS certificate validation
if self.ssl and not self.certificate_source: