From: Kushal Deb Date: Mon, 22 Dec 2025 12:58:29 +0000 (+0530) Subject: mgr/cephadm: Implement D3N L1 persistent datacache support for RGW X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e9a71927336c8a7790dc243ebbedf97efb1e757e;p=ceph-ci.git mgr/cephadm: Implement D3N L1 persistent datacache support for RGW Add RGW D3N L1 persistent datacache support backed by host block devices. Select devices deterministically per (service, host) with intra-service sharing, forbid cross-service reuse, prepare/mount devices, and bind-mount per-daemon cache directories into the container. Signed-off-by: Kushal Deb --- diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index 4e255e55165..bd3650dc682 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -873,6 +873,114 @@ def _update_container_args_for_podman( ) +def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None: + out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device]) + if code != 0 or not out.strip(): + raise Error(f'Failed to get UUID for {device}') + uuid = out.strip() + + entry = f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n' + + # check if mountpoint already present in fstab + with open('/etc/fstab', 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + parts = line.split() + if len(parts) >= 2 and parts[1] == mountpoint: + return + + with open('/etc/fstab', 'a') as f: + f.write(entry) + + +def prepare_d3n_cache( + ctx: CephadmContext, + d3n_cache: Dict[str, Any], + uid: int, + gid: int, +) -> None: + device = d3n_cache.get('device') + fs_type = d3n_cache.get('filesystem', 'xfs') + mountpoint = d3n_cache.get('mountpoint') + cache_path = d3n_cache.get('cache_path') + size_bytes = d3n_cache.get('size_bytes') + logger.debug( + f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}' + ) + + if not device: + raise Error('d3n_cache.device must be specified') + if not mountpoint: + raise Error('d3n_cache.mountpoint must be specified') + if not cache_path: + raise Error('d3n_cache.cache_path must be specified') + + # Ensure mountpoint exists + os.makedirs(mountpoint, mode=0o755, exist_ok=True) + logger.debug(f'[D3N] checking filesystem on device {device}') + + # Format the device if needed + if not _has_filesystem(ctx, device): + logger.debug(f'Formatting {device} with {fs_type} for D3N') + call_throws(ctx, ['mkfs', '-t', fs_type, device]) + + # Persist the mount in /etc/fstab + _ensure_fstab_entry(ctx, device, mountpoint, fs_type) + + if not _is_mountpoint(ctx, mountpoint): + logger.debug(f'[D3N] mountpoint not mounted, running mount {mountpoint}') + call_throws(ctx, ['mount', mountpoint]) + else: + logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}') + + if size_bytes is not None: + if not isinstance(size_bytes, int) or size_bytes <= 0: + raise Error(f'd3n_cache.size_bytes must be a positive integer, got {size_bytes!r}') + + avail = _avail_bytes(ctx, mountpoint) + if avail < size_bytes: + raise Error( + f'Not enough free space for D3N cache on {mountpoint}: ' + f'need {size_bytes} bytes, have {avail} bytes' + ) + + # Create per-daemon cache directory + os.makedirs(cache_path, mode=0o755, exist_ok=True) + call_throws(ctx, ['chown', '-R', f'{uid}:{gid}', cache_path]) + + +def _has_filesystem(ctx: CephadmContext, device: str) -> bool: + if not os.path.exists(device): + return False + out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device]) + return code == 0 and bool(out.strip()) + + +def _is_mountpoint(ctx: CephadmContext, path: str) -> bool: + out, err, code = call( + ctx, + ['findmnt', '-n', '-o', 'TARGET,SOURCE,FSTYPE', '--mountpoint', path], + ) + logger.debug( + f'[D3N] _is_mountpoint({path}): code={code} out={out.strip()!r} err={err.strip()!r}' + ) + return code == 0 + + +def _avail_bytes(ctx: CephadmContext, path: str) -> int: + out, _, code = call(ctx, ['df', '-B1', '--output=avail', path]) + if code != 0: + raise Error(f'Failed to check free space for {path}') + lines = [line.strip() for line in out.splitlines() if line.strip()] + if len(lines) < 2: + raise Error(f'Unexpected df output for {path}: {out!r}') + logger.debug(f'[D3N] df avail bytes for {path}: {lines[1]!r}') + + return int(lines[1]) + + def deploy_daemon( ctx: CephadmContext, ident: 'DaemonIdentity', @@ -945,6 +1053,13 @@ def deploy_daemon( # dirs, conf, keyring create_daemon_dirs(ctx, ident, uid, gid, config, keyring) + if ident.daemon_type == 'rgw': + config_json = fetch_configs(ctx) + d3n_cache = config_json.get('d3n_cache') + + if d3n_cache: + prepare_d3n_cache(ctx, d3n_cache, uid, gid) + # only write out unit files and start daemon # with systemd if this is not a reconfig if deployment_type != DeploymentType.RECONFIG: diff --git a/src/cephadm/cephadmlib/context_getters.py b/src/cephadm/cephadmlib/context_getters.py index 9dde5c855a5..fc1462b9ba5 100644 --- a/src/cephadm/cephadmlib/context_getters.py +++ b/src/cephadm/cephadmlib/context_getters.py @@ -68,7 +68,7 @@ def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]: return {} -def fetch_configs(ctx: CephadmContext) -> Dict[str, str]: +def fetch_configs(ctx: CephadmContext) -> Dict[str, Any]: """Return a dict containing arbitrary configuration parameters. This function filters out the key 'custom_config_files' which must not be part of a deployment's configuration key-value pairs. diff --git a/src/cephadm/cephadmlib/daemons/ceph.py b/src/cephadm/cephadmlib/daemons/ceph.py index ea359d73ed7..0134f69da15 100644 --- a/src/cephadm/cephadmlib/daemons/ceph.py +++ b/src/cephadm/cephadmlib/daemons/ceph.py @@ -197,6 +197,20 @@ class Ceph(ContainerDaemonForm): ) mounts.update(cm) + if self.identity.daemon_type == 'rgw': + config_json = fetch_configs(ctx) or {} + d3n_cache = config_json.get('d3n_cache') + + if d3n_cache: + if not isinstance(d3n_cache, dict): + raise Error( + f'Invalid d3n_cache config: expected dict got {type(d3n_cache).__name__}' + ) + + cache_path = d3n_cache.get('cache_path') + if cache_path and isinstance(cache_path, str): + mounts[cache_path] = f'{cache_path}:z' + def setup_qat_args(self, ctx: CephadmContext, args: List[str]) -> None: try: out, _, _ = call_throws(ctx, ['ls', '-1', '/dev/vfio/devices']) diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 482414db486..01e1e82f224 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -1,7 +1,8 @@ import errno +import re +import os import json import logging -import re import socket import time from abc import ABCMeta, abstractmethod @@ -45,6 +46,8 @@ logger = logging.getLogger(__name__) ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec) AuthEntity = NewType('AuthEntity', str) +_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$') + def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity: """ @@ -1349,6 +1352,243 @@ class RgwService(CephService): self.mgr.spec_store.save(spec) self.mgr.trigger_connect_dashboard_rgw() + def _size_to_bytes(self, v: str) -> int: + if isinstance(v, int): + return v + if isinstance(v, str): + m = _SIZE_RE.match(v) + if not m: + raise OrchestratorError(f'invalid size "{v}" (examples: 10737418240, 10G, 512M)') + num = int(m.group(1)) + unit = (m.group(2) or '').upper() + mult = { + '': 1, + 'K': 1024, + 'M': 1024**2, + 'G': 1024**3, + 'T': 1024**4, + 'P': 1024**5, + }[unit] + return num * mult + raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)') + + def _d3n_parse_dev_from_path(self, p: str) -> Optional[str]: + # expected: /mnt/ceph-d3n///rgw_datacache/... + + if not p: + return None + logger.info(f"p: {p}") + parts = [x for x in p.split('/') if x] + try: + i = parts.index('ceph-d3n') + except ValueError: + return None + if len(parts) < i + 3: + return None + dev = parts[i + 2] + return f'/dev/{dev}' if dev else None + + def _d3n_fail_if_devs_used_by_other_rgw_service(self, host: str, devs: List[str], service_name: str) -> None: + wanted = set(devs) + + # check rgw daemons on this host + rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw', host=host) + + for dd in rgw_daemons: + # skip daemons that belong to the same service + try: + other_service = dd.service_name() + + except Exception: + continue + if other_service == service_name: + continue + + who = utils.name_to_config_section(dd.name()) + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config get', + 'who': who, + 'name': 'rgw_d3n_l1_datacache_persistent_path', + }) + if ret != 0: + continue + + p = (out or '').strip() + if not p: + continue + + used_dev = self._d3n_parse_dev_from_path(p) + if used_dev and used_dev in wanted: + raise OrchestratorError( + f'D3N device conflict on host "{host}": device "{used_dev}" is already used by RGW service "{other_service}" ' + f'(daemon {dd.name()}). Refuse to reuse across services.' + ) + + def _d3n_get_host_devs(self, d3n: Dict[str, Any], host: str) -> Tuple[str, int, list[str]]: + fs_type = d3n.get('filesystem', 'xfs') + size_raw = d3n.get('size') + devices_map = d3n.get('devices') + + if size_raw is None: + raise OrchestratorError('"d3n_cache.size" is required') + if fs_type not in ('xfs', 'ext4'): + raise OrchestratorError(f'Invalid filesystem "{fs_type}" (supported: xfs, ext4)') + if not isinstance(devices_map, dict): + raise OrchestratorError('"d3n_cache.devices" must be a mapping of host -> [devices]') + + devs = devices_map.get(host) + if not isinstance(devs, list) or not devs: + raise OrchestratorError("no devices found") + devs = sorted(devs) + + for d in devs: + if not isinstance(d, str) or not d.startswith('/dev/'): + raise OrchestratorError(f'invalid device path "{d}" in d3n_cache.devices for host "{host}"') + + size_bytes = self._size_to_bytes(size_raw) + return fs_type, size_bytes, devs + + def _d3n_gc_and_prune_alloc( + self, + alloc: Dict[str, str], + devs: list[str], + daemon_details: list[DaemonDescription], + current_daemon_id: str, + key: tuple[str, str], + ) -> None: + + invalid = [did for did, dev in alloc.items() if dev not in devs] + for did in invalid: + del alloc[did] + logger.debug(f"[D3N][alloc] prune-invalid: removed={invalid} devs={devs} alloc_now={alloc}") + if not daemon_details: + if alloc: + logger.info(f"[D3N][alloc] clear-stale: key={key} alloc_was={alloc}") + alloc.clear() + return + + live_daemon_ids: set[str] = set() + for dd in daemon_details: + if dd.daemon_id: + live_daemon_ids.add(dd.daemon_id) + + if current_daemon_id: + live_daemon_ids.add(current_daemon_id) + + stale = [did for did in list(alloc.keys()) if did not in live_daemon_ids] + for did in stale: + del alloc[did] + logger.debug( + f"gc: key={key} live={sorted(live_daemon_ids)} " + f"removed={stale} alloc_now={alloc}" + ) + + def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]: + alloc_all = getattr(self.mgr, "_d3n_device_alloc", None) + if alloc_all is None: + alloc_all = {} + setattr(self.mgr, "_d3n_device_alloc", alloc_all) + + assert isinstance(alloc_all, dict) + return alloc_all + + def _d3n_choose_device_for_daemon( + self, + service_name: str, + host: str, + devs: list[str], + daemon_id: str, + daemon_details: list[DaemonDescription], + ) -> str: + alloc_all = self._d3n_get_allocator() + key = (service_name, host) + alloc = alloc_all.setdefault(key, {}) + assert isinstance(alloc, dict) + reason = "unknown" + + self._d3n_gc_and_prune_alloc(alloc, devs, daemon_details, daemon_id, key) + + logger.info( + f"[D3N][alloc] pre-choose key={key} daemon={daemon_id} " + f"daemon_details={len(daemon_details)} devs={devs} alloc={alloc}" + ) + + if daemon_id in alloc: + logger.info(f"[D3N][alloc] reuse existing mapping daemon={daemon_id} dev={alloc[daemon_id]}") + return alloc[daemon_id] + + used = set(alloc.values()) + free = [d for d in devs if d not in used] + if free: + chosen = free[0] # 1:1 mapping whenever possible + else: + chosen = devs[len(alloc) % len(devs)] # share the device when unavoidable + reason = "round-robin/share" + alloc[daemon_id] = chosen + logger.info(f"[D3N][alloc] chosen={chosen} reason={reason} alloc_now={alloc}") + return chosen + + def _compute_d3n_cache_for_daemon( + self, + daemon_spec: CephadmDaemonDeploySpec, + spec: RGWSpec, + ) -> Optional[Dict[str, Any]]: + + d3n = getattr(spec, 'd3n_cache', None) + if not d3n: + return None + if not isinstance(d3n, dict): + raise OrchestratorError('d3n_cache must be a mapping') + + host = daemon_spec.host + if not host: + raise OrchestratorError("missing host in daemon_spec") + + service_name = daemon_spec.service_name + daemon_details = self.mgr.cache.get_daemons_by_service(service_name) + + fs_type, size_bytes, devs = self._d3n_get_host_devs(d3n, host) + + self._d3n_fail_if_devs_used_by_other_rgw_service(host, devs, daemon_spec.service_name) + + device = self._d3n_choose_device_for_daemon( + service_name=service_name, + host=host, + devs=devs, + daemon_id=daemon_spec.daemon_id, + daemon_details=daemon_details, + ) + + logger.info( + f"[D3N][alloc] service={service_name} host={host} daemon={daemon_spec.daemon_id} " + f"chosen={device} used=? devs={devs}" + + ) + + if daemon_spec.daemon_id: + # warn if sharing is unavoidable + alloc_all = getattr(self.mgr, "_d3n_device_alloc", {}) + alloc = alloc_all.get((service_name, host), {}) if isinstance(alloc_all, dict) else {} + if isinstance(alloc, dict) and len(alloc) > len(devs): + logger.warning( + f'D3N cache sub-optimal on host "{host}" for service "{service_name}": ' + f'{len(alloc)} RGW daemons but only {len(devs)} devices; devices will be shared.' + ) + + fsid = self.mgr._cluster_fsid + device_key = os.path.basename(device) + mountpoint = f'/mnt/ceph-d3n/{fsid}/{device_key}' + daemon_entity = f'client.rgw.{daemon_spec.daemon_id}' + cache_path = os.path.join(mountpoint, 'rgw_datacache', daemon_entity) + + return { + 'device': device, + 'filesystem': fs_type, + 'size_bytes': size_bytes, + 'mountpoint': mountpoint, + 'cache_path': cache_path, + } + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type self.register_for_certificates(daemon_spec) @@ -1509,6 +1749,31 @@ class RgwService(CephService): daemon_spec.keyring = keyring daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + d3n_cache = daemon_spec.final_config.get('d3n_cache') + + if d3n_cache: + cache_path = d3n_cache.get('cache_path') + size = d3n_cache.get('size_bytes') + + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': daemon_name, + 'name': 'rgw_d3n_l1_local_datacache_enabled', + 'value': 'true', + }) + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': daemon_name, + 'name': 'rgw_d3n_l1_datacache_persistent_path', + 'value': cache_path, + }) + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': daemon_name, + 'name': 'rgw_d3n_l1_datacache_size', + 'value': str(size), + }) + return daemon_spec def get_keyring(self, rgw_id: str) -> str: @@ -1542,11 +1807,28 @@ class RgwService(CephService): 'who': utils.name_to_config_section(daemon.name()), 'name': 'rgw_frontends', }) + + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_d3n_l1_local_datacache_enabled', + }) + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_d3n_l1_datacache_persistent_path', + }) + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_d3n_l1_datacache_size', + }) self.mgr.check_mon_command({ 'prefix': 'config rm', 'who': utils.name_to_config_section(daemon.name()), 'name': 'qat_compressor_enabled' }) + self.mgr.check_mon_command({ 'prefix': 'config-key rm', 'key': f'rgw/cert/{daemon.name()}', @@ -1600,6 +1882,10 @@ class RgwService(CephService): if svc_spec.qat: config['qat'] = svc_spec.qat + d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec) + if d3n_cache: + config['d3n_cache'] = d3n_cache + rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec) return config, rgw_deps diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index eaddc2c1949..5979db72a8c 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -1482,6 +1482,7 @@ class RGWSpec(ServiceSpec): wildcard_enabled: Optional[bool] = False, rgw_exit_timeout_secs: int = 120, qat: Optional[Dict[str, str]] = None, + d3n_cache: Optional[Dict[str, Any]] = None, ): assert service_type == 'rgw', service_type @@ -1550,6 +1551,7 @@ class RGWSpec(ServiceSpec): self.rgw_exit_timeout_secs = rgw_exit_timeout_secs self.qat = qat or {} + self.d3n_cache = d3n_cache or {} def get_port_start(self) -> List[int]: ports = self.get_port() @@ -1634,6 +1636,44 @@ class RGWSpec(ServiceSpec): f"Invalid compression mode {compression}. Only 'sw' and 'hw' are allowed" ) + if self.d3n_cache: + if not isinstance(self.d3n_cache, dict): + raise SpecValidationError("d3n_cache must be a mapping") + + filesystem = self.d3n_cache.get('filesystem', 'xfs') + size = self.d3n_cache.get('size') + devices = self.d3n_cache.get('devices') + + if not size: + raise SpecValidationError('"d3n_cache.size" is required') + + if filesystem not in ('xfs', 'ext4'): + raise SpecValidationError( + f'Invalid filesystem "{filesystem}" in d3n_cache (supported: xfs, ext4)' + ) + + if not devices or not isinstance(devices, dict): + raise SpecValidationError( + '"d3n_cache.devices" must be a mapping of host -> list of devices' + ) + + for host, devs in devices.items(): + if not isinstance(host, str) or not host: + raise SpecValidationError( + 'Invalid host key in d3n_cache.devices (must be non-empty string)' + ) + + if not isinstance(devs, list) or not devs: + raise SpecValidationError( + f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths' + ) + + for dev in devs: + if not isinstance(dev, str) or not dev.startswith('/dev/'): + raise SpecValidationError( + f'Invalid device path "{dev}" in d3n_cache.devices[{host}]' + ) + yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)