]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: Implement D3N L1 persistent datacache support for RGW
authorKushal Deb <Kushal.Deb@ibm.com>
Mon, 22 Dec 2025 12:58:29 +0000 (18:28 +0530)
committerKushal Deb <Kushal.Deb@ibm.com>
Thu, 19 Feb 2026 14:03:29 +0000 (19:33 +0530)
Add RGW D3N L1 persistent datacache support backed by host block devices.
Select devices deterministically per (service, host) with intra-service
sharing, forbid cross-service reuse, prepare/mount devices, and
bind-mount per-daemon cache directories into the container.

Signed-off-by: Kushal Deb <Kushal.Deb@ibm.com>
src/cephadm/cephadm.py
src/cephadm/cephadmlib/context_getters.py
src/cephadm/cephadmlib/daemons/ceph.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/python-common/ceph/deployment/service_spec.py

index 4e255e551656bf9e793c3bfa7916937c0730adf9..bd3650dc6827b43dfc3e67378e528548de6905a5 100755 (executable)
@@ -873,6 +873,114 @@ def _update_container_args_for_podman(
     )
 
 
+def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None:
+    out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device])
+    if code != 0 or not out.strip():
+        raise Error(f'Failed to get UUID for {device}')
+    uuid = out.strip()
+
+    entry = f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+
+    # check if mountpoint already present in fstab
+    with open('/etc/fstab', 'r') as f:
+        for line in f:
+            line = line.strip()
+            if not line or line.startswith('#'):
+                continue
+            parts = line.split()
+            if len(parts) >= 2 and parts[1] == mountpoint:
+                return
+
+    with open('/etc/fstab', 'a') as f:
+        f.write(entry)
+
+
+def prepare_d3n_cache(
+    ctx: CephadmContext,
+    d3n_cache: Dict[str, Any],
+    uid: int,
+    gid: int,
+) -> None:
+    device = d3n_cache.get('device')
+    fs_type = d3n_cache.get('filesystem', 'xfs')
+    mountpoint = d3n_cache.get('mountpoint')
+    cache_path = d3n_cache.get('cache_path')
+    size_bytes = d3n_cache.get('size_bytes')
+    logger.debug(
+        f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}'
+    )
+
+    if not device:
+        raise Error('d3n_cache.device must be specified')
+    if not mountpoint:
+        raise Error('d3n_cache.mountpoint must be specified')
+    if not cache_path:
+        raise Error('d3n_cache.cache_path must be specified')
+
+    # Ensure mountpoint exists
+    os.makedirs(mountpoint, mode=0o755, exist_ok=True)
+    logger.debug(f'[D3N] checking filesystem on device {device}')
+
+    # Format the device if needed
+    if not _has_filesystem(ctx, device):
+        logger.debug(f'Formatting {device} with {fs_type} for D3N')
+        call_throws(ctx, ['mkfs', '-t', fs_type, device])
+
+    # Persist the mount in /etc/fstab
+    _ensure_fstab_entry(ctx, device, mountpoint, fs_type)
+
+    if not _is_mountpoint(ctx, mountpoint):
+        logger.debug(f'[D3N] mountpoint not mounted, running mount {mountpoint}')
+        call_throws(ctx, ['mount', mountpoint])
+    else:
+        logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}')
+
+    if size_bytes is not None:
+        if not isinstance(size_bytes, int) or size_bytes <= 0:
+            raise Error(f'd3n_cache.size_bytes must be a positive integer, got {size_bytes!r}')
+
+        avail = _avail_bytes(ctx, mountpoint)
+        if avail < size_bytes:
+            raise Error(
+                f'Not enough free space for D3N cache on {mountpoint}: '
+                f'need {size_bytes} bytes, have {avail} bytes'
+            )
+
+    # Create per-daemon cache directory
+    os.makedirs(cache_path, mode=0o755, exist_ok=True)
+    call_throws(ctx, ['chown', '-R', f'{uid}:{gid}', cache_path])
+
+
+def _has_filesystem(ctx: CephadmContext, device: str) -> bool:
+    if not os.path.exists(device):
+        return False
+    out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device])
+    return code == 0 and bool(out.strip())
+
+
+def _is_mountpoint(ctx: CephadmContext, path: str) -> bool:
+    out, err, code = call(
+        ctx,
+        ['findmnt', '-n', '-o', 'TARGET,SOURCE,FSTYPE', '--mountpoint', path],
+    )
+    logger.debug(
+        f'[D3N] _is_mountpoint({path}): code={code} out={out.strip()!r} err={err.strip()!r}'
+    )
+    return code == 0
+
+
+def _avail_bytes(ctx: CephadmContext, path: str) -> int:
+    out, _, code = call(ctx, ['df', '-B1', '--output=avail', path])
+    if code != 0:
+        raise Error(f'Failed to check free space for {path}')
+    lines = [line.strip() for line in out.splitlines() if line.strip()]
+    if len(lines) < 2:
+        raise Error(f'Unexpected df output for {path}: {out!r}')
+    logger.debug(f'[D3N] df avail bytes for {path}: {lines[1]!r}')
+
+    return int(lines[1])
+
+
 def deploy_daemon(
     ctx: CephadmContext,
     ident: 'DaemonIdentity',
@@ -945,6 +1053,13 @@ def deploy_daemon(
         # dirs, conf, keyring
         create_daemon_dirs(ctx, ident, uid, gid, config, keyring)
 
+        if ident.daemon_type == 'rgw':
+            config_json = fetch_configs(ctx)
+            d3n_cache = config_json.get('d3n_cache')
+
+            if d3n_cache:
+                prepare_d3n_cache(ctx, d3n_cache, uid, gid)
+
     # only write out unit files and start daemon
     # with systemd if this is not a reconfig
     if deployment_type != DeploymentType.RECONFIG:
index 9dde5c855a5032ca822d4f7de42c2b41e7a0ac14..fc1462b9ba584064bc44d0899b50d2b66447c947 100644 (file)
@@ -68,7 +68,7 @@ def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
     return {}
 
 
-def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
+def fetch_configs(ctx: CephadmContext) -> Dict[str, Any]:
     """Return a dict containing arbitrary configuration parameters.
     This function filters out the key 'custom_config_files' which
     must not be part of a deployment's configuration key-value pairs.
index ea359d73ed77edebc42aa87effbfe2888a0f731f..0134f69da1512812157646a70401d548f6e09279 100644 (file)
@@ -197,6 +197,20 @@ class Ceph(ContainerDaemonForm):
         )
         mounts.update(cm)
 
+        if self.identity.daemon_type == 'rgw':
+            config_json = fetch_configs(ctx) or {}
+            d3n_cache = config_json.get('d3n_cache')
+
+            if d3n_cache:
+                if not isinstance(d3n_cache, dict):
+                    raise Error(
+                        f'Invalid d3n_cache config: expected dict got {type(d3n_cache).__name__}'
+                    )
+
+                cache_path = d3n_cache.get('cache_path')
+                if cache_path and isinstance(cache_path, str):
+                    mounts[cache_path] = f'{cache_path}:z'
+
     def setup_qat_args(self, ctx: CephadmContext, args: List[str]) -> None:
         try:
             out, _, _ = call_throws(ctx, ['ls', '-1', '/dev/vfio/devices'])
index 482414db4869aca8008d001f31f087e1a37df939..01e1e82f224bfdbbf3541efb7a7bcfc3e09cb655 100644 (file)
@@ -1,7 +1,8 @@
 import errno
+import re
+import os
 import json
 import logging
-import re
 import socket
 import time
 from abc import ABCMeta, abstractmethod
@@ -45,6 +46,8 @@ logger = logging.getLogger(__name__)
 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
 AuthEntity = NewType('AuthEntity', str)
 
+_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
+
 
 def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity:
     """
@@ -1349,6 +1352,243 @@ class RgwService(CephService):
         self.mgr.spec_store.save(spec)
         self.mgr.trigger_connect_dashboard_rgw()
 
+    def _size_to_bytes(self, v: str) -> int:
+        if isinstance(v, int):
+            return v
+        if isinstance(v, str):
+            m = _SIZE_RE.match(v)
+            if not m:
+                raise OrchestratorError(f'invalid size "{v}" (examples: 10737418240, 10G, 512M)')
+            num = int(m.group(1))
+            unit = (m.group(2) or '').upper()
+            mult = {
+                '': 1,
+                'K': 1024,
+                'M': 1024**2,
+                'G': 1024**3,
+                'T': 1024**4,
+                'P': 1024**5,
+            }[unit]
+            return num * mult
+        raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)')
+
+    def _d3n_parse_dev_from_path(self, p: str) -> Optional[str]:
+        # expected: /mnt/ceph-d3n/<fsid>/<dev>/rgw_datacache/...
+
+        if not p:
+            return None
+        logger.info(f"p: {p}")
+        parts = [x for x in p.split('/') if x]
+        try:
+            i = parts.index('ceph-d3n')
+        except ValueError:
+            return None
+        if len(parts) < i + 3:
+            return None
+        dev = parts[i + 2]
+        return f'/dev/{dev}' if dev else None
+
+    def _d3n_fail_if_devs_used_by_other_rgw_service(self, host: str, devs: List[str], service_name: str) -> None:
+        wanted = set(devs)
+
+        # check rgw daemons on this host
+        rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw', host=host)
+
+        for dd in rgw_daemons:
+            # skip daemons that belong to the same service
+            try:
+                other_service = dd.service_name()
+
+            except Exception:
+                continue
+            if other_service == service_name:
+                continue
+
+            who = utils.name_to_config_section(dd.name())
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'config get',
+                'who': who,
+                'name': 'rgw_d3n_l1_datacache_persistent_path',
+            })
+            if ret != 0:
+                continue
+
+            p = (out or '').strip()
+            if not p:
+                continue
+
+            used_dev = self._d3n_parse_dev_from_path(p)
+            if used_dev and used_dev in wanted:
+                raise OrchestratorError(
+                    f'D3N device conflict on host "{host}": device "{used_dev}" is already used by RGW service "{other_service}" '
+                    f'(daemon {dd.name()}). Refuse to reuse across services.'
+                )
+
+    def _d3n_get_host_devs(self, d3n: Dict[str, Any], host: str) -> Tuple[str, int, list[str]]:
+        fs_type = d3n.get('filesystem', 'xfs')
+        size_raw = d3n.get('size')
+        devices_map = d3n.get('devices')
+
+        if size_raw is None:
+            raise OrchestratorError('"d3n_cache.size" is required')
+        if fs_type not in ('xfs', 'ext4'):
+            raise OrchestratorError(f'Invalid filesystem "{fs_type}" (supported: xfs, ext4)')
+        if not isinstance(devices_map, dict):
+            raise OrchestratorError('"d3n_cache.devices" must be a mapping of host -> [devices]')
+
+        devs = devices_map.get(host)
+        if not isinstance(devs, list) or not devs:
+            raise OrchestratorError("no devices found")
+        devs = sorted(devs)
+
+        for d in devs:
+            if not isinstance(d, str) or not d.startswith('/dev/'):
+                raise OrchestratorError(f'invalid device path "{d}" in d3n_cache.devices for host "{host}"')
+
+        size_bytes = self._size_to_bytes(size_raw)
+        return fs_type, size_bytes, devs
+
+    def _d3n_gc_and_prune_alloc(
+            self,
+            alloc: Dict[str, str],
+            devs: list[str],
+            daemon_details: list[DaemonDescription],
+            current_daemon_id: str,
+            key: tuple[str, str],
+    ) -> None:
+
+        invalid = [did for did, dev in alloc.items() if dev not in devs]
+        for did in invalid:
+            del alloc[did]
+        logger.debug(f"[D3N][alloc] prune-invalid: removed={invalid} devs={devs} alloc_now={alloc}")
+        if not daemon_details:
+            if alloc:
+                logger.info(f"[D3N][alloc] clear-stale: key={key} alloc_was={alloc}")
+                alloc.clear()
+            return
+
+        live_daemon_ids: set[str] = set()
+        for dd in daemon_details:
+            if dd.daemon_id:
+                live_daemon_ids.add(dd.daemon_id)
+
+        if current_daemon_id:
+            live_daemon_ids.add(current_daemon_id)
+
+        stale = [did for did in list(alloc.keys()) if did not in live_daemon_ids]
+        for did in stale:
+            del alloc[did]
+        logger.debug(
+            f"gc: key={key} live={sorted(live_daemon_ids)} "
+            f"removed={stale} alloc_now={alloc}"
+        )
+
+    def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]:
+        alloc_all = getattr(self.mgr, "_d3n_device_alloc", None)
+        if alloc_all is None:
+            alloc_all = {}
+            setattr(self.mgr, "_d3n_device_alloc", alloc_all)
+
+        assert isinstance(alloc_all, dict)
+        return alloc_all
+
+    def _d3n_choose_device_for_daemon(
+            self,
+            service_name: str,
+            host: str,
+            devs: list[str],
+            daemon_id: str,
+            daemon_details: list[DaemonDescription],
+    ) -> str:
+        alloc_all = self._d3n_get_allocator()
+        key = (service_name, host)
+        alloc = alloc_all.setdefault(key, {})
+        assert isinstance(alloc, dict)
+        reason = "unknown"
+
+        self._d3n_gc_and_prune_alloc(alloc, devs, daemon_details, daemon_id, key)
+
+        logger.info(
+            f"[D3N][alloc] pre-choose key={key} daemon={daemon_id} "
+            f"daemon_details={len(daemon_details)} devs={devs} alloc={alloc}"
+        )
+
+        if daemon_id in alloc:
+            logger.info(f"[D3N][alloc] reuse existing mapping daemon={daemon_id} dev={alloc[daemon_id]}")
+            return alloc[daemon_id]
+
+        used = set(alloc.values())
+        free = [d for d in devs if d not in used]
+        if free:
+            chosen = free[0]  # 1:1 mapping whenever possible
+        else:
+            chosen = devs[len(alloc) % len(devs)]  # share the device when unavoidable
+            reason = "round-robin/share"
+        alloc[daemon_id] = chosen
+        logger.info(f"[D3N][alloc] chosen={chosen} reason={reason} alloc_now={alloc}")
+        return chosen
+
+    def _compute_d3n_cache_for_daemon(
+        self,
+        daemon_spec: CephadmDaemonDeploySpec,
+        spec: RGWSpec,
+    ) -> Optional[Dict[str, Any]]:
+
+        d3n = getattr(spec, 'd3n_cache', None)
+        if not d3n:
+            return None
+        if not isinstance(d3n, dict):
+            raise OrchestratorError('d3n_cache must be a mapping')
+
+        host = daemon_spec.host
+        if not host:
+            raise OrchestratorError("missing host in daemon_spec")
+
+        service_name = daemon_spec.service_name
+        daemon_details = self.mgr.cache.get_daemons_by_service(service_name)
+
+        fs_type, size_bytes, devs = self._d3n_get_host_devs(d3n, host)
+
+        self._d3n_fail_if_devs_used_by_other_rgw_service(host, devs, daemon_spec.service_name)
+
+        device = self._d3n_choose_device_for_daemon(
+            service_name=service_name,
+            host=host,
+            devs=devs,
+            daemon_id=daemon_spec.daemon_id,
+            daemon_details=daemon_details,
+        )
+
+        logger.info(
+            f"[D3N][alloc] service={service_name} host={host} daemon={daemon_spec.daemon_id} "
+            f"chosen={device} used=? devs={devs}"
+
+        )
+
+        if daemon_spec.daemon_id:
+            # warn if sharing is unavoidable
+            alloc_all = getattr(self.mgr, "_d3n_device_alloc", {})
+            alloc = alloc_all.get((service_name, host), {}) if isinstance(alloc_all, dict) else {}
+            if isinstance(alloc, dict) and len(alloc) > len(devs):
+                logger.warning(
+                    f'D3N cache sub-optimal on host "{host}" for service "{service_name}": '
+                    f'{len(alloc)} RGW daemons but only {len(devs)} devices; devices will be shared.'
+                )
+
+        fsid = self.mgr._cluster_fsid
+        device_key = os.path.basename(device)
+        mountpoint = f'/mnt/ceph-d3n/{fsid}/{device_key}'
+        daemon_entity = f'client.rgw.{daemon_spec.daemon_id}'
+        cache_path = os.path.join(mountpoint, 'rgw_datacache', daemon_entity)
+
+        return {
+            'device': device,
+            'filesystem': fs_type,
+            'size_bytes': size_bytes,
+            'mountpoint': mountpoint,
+            'cache_path': cache_path,
+        }
+
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
         self.register_for_certificates(daemon_spec)
