)
self.run_grace_tool(spec, 'add', nodeid)
- # create the rados config object
- self.create_rados_config_obj(spec)
-
port = daemon_spec.ports[0] if daemon_spec.ports else 2049
monitoring_ip, monitoring_port = self.get_monitoring_details(daemon_spec.service_name, host, daemon_spec)
return get_cephadm_config(), self.get_dependencies(self.mgr, spec)
+ def pre_daemon_service_config(self, spec: ServiceSpec) -> None:
+ nfs_spec = cast(NFSServiceSpec, spec)
+ self.config(nfs_spec)
+ self.create_rados_config_obj(nfs_spec)
+
def create_rados_config_obj(self,
spec: NFSServiceSpec,
clobber: bool = False) -> None:
+ config_file_data = None
objname = spec.rados_config_name()
cmd = [
'rados',
timeout=10)
if not result.returncode and not clobber:
logger.info('Rados config object exists: %s' % objname)
+ config_file_data = result.stdout
else:
logger.info('Creating rados config object: %s' % objname)
result = subprocess.run(
f'Unable to create rados config object {objname}: {result.stderr.decode("utf-8")}'
)
raise RuntimeError(result.stderr.decode("utf-8"))
+ if spec.cluster_qos_config:
+ # set cluster level qos config
+ from nfs.cluster import config_cluster_qos_from_dict
+ assert spec.service_id
+ update_obj = False
+ if config_file_data and 'qosconf-nfs' in config_file_data.decode('utf-8'):
+ update_obj = True
+
+ config_cluster_qos_from_dict(
+ mgr=self.mgr,
+ cluster_id=spec.service_id,
+ qos_dict=spec.cluster_qos_config,
+ update_existing_obj=update_obj
+ )
def create_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
daemon_id = daemon_spec.daemon_id
QOS,
QOSType,
QOSBandwidthControl,
- QOSOpsControl)
+ QOSOpsControl,
+ QOSParams)
if TYPE_CHECKING:
from nfs.module import Module
log.debug("Successfully created nfs-ganesha pool %s", POOL_NAME)
+def config_cluster_qos_from_dict(
+ mgr: 'MgrModule',
+ cluster_id: str,
+ qos_dict: Dict[str, Union[str, bool, int]],
+ update_existing_obj: bool = False,
+) -> None:
+ qos_type = qos_dict.get(QOSParams.qos_type.value)
+ if not qos_type:
+ raise NFSInvalidOperation('qos_type is not specified in qos dict')
+ qos_type = QOSType[str(qos_type)]
+ enable_bw_ctrl = qos_dict.get(QOSParams.enable_bw_ctrl.value)
+ combined_bw_ctrl = qos_dict.get(QOSParams.combined_bw_ctrl.value)
+ enable_iops_ctrl = qos_dict.get(QOSParams.enable_iops_ctrl.value)
+ bw_obj = ops_obj = None
+ if enable_bw_ctrl:
+ bw_obj = QOSBandwidthControl(
+ bool(enable_bw_ctrl),
+ bool(combined_bw_ctrl),
+ export_writebw=str(qos_dict.get(QOSParams.export_writebw.value, "0")),
+ export_readbw=str(qos_dict.get(QOSParams.export_readbw.value, "0")),
+ client_writebw=str(qos_dict.get(QOSParams.client_writebw.value, "0")),
+ client_readbw=str(qos_dict.get(QOSParams.client_readbw.value, "0")),
+ export_rw_bw=str(qos_dict.get(QOSParams.export_rw_bw.value, "0")),
+ client_rw_bw=str(qos_dict.get(QOSParams.client_rw_bw.value, "0")),
+ )
+ bw_obj.qos_bandwidth_checks(qos_type)
+ if enable_iops_ctrl:
+ ops_obj = QOSOpsControl(
+ bool(enable_iops_ctrl),
+ max_export_iops=int(qos_dict.get(QOSParams.max_export_iops.value, 0)),
+ max_client_iops=int(qos_dict.get(QOSParams.max_client_iops.value, 0)),
+ )
+ ops_obj.qos_ops_checks(qos_type)
+
+ write_cluster_qos_obj(
+ mgr=mgr,
+ cluster_id=cluster_id,
+ qos_obj=None,
+ enable_qos=True,
+ qos_type=qos_type,
+ bw_obj=bw_obj,
+ ops_obj=ops_obj,
+ update_existing_obj=update_existing_obj
+ )
+
+
+def write_cluster_qos_obj(
+ mgr: 'MgrModule',
+ cluster_id: str,
+ qos_obj: Optional[QOS],
+ enable_qos: bool,
+ qos_type: Optional[QOSType] = None,
+ bw_obj: Optional[QOSBandwidthControl] = None,
+ ops_obj: Optional[QOSOpsControl] = None,
+ update_existing_obj: bool = False
+) -> None:
+ qos_obj_exists = False
+ if not qos_obj:
+ log.debug(f"Creating new QoS block for cluster {cluster_id}")
+ qos_obj = QOS(True, enable_qos, qos_type, bw_obj, ops_obj)
+ else:
+ log.debug(f"Updating existing QoS block for cluster {cluster_id}")
+ qos_obj_exists = True
+ qos_obj.enable_qos = enable_qos
+ qos_obj.qos_type = qos_type
+ if bw_obj:
+ qos_obj.bw_obj = bw_obj
+ if ops_obj:
+ qos_obj.ops_obj = ops_obj
+
+ qos_config = format_block(qos_obj.to_qos_block())
+ rados_obj = NFSRados(mgr.rados, cluster_id)
+ if not qos_obj_exists and not update_existing_obj:
+ rados_obj.write_obj(qos_config, qos_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id))
+ else:
+ rados_obj.update_obj(qos_config, qos_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id), should_notify=False)
+ log.debug(f"Successfully saved {cluster_id}s QOS bandwidth control config: \n {qos_config}")
+
+
class NFSCluster:
def __init__(self, mgr: 'Module') -> None:
self.mgr = mgr
virtual_ip: Optional[str] = None,
ingress_mode: Optional[IngressType] = None,
port: Optional[int] = None,
+ cluster_qos_config: Optional[Dict[str, Union[str, bool, int]]] = None,
ssl: bool = False,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
port=ganesha_port,
virtual_ip=virtual_ip_for_ganesha,
enable_haproxy_protocol=enable_haproxy_protocol,
+ cluster_qos_config=cluster_qos_config,
ssl=ssl,
ssl_cert=ssl_cert,
ssl_key=ssl_key,
spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id,
placement=PlacementSpec.from_string(placement),
port=port,
+ cluster_qos_config=cluster_qos_config,
ssl=ssl,
ssl_cert=ssl_cert,
ssl_key=ssl_key,
ingress: Optional[bool] = None,
ingress_mode: Optional[IngressType] = None,
port: Optional[int] = None,
+ cluster_qos_config: Optional[Dict[str, Union[str, bool, int]]] = None,
ssl: bool = False,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
self.create_empty_rados_obj(cluster_id)
if cluster_id not in available_clusters(self.mgr):
- self._call_orch_apply_nfs(cluster_id, placement, virtual_ip, ingress_mode, port,
- ssl, ssl_cert, ssl_key, ssl_ca_cert, tls_ktls, tls_debug,
- tls_min_version, tls_ciphers, enable_rdma, rdma_port)
+ self._call_orch_apply_nfs(
+ cluster_id,
+ placement,
+ virtual_ip,
+ ingress_mode,
+ port,
+ cluster_qos_config=cluster_qos_config,
+ ssl=ssl,
+ ssl_cert=ssl_cert,
+ ssl_key=ssl_key,
+ ssl_ca_cert=ssl_ca_cert,
+ tls_ktls=tls_ktls,
+ tls_debug=tls_debug,
+ tls_min_version=tls_min_version,
+ tls_ciphers=tls_ciphers,
+ enable_rdma=enable_rdma,
+ rdma_port=rdma_port
+ )
return
raise NonFatalError(f"{cluster_id} cluster already exists")
except Exception as e:
bw_obj: Optional[QOSBandwidthControl] = None,
ops_obj: Optional[QOSOpsControl] = None) -> None:
"""Update cluster QOS config"""
- qos_obj_exists = False
- if not qos_obj:
- log.debug(f"Creating new QoS block for cluster {cluster_id}")
- qos_obj = QOS(True, enable_qos, qos_type, bw_obj, ops_obj)
- else:
- log.debug(f"Updating existing QoS block for cluster {cluster_id}")
- qos_obj_exists = True
- qos_obj.enable_qos = enable_qos
- qos_obj.qos_type = qos_type
- if bw_obj:
- qos_obj.bw_obj = bw_obj
- if ops_obj:
- qos_obj.ops_obj = ops_obj
-
- qos_config = format_block(qos_obj.to_qos_block())
- rados_obj = self._rados(cluster_id)
- if not qos_obj_exists:
- rados_obj.write_obj(qos_config, qos_conf_obj_name(cluster_id),
- conf_obj_name(cluster_id))
- else:
- rados_obj.update_obj(qos_config, qos_conf_obj_name(cluster_id),
- conf_obj_name(cluster_id), should_notify=False)
- log.debug(f"Successfully saved {cluster_id}s QOS bandwidth control config: \n {qos_config}")
+ write_cluster_qos_obj(
+ mgr=self.mgr,
+ cluster_id=cluster_id,
+ qos_obj=qos_obj,
+ enable_qos=enable_qos,
+ qos_type=qos_type,
+ bw_obj=bw_obj,
+ ops_obj=ops_obj
+ )
def update_cluster_qos(self,
cluster_id: str,
extra_entrypoint_args: Optional[GeneralArgList] = None,
idmap_conf: Optional[Dict[str, Dict[str, str]]] = None,
custom_configs: Optional[List[CustomConfig]] = None,
+ cluster_qos_config: Optional[Dict[str, Union[str, bool, int]]] = None,
ssl: bool = False,
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
self.enable_nlm = enable_nlm
self.enable_rdma = enable_rdma
self.rdma_port = rdma_port
+ self.cluster_qos_config = cluster_qos_config
# colocation_ports is a list of port dicts for ADDITIONAL colocated daemons
# The first daemon always uses port and monitoring_port from the spec
# Validate colocation_ports
self.validate_colocation_ports()
+ # validate qos dict
+ if self.cluster_qos_config:
+ qos_enable = self.cluster_qos_config.get('enable_qos', True)
+ enable_bw_ctrl = self.cluster_qos_config.get('enable_bw_control', False)
+ combined_bw_ctrl = self.cluster_qos_config.get('combined_rw_bw_control', False)
+ enable_ops_ctrl = self.cluster_qos_config.get('enable_iops_control', False)
+ for key in [qos_enable, enable_bw_ctrl, combined_bw_ctrl, enable_ops_ctrl]:
+ if not isinstance(key, bool):
+ raise SpecValidationError('Invalid NFS spec: cluster_qos_config is not correct')
+ if not qos_enable or not (enable_bw_ctrl or enable_ops_ctrl):
+ # this means bandwidth or iops qos won't be enable, we don't need to set qos
+ self.cluster_qos_config = None
+ return
+
+ # Verify qos_type
+ qos_type = self.cluster_qos_config.get('qos_type')
+ valid_qos_types = ['PerShare', 'PerClient', 'PerShare_PerClient']
+ if not qos_type:
+ raise SpecValidationError('Invalid NFS spec: to set cluster-level QoS, "qos_type" must be provided.')
+ if qos_type not in valid_qos_types:
+ raise SpecValidationError(
+ f'Invalid NFS spec: "{qos_type}" is not a valid qos_type. Valid types are: {"|".join(valid_qos_types)}.'
+ )
+
+ # Verify bandwidth and IOPS types
+ for key, value in self.cluster_qos_config.items():
+ if key.endswith('bw') and not isinstance(value, str):
+ raise SpecValidationError(f"Invalid NFS spec: bandwidth '{key}' should be a string")
+ if key.endswith('iops') and not isinstance(value, int):
+ raise SpecValidationError(f"Invalid NFS spec: IOPS '{key}' should be an integer")
+
# TLS certificate validation
if self.ssl and not self.certificate_source:
raise SpecValidationError('If SSL is enabled, a certificate source must be provided.')