]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Implement D3N L1 persistent datacache support for RGW
authorKushal Deb <Kushal.Deb@ibm.com>
Mon, 22 Dec 2025 12:58:29 +0000 (18:28 +0530)
committerKushal Deb <Kushal.Deb@ibm.com>
Mon, 20 Apr 2026 04:43:54 +0000 (10:13 +0530)
Add RGW D3N L1 persistent datacache support backed by host block devices.
Select devices deterministically per (service, host) with intra-service
sharing, forbid cross-service reuse, prepare/mount devices, and
bind-mount per-daemon cache directories into the container.

Signed-off-by: Kushal Deb <Kushal.Deb@ibm.com>
src/cephadm/cephadm.py
src/cephadm/cephadmlib/daemons/ceph.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/rgw_d3n.py [new file with mode: 0644]
src/python-common/ceph/cephadm/d3n_types.py [new file with mode: 0644]
src/python-common/ceph/deployment/service_spec.py
src/python-common/ceph/utils.py

index 257f2f8cab46ef4e9ded0d38ba291d9e67703e92..8203a40ae2c123488e9f5501d08b60b2e80a8ea1 100755 (executable)
@@ -29,6 +29,7 @@ from glob import glob
 from io import StringIO
 from threading import Thread, Event
 from pathlib import Path