@@ -1509,6 +1749,31 @@ class RgwService(CephService):
         daemon_spec.keyring = keyring
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
 
+        d3n_cache = daemon_spec.final_config.get('d3n_cache')
+
+        if d3n_cache:
+            cache_path = d3n_cache.get('cache_path')
+            size = d3n_cache.get('size_bytes')
+
+            self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': daemon_name,
+                'name': 'rgw_d3n_l1_local_datacache_enabled',
+                'value': 'true',
+            })
+            self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': daemon_name,
+                'name': 'rgw_d3n_l1_datacache_persistent_path',
+                'value': cache_path,
+            })
+            self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': daemon_name,
+                'name': 'rgw_d3n_l1_datacache_size',
+                'value': str(size),
+            })
+
         return daemon_spec
 
     def get_keyring(self, rgw_id: str) -> str:
@@ -1542,11 +1807,28 @@ class RgwService(CephService):
             'who': utils.name_to_config_section(daemon.name()),
             'name': 'rgw_frontends',
         })
+
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_d3n_l1_local_datacache_enabled',
+        })
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_d3n_l1_datacache_persistent_path',
+        })
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_d3n_l1_datacache_size',
+        })
         self.mgr.check_mon_command({
             'prefix': 'config rm',
             'who': utils.name_to_config_section(daemon.name()),
             'name': 'qat_compressor_enabled'
         })
