]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
fix test_remote_executables.py to include ButOr -> BitOr
authorKushal Deb <Kushal.Deb@ibm.com>
Fri, 2 Jan 2026 09:34:15 +0000 (15:04 +0530)
committerKushal Deb <Kushal.Deb@ibm.com>
Thu, 19 Feb 2026 14:07:38 +0000 (19:37 +0530)
Issue seen:
    def _names(node):
        if isinstance(node, ast.Name):
            return [node.id]
        if isinstance(node, ast.Attribute):
            vn = _names(node.value)
            return vn + [node.attr]
        if isinstance(node, ast.Call):
            return _names(node.func)
        if isinstance(node, ast.Constant):
            return [repr(node.value)]
        if isinstance(node, ast.JoinedStr):
            return [f"<JoinedStr: {node.values!r}>"]
        if isinstance(node, ast.Subscript):
            return [f"<Subscript: {node.value}{node.slice}>"]
        if isinstance(node, ast.BinOp):
            return [f"<BinaryOp: {_names(node.left)} {_names(node.op)} {_names(node.right)}"]
        if (
            isinstance(node, ast.Add)
            or isinstance(node, ast.Sub)
            or isinstance(node, ast.Mult)
            or isinstance(node, ast.Div)
            or isinstance(node, ast.FloorDiv)
            or isinstance(node, ast.Mod)
            or isinstance(node, ast.Pow)
            or isinstance(node, ast.LShift)
            or isinstance(node, ast.RShift)
>           or isinstance(node, ast.ButOr)
            or isinstance(node, ast.BitXor)
            or isinstance(node, ast.BitAnd)
            or isinstance(node, ast.MatMult)
        ):
E       AttributeError: module 'ast' has no attribute 'ButOr'. Did you mean: 'BitOr'?

Signed-off-by: Kushal Deb <Kushal.Deb@ibm.com>
src/cephadm/cephadm.py
src/cephadm/cephadmlib/context_getters.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/tests/test_remote_executables.py
src/pybind/mgr/cephadm/utils.py
src/python-common/ceph/cephadm/d3n.py [new file with mode: 0644]
src/python-common/ceph/deployment/service_spec.py

index bd3650dc6827b43dfc3e67378e528548de6905a5..b93f292c9bf34769f0454082380dc2e1ab79b80c 100755 (executable)
@@ -29,6 +29,7 @@ from glob import glob
 from io import StringIO
 from threading import Thread, Event
 from pathlib import Path
+from enum import Enum
 from configparser import ConfigParser
 
 from cephadmlib.constants import (
@@ -203,7 +204,7 @@ from cephadmlib.listing_updaters import (
     VersionStatusUpdater,
 )
 from cephadmlib.container_lookup import infer_local_ceph_image, identify
-
+from ceph.cephadm.d3n import D3NCache, D3NCacheError
 
 FuncT = TypeVar('FuncT', bound=Callable)
 
@@ -873,13 +874,21 @@ def _update_container_args_for_podman(
     )
 
 
+def _d3n_fstab_entry(uuid: str, mountpoint: str, fs_type: str) -> str:
+    return f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+
+
 def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None:
+    """
+    Ensure the device is present in /etc/fstab for the given mountpoint.
+    If an entry for mountpoint already exists, no changes are made.
+    """
     out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device])
     if code != 0 or not out.strip():
         raise Error(f'Failed to get UUID for {device}')
     uuid = out.strip()
 
-    entry = f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+    entry = _d3n_fstab_entry(uuid, mountpoint, fs_type)
 
     # check if mountpoint already present in fstab
     with open('/etc/fstab', 'r') as f:
@@ -895,28 +904,87 @@ def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_ty
         f.write(entry)
 
 