+from enum import Enum
 from configparser import ConfigParser
 
 from cephadmlib.constants import (
@@ -228,7 +229,7 @@ from cephadmlib.listing_updaters import (
     VersionStatusUpdater,
 )
 from cephadmlib.container_lookup import infer_local_ceph_image, identify
-
+from ceph.cephadm.d3n_types import D3NCache, D3NCacheError
 
 FuncT = TypeVar('FuncT', bound=Callable)
 
@@ -898,6 +899,178 @@ def _update_container_args_for_podman(
     )
 
 
+def _d3n_fstab_entry(uuid: str, mountpoint: str, fs_type: str) -> str:
+    return f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+
+
+def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None:
+    """
+    Ensure the device is present in /etc/fstab for the given mountpoint.
+    If an entry for mountpoint already exists, no changes are made.
+    """
+    out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device])
+    if code != 0 or not out.strip():
+        raise Error(f'Failed to get UUID for {device}')
+    uuid = out.strip()
+
+    entry = _d3n_fstab_entry(uuid, mountpoint, fs_type)
+
+    # check if mountpoint already present in fstab
+    with open('/etc/fstab', 'r') as f:
+        for line in f:
+            line = line.strip()
+            if not line or line.startswith('#'):
+                continue
+            parts = line.split()
+            if len(parts) >= 2 and parts[1] == mountpoint:
+                return
+
+    with open('/etc/fstab', 'a') as f:
+        f.write(entry)
+
+
+class D3NStateAction(str, Enum):
+    WRITE = 'write'
+    CLEANUP = 'cleanup'
+
+
+def d3n_state(
+        ctx: CephadmContext,
+        data_dir: str,
+        action: D3NStateAction,
+        d3n: Optional[D3NCache] = None,
+        uid: int = 0,
+        gid: int = 0,
+) -> None:
+    """
+    Persist/read minimal D3N info in the daemon's data directory
+    so that rm-daemon can cleanup properly.
+    """
+    path = os.path.join(data_dir, 'd3n_state.json')
+
+    if action == D3NStateAction.WRITE:
+        if d3n is None:
+            return
+        state = {
+            'cache_path': d3n.cache_path,
+            'mount_path': d3n.mountpoint,
+        }
+        payload = json.dumps(state, sort_keys=True) + '\n'
+        with write_new(path, owner=(uid, gid)) as f:
+            f.write(payload)
+        return
+
+    if action == D3NStateAction.CLEANUP:
+        if not os.path.exists(path):
+            return
+
+        try:
+            with open(path, 'r') as f:
+                state = json.load(f)
+        except Exception:
+            state = {}
+
+        cache_path = state.get('cache_path') if isinstance(state, dict) else None
+        if isinstance(cache_path, str) and cache_path:
+            try:
+                shutil.rmtree(cache_path, ignore_errors=True)
+                logger.info(f'[D3N] removed cache directory: {cache_path}')
+            except Exception as e:
+                logger.warning(f'[D3N] failed to remove cache directory {cache_path}: {e}')
+
+        try:
+            os.unlink(path)
+        except FileNotFoundError:
+            pass
+        except Exception as e:
+            logger.warning(f'[D3N] failed to remove {path}: {e}')
+        return
+
+    raise Error(f'[D3N] invalid d3n_state action: {action}')
+
+
+def prepare_d3n_cache(ctx: CephadmContext, d3n: D3NCache, uid: int, gid: int) -> None:
+    """
+    Prepare a D3N cache mount and directory.
+
+    Steps:
+      1. Ensure mountpoint directory exists
+      2. Format device if it has no filesystem
+      3. Ensure /etc/fstab entry exists
+      4. Mount if not mounted
+      5. Ensure cache_path exists and is owned by daemon uid/gid
+    """
+    device = d3n.device
+    fs_type = d3n.filesystem
+    mountpoint = d3n.mountpoint
+    cache_path = d3n.cache_path
+    size_bytes = d3n.size_bytes
+
+    logger.debug(
+        f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}'
+    )
+
+    # Ensure mountpoint exists
+    os.makedirs(mountpoint, mode=0o755, exist_ok=True)
+    logger.debug(f'[D3N] checking filesystem on device {device}')
+
+    # Format the device if needed
+    if not _has_filesystem(ctx, device):
+        logger.debug(f'Formatting {device} with {fs_type} for D3N')
+        call_throws(ctx, ['mkfs', '-t', fs_type, device])
+
+    # Ensure the mount is persistent across reboot by ensuring an /etc/fstab entry exists.
+    _ensure_fstab_entry(ctx, device, mountpoint, fs_type)
+
+    if not _is_mountpoint(ctx, mountpoint):
+        logger.debug(f'[D3N] mountpoint not mounted, running mount {mountpoint}')
+        call_throws(ctx, ['mount', mountpoint])
+    else:
+        logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}')
+
+    if size_bytes is not None:
+        avail = _avail_bytes(ctx, mountpoint)
+        if avail < size_bytes:
+            raise Error(
+                f'Not enough free space for D3N cache on {mountpoint}: '
+                f'need {size_bytes} bytes, have {avail} bytes'
+            )
+
+    # Create per-daemon cache directory
+    os.makedirs(cache_path, mode=0o755, exist_ok=True)
+    call_throws(ctx, ['chown', '-R', f'{uid}:{gid}', cache_path])
+
+
+def _has_filesystem(ctx: CephadmContext, device: str) -> bool:
+    if not os.path.exists(device):
+        raise Error(f'D3N device does not exist: {device}')
+    out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device])
+    return code == 0 and bool(out.strip())
+
+
+def _is_mountpoint(ctx: CephadmContext, path: str) -> bool:
+    out, err, code = call(
+        ctx,
+        ['findmnt', '-n', '-o', 'TARGET,SOURCE,FSTYPE', '--mountpoint', path],
+    )
+    logger.debug(
+        f'[D3N] _is_mountpoint({path}): code={code} out={out.strip()!r} err={err.strip()!r}'
+    )
+    return code == 0
+
+
+def _avail_bytes(ctx: CephadmContext, path: str) -> int:
+    out, _, code = call(ctx, ['df', '-B1', '--output=avail', path])
+    if code != 0:
+        raise Error(f'Failed to check free space for {path}')
+    lines = [line.strip() for line in out.splitlines() if line.strip()]
+    if len(lines) < 2:
+        raise Error(f'Unexpected df output for {path}: {out!r}')
+    logger.debug(f'[D3N] df avail bytes for {path}: {lines[1]!r}')
+
+    return int(lines[1])
+
+
 def deploy_daemon(
     ctx: CephadmContext,
     ident: 'DaemonIdentity',
@@ -970,6 +1143,21 @@ def deploy_daemon(
         # dirs, conf, keyring
         create_daemon_dirs(ctx, ident, uid, gid, config, keyring)
 
+        if ident.daemon_type == 'rgw':
+            config_json = fetch_configs(ctx)
+            d3n_cache: Any = config_json.get('d3n_cache')
+
+            if d3n_cache:
+                try:
+                    d3n = D3NCache.from_json(d3n_cache)
+                except D3NCacheError as e:
+                    raise Error(str(e))
+                prepare_d3n_cache(ctx, d3n, uid, gid)
+                try:
+                    d3n_state(ctx, data_dir, D3NStateAction.WRITE, d3n, uid, gid)
+                except Exception as e:
+                    logger.warning(f'[D3N] failed to persist D3N state in {data_dir}: {e}')
+
     # only write out unit files and start daemon
     # with systemd if this is not a reconfig
     if deployment_type != DeploymentType.RECONFIG:
@@ -3948,6 +4136,10 @@ def command_rm_daemon(ctx):
              verbosity=CallVerbosity.DEBUG)
 
     data_dir = ident.data_dir(ctx.data_dir)
+
+    if ident.daemon_type == 'rgw':
+        d3n_state(ctx, data_dir, action=D3NStateAction.CLEANUP)
+
     if ident.daemon_type in ['mon', 'osd', 'prometheus'] and \
        not ctx.force_delete_data:
         # rename it out of the way -- do not delete
index ea359d73ed77edebc42aa87effbfe2888a0f731f..0134f69da1512812157646a70401d548f6e09279 100644 (file)
@@ -197,6 +197,20 @@ class Ceph(ContainerDaemonForm):
         )
         mounts.update(cm)
 
