Add RGW D3N L1 persistent datacache support backed by host block devices.
Select devices deterministically per (service, host) with intra-service
sharing, forbid cross-service reuse, prepare/mount devices, and
bind-mount per-daemon cache directories into the container.
Signed-off-by: Kushal Deb <Kushal.Deb@ibm.com>
from io import StringIO
from threading import Thread, Event
from pathlib import Path
+from enum import Enum
from configparser import ConfigParser
from cephadmlib.constants import (
VersionStatusUpdater,
)
from cephadmlib.container_lookup import infer_local_ceph_image, identify
-
+from ceph.cephadm.d3n_types import D3NCache, D3NCacheError
FuncT = TypeVar('FuncT', bound=Callable)
)
+def _d3n_fstab_entry(uuid: str, mountpoint: str, fs_type: str) -> str:
+ return f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+
+
+def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None:
+ """
+ Ensure the device is present in /etc/fstab for the given mountpoint.
+ If an entry for mountpoint already exists, no changes are made.
+ """
+ out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device])
+ if code != 0 or not out.strip():
+ raise Error(f'Failed to get UUID for {device}')
+ uuid = out.strip()
+
+ entry = _d3n_fstab_entry(uuid, mountpoint, fs_type)
+
+ # check if mountpoint already present in fstab
+ with open('/etc/fstab', 'r') as f:
+ for line in f:
+ line = line.strip()
+ if not line or line.startswith('#'):
+ continue
+ parts = line.split()
+ if len(parts) >= 2 and parts[1] == mountpoint:
+ return
+
+ with open('/etc/fstab', 'a') as f:
+ f.write(entry)
+
+
+class D3NStateAction(str, Enum):
+ WRITE = 'write'
+ CLEANUP = 'cleanup'
+
+
+def d3n_state(
+ ctx: CephadmContext,
+ data_dir: str,
+ action: D3NStateAction,
+ d3n: Optional[D3NCache] = None,
+ uid: int = 0,
+ gid: int = 0,
+) -> None:
+ """
+ Persist/read minimal D3N info in the daemon's data directory
+ so that rm-daemon can cleanup properly.
+ """
+ path = os.path.join(data_dir, 'd3n_state.json')
+
+ if action == D3NStateAction.WRITE:
+ if d3n is None:
+ return
+ state = {
+ 'cache_path': d3n.cache_path,
+ 'mount_path': d3n.mountpoint,
+ }
+ payload = json.dumps(state, sort_keys=True) + '\n'
+ with write_new(path, owner=(uid, gid)) as f:
+ f.write(payload)
+ return
+
+ if action == D3NStateAction.CLEANUP:
+ if not os.path.exists(path):
+ return
+
+ try:
+ with open(path, 'r') as f:
+ state = json.load(f)
+ except Exception:
+ state = {}
+
+ cache_path = state.get('cache_path') if isinstance(state, dict) else None
+ if isinstance(cache_path, str) and cache_path:
+ try:
+ shutil.rmtree(cache_path, ignore_errors=True)
+ logger.info(f'[D3N] removed cache directory: {cache_path}')
+ except Exception as e:
+ logger.warning(f'[D3N] failed to remove cache directory {cache_path}: {e}')
+
+ try:
+ os.unlink(path)
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ logger.warning(f'[D3N] failed to remove {path}: {e}')
+ return
+
+ raise Error(f'[D3N] invalid d3n_state action: {action}')
+
+
+def prepare_d3n_cache(ctx: CephadmContext, d3n: D3NCache, uid: int, gid: int) -> None:
+ """
+ Prepare a D3N cache mount and directory.
+
+ Steps:
+ 1. Ensure mountpoint directory exists
+ 2. Format device if it has no filesystem
+ 3. Ensure /etc/fstab entry exists
+ 4. Mount if not mounted
+ 5. Ensure cache_path exists and is owned by daemon uid/gid
+ """
+ device = d3n.device
+ fs_type = d3n.filesystem
+ mountpoint = d3n.mountpoint
+ cache_path = d3n.cache_path
+ size_bytes = d3n.size_bytes
+
+ logger.debug(
+ f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}'
+ )
+
+ # Ensure mountpoint exists
+ os.makedirs(mountpoint, mode=0o755, exist_ok=True)
+ logger.debug(f'[D3N] checking filesystem on device {device}')
+
+ # Format the device if needed
+ if not _has_filesystem(ctx, device):
+ logger.debug(f'Formatting {device} with {fs_type} for D3N')
+ call_throws(ctx, ['mkfs', '-t', fs_type, device])
+
+ # Ensure the mount is persistent across reboot by ensuring an /etc/fstab entry exists.
+ _ensure_fstab_entry(ctx, device, mountpoint, fs_type)
+
+ if not _is_mountpoint(ctx, mountpoint):
+ logger.debug(f'[D3N] mountpoint not mounted, running mount {mountpoint}')
+ call_throws(ctx, ['mount', mountpoint])
+ else:
+ logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}')
+
+ if size_bytes is not None:
+ avail = _avail_bytes(ctx, mountpoint)
+ if avail < size_bytes:
+ raise Error(
+ f'Not enough free space for D3N cache on {mountpoint}: '
+ f'need {size_bytes} bytes, have {avail} bytes'
+ )
+
+ # Create per-daemon cache directory
+ os.makedirs(cache_path, mode=0o755, exist_ok=True)
+ call_throws(ctx, ['chown', '-R', f'{uid}:{gid}', cache_path])
+
+
+def _has_filesystem(ctx: CephadmContext, device: str) -> bool:
+ if not os.path.exists(device):
+ raise Error(f'D3N device does not exist: {device}')
+ out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device])
+ return code == 0 and bool(out.strip())
+
+
+def _is_mountpoint(ctx: CephadmContext, path: str) -> bool:
+ out, err, code = call(
+ ctx,
+ ['findmnt', '-n', '-o', 'TARGET,SOURCE,FSTYPE', '--mountpoint', path],
+ )
+ logger.debug(
+ f'[D3N] _is_mountpoint({path}): code={code} out={out.strip()!r} err={err.strip()!r}'
+ )
+ return code == 0
+
+
+def _avail_bytes(ctx: CephadmContext, path: str) -> int:
+ out, _, code = call(ctx, ['df', '-B1', '--output=avail', path])
+ if code != 0:
+ raise Error(f'Failed to check free space for {path}')
+ lines = [line.strip() for line in out.splitlines() if line.strip()]
+ if len(lines) < 2:
+ raise Error(f'Unexpected df output for {path}: {out!r}')
+ logger.debug(f'[D3N] df avail bytes for {path}: {lines[1]!r}')
+
+ return int(lines[1])
+
+
def deploy_daemon(
ctx: CephadmContext,
ident: 'DaemonIdentity',
# dirs, conf, keyring
create_daemon_dirs(ctx, ident, uid, gid, config, keyring)
+ if ident.daemon_type == 'rgw':
+ config_json = fetch_configs(ctx)
+ d3n_cache: Any = config_json.get('d3n_cache')
+
+ if d3n_cache:
+ try:
+ d3n = D3NCache.from_json(d3n_cache)
+ except D3NCacheError as e:
+ raise Error(str(e))
+ prepare_d3n_cache(ctx, d3n, uid, gid)
+ try:
+ d3n_state(ctx, data_dir, D3NStateAction.WRITE, d3n, uid, gid)
+ except Exception as e:
+ logger.warning(f'[D3N] failed to persist D3N state in {data_dir}: {e}')
+
# only write out unit files and start daemon
# with systemd if this is not a reconfig
if deployment_type != DeploymentType.RECONFIG:
verbosity=CallVerbosity.DEBUG)
data_dir = ident.data_dir(ctx.data_dir)
+
+ if ident.daemon_type == 'rgw':
+ d3n_state(ctx, data_dir, action=D3NStateAction.CLEANUP)
+
if ident.daemon_type in ['mon', 'osd', 'prometheus'] and \
not ctx.force_delete_data:
# rename it out of the way -- do not delete
)
mounts.update(cm)
+ if self.identity.daemon_type == 'rgw':
+ config_json = fetch_configs(ctx) or {}
+ d3n_cache = config_json.get('d3n_cache')
+
+ if d3n_cache:
+ if not isinstance(d3n_cache, dict):
+ raise Error(
+ f'Invalid d3n_cache config: expected dict got {type(d3n_cache).__name__}'
+ )
+
+ cache_path = d3n_cache.get('cache_path')
+ if cache_path and isinstance(cache_path, str):
+ mounts[cache_path] = f'{cache_path}:z'
+
def setup_qat_args(self, ctx: CephadmContext, args: List[str]) -> None:
try:
out, _, _ = call_throws(ctx, ['ls', '-1', '/dev/vfio/devices'])
import errno
+import re
import json
import logging
-import re
import socket
import time
from abc import ABCMeta, abstractmethod
)
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
+from ceph.cephadm.d3n_types import (
+ D3NCache,
+ D3NCacheError,
+ D3NCacheSpec,
+ d3n_get_host_devs,
+ d3n_paths,
+)
+from cephadm.services.rgw_d3n import D3NDevicePlanner
from .service_registry import register_cephadm_service
from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
self.mgr.spec_store.save(spec)
self.mgr.trigger_connect_dashboard_rgw()
+ def _compute_d3n_cache_for_daemon(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ spec: RGWSpec,
+ ) -> Optional[D3NCache]:
+
+ alloc = D3NDevicePlanner(self.mgr)
+ d3n_raw = getattr(spec, 'd3n_cache', None)
+
+ if not d3n_raw:
+ return None
+
+ try:
+ d3n = D3NCacheSpec.from_json(d3n_raw)
+ except D3NCacheError as e:
+ raise OrchestratorError(str(e))
+
+ host = daemon_spec.host
+ if not host:
+ raise OrchestratorError("missing host in daemon_spec")
+
+ service_name = daemon_spec.service_name
+ daemon_details = self.mgr.cache.get_daemons_by_service(service_name)
+
+ fs_type, size_bytes, devs = d3n_get_host_devs(d3n, host)
+ device = alloc.plan_device_for_daemon(service_name, host, devs, daemon_spec.daemon_id, daemon_details)
+
+ logger.info(
+ f"[D3N][alloc] service={service_name} host={host} daemon={daemon_spec.daemon_id} "
+ f"chosen={device} used=? devs={devs}"
+
+ )
+
+ if daemon_spec.daemon_id:
+ # warn if sharing is unavoidable
+ alloc.warn_if_sharing_unavoidable(service_name, host, devs)
+
+ mountpoint, cache_path = d3n_paths(self.mgr._cluster_fsid, device, daemon_spec.daemon_id)
+
+ return D3NCache(
+ device=device,
+ filesystem=fs_type,
+ size_bytes=size_bytes,
+ mountpoint=mountpoint,
+ cache_path=cache_path,
+ )
+
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
self.register_for_certificates(daemon_spec)
daemon_spec.keyring = keyring
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ d3n_cache = daemon_spec.final_config.get('d3n_cache')
+
+ if d3n_cache:
+ try:
+ d3n = D3NCache.from_json(d3n_cache)
+ except D3NCacheError as e:
+ raise OrchestratorError(str(e))
+
+ cache_path = d3n.cache_path
+ size = d3n.size_bytes
+
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': daemon_name,
+ 'name': 'rgw_d3n_l1_local_datacache_enabled',
+ 'value': 'true',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': daemon_name,
+ 'name': 'rgw_d3n_l1_datacache_persistent_path',
+ 'value': cache_path,
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': daemon_name,
+ 'name': 'rgw_d3n_l1_datacache_size',
+ 'value': str(size),
+ })
+
return daemon_spec
def get_keyring(self, rgw_id: str) -> str:
'who': utils.name_to_config_section(daemon.name()),
'name': 'rgw_frontends',
})
+
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_d3n_l1_local_datacache_enabled',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_d3n_l1_datacache_persistent_path',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_d3n_l1_datacache_size',
+ })
self.mgr.check_mon_command({
'prefix': 'config rm',
'who': utils.name_to_config_section(daemon.name()),
'name': 'qat_compressor_enabled'
})
+
self.mgr.check_mon_command({
'prefix': 'config-key rm',
'key': f'rgw/cert/{daemon.name()}',
if svc_spec.qat:
config['qat'] = svc_spec.qat
+ d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec)
+ if d3n_cache:
+ config['d3n_cache'] = d3n_cache.to_json()
+
rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec)
return config, rgw_deps
--- /dev/null
+import logging
+
+from typing import List, Dict, Tuple, TYPE_CHECKING
+
+from dataclasses import dataclass
+from orchestrator import (
+ OrchestratorError,
+ DaemonDescription,
+)
+from cephadm import utils
+from ceph.cephadm.d3n_types import d3n_parse_dev_from_path
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class D3NDevicePlanner:
+ mgr: "CephadmOrchestrator"
+
+ def _d3n_fail_if_devs_used_by_other_rgw_service(self, host: str, devs: List[str], service_name: str) -> None:
+ logger.info("1361")
+ wanted = set(devs)
+
+ # check rgw daemons on this host
+ rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw', host=host)
+
+ for dd in rgw_daemons:
+ # skip daemons that belong to the same service
+ other_service = dd.service_name()
+
+ if other_service == service_name:
+ continue
+
+ who = utils.name_to_config_section(dd.name())
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'who': who,
+ 'name': 'rgw_d3n_l1_datacache_persistent_path',
+ })
+ if ret != 0:
+ continue
+
+ p = out or ''
+ p = p.strip()
+ if not p:
+ continue
+
+ used_dev = d3n_parse_dev_from_path(p)
+ if used_dev and used_dev in wanted:
+ raise OrchestratorError(
+ f'D3N device conflict on host "{host}": device "{used_dev}" is already used by RGW service "{other_service}" '
+ f'(daemon {dd.name()}). Refuse to reuse across services.'
+ )
+
+ def _d3n_gc_and_prune_alloc(
+ self,
+ alloc: Dict[str, str],
+ devs: list[str],
+ daemon_details: list[DaemonDescription],
+ current_daemon_id: str,
+ key: tuple[str, str],
+ ) -> None:
+
+ invalid = [did for did, dev in alloc.items() if dev not in devs]
+ for did in invalid:
+ del alloc[did]
+ logger.debug(f"[D3N][alloc] prune-invalid: removed={invalid} devs={devs} alloc_now={alloc}")
+ if not daemon_details:
+ if alloc:
+ logger.info(f"[D3N][alloc] clear-stale: key={key} alloc_was={alloc}")
+ alloc.clear()
+ return
+
+ live_daemon_ids: set[str] = set()
+ for dd in daemon_details:
+ if dd.daemon_id:
+ live_daemon_ids.add(dd.daemon_id)
+
+ if current_daemon_id:
+ live_daemon_ids.add(current_daemon_id)
+
+ stale = [did for did in list(alloc.keys()) if did not in live_daemon_ids]
+ for did in stale:
+ del alloc[did]
+ logger.debug(
+ f"gc: key={key} live={sorted(live_daemon_ids)} "
+ f"removed={stale} alloc_now={alloc}"
+ )
+
+ def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]:
+ """
+ Return the in-memory D3N device allocation map.
+
+ This is intentionally stored on the mgr module instance (self.mgr) as
+ ephemeral state to keep per-(service, host) device selections stable across
+ repeated scheduling cycles within the lifetime of the mgr process.
+
+ It is not persisted to disk/mon-store; it will be rebuilt after mgr restart
+ and also pruned via _d3n_gc_and_prune_alloc() based on currently running
+ daemons.
+ """
+ alloc_all = getattr(self.mgr, "_d3n_device_alloc", None)
+ if alloc_all is None:
+ alloc_all = {}
+ setattr(self.mgr, "_d3n_device_alloc", alloc_all)
+
+ assert isinstance(alloc_all, dict)
+ return alloc_all
+
+ def _d3n_choose_device_for_daemon(
+ self,
+ service_name: str,
+ host: str,
+ devs: list[str],
+ daemon_id: str,
+ daemon_details: list[DaemonDescription],
+ ) -> str:
+ alloc_all = self._d3n_get_allocator()
+ key = (service_name, host)
+ alloc = alloc_all.setdefault(key, {})
+ assert isinstance(alloc, dict)
+ reason = "unknown"
+
+ self._d3n_gc_and_prune_alloc(alloc, devs, daemon_details, daemon_id, key)
+
+ logger.info(
+ f"[D3N][alloc] pre-choose key={key} daemon={daemon_id} "
+ f"daemon_details={len(daemon_details)} devs={devs} alloc={alloc}"
+ )
+
+ if daemon_id in alloc:
+ logger.info(f"[D3N][alloc] reuse existing mapping daemon={daemon_id} dev={alloc[daemon_id]}")
+ return alloc[daemon_id]
+
+ used = set(alloc.values())
+ free = [d for d in devs if d not in used]
+ if free:
+ chosen = free[0] # 1:1 mapping whenever possible
+ else:
+ chosen = devs[len(alloc) % len(devs)] # share the device when unavoidable
+ reason = "round-robin/share"
+ alloc[daemon_id] = chosen
+ logger.info(f"[D3N][alloc] chosen={chosen} reason={reason} alloc_now={alloc}")
+ return chosen
+
+ def warn_if_sharing_unavoidable(
+ self,
+ service_name: str,
+ host: str,
+ devs: list[str],
+ ) -> None:
+ alloc_all = getattr(self.mgr, "_d3n_device_alloc", {})
+ alloc = alloc_all.get((service_name, host), {}) if isinstance(alloc_all, dict) else {}
+ if isinstance(alloc, dict) and len(alloc) > len(devs):
+ logger.warning(
+ f'D3N cache sub-optimal on host "{host}" for service "{service_name}": '
+ f'{len(alloc)} RGW daemons but only {len(devs)} devices; devices will be shared.'
+ )
+
+ def plan_device_for_daemon(
+ self,
+ service_name: str,
+ host: str,
+ devs: list[str],
+ daemon_id: str,
+ daemon_details: list[DaemonDescription],
+ ) -> str:
+ self._d3n_fail_if_devs_used_by_other_rgw_service(host, devs, service_name)
+ return self._d3n_choose_device_for_daemon(service_name, host, devs, daemon_id, daemon_details)
--- /dev/null
+import os
+from dataclasses import dataclass
+from typing import Any, Dict, Mapping, Optional, Union, Tuple
+from ceph.utils import size_to_bytes
+
+
+class D3NCacheError(ValueError):
+ pass
+
+
+@dataclass(frozen=True)
+class D3NCacheSpec:
+ """
+ RGW spec's d3n_cache section:
+ d3n_cache:
+ filesystem: xfs|ext4 (optional, default xfs)
+ size: 10G|512M|... (required)
+ devices:
+ host1: [/dev/nvme0n1, ...] (required)
+ host2: [/dev/nvme1n1, ...]
+ """
+
+ filesystem: str
+ size: Union[str, int]
+ devices: Dict[str, list[str]]
+
+ _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+ _KNOWN_KEYS = {"filesystem", "size", "devices"}
+
+ @classmethod
+ def from_json(cls, obj: Mapping[str, Any]) -> "D3NCacheSpec":
+ if not isinstance(obj, Mapping):
+ raise D3NCacheError(
+ f"d3n_cache must be a dict, got {type(obj).__name__}"
+ )
+
+ unknown = set(obj.keys()) - cls._KNOWN_KEYS
+ if unknown:
+ raise D3NCacheError(
+ f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+ )
+
+ filesystem = str(obj.get("filesystem", "xfs")).strip()
+ size = obj.get("size", None)
+ if not isinstance(size, (str, int)) or (
+ isinstance(size, str) and not size.strip()
+ ):
+ raise D3NCacheError(
+ '"d3n_cache.size" must be an int (bytes) or a string like "10G"'
+ )
+
+ devices_raw = obj.get("devices", None)
+
+ if not isinstance(devices_raw, Mapping):
+ raise D3NCacheError(
+ '"d3n_cache.devices" must be a mapping of host -> list of device paths'
+ )
+
+ devices: Dict[str, list[str]] = {}
+ for host, devs in devices_raw.items():
+ if not isinstance(host, str) or not host.strip():
+ raise D3NCacheError(
+ "Invalid host key in d3n_cache.devices (must be non-empty string)"
+ )
+ if not isinstance(devs, list) or not devs:
+ raise D3NCacheError(
+ f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
+ )
+
+ norm: list[str] = []
+ for d in devs:
+ if not isinstance(d, str) or not d.startswith("/dev/"):
+ raise D3NCacheError(
+ f'Invalid device path "{d}" in d3n_cache.devices[{host}]'
+ )
+ norm.append(d)
+
+ devices[host] = sorted(set(norm))
+
+ spec = cls(filesystem=filesystem, size=size, devices=devices)
+ spec.validate()
+ return spec
+
+ def validate(self) -> None:
+ if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+ raise D3NCacheError(
+ f'Invalid filesystem "{self.filesystem}" in d3n_cache '
+ f'(supported: {", ".join(self._SUPPORTED_FILESYSTEMS)})'
+ )
+ if self.size is None or self.size == "":
+ raise D3NCacheError('"d3n_cache.size" is required')
+
+ def devices_for_host(self, host: str) -> list[str]:
+ devs = self.devices.get(host)
+ if not devs:
+ raise D3NCacheError(
+ f'no devices found for host "{host}" in d3n_cache.devices'
+ )
+ return devs
+
+
+@dataclass(frozen=True)
+class D3NCache:
+ device: str
+ filesystem: str
+ mountpoint: str
+ cache_path: str
+ size_bytes: Optional[int] = None
+
+ _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+ _KNOWN_KEYS = {
+ "device",
+ "filesystem",
+ "mountpoint",
+ "cache_path",
+ "size_bytes",
+ }
+
+ @classmethod
+ def from_json(cls, obj: Mapping[str, Any]) -> "D3NCache":
+ if not isinstance(obj, Mapping):
+ raise D3NCacheError(
+ f"d3n_cache must be a dict, got {type(obj).__name__}"
+ )
+
+ unknown = set(obj.keys()) - cls._KNOWN_KEYS
+ if unknown:
+ raise D3NCacheError(
+ f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+ )
+
+ d = cls(
+ device=str(obj.get("device", "")).strip(),
+ filesystem=str(obj.get("filesystem", "xfs")).strip(),
+ mountpoint=str(obj.get("mountpoint", "")).strip(),
+ cache_path=str(obj.get("cache_path", "")).strip(),
+ size_bytes=obj.get("size_bytes"),
+ )
+ d.validate()
+ return d
+
+ def to_json(self) -> Dict[str, Any]:
+ out: Dict[str, Any] = {
+ "device": self.device,
+ "filesystem": self.filesystem,
+ "mountpoint": self.mountpoint,
+ "cache_path": self.cache_path,
+ }
+ if self.size_bytes is not None:
+ out["size_bytes"] = self.size_bytes
+ return out
+
+ def validate(self) -> None:
+ if not self.device:
+ raise D3NCacheError("d3n_cache.device must be a non-empty string")
+ if not self.device.startswith("/"):
+ raise D3NCacheError(
+ f"d3n_cache.device must be an absolute path, got {self.device!r}"
+ )
+
+ if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+ raise D3NCacheError(
+ f"Invalid filesystem {self.filesystem!r} "
+ f"(supported: {', '.join(self._SUPPORTED_FILESYSTEMS)})"
+ )
+
+ if not self.mountpoint:
+ raise D3NCacheError(
+ "d3n_cache.mountpoint must be a non-empty string"
+ )
+ if not self.mountpoint.startswith("/"):
+ raise D3NCacheError(
+ f"d3n_cache.mountpoint must be an absolute path, got {self.mountpoint!r}"
+ )
+
+ if not self.cache_path:
+ raise D3NCacheError(
+ "d3n_cache.cache_path must be a non-empty string"
+ )
+ if not self.cache_path.startswith("/"):
+ raise D3NCacheError(
+ f"d3n_cache.cache_path must be an absolute path, got {self.cache_path!r}"
+ )
+
+ if self.size_bytes is not None:
+ if not isinstance(self.size_bytes, int) or self.size_bytes <= 0:
+ raise D3NCacheError(
+ "d3n_cache.size_bytes must be a positive integer"
+ )
+
+
+def d3n_get_host_devs(
+ d3n: D3NCacheSpec, host: str
+) -> Tuple[str, int, list[str]]:
+
+ fs_type = d3n.filesystem
+ devs = d3n.devices_for_host(host)
+ try:
+ size_bytes = size_to_bytes(d3n.size)
+ except Exception as e:
+ raise D3NCacheError(
+ f'invalid d3n_cache.size {d3n.size!r}: {e}'
+ ) from e
+
+ return fs_type, size_bytes, devs
+
+
+def d3n_parse_dev_from_path(p: str) -> Optional[str]:
+ # expected: /mnt/ceph-d3n/<fsid>/<dev>/rgw_datacache/...
+
+ if not p:
+ return None
+
+ parts = [x for x in p.split('/') if x]
+ try:
+ i = parts.index('ceph-d3n')
+ except ValueError:
+ return None
+ if len(parts) < i + 3:
+ return None
+ dev = parts[i + 2]
+ return f'/dev/{dev}' if dev else None
+
+
+def d3n_mountpoint(fsid: str, dev: str) -> str:
+ dev_key = os.path.basename(dev)
+ return f"/mnt/ceph-d3n/{fsid}/{dev_key}"
+
+
+def d3n_cache_path(mountpoint: str, daemon_id: str) -> str:
+ daemon_entity = f"client.rgw.{daemon_id}"
+ return os.path.join(mountpoint, "rgw_datacache", daemon_entity)
+
+
+def d3n_paths(fsid: str, device: str, daemon_id: str) -> tuple[str, str]:
+ mp = d3n_mountpoint(fsid, device)
+ return mp, d3n_cache_path(mp, daemon_id)
from ceph.deployment.utils import verify_positive_int, verify_non_negative_number
from ceph.deployment.utils import verify_boolean, verify_enum, verify_int
from ceph.deployment.utils import parse_combined_pem_file
+from ceph.cephadm.d3n_types import D3NCacheSpec, D3NCacheError
from ceph.utils import is_hex
from ceph.smb import constants as smbconst
from ceph.smb import network as smbnet
rgw_frontend_port: 1234
rgw_frontend_type: beast
rgw_frontend_ssl_certificate: ...
+ # Optional: enable D3N (L1 datacache) for RGW
+ d3n_cache:
+ filesystem: xfs # default: xfs
+ size: 10G # required; int bytes or string with K/M/G/T/P
+ devices: # required; per-host list of devices
+ host1:
+ - /dev/nvme0n1
+ host2:
+ - /dev/nvme1n1
+ - /dev/nvme2n1
See also: :ref:`orchestrator-cli-service-spec`
"""
wildcard_enabled: Optional[bool] = False,
rgw_exit_timeout_secs: int = 120,
qat: Optional[Dict[str, str]] = None,
+ d3n_cache: Optional[Dict[str, Any]] = None,
):
assert service_type == 'rgw', service_type
self.rgw_exit_timeout_secs = rgw_exit_timeout_secs
self.qat = qat or {}
+ self.d3n_cache = d3n_cache or {}
def get_port_start(self) -> List[int]:
ports = self.get_port()
f"Invalid compression mode {compression}. Only 'sw' and 'hw' are allowed"
)
+ if self.d3n_cache:
+ try:
+ D3NCacheSpec.from_json(self.d3n_cache)
+ except D3NCacheError as e:
+ raise SpecValidationError(str(e))
+
yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)
import string
import ssl
-from typing import Optional, MutableMapping, Tuple, Any
+from typing import Optional, MutableMapping, Tuple, Any, Union
from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
log = logging.getLogger(__name__)
+_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
+
def datetime_now() -> datetime.datetime:
"""
if value.lower() in _FALSE_VALS:
return False
raise ValueError(f'invalid truth value {value!r}')
+
+
+def size_to_bytes(v: Union[str, int]) -> int:
+ if isinstance(v, int):
+ return v
+ if isinstance(v, str):
+ m = _SIZE_RE.match(v)
+ if not m:
+ raise ValueError(
+ f'invalid size "{v}" (examples: 10737418240, 10G, 512M)'
+ )
+ num = int(m.group(1))
+ unit = m.group(2) or ''
+ unit = unit.upper()
+ mult = {
+ '': 1,
+ 'K': 1024,
+ 'M': 1024**2,
+ 'G': 1024**3,
+ 'T': 1024**4,
+ 'P': 1024**5,
+ }[unit]
+ return num * mult
+ raise ValueError(f'invalid size type {type(v)} (expected int or str)')