-def prepare_d3n_cache(
-    ctx: CephadmContext,
-    d3n_cache: Dict[str, Any],
-    uid: int,
-    gid: int,
+class D3NStateAction(str, Enum):
+    WRITE = 'write'
+    CLEANUP = 'cleanup'
+
+
+def d3n_state(
+        ctx: CephadmContext,
+        data_dir: str,
+        action: D3NStateAction,
+        d3n: Optional[D3NCache] = None,
+        uid: int = 0,
+        gid: int = 0,
 ) -> None:
-    device = d3n_cache.get('device')
-    fs_type = d3n_cache.get('filesystem', 'xfs')
-    mountpoint = d3n_cache.get('mountpoint')
-    cache_path = d3n_cache.get('cache_path')
-    size_bytes = d3n_cache.get('size_bytes')
+    """
+    Persist/read minimal D3N info in the daemon's data directory
+    so that rm-daemon can cleanup properly.
+    """
+    path = os.path.join(data_dir, 'd3n_state.json')
+
+    if action == D3NStateAction.WRITE:
+        if d3n is None:
+            return
+        state = {
+            'cache_path': d3n.cache_path,
+            'mount_path': d3n.mountpoint,
+        }
+        payload = json.dumps(state, sort_keys=True) + '\n'
+        with write_new(path, owner=(uid, gid)) as f:
+            f.write(payload)
+        return
+
+    if action == D3NStateAction.CLEANUP:
+        if not os.path.exists(path):
+            return
+
+        try:
+            with open(path, 'r') as f:
+                state = json.load(f)
+        except Exception:
+            state = {}
+
+        cache_path = state.get('cache_path') if isinstance(state, dict) else None
+        if isinstance(cache_path, str) and cache_path:
+            try:
+                shutil.rmtree(cache_path, ignore_errors=True)
+                logger.info(f'[D3N] removed cache directory: {cache_path}')
+            except Exception as e:
+                logger.warning(f'[D3N] failed to remove cache directory {cache_path}: {e}')
+
+        try:
+            os.unlink(path)
+        except FileNotFoundError:
+            pass
+        except Exception as e:
+            logger.warning(f'[D3N] failed to remove {path}: {e}')
+        return
+
+    raise Error(f'[D3N] invalid d3n_state action: {action}')
+
+
+def prepare_d3n_cache(ctx: CephadmContext, d3n: D3NCache, uid: int, gid: int) -> None:
+    """
+    Prepare a D3N cache mount and directory.
+
+    Steps:
+      1. Ensure mountpoint directory exists
+      2. Format device if it has no filesystem
+      3. Ensure /etc/fstab entry exists
+      4. Mount if not mounted
+      5. Ensure cache_path exists and is owned by daemon uid/gid
+    """
+    device = d3n.device
+    fs_type = d3n.filesystem
+    mountpoint = d3n.mountpoint
+    cache_path = d3n.cache_path
+    size_bytes = d3n.size_bytes
+
     logger.debug(
         f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}'
     )
 
-    if not device:
-        raise Error('d3n_cache.device must be specified')
-    if not mountpoint:
-        raise Error('d3n_cache.mountpoint must be specified')
-    if not cache_path:
-        raise Error('d3n_cache.cache_path must be specified')
-
     # Ensure mountpoint exists
     os.makedirs(mountpoint, mode=0o755, exist_ok=True)
     logger.debug(f'[D3N] checking filesystem on device {device}')
@@ -926,7 +994,7 @@ def prepare_d3n_cache(
         logger.debug(f'Formatting {device} with {fs_type} for D3N')
         call_throws(ctx, ['mkfs', '-t', fs_type, device])
 
-    # Persist the mount in /etc/fstab
+    # Ensure the mount is persistent across reboot by ensuring an /etc/fstab entry exists.
     _ensure_fstab_entry(ctx, device, mountpoint, fs_type)
 
     if not _is_mountpoint(ctx, mountpoint):
@@ -936,9 +1004,6 @@ def prepare_d3n_cache(
         logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}')
 
     if size_bytes is not None:
-        if not isinstance(size_bytes, int) or size_bytes <= 0:
-            raise Error(f'd3n_cache.size_bytes must be a positive integer, got {size_bytes!r}')
-
         avail = _avail_bytes(ctx, mountpoint)
         if avail < size_bytes:
             raise Error(
@@ -953,7 +1018,7 @@ def prepare_d3n_cache(
 
 def _has_filesystem(ctx: CephadmContext, device: str) -> bool:
     if not os.path.exists(device):
-        return False
+        raise Error(f'D3N device does not exist: {device}')
     out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device])
     return code == 0 and bool(out.strip())
 
@@ -1046,7 +1111,7 @@ def deploy_daemon(
             },
         ).run()
 