+
         self.mgr.check_mon_command({
             'prefix': 'config-key rm',
             'key': f'rgw/cert/{daemon.name()}',
@@ -1600,6 +1882,10 @@ class RgwService(CephService):
         if svc_spec.qat:
             config['qat'] = svc_spec.qat
 
+        d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec)
+        if d3n_cache:
+            config['d3n_cache'] = d3n_cache
+
         rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec)
         return config, rgw_deps
 
index eaddc2c1949a4f9408e17413592068b9d05bdb26..5979db72a8c42bec4d2a8b5eb5f9d81d45be6de4 100644 (file)
@@ -1482,6 +1482,7 @@ class RGWSpec(ServiceSpec):
                  wildcard_enabled: Optional[bool] = False,
                  rgw_exit_timeout_secs: int = 120,
                  qat: Optional[Dict[str, str]] = None,
+                 d3n_cache: Optional[Dict[str, Any]] = None,
                  ):
         assert service_type == 'rgw', service_type
 
@@ -1550,6 +1551,7 @@ class RGWSpec(ServiceSpec):
         self.rgw_exit_timeout_secs = rgw_exit_timeout_secs
 
         self.qat = qat or {}
+        self.d3n_cache = d3n_cache or {}
 
     def get_port_start(self) -> List[int]:
         ports = self.get_port()