+        if self.identity.daemon_type == 'rgw':
+            config_json = fetch_configs(ctx) or {}
+            d3n_cache = config_json.get('d3n_cache')
+
+            if d3n_cache:
+                if not isinstance(d3n_cache, dict):
+                    raise Error(
+                        f'Invalid d3n_cache config: expected dict got {type(d3n_cache).__name__}'
+                    )
+
+                cache_path = d3n_cache.get('cache_path')
+                if cache_path and isinstance(cache_path, str):
+                    mounts[cache_path] = f'{cache_path}:z'
+
     def setup_qat_args(self, ctx: CephadmContext, args: List[str]) -> None:
         try:
             out, _, _ = call_throws(ctx, ['ls', '-1', '/dev/vfio/devices'])
index fd0190f2f314973a0edec1d5c3d37ae7ca2eddfd..c6705ca03790d4c0252bc7350ab39978b3f5eb9c 100644 (file)
@@ -1,7 +1,7 @@
 import errno
+import re
 import json
 import logging
-import re
 import socket
 import time
 from abc import ABCMeta, abstractmethod
@@ -33,6 +33,14 @@ from orchestrator import (
 )
 from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
+from ceph.cephadm.d3n_types import (
+    D3NCache,
+    D3NCacheError,
+    D3NCacheSpec,
+    d3n_get_host_devs,
+    d3n_paths,
+)
+from cephadm.services.rgw_d3n import D3NDevicePlanner
 from .service_registry import register_cephadm_service
 from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
 from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
@@ -1390,6 +1398,53 @@ class RgwService(CephService):
         self.mgr.spec_store.save(spec)
         self.mgr.trigger_connect_dashboard_rgw()
 
+    def _compute_d3n_cache_for_daemon(
+        self,
+        daemon_spec: CephadmDaemonDeploySpec,
+        spec: RGWSpec,
+    ) -> Optional[D3NCache]:
+
+        alloc = D3NDevicePlanner(self.mgr)
+        d3n_raw = getattr(spec, 'd3n_cache', None)
+
+        if not d3n_raw:
+            return None
+
+        try:
+            d3n = D3NCacheSpec.from_json(d3n_raw)
+        except D3NCacheError as e:
+            raise OrchestratorError(str(e))
+
+        host = daemon_spec.host
+        if not host:
+            raise OrchestratorError("missing host in daemon_spec")
+
+        service_name = daemon_spec.service_name
+        daemon_details = self.mgr.cache.get_daemons_by_service(service_name)
+
+        fs_type, size_bytes, devs = d3n_get_host_devs(d3n, host)
+        device = alloc.plan_device_for_daemon(service_name, host, devs, daemon_spec.daemon_id, daemon_details)
+
+        logger.info(
+            f"[D3N][alloc] service={service_name} host={host} daemon={daemon_spec.daemon_id} "
+            f"chosen={device} used=? devs={devs}"
+
+        )
+
+        if daemon_spec.daemon_id:
+            # warn if sharing is unavoidable
+            alloc.warn_if_sharing_unavoidable(service_name, host, devs)
+
+        mountpoint, cache_path = d3n_paths(self.mgr._cluster_fsid, device, daemon_spec.daemon_id)
+
+        return D3NCache(
+            device=device,
+            filesystem=fs_type,
+            size_bytes=size_bytes,
+            mountpoint=mountpoint,
+            cache_path=cache_path,
+        )
+
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
         self.register_for_certificates(daemon_spec)