-        # write conf
+        # write confgit status
         with write_new(mon_dir + '/config', owner=(uid, gid)) as f:
             f.write(config)
     else:
@@ -1055,10 +1120,18 @@ def deploy_daemon(
 
         if ident.daemon_type == 'rgw':
             config_json = fetch_configs(ctx)
-            d3n_cache = config_json.get('d3n_cache')
+            d3n_cache: Any = config_json.get('d3n_cache')
 
             if d3n_cache:
-                prepare_d3n_cache(ctx, d3n_cache, uid, gid)
+                try:
+                    d3n = D3NCache.from_json(d3n_cache)
+                except D3NCacheError as e:
+                    raise Error(str(e))
+                prepare_d3n_cache(ctx, d3n, uid, gid)
+                try:
+                    d3n_state(ctx, data_dir, D3NStateAction.WRITE, d3n, uid, gid)
+                except Exception as e:
+                    logger.warning(f'[D3N] failed to persist D3N state in {data_dir}: {e}')
 
     # only write out unit files and start daemon
     # with systemd if this is not a reconfig
@@ -4038,6 +4111,10 @@ def command_rm_daemon(ctx):
              verbosity=CallVerbosity.DEBUG)
 
     data_dir = ident.data_dir(ctx.data_dir)
+
+    if ident.daemon_type == 'rgw':
+        d3n_state(ctx, data_dir, action=D3NStateAction.CLEANUP)
+
     if ident.daemon_type in ['mon', 'osd', 'prometheus'] and \
        not ctx.force_delete_data:
         # rename it out of the way -- do not delete
index fc1462b9ba584064bc44d0899b50d2b66447c947..9dde5c855a5032ca822d4f7de42c2b41e7a0ac14 100644 (file)
@@ -68,7 +68,7 @@ def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
     return {}
 
 
-def fetch_configs(ctx: CephadmContext) -> Dict[str, Any]:
+def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
     """Return a dict containing arbitrary configuration parameters.
     This function filters out the key 'custom_config_files' which
     must not be part of a deployment's configuration key-value pairs.
index 01e1e82f224bfdbbf3541efb7a7bcfc3e09cb655..2b8c4d1a003bcd4b7f846ecc425b6e65cbcf48b7 100644 (file)
@@ -34,6 +34,7 @@ from orchestrator import (
 )
 from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
+from ceph.cephadm.d3n import D3NCache, D3NCacheError, D3NCacheSpec
 from .service_registry import register_cephadm_service
 from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
 from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
@@ -46,8 +47,6 @@ logger = logging.getLogger(__name__)
 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
 AuthEntity = NewType('AuthEntity', str)
 
-_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
-
 
 def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity:
     """
@@ -1352,32 +1351,12 @@ class RgwService(CephService):
         self.mgr.spec_store.save(spec)
         self.mgr.trigger_connect_dashboard_rgw()
 
-    def _size_to_bytes(self, v: str) -> int:
-        if isinstance(v, int):
-            return v
-        if isinstance(v, str):
-            m = _SIZE_RE.match(v)
-            if not m:
-                raise OrchestratorError(f'invalid size "{v}" (examples: 10737418240, 10G, 512M)')
-            num = int(m.group(1))
-            unit = (m.group(2) or '').upper()
-            mult = {
-                '': 1,
-                'K': 1024,
-                'M': 1024**2,
-                'G': 1024**3,
-                'T': 1024**4,
-                'P': 1024**5,
-            }[unit]
-            return num * mult
-        raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)')
-
     def _d3n_parse_dev_from_path(self, p: str) -> Optional[str]:
         # expected: /mnt/ceph-d3n/<fsid>/<dev>/rgw_datacache/...
 
         if not p:
             return None
-        logger.info(f"p: {p}")
+
         parts = [x for x in p.split('/') if x]
         try:
             i = parts.index('ceph-d3n')
@@ -1396,11 +1375,8 @@ class RgwService(CephService):
 
         for dd in rgw_daemons:
             # skip daemons that belong to the same service
-            try:
-                other_service = dd.service_name()
+            other_service = dd.service_name()
 
