)
+def _ensure_fstab_entry(ctx: CephadmContext, device: str, mountpoint: str, fs_type: str) -> None:
+ out, _, code = call(ctx, ['blkid', '-s', 'UUID', '-o', 'value', device])
+ if code != 0 or not out.strip():
+ raise Error(f'Failed to get UUID for {device}')
+ uuid = out.strip()
+
+ entry = f'UUID={uuid} {mountpoint} {fs_type} defaults,noatime 0 2\n'
+
+ # check if mountpoint already present in fstab
+ with open('/etc/fstab', 'r') as f:
+ for line in f:
+ line = line.strip()
+ if not line or line.startswith('#'):
+ continue
+ parts = line.split()
+ if len(parts) >= 2 and parts[1] == mountpoint:
+ return
+
+ with open('/etc/fstab', 'a') as f:
+ f.write(entry)
+
+
+def prepare_d3n_cache(
+ ctx: CephadmContext,
+ d3n_cache: Dict[str, Any],
+ uid: int,
+ gid: int,
+) -> None:
+ device = d3n_cache.get('device')
+ fs_type = d3n_cache.get('filesystem', 'xfs')
+ mountpoint = d3n_cache.get('mountpoint')
+ cache_path = d3n_cache.get('cache_path')
+ size_bytes = d3n_cache.get('size_bytes')
+ logger.debug(
+ f'[D3N] prepare_d3n_cache: device={device!r} fs_type={fs_type!r} mountpoint={mountpoint!r} cache_path={cache_path!r} size_bytes={size_bytes!r}'
+ )
+
+ if not device:
+ raise Error('d3n_cache.device must be specified')
+ if not mountpoint:
+ raise Error('d3n_cache.mountpoint must be specified')
+ if not cache_path:
+ raise Error('d3n_cache.cache_path must be specified')
+
+ # Ensure mountpoint exists
+ os.makedirs(mountpoint, mode=0o755, exist_ok=True)
+ logger.debug(f'[D3N] checking filesystem on device {device}')
+
+ # Format the device if needed
+ if not _has_filesystem(ctx, device):
+ logger.debug(f'Formatting {device} with {fs_type} for D3N')
+ call_throws(ctx, ['mkfs', '-t', fs_type, device])
+
+ # Persist the mount in /etc/fstab
+ _ensure_fstab_entry(ctx, device, mountpoint, fs_type)
+
+ if not _is_mountpoint(ctx, mountpoint):
+ logger.debug(f'[D3N] mountpoint not mounted, running mount {mountpoint}')
+ call_throws(ctx, ['mount', mountpoint])
+ else:
+ logger.debug(f'[D3N] mountpoint already mounted according to _is_mountpoint(): {mountpoint}')
+
+ if size_bytes is not None:
+ if not isinstance(size_bytes, int) or size_bytes <= 0:
+ raise Error(f'd3n_cache.size_bytes must be a positive integer, got {size_bytes!r}')
+
+ avail = _avail_bytes(ctx, mountpoint)
+ if avail < size_bytes:
+ raise Error(
+ f'Not enough free space for D3N cache on {mountpoint}: '
+ f'need {size_bytes} bytes, have {avail} bytes'
+ )
+
+ # Create per-daemon cache directory
+ os.makedirs(cache_path, mode=0o755, exist_ok=True)
+ call_throws(ctx, ['chown', '-R', f'{uid}:{gid}', cache_path])
+
+
+def _has_filesystem(ctx: CephadmContext, device: str) -> bool:
+ if not os.path.exists(device):
+ return False
+ out, _, code = call(ctx, ['blkid', '-o', 'value', '-s', 'TYPE', device])
+ return code == 0 and bool(out.strip())
+
+
+def _is_mountpoint(ctx: CephadmContext, path: str) -> bool:
+ out, err, code = call(
+ ctx,
+ ['findmnt', '-n', '-o', 'TARGET,SOURCE,FSTYPE', '--mountpoint', path],
+ )
+ logger.debug(
+ f'[D3N] _is_mountpoint({path}): code={code} out={out.strip()!r} err={err.strip()!r}'
+ )
+ return code == 0
+
+
+def _avail_bytes(ctx: CephadmContext, path: str) -> int:
+ out, _, code = call(ctx, ['df', '-B1', '--output=avail', path])
+ if code != 0:
+ raise Error(f'Failed to check free space for {path}')
+ lines = [line.strip() for line in out.splitlines() if line.strip()]
+ if len(lines) < 2:
+ raise Error(f'Unexpected df output for {path}: {out!r}')
+ logger.debug(f'[D3N] df avail bytes for {path}: {lines[1]!r}')
+
+ return int(lines[1])
+
+
def deploy_daemon(
ctx: CephadmContext,
ident: 'DaemonIdentity',
# dirs, conf, keyring
create_daemon_dirs(ctx, ident, uid, gid, config, keyring)
+ if ident.daemon_type == 'rgw':
+ config_json = fetch_configs(ctx)
+ d3n_cache = config_json.get('d3n_cache')
+
+ if d3n_cache:
+ prepare_d3n_cache(ctx, d3n_cache, uid, gid)
+
# only write out unit files and start daemon
# with systemd if this is not a reconfig
if deployment_type != DeploymentType.RECONFIG:
import errno
+import re
+import os
import json
import logging
-import re
import socket
import time
from abc import ABCMeta, abstractmethod
ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
AuthEntity = NewType('AuthEntity', str)
+_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
+
def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity:
"""
self.mgr.spec_store.save(spec)
self.mgr.trigger_connect_dashboard_rgw()
+ def _size_to_bytes(self, v: str) -> int:
+ if isinstance(v, int):
+ return v
+ if isinstance(v, str):
+ m = _SIZE_RE.match(v)
+ if not m:
+ raise OrchestratorError(f'invalid size "{v}" (examples: 10737418240, 10G, 512M)')
+ num = int(m.group(1))
+ unit = (m.group(2) or '').upper()
+ mult = {
+ '': 1,
+ 'K': 1024,
+ 'M': 1024**2,
+ 'G': 1024**3,
+ 'T': 1024**4,
+ 'P': 1024**5,
+ }[unit]
+ return num * mult
+ raise OrchestratorError(f'invalid size type {type(v)} (expected int or str)')
+
+ def _d3n_parse_dev_from_path(self, p: str) -> Optional[str]:
+ # expected: /mnt/ceph-d3n/<fsid>/<dev>/rgw_datacache/...
+
+ if not p:
+ return None
+ logger.info(f"p: {p}")
+ parts = [x for x in p.split('/') if x]
+ try:
+ i = parts.index('ceph-d3n')
+ except ValueError:
+ return None
+ if len(parts) < i + 3:
+ return None
+ dev = parts[i + 2]
+ return f'/dev/{dev}' if dev else None
+
+ def _d3n_fail_if_devs_used_by_other_rgw_service(self, host: str, devs: List[str], service_name: str) -> None:
+ wanted = set(devs)
+
+ # check rgw daemons on this host
+ rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw', host=host)
+
+ for dd in rgw_daemons:
+ # skip daemons that belong to the same service
+ try:
+ other_service = dd.service_name()
+
+ except Exception:
+ continue
+ if other_service == service_name:
+ continue
+
+ who = utils.name_to_config_section(dd.name())
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'who': who,
+ 'name': 'rgw_d3n_l1_datacache_persistent_path',
+ })
+ if ret != 0:
+ continue
+
+ p = (out or '').strip()
+ if not p:
+ continue
+
+ used_dev = self._d3n_parse_dev_from_path(p)
+ if used_dev and used_dev in wanted:
+ raise OrchestratorError(
+ f'D3N device conflict on host "{host}": device "{used_dev}" is already used by RGW service "{other_service}" '
+ f'(daemon {dd.name()}). Refuse to reuse across services.'
+ )
+
+ def _d3n_get_host_devs(self, d3n: Dict[str, Any], host: str) -> Tuple[str, int, list[str]]:
+ fs_type = d3n.get('filesystem', 'xfs')
+ size_raw = d3n.get('size')
+ devices_map = d3n.get('devices')
+
+ if size_raw is None:
+ raise OrchestratorError('"d3n_cache.size" is required')
+ if fs_type not in ('xfs', 'ext4'):
+ raise OrchestratorError(f'Invalid filesystem "{fs_type}" (supported: xfs, ext4)')
+ if not isinstance(devices_map, dict):
+ raise OrchestratorError('"d3n_cache.devices" must be a mapping of host -> [devices]')
+
+ devs = devices_map.get(host)
+ if not isinstance(devs, list) or not devs:
+ raise OrchestratorError("no devices found")
+ devs = sorted(devs)
+
+ for d in devs:
+ if not isinstance(d, str) or not d.startswith('/dev/'):
+ raise OrchestratorError(f'invalid device path "{d}" in d3n_cache.devices for host "{host}"')
+
+ size_bytes = self._size_to_bytes(size_raw)
+ return fs_type, size_bytes, devs
+
+ def _d3n_gc_and_prune_alloc(
+ self,
+ alloc: Dict[str, str],
+ devs: list[str],
+ daemon_details: list[DaemonDescription],
+ current_daemon_id: str,
+ key: tuple[str, str],
+ ) -> None:
+
+ invalid = [did for did, dev in alloc.items() if dev not in devs]
+ for did in invalid:
+ del alloc[did]
+ logger.debug(f"[D3N][alloc] prune-invalid: removed={invalid} devs={devs} alloc_now={alloc}")
+ if not daemon_details:
+ if alloc:
+ logger.info(f"[D3N][alloc] clear-stale: key={key} alloc_was={alloc}")
+ alloc.clear()
+ return
+
+ live_daemon_ids: set[str] = set()
+ for dd in daemon_details:
+ if dd.daemon_id:
+ live_daemon_ids.add(dd.daemon_id)
+
+ if current_daemon_id:
+ live_daemon_ids.add(current_daemon_id)
+
+ stale = [did for did in list(alloc.keys()) if did not in live_daemon_ids]
+ for did in stale:
+ del alloc[did]
+ logger.debug(
+ f"gc: key={key} live={sorted(live_daemon_ids)} "
+ f"removed={stale} alloc_now={alloc}"
+ )
+
+ def _d3n_get_allocator(self) -> Dict[Tuple[str, str], Dict[str, str]]:
+ alloc_all = getattr(self.mgr, "_d3n_device_alloc", None)
+ if alloc_all is None:
+ alloc_all = {}
+ setattr(self.mgr, "_d3n_device_alloc", alloc_all)
+
+ assert isinstance(alloc_all, dict)
+ return alloc_all
+
+ def _d3n_choose_device_for_daemon(
+ self,
+ service_name: str,
+ host: str,
+ devs: list[str],
+ daemon_id: str,
+ daemon_details: list[DaemonDescription],
+ ) -> str:
+ alloc_all = self._d3n_get_allocator()
+ key = (service_name, host)
+ alloc = alloc_all.setdefault(key, {})
+ assert isinstance(alloc, dict)
+ reason = "unknown"
+
+ self._d3n_gc_and_prune_alloc(alloc, devs, daemon_details, daemon_id, key)
+
+ logger.info(
+ f"[D3N][alloc] pre-choose key={key} daemon={daemon_id} "
+ f"daemon_details={len(daemon_details)} devs={devs} alloc={alloc}"
+ )
+
+ if daemon_id in alloc:
+ logger.info(f"[D3N][alloc] reuse existing mapping daemon={daemon_id} dev={alloc[daemon_id]}")
+ return alloc[daemon_id]
+
+ used = set(alloc.values())
+ free = [d for d in devs if d not in used]
+ if free:
+ chosen = free[0] # 1:1 mapping whenever possible
+ else:
+ chosen = devs[len(alloc) % len(devs)] # share the device when unavoidable
+ reason = "round-robin/share"
+ alloc[daemon_id] = chosen
+ logger.info(f"[D3N][alloc] chosen={chosen} reason={reason} alloc_now={alloc}")
+ return chosen
+
+ def _compute_d3n_cache_for_daemon(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ spec: RGWSpec,
+ ) -> Optional[Dict[str, Any]]:
+
+ d3n = getattr(spec, 'd3n_cache', None)
+ if not d3n:
+ return None
+ if not isinstance(d3n, dict):
+ raise OrchestratorError('d3n_cache must be a mapping')
+
+ host = daemon_spec.host
+ if not host:
+ raise OrchestratorError("missing host in daemon_spec")
+
+ service_name = daemon_spec.service_name
+ daemon_details = self.mgr.cache.get_daemons_by_service(service_name)
+
+ fs_type, size_bytes, devs = self._d3n_get_host_devs(d3n, host)
+
+ self._d3n_fail_if_devs_used_by_other_rgw_service(host, devs, daemon_spec.service_name)
+
+ device = self._d3n_choose_device_for_daemon(
+ service_name=service_name,
+ host=host,
+ devs=devs,
+ daemon_id=daemon_spec.daemon_id,
+ daemon_details=daemon_details,
+ )
+
+ logger.info(
+ f"[D3N][alloc] service={service_name} host={host} daemon={daemon_spec.daemon_id} "
+ f"chosen={device} used=? devs={devs}"
+
+ )
+
+ if daemon_spec.daemon_id:
+ # warn if sharing is unavoidable
+ alloc_all = getattr(self.mgr, "_d3n_device_alloc", {})
+ alloc = alloc_all.get((service_name, host), {}) if isinstance(alloc_all, dict) else {}
+ if isinstance(alloc, dict) and len(alloc) > len(devs):
+ logger.warning(
+ f'D3N cache sub-optimal on host "{host}" for service "{service_name}": '
+ f'{len(alloc)} RGW daemons but only {len(devs)} devices; devices will be shared.'
+ )
+
+ fsid = self.mgr._cluster_fsid
+ device_key = os.path.basename(device)
+ mountpoint = f'/mnt/ceph-d3n/{fsid}/{device_key}'
+ daemon_entity = f'client.rgw.{daemon_spec.daemon_id}'
+ cache_path = os.path.join(mountpoint, 'rgw_datacache', daemon_entity)
+
+ return {
+ 'device': device,
+ 'filesystem': fs_type,
+ 'size_bytes': size_bytes,
+ 'mountpoint': mountpoint,
+ 'cache_path': cache_path,
+ }
+
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
self.register_for_certificates(daemon_spec)
daemon_spec.keyring = keyring
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ d3n_cache = daemon_spec.final_config.get('d3n_cache')
+
+ if d3n_cache:
+ cache_path = d3n_cache.get('cache_path')
+ size = d3n_cache.get('size_bytes')
+
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': daemon_name,
+ 'name': 'rgw_d3n_l1_local_datacache_enabled',
+ 'value': 'true',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': daemon_name,
+ 'name': 'rgw_d3n_l1_datacache_persistent_path',
+ 'value': cache_path,
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': daemon_name,
+ 'name': 'rgw_d3n_l1_datacache_size',
+ 'value': str(size),
+ })
+
return daemon_spec
def get_keyring(self, rgw_id: str) -> str:
'who': utils.name_to_config_section(daemon.name()),
'name': 'rgw_frontends',
})
+
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_d3n_l1_local_datacache_enabled',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_d3n_l1_datacache_persistent_path',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_d3n_l1_datacache_size',
+ })
self.mgr.check_mon_command({
'prefix': 'config rm',
'who': utils.name_to_config_section(daemon.name()),
'name': 'qat_compressor_enabled'
})
+
self.mgr.check_mon_command({
'prefix': 'config-key rm',
'key': f'rgw/cert/{daemon.name()}',
if svc_spec.qat:
config['qat'] = svc_spec.qat
+ d3n_cache = self._compute_d3n_cache_for_daemon(daemon_spec, svc_spec)
+ if d3n_cache:
+ config['d3n_cache'] = d3n_cache
+
rgw_deps = parent_deps + self.get_dependencies(self.mgr, svc_spec)
return config, rgw_deps
wildcard_enabled: Optional[bool] = False,
rgw_exit_timeout_secs: int = 120,
qat: Optional[Dict[str, str]] = None,
+ d3n_cache: Optional[Dict[str, Any]] = None,
):
assert service_type == 'rgw', service_type
self.rgw_exit_timeout_secs = rgw_exit_timeout_secs
self.qat = qat or {}
+ self.d3n_cache = d3n_cache or {}
def get_port_start(self) -> List[int]:
ports = self.get_port()
f"Invalid compression mode {compression}. Only 'sw' and 'hw' are allowed"
)
+ if self.d3n_cache:
+ if not isinstance(self.d3n_cache, dict):
+ raise SpecValidationError("d3n_cache must be a mapping")
+
+ filesystem = self.d3n_cache.get('filesystem', 'xfs')
+ size = self.d3n_cache.get('size')
+ devices = self.d3n_cache.get('devices')
+
+ if not size:
+ raise SpecValidationError('"d3n_cache.size" is required')
+
+ if filesystem not in ('xfs', 'ext4'):
+ raise SpecValidationError(
+ f'Invalid filesystem "{filesystem}" in d3n_cache (supported: xfs, ext4)'
+ )
+
+ if not devices or not isinstance(devices, dict):
+ raise SpecValidationError(
+ '"d3n_cache.devices" must be a mapping of host -> list of devices'
+ )
+
+ for host, devs in devices.items():
+ if not isinstance(host, str) or not host:
+ raise SpecValidationError(
+ 'Invalid host key in d3n_cache.devices (must be non-empty string)'
+ )
+
+ if not isinstance(devs, list) or not devs:
+ raise SpecValidationError(
+ f'"d3n_cache.devices[{host}]" must be a non-empty list of device paths'
+ )
+
+ for dev in devs:
+ if not isinstance(dev, str) or not dev.startswith('/dev/'):
+ raise SpecValidationError(
+ f'Invalid device path "{dev}" in d3n_cache.devices[{host}]'
+ )
+
yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)