From: Kushal Deb Date: Mon, 22 Dec 2025 12:58:29 +0000 (+0530) Subject: mgr/cephadm: Implement D3N L1 persistent datacache support for RGW X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=79d72fc70cdedd0253419c669ea5fb156f267268;p=ceph.git mgr/cephadm: Implement D3N L1 persistent datacache support for RGW Add RGW D3N L1 persistent datacache support backed by host block devices. Select devices deterministically per (service, host) with intra-service sharing, forbid cross-service reuse, prepare/mount devices, and bind-mount per-daemon cache directories into the container. Signed-off-by: Kushal Deb --- diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index 257f2f8cab46..8203a40ae2c1 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -29,6 +29,7 @@ from glob import glob from io import StringIO from threading import Thread, Event from pathlib import Path +from enum import Enum from configparser import ConfigParser from cephadmlib.constants import ( @@ -228,7 +229,7 @@ from cephadmlib.listing_updaters import ( VersionStatusUpdater, ) from cephadmlib.container_lookup import infer_local_ceph_image, identify - +from ceph.cephadm.d3n_types import D3NCache, D3NCacheError FuncT = TypeVar('FuncT', bound=Callable) @@ -898,6 +899,178 @@ def _update_container_args_for_podman( ) +def _d3n_fstab_entry(uuid: str, mountpoint: str, fs_type: str) -> str: + return f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n' + + +def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None: + """ + Ensure the device is present in /etc/fstab for the given mountpoint. + If an entry for mountpoint already exists, no changes are made. + """ + out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device]) + if code != 0 or not out.strip(): + raise Error(f'Failed to get UUID for {device}') + uuid = out.strip() + + entry = _d3n_fstab_entry(uuid, mountpoint, fs_type) + + # check if mountpoint already present in fstab + with open('/etc/fstab', 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + parts = line.split() + if len(parts) >= 2 and parts[1] == mountpoint: + return + + with open('/etc/fstab', 'a') as f: + f.write(entry) + + +class D3NStateAction(str, Enum): + WRITE = 'write' + CLEANUP = 'cleanup' + + +def d3n_state( + ctx: CephadmContext, + data_dir: str, + action: D3NStateAction, + d3n: Optional[D3NCache] = None, + uid: int = 0, + gid: int = 0, +) -> None: + """ + Persist/read minimal D3N info in the daemon's data directory + so that rm-daemon can cleanup properly. + """ + path = os.path.join(data_dir, 'd3n_state.json') + + if action == D3NStateAction.WRITE: + if d3n is None: + return + state = { + 'cache_path': d3n.cache_path, + 'mount_path': d3n.mountpoint, + } + payload = json.dumps(state, sort_keys=True) + '\n' + with write_new(path, owner=(uid, gid)) as f: + f.write(payload) + return + + if action == D3NStateAction.CLEANUP: + if not os.path.exists(path): + return + + try: + with open(path, 'r') as f: + state = json.load(f) + except Exception: + state = {} + + cache_path = state.get('cache_path') if isinstance(state, dict) else None + if isinstance(cache_path, str) and cache_path: + try: + shutil.rmtree(cache_path, ignore_errors=True) + logger.info(f'[D3N] removed cache directory: {cache_path}') + except Exception as e: + logger.warning(f'[D3N] failed to remove cache directory {cache_path}: {e}') + + try: + os.unlink(path) + except FileNotFoundError: + pass + except Exception as e: + logger.warning(f'[D3N] failed to remove {path}: {e}') + return + + raise Error(f'[D3N] invalid d3n_state action: {action}') + + +def prepare_d3n_cache(ctx: CephadmContext, d3n: D3NCache, uid: int, gid: int) -> None: + """ + Prepare a D3N cache mount and directory. + + Steps: + 1. Ensure mountpoint directory exists + 2. Format device if it has no filesystem + 3. Ensure /etc/fstab entry exists + 4. Mount if not mounted + 5. Ensure cache_path exists and is owned by daemon uid/gid + """ + device = d3n.device + fs_type = d3n.filesystem + mountpoint = d3n.mountpoint + cache_path = d3n.cache_path + size_bytes = d3n.size_bytes + + logger.debug( + f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}' + ) + + # Ensure mountpoint exists + os.makedirs(mountpoint, mode=0o755, exist_ok=True) + logger.debug(f'[D3N] checking filesystem on device {device}') + + # Format the device if needed + if not _has_filesystem(ctx, device): + logger.debug(f'Formatting {device} with {fs_type} for D3N') + call_throws(ctx, ['mkfs', '-t', fs_type, device]) + + # Ensure the mount is persistent across reboot by ensuring an /etc/fstab entry exists. + _ensure_fstab_entry(ctx, device, mountpoint, fs_type) + + if not _is_mountpoint(ctx, mountpoint): + logger.debug(f'[D3N] mountpoint not mounted, running mount {mountpoint}') + call_throws(ctx, ['mount', mountpoint]) + else: + logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}') + + if size_bytes is not None: + avail = _avail_bytes(ctx, mountpoint) + if avail < size_bytes: + raise Error( + f'Not enough free space for D3N cache on {mountpoint}: ' + f'need {size_bytes} bytes, have {avail} bytes' + ) + + # Create per-daemon cache directory + os.makedirs(cache_path, mode=0o755, exist_ok=True) + call_throws(ctx, ['chown', '-R', f'{uid}:{gid}', cache_path]) + + +def _has_filesystem(ctx: CephadmContext, device: str) -> bool: + if not os.path.exists(device): + raise Error(f'D3N device does not exist: {device}') + out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device]) + return code == 0 and bool(out.strip()) + + +def _is_mountpoint(ctx: CephadmContext, path: str) -> bool: + out, err, code = call( + ctx, + ['findmnt', '-n', '-o', 'TARGET,SOURCE,FSTYPE', '--mountpoint', path], + ) + logger.debug( + f'[D3N] _is_mountpoint({path}): code={code} out={out.strip()!r} err={err.strip()!r}' + ) + return code == 0 + + +def _avail_bytes(ctx: CephadmContext, path: str) -> int: + out, _, code = call(ctx, ['df', '-B1', '--output=avail', path]) + if code != 0: + raise Error(f'Failed to check free space for {path}') + lines = [line.strip() for line in out.splitlines() if line.strip()] + if len(lines) < 2: + raise Error(f'Unexpected df output for {path}: {out!r}') + logger.debug(f'[D3N] df avail bytes for {path}: {lines[1]!r}') + + return int(lines[1]) + + def deploy_daemon( ctx: CephadmContext, ident: 'DaemonIdentity', @@ -970,6 +1143,21 @@ def deploy_daemon( # dirs, conf, keyring create_daemon_dirs(ctx, ident, uid, gid, config, keyring) + if ident.daemon_type == 'rgw': + config_json = fetch_configs(ctx) + d3n_cache: Any = config_json.get('d3n_cache') + + if d3n_cache: + try: + d3n = D3NCache.from_json(d3n_cache) + except D3NCacheError as e: + raise Error(str(e)) + prepare_d3n_cache(ctx, d3n, uid, gid) + try: + d3n_state(ctx, data_dir, D3NStateAction.WRITE, d3n, uid, gid) + except Exception as e: + logger.warning(f'[D3N] failed to persist D3N state in {data_dir}: {e}') + # only write out unit files and start daemon # with systemd if this is not a reconfig if deployment_type != DeploymentType.RECONFIG: @@ -3948,6 +4136,10 @@ def command_rm_daemon(ctx): verbosity=CallVerbosity.DEBUG) data_dir = ident.data_dir(ctx.data_dir) + + if ident.daemon_type == 'rgw': + d3n_state(ctx, data_dir, action=D3NStateAction.CLEANUP) + if ident.daemon_type in ['mon', 'osd', 'prometheus'] and \ not ctx.force_delete_data: # rename it out of the way -- do not delete diff --git a/src/cephadm/cephadmlib/daemons/ceph.py b/src/cephadm/cephadmlib/daemons/ceph.py index ea359d73ed77..0134f69da151 100644 --- a/src/cephadm/cephadmlib/daemons/ceph.py +++ b/src/cephadm/cephadmlib/daemons/ceph.py @@ -197,6 +197,20 @@ class Ceph(ContainerDaemonForm): ) mounts.update(cm) + if self.identity.daemon_type == 'rgw': + config_json = fetch_configs(ctx) or {} + d3n_cache = config_json.get('d3n_cache') + + if d3n_cache: + if not isinstance(d3n_cache, dict): + raise Error( + f'Invalid d3n_cache config: expected dict got {type(d3n_cache).__name__}' + ) + + cache_path = d3n_cache.get('cache_path') + if cache_path and isinstance(cache_path, str): + mounts[cache_path] = f'{cache_path}:z' + def setup_qat_args(self, ctx: CephadmContext, args: List[str]) -> None: try: out, _, _ = call_throws(ctx, ['ls', '-1', '/dev/vfio/devices']) diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index fd0190f2f314..c6705ca03790 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -1,7 +1,7 @@ import errno +import re import json import logging -import re import socket import time from abc import ABCMeta, abstractmethod @@ -33,6 +33,14 @@ from orchestrator import ( ) from orchestrator._interface import daemon_type_to_service from cephadm import utils +from ceph.cephadm.d3n_types import ( + D3NCache, + D3NCacheError, + D3NCacheSpec, + d3n_get_host_devs, + d3n_paths, +) +from cephadm.services.rgw_d3n import D3NDevicePlanner from .service_registry import register_cephadm_service from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert @@ -1390,6 +1398,53 @@ class RgwService(CephService): self.mgr.spec_store.save(spec) self.mgr.trigger_connect_dashboard_rgw() + def _compute_d3n_cache_for_daemon( + self, + daemon_spec: CephadmDaemonDeploySpec, + spec: RGWSpec, + ) -> Optional[D3NCache]: + + alloc = D3NDevicePlanner(self.mgr) + d3n_raw = getattr(spec, 'd3n_cache', None) + + if not d3n_raw: + return None + + try: + d3n = D3NCacheSpec.from_json(d3n_raw) + except D3NCacheError as e: + raise OrchestratorError(str(e)) + + host = daemon_spec.host + if not host: + raise OrchestratorError("missing host in daemon_spec") + + service_name = daemon_spec.service_name + daemon_details = self.mgr.cache.get_daemons_by_service(service_name) + + fs_type, size_bytes, devs = d3n_get_host_devs(d3n, host) + device = alloc.plan_device_for_daemon(service_name, host, devs, daemon_spec.daemon_id, daemon_details) + + logger.info( + f"[D3N][alloc] service={service_name} host={host} daemon={daemon_spec.daemon_id} " + f"chosen={device} used=? devs={devs}" + + ) + + if daemon_spec.daemon_id: + # warn if sharing is unavoidable + alloc.warn_if_sharing_unavoidable(service_name, host, devs) + + mountpoint, cache_path = d3n_paths(self.mgr._cluster_fsid, device, daemon_spec.daemon_id) + + return D3NCache( + device=device, + filesystem=fs_type, + size_bytes=size_bytes, + mountpoint=mountpoint, + cache_path=cache_path, + ) + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type self.register_for_certificates(daemon_spec) @@ -1550,6 +1605,36 @@ class RgwService(CephService): daemon_spec.keyring = keyring daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + d3n_cache = daemon_spec.final_config.get('d3n_cache') + + if d3n_cache: + try: + d3n = D3NCache.from_json(d3n_cache) + except D3NCacheError as e: + raise OrchestratorError(str(e)) + + cache_path = d3n.cache_path + size = d3n.size_bytes + + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': daemon_name, + 'name': 'rgw_d3n_l1_local_datacache_enabled', + 'value': 'true', + }) + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': daemon_name, + 'name': 'rgw_d3n_l1_datacache_persistent_path', + 'value': cache_path, + }) + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': daemon_name, + 'name': 'rgw_d3n_l1_datacache_size', + 'value': str(size), + }) + return daemon_spec def get_keyring(self, rgw_id: str) -> str: @@ -1583,11 +1668,28 @@ class RgwService(CephService): 'who': utils.name_to_config_section(daemon.name()), 'name': 'rgw_frontends', }) + + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_d3n_l1_local_datacache_enabled', + }) + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_d3n_l1_datacache_persistent_path', + }) + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_d3n_l1_datacache_size', + }) self.mgr.check_mon_command({ 'prefix': 'config rm', 'who': utils.name_to_config_section(daemon.name()), 'name': 'qat_compressor_enabled' }) + self.mgr.check_mon_command({ 'prefix': 'config-key rm', 'key': f'rgw/cert/{daemon.name()}', @@ -1641,6 +1743,10 @@ class RgwService(CephService): if svc_spec.qat: config['qat'] = svc_spec.qat + d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec) + if d3n_cache: + config['d3n_cache'] = d3n_cache.to_json() + rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec) return config, rgw_deps diff --git a/src/pybind/mgr/cephadm/services/rgw_d3n.py b/src/pybind/mgr/cephadm/services/rgw_d3n.py new file mode 100644 index 000000000000..34bbbed01277 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/rgw_d3n.py @@ -0,0 +1,172 @@ +import logging + +from typing import List, Dict, Tuple, TYPE_CHECKING + +from dataclasses import dataclass +from orchestrator import ( + OrchestratorError, + DaemonDescription, +) +from cephadm import utils +from ceph.cephadm.d3n_types import d3n_parse_dev_from_path +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + + +logger = logging.getLogger(__name__) + + +@dataclass +class D3NDevicePlanner: + mgr: "CephadmOrchestrator" + + def _d3n_fail_if_devs_used_by_other_rgw_service(self, host: str, devs: List[str], service_name: str) -> None: + logger.info("1361") + wanted = set(devs) + + # check rgw daemons on this host + rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw', host=host) + + for dd in rgw_daemons: + # skip daemons that belong to the same service + other_service = dd.service_name() + + if other_service == service_name: + continue + + who = utils.name_to_config_section(dd.name()) + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config get', + 'who': who, + 'name': 'rgw_d3n_l1_datacache_persistent_path', + }) + if ret != 0: + continue + + p = out or '' + p = p.strip() + if not p: + continue + + used_dev = d3n_parse_dev_from_path(p) + if used_dev and used_dev in wanted: + raise OrchestratorError( + f'D3N device conflict on host "{host}": device "{used_dev}" is already used by RGW service "{other_service}" ' + f'(daemon {dd.name()}). Refuse to reuse across services.' + ) + + def _d3n_gc_and_prune_alloc( + self, + alloc: Dict[str, str], + devs: list[str], + daemon_details: list[DaemonDescription], + current_daemon_id: str, + key: tuple[str, str], + ) -> None: + + invalid = [did for did, dev in alloc.items() if dev not in devs] + for did in invalid: + del alloc[did] + logger.debug(f"[D3N][alloc] prune-invalid: removed={invalid} devs={devs} alloc_now={alloc}") + if not daemon_details: + if alloc: + logger.info(f"[D3N][alloc] clear-stale: key={key} alloc_was={alloc}") + alloc.clear() + return + + live_daemon_ids: set[str] = set() + for dd in daemon_details: + if dd.daemon_id: + live_daemon_ids.add(dd.daemon_id) + + if current_daemon_id: + live_daemon_ids.add(current_daemon_id) + + stale = [did for did in list(alloc.keys()) if did not in live_daemon_ids] + for did in stale: + del alloc[did] + logger.debug( + f"gc: key={key} live={sorted(live_daemon_ids)} " + f"removed={stale} alloc_now={alloc}" + ) + + def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]: + """ + Return the in-memory D3N device allocation map. + + This is intentionally stored on the mgr module instance (self.mgr) as + ephemeral state to keep per-(service, host) device selections stable across + repeated scheduling cycles within the lifetime of the mgr process. + + It is not persisted to disk/mon-store; it will be rebuilt after mgr restart + and also pruned via _d3n_gc_and_prune_alloc() based on currently running + daemons. + """ + alloc_all = getattr(self.mgr, "_d3n_device_alloc", None) + if alloc_all is None: + alloc_all = {} + setattr(self.mgr, "_d3n_device_alloc", alloc_all) + + assert isinstance(alloc_all, dict) + return alloc_all + + def _d3n_choose_device_for_daemon( + self, + service_name: str, + host: str, + devs: list[str], + daemon_id: str, + daemon_details: list[DaemonDescription], + ) -> str: + alloc_all = self._d3n_get_allocator() + key = (service_name, host) + alloc = alloc_all.setdefault(key, {}) + assert isinstance(alloc, dict) + reason = "unknown" + + self._d3n_gc_and_prune_alloc(alloc, devs, daemon_details, daemon_id, key) + + logger.info( + f"[D3N][alloc] pre-choose key={key} daemon={daemon_id} " + f"daemon_details={len(daemon_details)} devs={devs} alloc={alloc}" + ) + + if daemon_id in alloc: + logger.info(f"[D3N][alloc] reuse existing mapping daemon={daemon_id} dev={alloc[daemon_id]}") + return alloc[daemon_id] + + used = set(alloc.values()) + free = [d for d in devs if d not in used] + if free: + chosen = free[0] # 1:1 mapping whenever possible + else: + chosen = devs[len(alloc) % len(devs)] # share the device when unavoidable + reason = "round-robin/share" + alloc[daemon_id] = chosen + logger.info(f"[D3N][alloc] chosen={chosen} reason={reason} alloc_now={alloc}") + return chosen + + def warn_if_sharing_unavoidable( + self, + service_name: str, + host: str, + devs: list[str], + ) -> None: + alloc_all = getattr(self.mgr, "_d3n_device_alloc", {}) + alloc = alloc_all.get((service_name, host), {}) if isinstance(alloc_all, dict) else {} + if isinstance(alloc, dict) and len(alloc) > len(devs): + logger.warning( + f'D3N cache sub-optimal on host "{host}" for service "{service_name}": ' + f'{len(alloc)} RGW daemons but only {len(devs)} devices; devices will be shared.' + ) + + def plan_device_for_daemon( + self, + service_name: str, + host: str, + devs: list[str], + daemon_id: str, + daemon_details: list[DaemonDescription], + ) -> str: + self._d3n_fail_if_devs_used_by_other_rgw_service(host, devs, service_name) + return self._d3n_choose_device_for_daemon(service_name, host, devs, daemon_id, daemon_details) diff --git a/src/python-common/ceph/cephadm/d3n_types.py b/src/python-common/ceph/cephadm/d3n_types.py new file mode 100644 index 000000000000..589b47abb6a4 --- /dev/null +++ b/src/python-common/ceph/cephadm/d3n_types.py @@ -0,0 +1,237 @@ +import os +from dataclasses import dataclass +from typing import Any, Dict, Mapping, Optional, Union, Tuple +from ceph.utils import size_to_bytes + + +class D3NCacheError(ValueError): + pass + + +@dataclass(frozen=True) +class D3NCacheSpec: + """ + RGW spec's d3n_cache section: + d3n_cache: + filesystem: xfs|ext4 (optional, default xfs) + size: 10G|512M|... (required) + devices: + host1: [/dev/nvme0n1, ...] (required) + host2: [/dev/nvme1n1, ...] + """ + + filesystem: str + size: Union[str, int] + devices: Dict[str, list[str]] + + _SUPPORTED_FILESYSTEMS = ("xfs", "ext4") + _KNOWN_KEYS = {"filesystem", "size", "devices"} + + @classmethod + def from_json(cls, obj: Mapping[str, Any]) -> "D3NCacheSpec": + if not isinstance(obj, Mapping): + raise D3NCacheError( + f"d3n_cache must be a dict, got {type(obj).__name__}" + ) + + unknown = set(obj.keys()) - cls._KNOWN_KEYS + if unknown: + raise D3NCacheError( + f"Unknown d3n_cache field(s): {sorted(unknown)!r}" + ) + + filesystem = str(obj.get("filesystem", "xfs")).strip() + size = obj.get("size", None) + if not isinstance(size, (str, int)) or ( + isinstance(size, str) and not size.strip() + ): + raise D3NCacheError( + '"d3n_cache.size" must be an int (bytes) or a string like "10G"' + ) + + devices_raw = obj.get("devices", None) + + if not isinstance(devices_raw, Mapping): + raise D3NCacheError( + '"d3n_cache.devices" must be a mapping of host -> list of device paths' + ) + + devices: Dict[str, list[str]] = {} + for host, devs in devices_raw.items(): + if not isinstance(host, str) or not host.strip(): + raise D3NCacheError( + "Invalid host key in d3n_cache.devices (must be non-empty string)" + ) + if not isinstance(devs, list) or not devs: + raise D3NCacheError( + f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths' + ) + + norm: list[str] = [] + for d in devs: + if not isinstance(d, str) or not d.startswith("/dev/"): + raise D3NCacheError( + f'Invalid device path "{d}" in d3n_cache.devices[{host}]' + ) + norm.append(d) + + devices[host] = sorted(set(norm)) + + spec = cls(filesystem=filesystem, size=size, devices=devices) + spec.validate() + return spec + + def validate(self) -> None: + if self.filesystem not in self._SUPPORTED_FILESYSTEMS: + raise D3NCacheError( + f'Invalid filesystem "{self.filesystem}" in d3n_cache ' + f'(supported: {", ".join(self._SUPPORTED_FILESYSTEMS)})' + ) + if self.size is None or self.size == "": + raise D3NCacheError('"d3n_cache.size" is required') + + def devices_for_host(self, host: str) -> list[str]: + devs = self.devices.get(host) + if not devs: + raise D3NCacheError( + f'no devices found for host "{host}" in d3n_cache.devices' + ) + return devs + + +@dataclass(frozen=True) +class D3NCache: + device: str + filesystem: str + mountpoint: str + cache_path: str + size_bytes: Optional[int] = None + + _SUPPORTED_FILESYSTEMS = ("xfs", "ext4") + _KNOWN_KEYS = { + "device", + "filesystem", + "mountpoint", + "cache_path", + "size_bytes", + } + + @classmethod + def from_json(cls, obj: Mapping[str, Any]) -> "D3NCache": + if not isinstance(obj, Mapping): + raise D3NCacheError( + f"d3n_cache must be a dict, got {type(obj).__name__}" + ) + + unknown = set(obj.keys()) - cls._KNOWN_KEYS + if unknown: + raise D3NCacheError( + f"Unknown d3n_cache field(s): {sorted(unknown)!r}" + ) + + d = cls( + device=str(obj.get("device", "")).strip(), + filesystem=str(obj.get("filesystem", "xfs")).strip(), + mountpoint=str(obj.get("mountpoint", "")).strip(), + cache_path=str(obj.get("cache_path", "")).strip(), + size_bytes=obj.get("size_bytes"), + ) + d.validate() + return d + + def to_json(self) -> Dict[str, Any]: + out: Dict[str, Any] = { + "device": self.device, + "filesystem": self.filesystem, + "mountpoint": self.mountpoint, + "cache_path": self.cache_path, + } + if self.size_bytes is not None: + out["size_bytes"] = self.size_bytes + return out + + def validate(self) -> None: + if not self.device: + raise D3NCacheError("d3n_cache.device must be a non-empty string") + if not self.device.startswith("/"): + raise D3NCacheError( + f"d3n_cache.device must be an absolute path, got {self.device!r}" + ) + + if self.filesystem not in self._SUPPORTED_FILESYSTEMS: + raise D3NCacheError( + f"Invalid filesystem {self.filesystem!r} " + f"(supported: {', '.join(self._SUPPORTED_FILESYSTEMS)})" + ) + + if not self.mountpoint: + raise D3NCacheError( + "d3n_cache.mountpoint must be a non-empty string" + ) + if not self.mountpoint.startswith("/"): + raise D3NCacheError( + f"d3n_cache.mountpoint must be an absolute path, got {self.mountpoint!r}" + ) + + if not self.cache_path: + raise D3NCacheError( + "d3n_cache.cache_path must be a non-empty string" + ) + if not self.cache_path.startswith("/"): + raise D3NCacheError( + f"d3n_cache.cache_path must be an absolute path, got {self.cache_path!r}" + ) + + if self.size_bytes is not None: + if not isinstance(self.size_bytes, int) or self.size_bytes <= 0: + raise D3NCacheError( + "d3n_cache.size_bytes must be a positive integer" + ) + + +def d3n_get_host_devs( + d3n: D3NCacheSpec, host: str +) -> Tuple[str, int, list[str]]: + + fs_type = d3n.filesystem + devs = d3n.devices_for_host(host) + try: + size_bytes = size_to_bytes(d3n.size) + except Exception as e: + raise D3NCacheError( + f'invalid d3n_cache.size {d3n.size!r}: {e}' + ) from e + + return fs_type, size_bytes, devs + + +def d3n_parse_dev_from_path(p: str) -> Optional[str]: + # expected: /mnt/ceph-d3n///rgw_datacache/... + + if not p: + return None + + parts = [x for x in p.split('/') if x] + try: + i = parts.index('ceph-d3n') + except ValueError: + return None + if len(parts) < i + 3: + return None + dev = parts[i + 2] + return f'/dev/{dev}' if dev else None + + +def d3n_mountpoint(fsid: str, dev: str) -> str: + dev_key = os.path.basename(dev) + return f"/mnt/ceph-d3n/{fsid}/{dev_key}" + + +def d3n_cache_path(mountpoint: str, daemon_id: str) -> str: + daemon_entity = f"client.rgw.{daemon_id}" + return os.path.join(mountpoint, "rgw_datacache", daemon_entity) + + +def d3n_paths(fsid: str, device: str, daemon_id: str) -> tuple[str, str]: + mp = d3n_mountpoint(fsid, device) + return mp, d3n_cache_path(mp, daemon_id) diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 93d30264eeb4..246a9b898c6b 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -38,6 +38,7 @@ from ceph.deployment.utils import unwrap_ipv6, valid_addr, verify_non_negative_i from ceph.deployment.utils import verify_positive_int, verify_non_negative_number from ceph.deployment.utils import verify_boolean, verify_enum, verify_int from ceph.deployment.utils import parse_combined_pem_file +from ceph.cephadm.d3n_types import D3NCacheSpec, D3NCacheError from ceph.utils import is_hex from ceph.smb import constants as smbconst from ceph.smb import network as smbnet @@ -1459,6 +1460,16 @@ class RGWSpec(ServiceSpec): rgw_frontend_port: 1234 rgw_frontend_type: beast rgw_frontend_ssl_certificate: ... + # Optional: enable D3N (L1 datacache) for RGW + d3n_cache: + filesystem: xfs # default: xfs + size: 10G # required; int bytes or string with K/M/G/T/P + devices: # required; per-host list of devices + host1: + - /dev/nvme0n1 + host2: + - /dev/nvme1n1 + - /dev/nvme2n1 See also: :ref:`orchestrator-cli-service-spec` """ @@ -1509,6 +1520,7 @@ class RGWSpec(ServiceSpec): wildcard_enabled: Optional[bool] = False, rgw_exit_timeout_secs: int = 120, qat: Optional[Dict[str, str]] = None, + d3n_cache: Optional[Dict[str, Any]] = None, ): assert service_type == 'rgw', service_type @@ -1577,6 +1589,7 @@ class RGWSpec(ServiceSpec): self.rgw_exit_timeout_secs = rgw_exit_timeout_secs self.qat = qat or {} + self.d3n_cache = d3n_cache or {} def get_port_start(self) -> List[int]: ports = self.get_port() @@ -1661,6 +1674,12 @@ class RGWSpec(ServiceSpec): f"Invalid compression mode {compression}. Only 'sw' and 'hw' are allowed" ) + if self.d3n_cache: + try: + D3NCacheSpec.from_json(self.d3n_cache) + except D3NCacheError as e: + raise SpecValidationError(str(e)) + yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer) diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py index 5419de7da32b..4bd3c2c329f1 100644 --- a/src/python-common/ceph/utils.py +++ b/src/python-common/ceph/utils.py @@ -3,7 +3,7 @@ import re import string import ssl -from typing import Optional, MutableMapping, Tuple, Any +from typing import Optional, MutableMapping, Tuple, Any, Union from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request @@ -11,6 +11,8 @@ import logging log = logging.getLogger(__name__) +_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$') + def datetime_now() -> datetime.datetime: """ @@ -189,3 +191,27 @@ def strtobool(value: str) -> bool: if value.lower() in _FALSE_VALS: return False raise ValueError(f'invalid truth value {value!r}') + + +def size_to_bytes(v: Union[str, int]) -> int: + if isinstance(v, int): + return v + if isinstance(v, str): + m = _SIZE_RE.match(v) + if not m: + raise ValueError( + f'invalid size "{v}" (examples: 10737418240, 10G, 512M)' + ) + num = int(m.group(1)) + unit = m.group(2) or '' + unit = unit.upper() + mult = { + '': 1, + 'K': 1024, + 'M': 1024**2, + 'G': 1024**3, + 'T': 1024**4, + 'P': 1024**5, + }[unit] + return num * mult + raise ValueError(f'invalid size type {type(v)} (expected int or str)')