-            except Exception:
-                continue
             if other_service == service_name:
                 continue
 
@@ -1413,7 +1389,8 @@ class RgwService(CephService):
             if ret != 0:
                 continue
 
-            p = (out or '').strip()
+            p = out or ''
+            p = p.strip()
             if not p:
                 continue
 
@@ -1424,28 +1401,10 @@ class RgwService(CephService):
                     f'(daemon {dd.name()}). Refuse to reuse across services.'
                 )
 
-    def _d3n_get_host_devs(self, d3n: Dict[str, Any], host: str) -> Tuple[str, int, list[str]]:
-        fs_type = d3n.get('filesystem', 'xfs')
-        size_raw = d3n.get('size')
-        devices_map = d3n.get('devices')
-
-        if size_raw is None:
-            raise OrchestratorError('"d3n_cache.size" is required')
-        if fs_type not in ('xfs', 'ext4'):
-            raise OrchestratorError(f'Invalid filesystem "{fs_type}" (supported: xfs, ext4)')
-        if not isinstance(devices_map, dict):
-            raise OrchestratorError('"d3n_cache.devices" must be a mapping of host -> [devices]')
-
-        devs = devices_map.get(host)
-        if not isinstance(devs, list) or not devs:
-            raise OrchestratorError("no devices found")
-        devs = sorted(devs)
-
-        for d in devs:
-            if not isinstance(d, str) or not d.startswith('/dev/'):
-                raise OrchestratorError(f'invalid device path "{d}" in d3n_cache.devices for host "{host}"')
-
-        size_bytes = self._size_to_bytes(size_raw)
+    def _d3n_get_host_devs(self, d3n: D3NCacheSpec, host: str) -> Tuple[str, int, list[str]]:
+        fs_type = d3n.filesystem
+        devs = d3n.devices_for_host(host)
+        size_bytes = utils._size_to_bytes(d3n.size)
         return fs_type, size_bytes, devs
 
     def _d3n_gc_and_prune_alloc(
@@ -1484,6 +1443,17 @@ class RgwService(CephService):
         )
 
     def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]:
+        """
+        Return the in-memory D3N device allocation map.
+
+        This is intentionally stored on the mgr module instance (self.mgr) as
+        ephemeral state to keep per-(service, host) device selections stable across
+        repeated scheduling cycles within the lifetime of the mgr process.
+
+        It is not persisted to disk/mon-store; it will be rebuilt after mgr restart
+        and also pruned via _d3n_gc_and_prune_alloc() based on currently running
+        daemons.
+        """
         alloc_all = getattr(self.mgr, "_d3n_device_alloc", None)
         if alloc_all is None:
             alloc_all = {}
@@ -1532,13 +1502,15 @@ class RgwService(CephService):
         self,
         daemon_spec: CephadmDaemonDeploySpec,
         spec: RGWSpec,
-    ) -> Optional[Dict[str, Any]]:
+    ) -> Optional[D3NCache]:
 
-        d3n = getattr(spec, 'd3n_cache', None)
-        if not d3n:
+        d3n_raw = getattr(spec, 'd3n_cache', None)
+        if not d3n_raw:
             return None
-        if not isinstance(d3n, dict):
-            raise OrchestratorError('d3n_cache must be a mapping')
+        try:
+            d3n = D3NCacheSpec.from_json(d3n_raw)
+        except D3NCacheError as e:
+            raise OrchestratorError(str(e))
 
         host = daemon_spec.host
         if not host:
@@ -1581,13 +1553,13 @@ class RgwService(CephService):
         daemon_entity = f'client.rgw.{daemon_spec.daemon_id}'
         cache_path = os.path.join(mountpoint, 'rgw_datacache', daemon_entity)
 
-        return {
-            'device': device,
-            'filesystem': fs_type,
-            'size_bytes': size_bytes,
-            'mountpoint': mountpoint,
-            'cache_path': cache_path,
-        }
+        return D3NCache(
+            device=device,
+            filesystem=fs_type,
+            size_bytes=size_bytes,
+            mountpoint=mountpoint,
+            cache_path=cache_path,
+        )
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
@@ -1752,8 +1724,13 @@ class RgwService(CephService):
         d3n_cache = daemon_spec.final_config.get('d3n_cache')
 
         if d3n_cache:
-            cache_path = d3n_cache.get('cache_path')
-            size = d3n_cache.get('size_bytes')
+            try:
+                d3n = D3NCache.from_json(d3n_cache)
+            except D3NCacheError as e:
+                raise OrchestratorError(str(e))
+
+            cache_path = d3n.cache_path
+            size = d3n.size_bytes
 
             self.mgr.check_mon_command({
                 'prefix': 'config set',
@@ -1884,7 +1861,7 @@ class RgwService(CephService):
 
         d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec)
         if d3n_cache:
-            config['d3n_cache'] = d3n_cache
+            config['d3n_cache'] = d3n_cache.to_json()
 
         rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec)
         return config, rgw_deps
index 9d5bd458254c316aaf70499e973d58dd01c4c4b0..433dc916d129d91b30571de0959517fe12e51067 100644 (file)
@@ -114,7 +114,7 @@ def _names(node):
         or isinstance(node, ast.Pow)
         or isinstance(node, ast.LShift)
         or isinstance(node, ast.RShift)
-        or isinstance(node, ast.ButOr)
+        or isinstance(node, ast.BitOr)
         or isinstance(node, ast.BitXor)
         or isinstance(node, ast.BitAnd)
         or isinstance(node, ast.MatMult)
index 805ca953e918bd5796a76612745035c2a1f2c866..077e6387fac554c9cb17ee3a523e4af01747cd9a 100644 (file)
@@ -1,9 +1,10 @@
 import logging
+import re
 import json
 import socket
 from enum import Enum
 from functools import wraps
-from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple
+from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple, Union
 from orchestrator import OrchestratorError
 import hashlib
 
@@ -43,6 +44,8 @@ NON_CEPH_IMAGE_TYPES = MONITORING_STACK_TYPES + ['nvmeof', 'smb'] + MGMT_GATEWAY
 # Used for _run_cephadm used for check-host etc that don't require an --image parameter
 cephadmNoImage = CephadmNoImage.token
 
+_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
+
 
 class ContainerInspectInfo(NamedTuple):
     image_id: str
@@ -174,3 +177,27 @@ def get_node_proxy_status_value(data: Any, key: str, lower: bool = False) -> str
     if not isinstance(value, str):
         return ''
     return value.lower() if lower else value
