from io import StringIO
from threading import Thread, Event
from pathlib import Path
+from enum import Enum
from configparser import ConfigParser
from cephadmlib.constants import (
VersionStatusUpdater,
)
from cephadmlib.container_lookup import infer_local_ceph_image, identify
-
+from ceph.cephadm.d3n import D3NCache, D3NCacheError
FuncT = TypeVar('FuncT', bound=Callable)
)
+def _d3n_fstab_entry(uuid: str, mountpoint: str, fs_type: str) -> str:
+ return f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+
+
def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None:
+ """
+ Ensure the device is present in /etc/fstab for the given mountpoint.
+ If an entry for mountpoint already exists, no changes are made.
+ """
out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device])
if code != 0 or not out.strip():
raise Error(f'Failed to get UUID for {device}')
uuid = out.strip()
- entry = f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+ entry = _d3n_fstab_entry(uuid, mountpoint, fs_type)
# check if mountpoint already present in fstab
with open('/etc/fstab', 'r') as f:
f.write(entry)
-def prepare_d3n_cache(
- ctx: CephadmContext,
- d3n_cache: Dict[str, Any],
- uid: int,
- gid: int,
+class D3NStateAction(str, Enum):
+ WRITE = 'write'
+ CLEANUP = 'cleanup'
+
+
+def d3n_state(
+ ctx: CephadmContext,
+ data_dir: str,
+ action: D3NStateAction,
+ d3n: Optional[D3NCache] = None,
+ uid: int = 0,
+ gid: int = 0,
) -> None:
- device = d3n_cache.get('device')
- fs_type = d3n_cache.get('filesystem', 'xfs')
- mountpoint = d3n_cache.get('mountpoint')
- cache_path = d3n_cache.get('cache_path')
- size_bytes = d3n_cache.get('size_bytes')
+ """
+ Persist/read minimal D3N info in the daemon's data directory
+ so that rm-daemon can cleanup properly.
+ """
+ path = os.path.join(data_dir, 'd3n_state.json')
+
+ if action == D3NStateAction.WRITE:
+ if d3n is None:
+ return
+ state = {
+ 'cache_path': d3n.cache_path,
+ 'mount_path': d3n.mountpoint,
+ }
+ payload = json.dumps(state, sort_keys=True) + '\n'
+ with write_new(path, owner=(uid, gid)) as f:
+ f.write(payload)
+ return
+
+ if action == D3NStateAction.CLEANUP:
+ if not os.path.exists(path):
+ return
+
+ try:
+ with open(path, 'r') as f:
+ state = json.load(f)
+ except Exception:
+ state = {}
+
+ cache_path = state.get('cache_path') if isinstance(state, dict) else None
+ if isinstance(cache_path, str) and cache_path:
+ try:
+ shutil.rmtree(cache_path, ignore_errors=True)
+ logger.info(f'[D3N] removed cache directory: {cache_path}')
+ except Exception as e:
+ logger.warning(f'[D3N] failed to remove cache directory {cache_path}: {e}')
+
+ try:
+ os.unlink(path)
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ logger.warning(f'[D3N] failed to remove {path}: {e}')
+ return
+
+ raise Error(f'[D3N] invalid d3n_state action: {action}')
+
+
+def prepare_d3n_cache(ctx: CephadmContext, d3n: D3NCache, uid: int, gid: int) -> None:
+ """
+ Prepare a D3N cache mount and directory.
+
+ Steps:
+ 1. Ensure mountpoint directory exists
+ 2. Format device if it has no filesystem
+ 3. Ensure /etc/fstab entry exists
+ 4. Mount if not mounted
+ 5. Ensure cache_path exists and is owned by daemon uid/gid
+ """
+ device = d3n.device
+ fs_type = d3n.filesystem
+ mountpoint = d3n.mountpoint
+ cache_path = d3n.cache_path
+ size_bytes = d3n.size_bytes
+
logger.debug(
f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}'
)
- if not device:
- raise Error('d3n_cache.device must be specified')
- if not mountpoint:
- raise Error('d3n_cache.mountpoint must be specified')
- if not cache_path:
- raise Error('d3n_cache.cache_path must be specified')
-
# Ensure mountpoint exists
os.makedirs(mountpoint, mode=0o755, exist_ok=True)
logger.debug(f'[D3N] checking filesystem on device {device}')
logger.debug(f'Formatting {device} with {fs_type} for D3N')
call_throws(ctx, ['mkfs', '-t', fs_type, device])
- # Persist the mount in /etc/fstab
+ # Ensure the mount is persistent across reboot by ensuring an /etc/fstab entry exists.
_ensure_fstab_entry(ctx, device, mountpoint, fs_type)
if not _is_mountpoint(ctx, mountpoint):
logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}')
if size_bytes is not None:
- if not isinstance(size_bytes, int) or size_bytes <= 0:
- raise Error(f'd3n_cache.size_bytes must be a positive integer, got {size_bytes!r}')
-
avail = _avail_bytes(ctx, mountpoint)
if avail < size_bytes:
raise Error(
def _has_filesystem(ctx: CephadmContext, device: str) -> bool:
if not os.path.exists(device):
- return False
+ raise Error(f'D3N device does not exist: {device}')
out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device])
return code == 0 and bool(out.strip())
},
).run()
- # write conf
+ # write confgit status
with write_new(mon_dir + '/config', owner=(uid, gid)) as f:
f.write(config)
else:
if ident.daemon_type == 'rgw':
config_json = fetch_configs(ctx)
- d3n_cache = config_json.get('d3n_cache')
+ d3n_cache: Any = config_json.get('d3n_cache')
if d3n_cache:
- prepare_d3n_cache(ctx, d3n_cache, uid, gid)
+ try:
+ d3n = D3NCache.from_json(d3n_cache)
+ except D3NCacheError as e:
+ raise Error(str(e))
+ prepare_d3n_cache(ctx, d3n, uid, gid)
+ try:
+ d3n_state(ctx, data_dir, D3NStateAction.WRITE, d3n, uid, gid)
+ except Exception as e:
+ logger.warning(f'[D3N] failed to persist D3N state in {data_dir}: {e}')
# only write out unit files and start daemon
# with systemd if this is not a reconfig
verbosity=CallVerbosity.DEBUG)
data_dir = ident.data_dir(ctx.data_dir)
+
+ if ident.daemon_type == 'rgw':
+ d3n_state(ctx, data_dir, action=D3NStateAction.CLEANUP)
+
if ident.daemon_type in ['mon', 'osd', 'prometheus'] and \
not ctx.force_delete_data:
# rename it out of the way -- do not delete
return {}
-def fetch_configs(ctx: CephadmContext) -> Dict[str, Any]:
+def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
"""Return a dict containing arbitrary configuration parameters.
This function filters out the key 'custom_config_files' which
must not be part of a deployment's configuration key-value pairs.
)
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
+from ceph.cephadm.d3n import D3NCache, D3NCacheError, D3NCacheSpec
from .service_registry import register_cephadm_service
from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
from cephadm.ssl_cert_utils import extract_ips_and_fqdns_from_cert
ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
AuthEntity = NewType('AuthEntity', str)
-_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
-
def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity:
"""
self.mgr.spec_store.save(spec)
self.mgr.trigger_connect_dashboard_rgw()
- def _size_to_bytes(self, v: str) -> int:
- if isinstance(v, int):
- return v
- if isinstance(v, str):
- m = _SIZE_RE.match(v)
- if not m:
- raise OrchestratorError(f'invalid size "{v}" (examples: 10737418240, 10G, 512M)')
- num = int(m.group(1))
- unit = (m.group(2) or '').upper()
- mult = {
- '': 1,
- 'K': 1024,
- 'M': 1024**2,
- 'G': 1024**3,
- 'T': 1024**4,
- 'P': 1024**5,
- }[unit]
- return num * mult
- raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)')
-
def _d3n_parse_dev_from_path(self, p: str) -> Optional[str]:
# expected: /mnt/ceph-d3n/<fsid>/<dev>/rgw_datacache/...
if not p:
return None
- logger.info(f"p: {p}")
+
parts = [x for x in p.split('/') if x]
try:
i = parts.index('ceph-d3n')
for dd in rgw_daemons:
# skip daemons that belong to the same service
- try:
- other_service = dd.service_name()
+ other_service = dd.service_name()
- except Exception:
- continue
if other_service == service_name:
continue
if ret != 0:
continue
- p = (out or '').strip()
+ p = out or ''
+ p = p.strip()
if not p:
continue
f'(daemon {dd.name()}). Refuse to reuse across services.'
)
- def _d3n_get_host_devs(self, d3n: Dict[str, Any], host: str) -> Tuple[str, int, list[str]]:
- fs_type = d3n.get('filesystem', 'xfs')
- size_raw = d3n.get('size')
- devices_map = d3n.get('devices')
-
- if size_raw is None:
- raise OrchestratorError('"d3n_cache.size" is required')
- if fs_type not in ('xfs', 'ext4'):
- raise OrchestratorError(f'Invalid filesystem "{fs_type}" (supported: xfs, ext4)')
- if not isinstance(devices_map, dict):
- raise OrchestratorError('"d3n_cache.devices" must be a mapping of host -> [devices]')
-
- devs = devices_map.get(host)
- if not isinstance(devs, list) or not devs:
- raise OrchestratorError("no devices found")
- devs = sorted(devs)
-
- for d in devs:
- if not isinstance(d, str) or not d.startswith('/dev/'):
- raise OrchestratorError(f'invalid device path "{d}" in d3n_cache.devices for host "{host}"')
-
- size_bytes = self._size_to_bytes(size_raw)
+ def _d3n_get_host_devs(self, d3n: D3NCacheSpec, host: str) -> Tuple[str, int, list[str]]:
+ fs_type = d3n.filesystem
+ devs = d3n.devices_for_host(host)
+ size_bytes = utils._size_to_bytes(d3n.size)
return fs_type, size_bytes, devs
def _d3n_gc_and_prune_alloc(
)
def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]:
+ """
+ Return the in-memory D3N device allocation map.
+
+ This is intentionally stored on the mgr module instance (self.mgr) as
+ ephemeral state to keep per-(service, host) device selections stable across
+ repeated scheduling cycles within the lifetime of the mgr process.
+
+ It is not persisted to disk/mon-store; it will be rebuilt after mgr restart
+ and also pruned via _d3n_gc_and_prune_alloc() based on currently running
+ daemons.
+ """
alloc_all = getattr(self.mgr, "_d3n_device_alloc", None)
if alloc_all is None:
alloc_all = {}
self,
daemon_spec: CephadmDaemonDeploySpec,
spec: RGWSpec,
- ) -> Optional[Dict[str, Any]]:
+ ) -> Optional[D3NCache]:
- d3n = getattr(spec, 'd3n_cache', None)
- if not d3n:
+ d3n_raw = getattr(spec, 'd3n_cache', None)
+ if not d3n_raw:
return None
- if not isinstance(d3n, dict):
- raise OrchestratorError('d3n_cache must be a mapping')
+ try:
+ d3n = D3NCacheSpec.from_json(d3n_raw)
+ except D3NCacheError as e:
+ raise OrchestratorError(str(e))
host = daemon_spec.host
if not host:
daemon_entity = f'client.rgw.{daemon_spec.daemon_id}'
cache_path = os.path.join(mountpoint, 'rgw_datacache', daemon_entity)
- return {
- 'device': device,
- 'filesystem': fs_type,
- 'size_bytes': size_bytes,
- 'mountpoint': mountpoint,
- 'cache_path': cache_path,
- }
+ return D3NCache(
+ device=device,
+ filesystem=fs_type,
+ size_bytes=size_bytes,
+ mountpoint=mountpoint,
+ cache_path=cache_path,
+ )
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
d3n_cache = daemon_spec.final_config.get('d3n_cache')
if d3n_cache:
- cache_path = d3n_cache.get('cache_path')
- size = d3n_cache.get('size_bytes')
+ try:
+ d3n = D3NCache.from_json(d3n_cache)
+ except D3NCacheError as e:
+ raise OrchestratorError(str(e))
+
+ cache_path = d3n.cache_path
+ size = d3n.size_bytes
self.mgr.check_mon_command({
'prefix': 'config set',
d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec)
if d3n_cache:
- config['d3n_cache'] = d3n_cache
+ config['d3n_cache'] = d3n_cache.to_json()
rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec)
return config, rgw_deps
or isinstance(node, ast.Pow)
or isinstance(node, ast.LShift)
or isinstance(node, ast.RShift)
- or isinstance(node, ast.ButOr)
+ or isinstance(node, ast.BitOr)
or isinstance(node, ast.BitXor)
or isinstance(node, ast.BitAnd)
or isinstance(node, ast.MatMult)
import logging
+import re
import json
import socket
from enum import Enum
from functools import wraps
-from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple
+from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple, Union
from orchestrator import OrchestratorError
import hashlib
# Used for _run_cephadm used for check-host etc that don't require an --image parameter
cephadmNoImage = CephadmNoImage.token
+_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
+
class ContainerInspectInfo(NamedTuple):
image_id: str
if not isinstance(value, str):
return ''
return value.lower() if lower else value
+
+
+def _size_to_bytes(v: Union[str, int]) -> int:
+ if isinstance(v, int):
+ return v
+ if isinstance(v, str):
+ m = _SIZE_RE.match(v)
+ if not m:
+ raise OrchestratorError(
+ f'invalid size "{v}" (examples: 10737418240, 10G, 512M)'
+ )
+ num = int(m.group(1))
+ unit = m.group(2) or ''
+ unit = unit.upper()
+ mult = {
+ '': 1,
+ 'K': 1024,
+ 'M': 1024**2,
+ 'G': 1024**3,
+ 'T': 1024**4,
+ 'P': 1024**5,
+ }[unit]
+ return num * mult
+ raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)')
--- /dev/null
+from dataclasses import dataclass
+from typing import Any, Dict, Mapping, Optional, Union
+
+
+class D3NCacheError(ValueError):
+ pass
+
+
+@dataclass(frozen=True)
+class D3NCacheSpec:
+ """
+ RGW spec's d3n_cache section:
+ d3n_cache:
+ filesystem: xfs|ext4 (optional, default xfs)
+ size: 10G|512M|... (required)
+ devices:
+ host1: [/dev/nvme0n1, ...] (required)
+ host2: [/dev/nvme1n1, ...]
+ """
+
+ filesystem: str
+ size: Union[str, int]
+ devices: Dict[str, list[str]]
+
+ _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+ _KNOWN_KEYS = {"filesystem", "size", "devices"}
+
+ @classmethod
+ def from_json(cls, obj: Mapping[str, Any]) -> "D3NCacheSpec":
+ if not isinstance(obj, Mapping):
+ raise D3NCacheError(
+ f"d3n_cache must be a dict, got {type(obj).__name__}"
+ )
+
+ unknown = set(obj.keys()) - cls._KNOWN_KEYS
+ if unknown:
+ raise D3NCacheError(
+ f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+ )
+
+ filesystem = str(obj.get("filesystem", "xfs")).strip()
+ size = obj.get("size", None)
+ if not isinstance(size, (str, int)) or (
+ isinstance(size, str) and not size.strip()
+ ):
+ raise D3NCacheError(
+ '"d3n_cache.size" must be an int (bytes) or a string like "10G"'
+ )
+
+ devices_raw = obj.get("devices", None)
+
+ if not isinstance(devices_raw, Mapping):
+ raise D3NCacheError(
+ '"d3n_cache.devices" must be a mapping of host -> list of device paths'
+ )
+
+ devices: Dict[str, list[str]] = {}
+ for host, devs in devices_raw.items():
+ if not isinstance(host, str) or not host.strip():
+ raise D3NCacheError(
+ "Invalid host key in d3n_cache.devices (must be non-empty string)"
+ )
+ if not isinstance(devs, list) or not devs:
+ raise D3NCacheError(
+ f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
+ )
+
+ norm: list[str] = []
+ for d in devs:
+ if not isinstance(d, str) or not d.startswith("/dev/"):
+ raise D3NCacheError(
+ f'Invalid device path "{d}" in d3n_cache.devices[{host}]'
+ )
+ norm.append(d)
+
+ devices[host] = sorted(set(norm))
+
+ spec = cls(filesystem=filesystem, size=size, devices=devices)
+ spec.validate()
+ return spec
+
+ def validate(self) -> None:
+ if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+ raise D3NCacheError(
+ f'Invalid filesystem "{self.filesystem}" in d3n_cache '
+ f'(supported: {", ".join(self._SUPPORTED_FILESYSTEMS)})'
+ )
+ if self.size is None or self.size == "":
+ raise D3NCacheError('"d3n_cache.size" is required')
+
+ def devices_for_host(self, host: str) -> list[str]:
+ devs = self.devices.get(host)
+ if not devs:
+ raise D3NCacheError(
+ f'no devices found for host "{host}" in d3n_cache.devices'
+ )
+ return devs
+
+
+@dataclass(frozen=True)
+class D3NCache:
+ device: str
+ filesystem: str
+ mountpoint: str
+ cache_path: str
+ size_bytes: Optional[int] = None
+
+ _SUPPORTED_FILESYSTEMS = ("xfs", "ext4")
+ _KNOWN_KEYS = {
+ "device",
+ "filesystem",
+ "mountpoint",
+ "cache_path",
+ "size_bytes",
+ }
+
+ @classmethod
+ def from_json(cls, obj: Mapping[str, Any]) -> "D3NCache":
+ if not isinstance(obj, Mapping):
+ raise D3NCacheError(
+ f"d3n_cache must be a dict, got {type(obj).__name__}"
+ )
+
+ unknown = set(obj.keys()) - cls._KNOWN_KEYS
+ if unknown:
+ raise D3NCacheError(
+ f"Unknown d3n_cache field(s): {sorted(unknown)!r}"
+ )
+
+ d = cls(
+ device=str(obj.get("device", "")).strip(),
+ filesystem=str(obj.get("filesystem", "xfs")).strip(),
+ mountpoint=str(obj.get("mountpoint", "")).strip(),
+ cache_path=str(obj.get("cache_path", "")).strip(),
+ size_bytes=obj.get("size_bytes"),
+ )
+ d.validate()
+ return d
+
+ def to_json(self) -> Dict[str, Any]:
+ out: Dict[str, Any] = {
+ "device": self.device,
+ "filesystem": self.filesystem,
+ "mountpoint": self.mountpoint,
+ "cache_path": self.cache_path,
+ }
+ if self.size_bytes is not None:
+ out["size_bytes"] = self.size_bytes
+ return out
+
+ def validate(self) -> None:
+ if not self.device:
+ raise D3NCacheError("d3n_cache.device must be a non-empty string")
+ if not self.device.startswith("/"):
+ raise D3NCacheError(
+ f"d3n_cache.device must be an absolute path, got {self.device!r}"
+ )
+
+ if self.filesystem not in self._SUPPORTED_FILESYSTEMS:
+ raise D3NCacheError(
+ f"Invalid filesystem {self.filesystem!r} "
+ f"(supported: {', '.join(self._SUPPORTED_FILESYSTEMS)})"
+ )
+
+ if not self.mountpoint:
+ raise D3NCacheError(
+ "d3n_cache.mountpoint must be a non-empty string"
+ )
+ if not self.mountpoint.startswith("/"):
+ raise D3NCacheError(
+ f"d3n_cache.mountpoint must be an absolute path, got {self.mountpoint!r}"
+ )
+
+ if not self.cache_path:
+ raise D3NCacheError(
+ "d3n_cache.cache_path must be a non-empty string"
+ )
+ if not self.cache_path.startswith("/"):
+ raise D3NCacheError(
+ f"d3n_cache.cache_path must be an absolute path, got {self.cache_path!r}"
+ )
+
+ if self.size_bytes is not None:
+ if not isinstance(self.size_bytes, int) or self.size_bytes <= 0:
+ raise D3NCacheError(
+ "d3n_cache.size_bytes must be a positive integer"
+ )
from ceph.deployment.utils import verify_positive_int, verify_non_negative_number
from ceph.deployment.utils import verify_boolean, verify_enum, verify_int
from ceph.deployment.utils import parse_combined_pem_file
+from ceph.cephadm.d3n import D3NCacheSpec, D3NCacheError
from ceph.utils import is_hex
from ceph.smb import constants as smbconst
from ceph.smb import network as smbnet
rgw_frontend_port: 1234
rgw_frontend_type: beast
rgw_frontend_ssl_certificate: ...
+ # Optional: enable D3N (L1 datacache) for RGW
+ d3n_cache:
+ filesystem: xfs # default: xfs
+ size: 10G # required; int bytes or string with K/M/G/T/P
+ devices: # required; per-host list of devices
+ host1:
+ - /dev/nvme0n1
+ host2:
+ - /dev/nvme1n1
+ - /dev/nvme2n1
See also: :ref:`orchestrator-cli-service-spec`
"""
)
if self.d3n_cache:
- if not isinstance(self.d3n_cache, dict):
- raise SpecValidationError("d3n_cache must be a mapping")
-
- filesystem = self.d3n_cache.get('filesystem', 'xfs')
- size = self.d3n_cache.get('size')
- devices = self.d3n_cache.get('devices')
-
- if not size:
- raise SpecValidationError('"d3n_cache.size" is required')
-
- if filesystem not in ('xfs', 'ext4'):
- raise SpecValidationError(
- f'Invalid filesystem "{filesystem}" in d3n_cache (supported: xfs, ext4)'
- )
-
- if not devices or not isinstance(devices, dict):
- raise SpecValidationError(
- '"d3n_cache.devices" must be a mapping of host -> list of devices'
- )
-
- for host, devs in devices.items():
- if not isinstance(host, str) or not host:
- raise SpecValidationError(
- 'Invalid host key in d3n_cache.devices (must be non-empty string)'
- )
-
- if not isinstance(devs, list) or not devs:
- raise SpecValidationError(
- f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
- )
-
- for dev in devs:
- if not isinstance(dev, str) or not dev.startswith('/dev/'):
- raise SpecValidationError(
- f'Invalid device path "{dev}" in d3n_cache.devices[{host}]'
- )
+ try:
+ D3NCacheSpec.from_json(self.d3n_cache)
+ except D3NCacheError as e:
+ raise SpecValidationError(str(e))
yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)