}[cephfs.provider.expand()]
except KeyError:
raise ValueError(f'unsupported provider: {cephfs.provider}')
+ modules = ["acl_xattr", "ceph_snapshots"]
+
+ if qos := cephfs.qos:
+ vfs_rl = "aio_ratelimit"
+ modules.extend([vfs_rl, ceph_vfs])
+ else:
+ modules.append(ceph_vfs)
+
cfg = {
# smb.conf options
'options': {
'path': path,
- "vfs objects": f"acl_xattr ceph_snapshots {ceph_vfs}",
+ "vfs objects": " ".join(modules),
'acl_xattr:security_acl_name': 'user.NTACL',
f'{ceph_vfs}:config_file': '/etc/ceph/ceph.conf',
f'{ceph_vfs}:filesystem': cephfs.volume,
'posix locking': 'no',
}
}
+
+ if qos:
+ opts = cfg["options"]
+ for field in (
+ "read_iops_limit",
+ "read_bw_limit",
+ "read_delay_max",
+ "write_iops_limit",
+ "write_bw_limit",
+ "write_delay_max",
+ ):
+ if value := getattr(qos, field):
+ opts[f"{vfs_rl}:{field}"] = str(value)
if share.comment is not None:
cfg['options']['comment'] = share.comment
from typing import TYPE_CHECKING, Any, List, Optional, cast
import logging
+from dataclasses import replace
import orchestrator
from ceph.deployment.service_spec import PlacementSpec, SMBSpec
)
return self._apply_res([share]).one()
+ @cli.SMBCommand('share update cephfs qos', perm='rw')
+ def share_update_qos(
+ self,
+ cluster_id: str,
+ share_id: str,
+ read_iops_limit: Optional[int] = None,
+ write_iops_limit: Optional[int] = None,
+ read_bw_limit: Optional[int] = None,
+ write_bw_limit: Optional[int] = None,
+ read_delay_max: Optional[int] = 30,
+ write_delay_max: Optional[int] = 30,
+ ) -> results.Result:
+ """Update QoS settings for a CephFS share"""
+ try:
+ shares = self._handler.matching_resources(
+ [f'ceph.smb.share.{cluster_id}.{share_id}']
+ )
+ if not shares or not isinstance(shares[0], resources.Share):
+ raise ValueError(f"Share {cluster_id}/{share_id} not found")
+
+ share = shares[0]
+ if not share.cephfs:
+ raise ValueError("Share has no CephFS configuration")
+
+ updated_cephfs = share.cephfs.update_qos(
+ read_iops_limit=read_iops_limit,
+ write_iops_limit=write_iops_limit,
+ read_bw_limit=read_bw_limit,
+ write_bw_limit=write_bw_limit,
+ read_delay_max=read_delay_max,
+ write_delay_max=write_delay_max,
+ )
+
+ updated_share = replace(share, cephfs=updated_cephfs)
+ return self._apply_res([updated_share]).one()
+
+ except resources.InvalidResourceError as err:
+ return results.InvalidResourceResult(err.resource_data, str(err))
+
@cli.SMBCommand("show", perm="r")
def show(
self,
import dataclasses
import errno
import json
+from dataclasses import replace
import yaml
return self
+@resourcelib.component()
+class QoSConfig(_RBase):
+ """Quality of Service configuration for CephFS shares."""
+
+ read_iops_limit: Optional[int] = None
+ write_iops_limit: Optional[int] = None
+ read_bw_limit: Optional[int] = None
+ write_bw_limit: Optional[int] = None
+ read_delay_max: Optional[int] = 30
+ write_delay_max: Optional[int] = 30
+
+
@resourcelib.component()
class CephFSStorage(_RBase):
"""Description of where in a CephFS file system a share is located."""
subvolumegroup: str = ''
subvolume: str = ''
provider: CephFSStorageProvider = CephFSStorageProvider.SAMBA_VFS
+ qos: Optional[QoSConfig] = None
+ DELAY_MAX_LIMIT = 300
+ # Maximal value for iops_limit
+ IOPS_LIMIT_MAX = 1_000_000
+ # Maximal value for bw_limit (1 << 40 = 1 TB)
+ BYTES_LIMIT_MAX = 1 << 40
def __post_init__(self) -> None:
# Allow a shortcut form of <subvolgroup>/<subvol> in the subvolume
def _customize_resource(rc: resourcelib.Resource) -> resourcelib.Resource:
rc.subvolumegroup.quiet = True
rc.subvolume.quiet = True
+ rc.qos.quiet = True
return rc
+ def update_qos(
+ self,
+ *,
+ read_iops_limit: Optional[int] = None,
+ write_iops_limit: Optional[int] = None,
+ read_bw_limit: Optional[int] = None,
+ write_bw_limit: Optional[int] = None,
+ read_delay_max: Optional[int] = 30,
+ write_delay_max: Optional[int] = 30,
+ ) -> Self:
+ """Return a new CephFSStorage instance with updated QoS values."""
+
+ qos_updates = {}
+ new_qos: Optional[QoSConfig] = None
+ if read_iops_limit is not None and read_iops_limit > 0:
+ qos_updates["read_iops_limit"] = min(
+ read_iops_limit, self.IOPS_LIMIT_MAX
+ )
+ if write_iops_limit is not None and write_iops_limit > 0:
+ qos_updates["write_iops_limit"] = min(
+ write_iops_limit, self.IOPS_LIMIT_MAX
+ )
+ if read_bw_limit is not None and read_bw_limit > 0:
+ qos_updates["read_bw_limit"] = min(
+ read_bw_limit, self.BYTES_LIMIT_MAX
+ )
+ if write_bw_limit is not None and write_bw_limit > 0:
+ qos_updates["write_bw_limit"] = min(
+ write_bw_limit, self.BYTES_LIMIT_MAX
+ )
+ if read_delay_max is not None and read_delay_max > 0:
+ qos_updates["read_delay_max"] = min(
+ read_delay_max, self.DELAY_MAX_LIMIT
+ )
+ if write_delay_max is not None and write_delay_max > 0:
+ qos_updates["write_delay_max"] = min(
+ write_delay_max, self.DELAY_MAX_LIMIT
+ )
+
+ if qos_updates:
+ new_qos = replace(self.qos or QoSConfig(), **qos_updates)
+
+ return replace(self, qos=new_qos)
+
@resourcelib.component()
class LoginAccessEntry(_RBase):