+
+
+def _size_to_bytes(v: Union[str, int]) -> int:
+    if isinstance(v, int):
+        return v
+    if isinstance(v, str):
+        m = _SIZE_RE.match(v)
+        if not m:
+            raise OrchestratorError(
+                f'invalid size "{v}" (examples: 10737418240, 10G, 512M)'
+            )
+        num = int(m.group(1))
+        unit = m.group(2) or ''
+        unit = unit.upper()
+        mult = {
+            '': 1,
+            'K': 1024,
+            'M': 1024**2,
+            'G': 1024**3,
+            'T': 1024**4,
+            'P': 1024**5,
+        }[unit]
+        return num * mult
+    raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)')
diff --git a/src/python-common/ceph/cephadm/d3n.py b/src/python-common/ceph/cephadm/d3n.py
new file mode 100644 (file)
index 0000000..4b41aa2
--- /dev/null
@@ -0,0 +1,187 @@
+from dataclasses import dataclass
+from typing import Any, Dict, Mapping, Optional, Union
+
+
+class D3NCacheError(ValueError):
+    pass
+
+
+@dataclass(frozen=True)
+class D3NCacheSpec:
+    """
+    RGW spec's d3n_cache section:
+      d3n_cache:
+        filesystem: xfs|ext4           (optional, default xfs)
+        size: 10G|512M|...             (required)
+        devices:
+          host1: [/dev/nvme0n1, ...]   (required)
+          host2: [/dev/nvme1n1, ...]
+    """
+
+    filesystem: str
+    size: Union[str, int]
+    devices: Dict[str, list[str]]
+
+    _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+    _KNOWN_KEYS = {"filesystem", "size", "devices"}
+
+    @classmethod
+    def from_json(cls, obj: Mapping[str, Any]) -> "D3NCacheSpec":
+        if not isinstance(obj, Mapping):
+            raise D3NCacheError(
+                f"d3n_cache must be a dict, got {type(obj).__name__}"
+            )
+
+        unknown = set(obj.keys()) - cls._KNOWN_KEYS
+        if unknown:
+            raise D3NCacheError(
+                f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+            )
+
+        filesystem = str(obj.get("filesystem", "xfs")).strip()
+        size = obj.get("size", None)
+        if not isinstance(size, (str, int)) or (
+            isinstance(size, str) and not size.strip()
+        ):
+            raise D3NCacheError(
+                '"d3n_cache.size" must be an int (bytes) or a string like "10G"'
+            )
+
+        devices_raw = obj.get("devices", None)
+
+        if not isinstance(devices_raw, Mapping):
+            raise D3NCacheError(
+                '"d3n_cache.devices" must be a mapping of host -> list of device paths'
+            )
+
+        devices: Dict[str, list[str]] = {}
+        for host, devs in devices_raw.items():
+            if not isinstance(host, str) or not host.strip():
+                raise D3NCacheError(
+                    "Invalid host key in d3n_cache.devices (must be non-empty string)"
+                )
+            if not isinstance(devs, list) or not devs:
+                raise D3NCacheError(
+                    f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
+                )
+
+            norm: list[str] = []
+            for d in devs:
+                if not isinstance(d, str) or not d.startswith("/dev/"):
+                    raise D3NCacheError(
+                        f'Invalid device path "{d}" in d3n_cache.devices[{host}]'
+                    )
+                norm.append(d)
+
+            devices[host] = sorted(set(norm))
+
+        spec = cls(filesystem=filesystem, size=size, devices=devices)
+        spec.validate()
+        return spec
+
+    def validate(self) -> None:
+        if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+            raise D3NCacheError(
+                f'Invalid filesystem "{self.filesystem}" in d3n_cache '
+                f'(supported: {", ".join(self._SUPPORTED_FILESYSTEMS)})'
+            )
+        if self.size is None or self.size == "":
+            raise D3NCacheError('"d3n_cache.size" is required')
+
+    def devices_for_host(self, host: str) -> list[str]:
+        devs = self.devices.get(host)
+        if not devs:
+            raise D3NCacheError(
+                f'no devices found for host "{host}" in d3n_cache.devices'
+            )
+        return devs
+
+
+@dataclass(frozen=True)
+class D3NCache:
+    device: str
+    filesystem: str
+    mountpoint: str
+    cache_path: str
+    size_bytes: Optional[int] = None
+
+    _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+    _KNOWN_KEYS = {
+        "device",
+        "filesystem",
+        "mountpoint",
+        "cache_path",
+        "size_bytes",
+    }
+
+    @classmethod
+    def from_json(cls, obj: Mapping[str, Any]) -> "D3NCache":
+        if not isinstance(obj, Mapping):
+            raise D3NCacheError(
+                f"d3n_cache must be a dict, got {type(obj).__name__}"
+            )
+
+        unknown = set(obj.keys()) - cls._KNOWN_KEYS
+        if unknown:
+            raise D3NCacheError(
+                f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+            )
+
+        d = cls(
+            device=str(obj.get("device", "")).strip(),
+            filesystem=str(obj.get("filesystem", "xfs")).strip(),
+            mountpoint=str(obj.get("mountpoint", "")).strip(),
+            cache_path=str(obj.get("cache_path", "")).strip(),
+            size_bytes=obj.get("size_bytes"),
+        )
+        d.validate()
+        return d
+
+    def to_json(self) -> Dict[str, Any]:
+        out: Dict[str, Any] = {
+            "device": self.device,
+            "filesystem": self.filesystem,
+            "mountpoint": self.mountpoint,
+            "cache_path": self.cache_path,
+        }
+        if self.size_bytes is not None:
+            out["size_bytes"] = self.size_bytes
+        return out
+
+    def validate(self) -> None:
+        if not self.device:
+            raise D3NCacheError("d3n_cache.device must be a non-empty string")
+        if not self.device.startswith("/"):
+            raise D3NCacheError(
+                f"d3n_cache.device must be an absolute path, got {self.device!r}"
+            )
+
+        if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+            raise D3NCacheError(
+                f"Invalid filesystem {self.filesystem!r} "
+                f"(supported: {', '.join(self._SUPPORTED_FILESYSTEMS)})"
+            )
+
+        if not self.mountpoint:
+            raise D3NCacheError(
+                "d3n_cache.mountpoint must be a non-empty string"
+            )
+        if not self.mountpoint.startswith("/"):
+            raise D3NCacheError(
+                f"d3n_cache.mountpoint must be an absolute path, got {self.mountpoint!r}"
+            )
+
+        if not self.cache_path:
+            raise D3NCacheError(
+                "d3n_cache.cache_path must be a non-empty string"
+            )
+        if not self.cache_path.startswith("/"):
+            raise D3NCacheError(
+                f"d3n_cache.cache_path must be an absolute path, got {self.cache_path!r}"
+            )
+
+        if self.size_bytes is not None:
+            if not isinstance(self.size_bytes, int) or self.size_bytes <= 0:
+                raise D3NCacheError(
+                    "d3n_cache.size_bytes must be a positive integer"
+                )
index 5979db72a8c42bec4d2a8b5eb5f9d81d45be6de4..74337de65a31a6b60674e29b01d73f42fe6e469c 100644 (file)
@@ -38,6 +38,7 @@ from ceph.deployment.utils import unwrap_ipv6, valid_addr, verify_non_negative_i
 from ceph.deployment.utils import verify_positive_int, verify_non_negative_number
 from ceph.deployment.utils import verify_boolean, verify_enum, verify_int
 from ceph.deployment.utils import parse_combined_pem_file