@@ -1550,6 +1605,36 @@ class RgwService(CephService):
         daemon_spec.keyring = keyring
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
 
+        d3n_cache = daemon_spec.final_config.get('d3n_cache')
+
+        if d3n_cache:
+            try:
+                d3n = D3NCache.from_json(d3n_cache)
+            except D3NCacheError as e:
+                raise OrchestratorError(str(e))
+
+            cache_path = d3n.cache_path
+            size = d3n.size_bytes
+
+            self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': daemon_name,
+                'name': 'rgw_d3n_l1_local_datacache_enabled',
+                'value': 'true',
+            })
+            self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': daemon_name,
+                'name': 'rgw_d3n_l1_datacache_persistent_path',
+                'value': cache_path,
+            })
+            self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': daemon_name,
+                'name': 'rgw_d3n_l1_datacache_size',
+                'value': str(size),
+            })
+
         return daemon_spec
 
     def get_keyring(self, rgw_id: str) -> str:
@@ -1583,11 +1668,28 @@ class RgwService(CephService):
             'who': utils.name_to_config_section(daemon.name()),
             'name': 'rgw_frontends',
         })
+
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_d3n_l1_local_datacache_enabled',
+        })
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_d3n_l1_datacache_persistent_path',
+        })
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_d3n_l1_datacache_size',
+        })
         self.mgr.check_mon_command({
             'prefix': 'config rm',
             'who': utils.name_to_config_section(daemon.name()),
             'name': 'qat_compressor_enabled'
         })
+
         self.mgr.check_mon_command({
             'prefix': 'config-key rm',
             'key': f'rgw/cert/{daemon.name()}',
@@ -1641,6 +1743,10 @@ class RgwService(CephService):
         if svc_spec.qat:
             config['qat'] = svc_spec.qat
 
+        d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec)
+        if d3n_cache:
+            config['d3n_cache'] = d3n_cache.to_json()
+
         rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec)
         return config, rgw_deps
 