@@ -1634,6 +1636,44 @@ class RGWSpec(ServiceSpec):
                     f"Invalid compression mode {compression}. Only 'sw' and 'hw' are allowed"
                     )
 
+        if self.d3n_cache:
+            if not isinstance(self.d3n_cache, dict):
+                raise SpecValidationError("d3n_cache must be a mapping")
+
+            filesystem = self.d3n_cache.get('filesystem', 'xfs')
+            size = self.d3n_cache.get('size')
+            devices = self.d3n_cache.get('devices')
+
+            if not size:
+                raise SpecValidationError('"d3n_cache.size" is required')
+
+            if filesystem not in ('xfs', 'ext4'):
+                raise SpecValidationError(
+                    f'Invalid filesystem "{filesystem}" in d3n_cache (supported: xfs, ext4)'
+                )
+
+            if not devices or not isinstance(devices, dict):
+                raise SpecValidationError(
+                    '"d3n_cache.devices" must be a mapping of host -> list of devices'
+                )
+
+            for host, devs in devices.items():
+                if not isinstance(host, str) or not host:
+                    raise SpecValidationError(
+                        'Invalid host key in d3n_cache.devices (must be non-empty string)'
+                    )
+
+                if not isinstance(devs, list) or not devs:
+                    raise SpecValidationError(
+                        f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
+                    )
+
+                for dev in devs:
+                    if not isinstance(dev, str) or not dev.startswith('/dev/'):
+                        raise SpecValidationError(
+                            f'Invalid device path "{dev}" in d3n_cache.devices[{host}]'
+                        )
+
 
 yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)