+from ceph.cephadm.d3n import D3NCacheSpec, D3NCacheError
 from ceph.utils import is_hex
 from ceph.smb import constants as smbconst
 from ceph.smb import network as smbnet
@@ -1432,6 +1433,16 @@ class RGWSpec(ServiceSpec):
             rgw_frontend_port: 1234
             rgw_frontend_type: beast
             rgw_frontend_ssl_certificate: ...
+            # Optional: enable D3N (L1 datacache) for RGW
+            d3n_cache:
+                filesystem: xfs          # default: xfs
+                size: 10G                # required; int bytes or string with K/M/G/T/P
+                devices:                 # required; per-host list of devices
+                    host1:
+                      - /dev/nvme0n1
+                    host2:
+                      - /dev/nvme1n1
+                      - /dev/nvme2n1
 
     See also: :ref:`orchestrator-cli-service-spec`
     """
@@ -1637,42 +1648,10 @@ class RGWSpec(ServiceSpec):
                     )
 
         if self.d3n_cache:
-            if not isinstance(self.d3n_cache, dict):
-                raise SpecValidationError("d3n_cache must be a mapping")
-
-            filesystem = self.d3n_cache.get('filesystem', 'xfs')
-            size = self.d3n_cache.get('size')
-            devices = self.d3n_cache.get('devices')
-
-            if not size:
-                raise SpecValidationError('"d3n_cache.size" is required')
-
-            if filesystem not in ('xfs', 'ext4'):
-                raise SpecValidationError(
-                    f'Invalid filesystem "{filesystem}" in d3n_cache (supported: xfs, ext4)'
-                )
-
-            if not devices or not isinstance(devices, dict):
-                raise SpecValidationError(
-                    '"d3n_cache.devices" must be a mapping of host -> list of devices'
-                )
-
-            for host, devs in devices.items():
-                if not isinstance(host, str) or not host:
-                    raise SpecValidationError(
-                        'Invalid host key in d3n_cache.devices (must be non-empty string)'
-                    )
-
-                if not isinstance(devs, list) or not devs:
-                    raise SpecValidationError(
-                        f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
-                    )
-
-                for dev in devs:
-                    if not isinstance(dev, str) or not dev.startswith('/dev/'):
-                        raise SpecValidationError(
-                            f'Invalid device path "{dev}" in d3n_cache.devices[{host}]'
-                        )
+            try:
+                D3NCacheSpec.from_json(self.d3n_cache)
+            except D3NCacheError as e:
+                raise SpecValidationError(str(e))
 
 
 yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)