diff --git a/src/pybind/mgr/cephadm/services/rgw_d3n.py b/src/pybind/mgr/cephadm/services/rgw_d3n.py
new file mode 100644 (file)
index 0000000..34bbbed
--- /dev/null
@@ -0,0 +1,172 @@
+import logging
+
+from typing import List, Dict, Tuple, TYPE_CHECKING
+
+from dataclasses import dataclass
+from orchestrator import (
+    OrchestratorError,
+    DaemonDescription,
+)
+from cephadm import utils
+from ceph.cephadm.d3n_types import d3n_parse_dev_from_path
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class D3NDevicePlanner:
+    mgr: "CephadmOrchestrator"
+
+    def _d3n_fail_if_devs_used_by_other_rgw_service(self, host: str, devs: List[str], service_name: str) -> None:
+        logger.info("1361")
+        wanted = set(devs)
+
+        # check rgw daemons on this host
+        rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw', host=host)
+
+        for dd in rgw_daemons:
+            # skip daemons that belong to the same service
+            other_service = dd.service_name()
+
+            if other_service == service_name:
+                continue
+
+            who = utils.name_to_config_section(dd.name())
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'config get',
+                'who': who,
+                'name': 'rgw_d3n_l1_datacache_persistent_path',
+            })
+            if ret != 0:
+                continue
+
+            p = out or ''
+            p = p.strip()
+            if not p:
+                continue
+
+            used_dev = d3n_parse_dev_from_path(p)
+            if used_dev and used_dev in wanted:
+                raise OrchestratorError(
+                    f'D3N device conflict on host "{host}": device "{used_dev}" is already used by RGW service "{other_service}" '
+                    f'(daemon {dd.name()}). Refuse to reuse across services.'
+                )
+
+    def _d3n_gc_and_prune_alloc(
+            self,
+            alloc: Dict[str, str],
+            devs: list[str],
+            daemon_details: list[DaemonDescription],
+            current_daemon_id: str,
+            key: tuple[str, str],
+    ) -> None:
+
+        invalid = [did for did, dev in alloc.items() if dev not in devs]
+        for did in invalid:
+            del alloc[did]
+        logger.debug(f"[D3N][alloc] prune-invalid: removed={invalid} devs={devs} alloc_now={alloc}")
+        if not daemon_details:
+            if alloc:
+                logger.info(f"[D3N][alloc] clear-stale: key={key} alloc_was={alloc}")
+                alloc.clear()
+            return
+
+        live_daemon_ids: set[str] = set()
+        for dd in daemon_details:
+            if dd.daemon_id:
+                live_daemon_ids.add(dd.daemon_id)
+
+        if current_daemon_id:
+            live_daemon_ids.add(current_daemon_id)
+
+        stale = [did for did in list(alloc.keys()) if did not in live_daemon_ids]
+        for did in stale:
+            del alloc[did]
+        logger.debug(
+            f"gc: key={key} live={sorted(live_daemon_ids)} "
+            f"removed={stale} alloc_now={alloc}"
+        )
+
+    def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]:
+        """
+        Return the in-memory D3N device allocation map.
+
+        This is intentionally stored on the mgr module instance (self.mgr) as
+        ephemeral state to keep per-(service, host) device selections stable across
+        repeated scheduling cycles within the lifetime of the mgr process.
+
+        It is not persisted to disk/mon-store; it will be rebuilt after mgr restart
+        and also pruned via _d3n_gc_and_prune_alloc() based on currently running
+        daemons.
+        """
+        alloc_all = getattr(self.mgr, "_d3n_device_alloc", None)
+        if alloc_all is None:
+            alloc_all = {}
+            setattr(self.mgr, "_d3n_device_alloc", alloc_all)
+
+        assert isinstance(alloc_all, dict)
+        return alloc_all
+
+    def _d3n_choose_device_for_daemon(
+            self,
+            service_name: str,
+            host: str,
+            devs: list[str],
+            daemon_id: str,
+            daemon_details: list[DaemonDescription],
+    ) -> str:
+        alloc_all = self._d3n_get_allocator()
+        key = (service_name, host)
+        alloc = alloc_all.setdefault(key, {})
+        assert isinstance(alloc, dict)
+        reason = "unknown"
+
+        self._d3n_gc_and_prune_alloc(alloc, devs, daemon_details, daemon_id, key)
+
+        logger.info(
+            f"[D3N][alloc] pre-choose key={key} daemon={daemon_id} "
+            f"daemon_details={len(daemon_details)} devs={devs} alloc={alloc}"
+        )
+
+        if daemon_id in alloc:
+            logger.info(f"[D3N][alloc] reuse existing mapping daemon={daemon_id} dev={alloc[daemon_id]}")
+            return alloc[daemon_id]
+
+        used = set(alloc.values())
+        free = [d for d in devs if d not in used]
+        if free:
+            chosen = free[0]  # 1:1 mapping whenever possible
+        else:
+            chosen = devs[len(alloc) % len(devs)]  # share the device when unavoidable
+            reason = "round-robin/share"
+        alloc[daemon_id] = chosen
+        logger.info(f"[D3N][alloc] chosen={chosen} reason={reason} alloc_now={alloc}")
+        return chosen
+
+    def warn_if_sharing_unavoidable(
+        self,
+        service_name: str,
+        host: str,
+        devs: list[str],
+    ) -> None:
+        alloc_all = getattr(self.mgr, "_d3n_device_alloc", {})
+        alloc = alloc_all.get((service_name, host), {}) if isinstance(alloc_all, dict) else {}
+        if isinstance(alloc, dict) and len(alloc) > len(devs):
+            logger.warning(
+                f'D3N cache sub-optimal on host "{host}" for service "{service_name}": '
+                f'{len(alloc)} RGW daemons but only {len(devs)} devices; devices will be shared.'
+            )
+
+    def plan_device_for_daemon(
+        self,
+        service_name: str,
+        host: str,
+        devs: list[str],
+        daemon_id: str,
+        daemon_details: list[DaemonDescription],
+    ) -> str:
+        self._d3n_fail_if_devs_used_by_other_rgw_service(host, devs, service_name)
+        return self._d3n_choose_device_for_daemon(service_name, host, devs, daemon_id, daemon_details)
diff --git a/src/python-common/ceph/cephadm/d3n_types.py b/src/python-common/ceph/cephadm/d3n_types.py
new file mode 100644 (file)
index 0000000..589b47a
--- /dev/null
@@ -0,0 +1,237 @@
+import os
+from dataclasses import dataclass
+from typing import Any, Dict, Mapping, Optional, Union, Tuple
+from ceph.utils import size_to_bytes
+
+
+class D3NCacheError(ValueError):
+    pass
+
+
+@dataclass(frozen=True)
+class D3NCacheSpec:
+    """
+    RGW spec's d3n_cache section:
+      d3n_cache:
+        filesystem: xfs|ext4           (optional, default xfs)
+        size: 10G|512M|...             (required)
+        devices:
+          host1: [/dev/nvme0n1, ...]   (required)
+          host2: [/dev/nvme1n1, ...]
+    """
+
+    filesystem: str
+    size: Union[str, int]
+    devices: Dict[str, list[str]]
+
+    _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+    _KNOWN_KEYS = {"filesystem", "size", "devices"}
+
+    @classmethod
+    def from_json(cls, obj: Mapping[str, Any]) -> "D3NCacheSpec":
+        if not isinstance(obj, Mapping):
+            raise D3NCacheError(
+                f"d3n_cache must be a dict, got {type(obj).__name__}"
+            )
+
+        unknown = set(obj.keys()) - cls._KNOWN_KEYS
+        if unknown:
+            raise D3NCacheError(
+                f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+            )
+
+        filesystem = str(obj.get("filesystem", "xfs")).strip()
+        size = obj.get("size", None)
+        if not isinstance(size, (str, int)) or (
+            isinstance(size, str) and not size.strip()
+        ):
+            raise D3NCacheError(
+                '"d3n_cache.size" must be an int (bytes) or a string like "10G"'
+            )
+
+        devices_raw = obj.get("devices", None)
+
+        if not isinstance(devices_raw, Mapping):
+            raise D3NCacheError(
+                '"d3n_cache.devices" must be a mapping of host -> list of device paths'
+            )
+
+        devices: Dict[str, list[str]] = {}
+        for host, devs in devices_raw.items():
+            if not isinstance(host, str) or not host.strip():
+                raise D3NCacheError(
+                    "Invalid host key in d3n_cache.devices (must be non-empty string)"
+                )
+            if not isinstance(devs, list) or not devs:
+                raise D3NCacheError(
+                    f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
+                )
+
+            norm: list[str] = []
+            for d in devs:
+                if not isinstance(d, str) or not d.startswith("/dev/"):
+                    raise D3NCacheError(
+                        f'Invalid device path "{d}" in d3n_cache.devices[{host}]'
+                    )
+                norm.append(d)
+
+            devices[host] = sorted(set(norm))
+
+        spec = cls(filesystem=filesystem, size=size, devices=devices)
+        spec.validate()
+        return spec
+
+    def validate(self) -> None:
+        if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+            raise D3NCacheError(
+                f'Invalid filesystem "{self.filesystem}" in d3n_cache '
+                f'(supported: {", ".join(self._SUPPORTED_FILESYSTEMS)})'
+            )
+        if self.size is None or self.size == "":
+            raise D3NCacheError('"d3n_cache.size" is required')
+
+    def devices_for_host(self, host: str) -> list[str]:
+        devs = self.devices.get(host)
+        if not devs:
+            raise D3NCacheError(
+                f'no devices found for host "{host}" in d3n_cache.devices'
+            )
+        return devs
+
+
+@dataclass(frozen=True)
+class D3NCache:
+    device: str
+    filesystem: str
+    mountpoint: str
+    cache_path: str
+    size_bytes: Optional[int] = None
+
+    _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+    _KNOWN_KEYS = {
+        "device",
+        "filesystem",
+        "mountpoint",
+        "cache_path",
+        "size_bytes",
+    }
+
+    @classmethod
+    def from_json(cls, obj: Mapping[str, Any]) -> "D3NCache":
+        if not isinstance(obj, Mapping):
+            raise D3NCacheError(
+                f"d3n_cache must be a dict, got {type(obj).__name__}"
+            )
+
+        unknown = set(obj.keys()) - cls._KNOWN_KEYS
+        if unknown:
+            raise D3NCacheError(
+                f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+            )
+
+        d = cls(
+            device=str(obj.get("device", "")).strip(),
+            filesystem=str(obj.get("filesystem", "xfs")).strip(),
+            mountpoint=str(obj.get("mountpoint", "")).strip(),
+            cache_path=str(obj.get("cache_path", "")).strip(),
+            size_bytes=obj.get("size_bytes"),
+        )
+        d.validate()
+        return d
+
+    def to_json(self) -> Dict[str, Any]:
+        out: Dict[str, Any] = {
+            "device": self.device,
+            "filesystem": self.filesystem,
+            "mountpoint": self.mountpoint,
+            "cache_path": self.cache_path,
+        }
+        if self.size_bytes is not None:
+            out["size_bytes"] = self.size_bytes
+        return out
+
+    def validate(self) -> None:
+        if not self.device:
+            raise D3NCacheError("d3n_cache.device must be a non-empty string")
+        if not self.device.startswith("/"):
+            raise D3NCacheError(
+                f"d3n_cache.device must be an absolute path, got {self.device!r}"
+            )
+
+        if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+            raise D3NCacheError(
+                f"Invalid filesystem {self.filesystem!r} "
+                f"(supported: {', '.join(self._SUPPORTED_FILESYSTEMS)})"
+            )
+
+        if not self.mountpoint:
+            raise D3NCacheError(
+                "d3n_cache.mountpoint must be a non-empty string"
+            )
+        if not self.mountpoint.startswith("/"):
+            raise D3NCacheError(
+                f"d3n_cache.mountpoint must be an absolute path, got {self.mountpoint!r}"
+            )
+
+        if not self.cache_path:
+            raise D3NCacheError(
+                "d3n_cache.cache_path must be a non-empty string"
+            )
+        if not self.cache_path.startswith("/"):
+            raise D3NCacheError(
+                f"d3n_cache.cache_path must be an absolute path, got {self.cache_path!r}"
+            )
+
+        if self.size_bytes is not None:
+            if not isinstance(self.size_bytes, int) or self.size_bytes <= 0:
+                raise D3NCacheError(
+                    "d3n_cache.size_bytes must be a positive integer"
+                )
+
+
+def d3n_get_host_devs(
+    d3n: D3NCacheSpec, host: str
+) -> Tuple[str, int, list[str]]:
+
+    fs_type = d3n.filesystem
+    devs = d3n.devices_for_host(host)
+    try:
+        size_bytes = size_to_bytes(d3n.size)
+    except Exception as e:
+        raise D3NCacheError(
+            f'invalid d3n_cache.size {d3n.size!r}: {e}'
+        ) from e
+
+    return fs_type, size_bytes, devs
+
+
+def d3n_parse_dev_from_path(p: str) -> Optional[str]:
+    # expected: /mnt/ceph-d3n/<fsid>/<dev>/rgw_datacache/...
+
+    if not p:
+        return None
+
+    parts = [x for x in p.split('/') if x]
+    try:
+        i = parts.index('ceph-d3n')
+    except ValueError:
+        return None
+    if len(parts) < i + 3:
+        return None
+    dev = parts[i + 2]
+    return f'/dev/{dev}' if dev else None
+
+
+def d3n_mountpoint(fsid: str, dev: str) -> str:
+    dev_key = os.path.basename(dev)
+    return f"/mnt/ceph-d3n/{fsid}/{dev_key}"
+
+
+def d3n_cache_path(mountpoint: str, daemon_id: str) -> str:
+    daemon_entity = f"client.rgw.{daemon_id}"
+    return os.path.join(mountpoint, "rgw_datacache", daemon_entity)
+
+
+def d3n_paths(fsid: str, device: str, daemon_id: str) -> tuple[str, str]:
+    mp = d3n_mountpoint(fsid, device)
+    return mp, d3n_cache_path(mp, daemon_id)
index 93d30264eeb458c1fcfd1e8a28e529914170a1ed..246a9b898c6b8d87da3b1537ba2ac30e9608cda9 100644 (file)
@@ -38,6 +38,7 @@ from ceph.deployment.utils import unwrap_ipv6, valid_addr, verify_non_negative_i
 from ceph.deployment.utils import verify_positive_int, verify_non_negative_number
 from ceph.deployment.utils import verify_boolean, verify_enum, verify_int
 from ceph.deployment.utils import parse_combined_pem_file
+from ceph.cephadm.d3n_types import D3NCacheSpec, D3NCacheError
 from ceph.utils import is_hex
 from ceph.smb import constants as smbconst
 from ceph.smb import network as smbnet
@@ -1459,6 +1460,16 @@ class RGWSpec(ServiceSpec):
             rgw_frontend_port: 1234
             rgw_frontend_type: beast
             rgw_frontend_ssl_certificate: ...
+            # Optional: enable D3N (L1 datacache) for RGW
+            d3n_cache:
+                filesystem: xfs          # default: xfs
+                size: 10G                # required; int bytes or string with K/M/G/T/P
+                devices:                 # required; per-host list of devices
+                    host1:
+                      - /dev/nvme0n1
+                    host2:
+                      - /dev/nvme1n1
+                      - /dev/nvme2n1
 
     See also: :ref:`orchestrator-cli-service-spec`
     """
@@ -1509,6 +1520,7 @@ class RGWSpec(ServiceSpec):
                  wildcard_enabled: Optional[bool] = False,
                  rgw_exit_timeout_secs: int = 120,
                  qat: Optional[Dict[str, str]] = None,
+                 d3n_cache: Optional[Dict[str, Any]] = None,
                  ):
         assert service_type == 'rgw', service_type
 
@@ -1577,6 +1589,7 @@ class RGWSpec(ServiceSpec):
         self.rgw_exit_timeout_secs = rgw_exit_timeout_secs
 
         self.qat = qat or {}
+        self.d3n_cache = d3n_cache or {}
 
     def get_port_start(self) -> List[int]:
         ports = self.get_port()
@@ -1661,6 +1674,12 @@ class RGWSpec(ServiceSpec):
                     f"Invalid compression mode {compression}. Only 'sw' and 'hw' are allowed"
                     )
 
+        if self.d3n_cache:
+            try:
+                D3NCacheSpec.from_json(self.d3n_cache)
+            except D3NCacheError as e:
+                raise SpecValidationError(str(e))
+
 
 yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)
 
index 5419de7da32bfb4d17728a28e1e61e697a629c86..4bd3c2c329f1f8969a32894463007b0a9bda828a 100644 (file)
@@ -3,7 +3,7 @@ import re
 import string
 import ssl
 
-from typing import Optional, MutableMapping, Tuple, Any
+from typing import Optional, MutableMapping, Tuple, Any, Union
 from urllib.error import HTTPError, URLError
 from urllib.request import urlopen, Request
 
@@ -11,6 +11,8 @@ import logging
 
 log = logging.getLogger(__name__)
 
+_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
+
 
 def datetime_now() -> datetime.datetime:
     """
@@ -189,3 +191,27 @@ def strtobool(value: str) -> bool:
     if value.lower() in _FALSE_VALS:
         return False
     raise ValueError(f'invalid truth value {value!r}')
+
+
+def size_to_bytes(v: Union[str, int]) -> int:
+    if isinstance(v, int):
+        return v
+    if isinstance(v, str):
+        m = _SIZE_RE.match(v)
+        if not m:
+            raise ValueError(
+                f'invalid size "{v}" (examples: 10737418240, 10G, 512M)'
+            )
+        num = int(m.group(1))
+        unit = m.group(2) or ''
+        unit = unit.upper()
+        mult = {
+            '': 1,
+            'K': 1024,
+            'M': 1024**2,
+            'G': 1024**3,
+            'T': 1024**4,
+            'P': 1024**5,
+        }[unit]
+        return num * mult
+    raise ValueError(f'invalid size type {type(v)} (expected int or str)')