]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: stop start cluster 67388/head
authorTeoman ONAY <tonay@ibm.com>
Mon, 16 Feb 2026 13:58:00 +0000 (14:58 +0100)
committerTeoman ONAY <tonay@ibm.com>
Thu, 9 Apr 2026 13:56:14 +0000 (15:56 +0200)
Automates the procedure to shutdown and bring up an entire Ceph cluster
with one single command.

- cephadm cluster-{shutdown,start,status}

Fixes: https://tracker.ceph.com/issues/74581
Resolves: ISCE-2109

Signed-off-by: Teoman ONAY <tonay@ibm.com>
doc/cephadm/operations.rst
src/cephadm/cephadm.py
src/cephadm/cephadmlib/cluster_ops.py [new file with mode: 0644]
src/cephadm/cephadmlib/constants.py
src/cephadm/tests/test_cluster_ops.py [new file with mode: 0644]

index e941399d5f647f4eba881a74be12f52858ca25a9..dc4a3c625ed624af540adac0d3b7df0bb55e5748 100644 (file)
@@ -67,7 +67,146 @@ You can also do the same for all daemons for a service with:
     cases even data loss.
 
 
-Redeploying or Reconfiguring a Daemon
+Cluster-wide shutdown and startup
+----------------------------------
+
+Cephadm provides commands to gracefully shut down and start up an entire Ceph
+cluster. This is useful for planned maintenance such as datacenter power-downs,
+hardware upgrades, or disaster recovery scenarios.
+
+.. warning::
+
+   These commands will stop **all** Ceph services across **all** hosts in the
+   cluster. Ensure all clients have unmounted CephFS filesystems and disconnected
+   from RBD images and RGW before proceeding.
+
+Shutting down the cluster
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To shut down the entire cluster, run the following command from the admin host:
+
+.. prompt:: bash #
+
+   cephadm cluster-shutdown --fsid <fsid> --yes-i-really-mean-it
+
+The ``--yes-i-really-mean-it`` flag is required to confirm the operation.
+
+You can preview what would happen without making changes by using the dry-run mode:
+
+.. prompt:: bash #
+
+   cephadm cluster-shutdown --fsid <fsid> --dry-run
+
+The shutdown process performs the following steps in order:
+
+1. Checks cluster health and warns if the cluster is not healthy
+2. Pauses the cephadm orchestrator to prevent daemon restarts
+3. Sets the OSD ``noout`` flag to prevent rebalancing on startup
+4. Fails all CephFS filesystems to prevent client I/O
+5. Stops client-facing daemons (MDS, NFS, RGW, iSCSI)
+6. Saves cluster state to a local file for later startup
+7. Stops all hosts sequentially (admin host last)
+
+The cluster state is saved to ``/var/lib/ceph/<fsid>/cluster-shutdown-state.json``
+on the admin host. This file is required for the startup process.
+
+.. note::
+
+   During shutdown, the cephadm SSH private key is retrieved from the cluster
+   and cached locally on the admin host. This is necessary because the cluster
+   (and its key store) will be unavailable during startup. The cached key is
+   stored with restricted permissions (0600) and is automatically removed after
+   a successful cluster startup.
+
+If the cluster health check fails but you still want to proceed, you can use
+the ``--force`` flag:
+
+.. prompt:: bash #
+
+   cephadm cluster-shutdown --fsid <fsid> --yes-i-really-mean-it --force
+
+.. danger::
+
+   Using ``--force`` to shut down an unhealthy cluster is **dangerous** and may
+   result in data loss. The cluster may not be able to recover properly on
+   startup if there are degraded or undersized placement groups. Only use this
+   option in emergency situations when you fully understand the risks and have
+   verified that your data is safely replicated or backed up.
+
+Starting the cluster
+~~~~~~~~~~~~~~~~~~~~
+
+To start a previously shut-down cluster, run the following command from the
+admin host:
+
+.. prompt:: bash #
+
+   cephadm cluster-start --fsid <fsid>
+
+You can preview what would happen without making changes:
+
+.. prompt:: bash #
+
+   cephadm cluster-start --fsid <fsid> --dry-run
+
+The startup process performs the following steps:
+
+1. Loads the saved cluster state from the shutdown state file
+2. Starts all hosts in parallel (configurable)
+3. Waits for the cluster to become accessible
+4. Waits for all placement groups to become ``active+clean``
+5. Unsets the OSD ``noout`` flag
+6. Sets CephFS filesystems back to joinable
+7. Resumes the cephadm orchestrator
+8. Cleans up the state file
+
+By default, up to 5 hosts are started in parallel. You can adjust this with
+the ``--parallel`` option:
+
+.. prompt:: bash #
+
+   cephadm cluster-start --fsid <fsid> --parallel 10
+
+Checking cluster status
+~~~~~~~~~~~~~~~~~~~~~~~
+
+To check whether a cluster is in a shutdown state or running normally:
+
+.. prompt:: bash #
+
+   cephadm cluster-status --fsid <fsid>
+
+This command displays:
+
+- Whether a shutdown state file exists and when the shutdown occurred
+- The list of hosts and their status
+- Current cluster health (if the cluster is running)
+
+Troubleshooting
+~~~~~~~~~~~~~~~
+
+**Stale state file**: If you believe the cluster is actually running but a
+state file exists from a previous incomplete operation, you can rename or
+remove the state file and retry:
+
+.. prompt:: bash #
+
+   mv /var/lib/ceph/<fsid>/cluster-shutdown-state.json /var/lib/ceph/<fsid>/cluster-shutdown-state.json.bak
+
+**Hosts fail to start**: If some hosts fail to start, check:
+
+- SSH connectivity from the admin host to the failed hosts
+- That the ``ceph.target`` systemd unit exists on those hosts
+- System logs on the failed hosts (``journalctl -u ceph.target``)
+
+**Cluster not becoming healthy after startup**: After startup, if PGs don't
+become ``active+clean``, check:
+
+- That all OSDs have started (``ceph osd tree``)
+- OSD logs for errors (``journalctl -u ceph-<fsid>@osd.*``)
+- That the ``noout`` flag has been cleared (``ceph osd dump | grep flags``)
+
+Redeploying or reconfiguring a daemon
 -------------------------------------
 
 The container for a daemon can be stopped, recreated, and restarted with
index 0c632cf80a0b695d89ec56a002137d35e84298d6..257f2f8cab46ef4e9ded0d38ba291d9e67703e92 100755 (executable)
@@ -37,6 +37,7 @@ from cephadmlib.constants import (
     DEFAULT_IMAGE_IS_MAIN,
     DEFAULT_IMAGE_RELEASE,
     # other constant values
+    ADMIN_LABEL,
     CEPH_CONF,
     CEPH_CONF_DIR,
     CEPH_DEFAULT_CONF,
@@ -167,6 +168,30 @@ from cephadmlib.container_daemon_form import (
     daemon_to_container,
 )
 from cephadmlib.sysctl import install_sysctl, migrate_sysctl_dir
+from cephadmlib.cluster_ops import (
+    DEFAULT_PARALLEL_HOSTS,
+    cleanup_startup,
+    gather_shutdown_info,
+    get_cephadm_ssh_key,
+    load_cluster_state,
+    load_startup_info,
+    prepare_for_shutdown,
+    print_cluster_status_running,
+    print_cluster_status_shutdown,
+    print_section_footer,
+    print_section_header,
+    print_shutdown_complete,
+    print_shutdown_plan,
+    print_startup_complete,
+    print_startup_plan,
+    restore_cluster_services,
+    save_shutdown_state,
+    start_all_hosts,
+    stop_all_hosts,
+    stop_client_daemons,
+    validate_shutdown_preconditions,
+    wait_for_cluster_ready,
+)
 from cephadmlib.firewalld import Firewalld, update_firewalld
 from cephadmlib import templating
 from cephadmlib.daemons.ceph import get_ceph_mounts_for_type, ceph_daemons
@@ -2806,8 +2831,8 @@ def command_bootstrap(ctx):
     if ctx.output_config == CEPH_DEFAULT_CONF and not ctx.skip_admin_label and not ctx.no_minimize_config:
         logger.info('Enabling client.admin keyring and conf on hosts with "admin" label')
         try:
-            cli(['orch', 'client-keyring', 'set', 'client.admin', 'label:_admin'])
-            cli(['orch', 'host', 'label', 'add', get_hostname(), '_admin'])
+            cli(['orch', 'client-keyring', 'set', 'client.admin', f'label:{ADMIN_LABEL}'])
+            cli(['orch', 'host', 'label', 'add', get_hostname(), ADMIN_LABEL])
         except Exception:
             logger.info('Unable to set up "admin" label; assuming older version of Ceph')
 
@@ -4475,6 +4500,91 @@ def change_maintenance_mode(ctx: CephadmContext) -> str:
                     return f'success - systemd target {target} enabled and started'
         return f'success - systemd target {target} enabled and started'
 
+
+@infer_fsid
+@infer_image
+def command_cluster_shutdown(ctx: CephadmContext) -> int:
+    """Shut down the entire Ceph cluster in the correct order."""
+    try:
+        health_msg = validate_shutdown_preconditions(ctx)
+    except SystemExit:
+        return 0
+
+    info = gather_shutdown_info(ctx)
+    dry_run = ctx.dry_run
+
+    print_shutdown_plan(ctx, info, health_msg, dry_run)
+
+    ssh_key, flags_set = prepare_for_shutdown(ctx, info, dry_run)
+    stop_client_daemons(ctx, info, ssh_key, dry_run)
+
+    if not dry_run:
+        save_shutdown_state(ctx, info, flags_set)
+
+    failed_hosts = stop_all_hosts(ctx, info, ssh_key, dry_run)
+
+    if failed_hosts:
+        logger.error(f'Failed to stop hosts: {failed_hosts}')
+        if not ctx.force:
+            return 1
+
+    print_shutdown_complete(ctx, len(info['shutdown_order']), dry_run)
+    return 0
+
+
+@infer_fsid
+@infer_image
+def command_cluster_start(ctx: CephadmContext) -> int:
+    """Start the Ceph cluster after a shutdown."""
+    info = load_startup_info(ctx)
+    if not info:
+        return 0
+
+    if ctx.dry_run:
+        print_startup_plan(ctx, info)
+        return 0
+
+    ssh_key = get_cephadm_ssh_key(ctx)
+    if not ssh_key:
+        logger.warning('No cached SSH key found - remote operations may fail')
+
+    failed_hosts = start_all_hosts(ctx, info, ssh_key)
+
+    if not wait_for_cluster_ready(ctx):
+        return 1
+
+    restore_cluster_services(ctx, info['flags_set'])
+    cleanup_startup(ctx)
+    print_startup_complete(failed_hosts)
+
+    return 0 if not failed_hosts else 1
+
+
+@infer_fsid
+@infer_image
+def command_cluster_status(ctx: CephadmContext) -> int:
+    """Show cluster shutdown/startup status."""
+    if not ctx.fsid:
+        raise Error('Cannot determine FSID. Use --fsid to specify.')
+
+    current_host = get_hostname()
+
+    print_section_header('CLUSTER STATUS')
+    print(f'FSID: {ctx.fsid}')
+    print(f'Current host: {current_host}')
+
+    state = load_cluster_state(ctx.fsid)
+
+    if state:
+        print_cluster_status_shutdown(ctx, state, current_host)
+        result = 0
+    else:
+        result = print_cluster_status_running(ctx)
+
+    print_section_footer()
+    return result
+
+
 ##################################
 
 
@@ -5292,6 +5402,48 @@ def _get_parser():
     parser_update_service.add_argument('--osd-ids', required=True, help='Comma-separated OSD IDs')
     parser_update_service.add_argument('--service-name', required=True, help='OSD service name')
 
+    parser_cluster_shutdown = subparsers.add_parser(
+        'cluster-shutdown', help='Shut down the entire Ceph cluster')
+    parser_cluster_shutdown.set_defaults(func=command_cluster_shutdown)
+    parser_cluster_shutdown.add_argument(
+        '--fsid',
+        help='cluster FSID')
+    parser_cluster_shutdown.add_argument(
+        '--force',
+        action='store_true',
+        help=argparse.SUPPRESS)  # Hidden option to bypass health checks
+    parser_cluster_shutdown.add_argument(
+        '--dry-run',
+        action='store_true',
+        help='Show what would be done without actually doing it')
+    parser_cluster_shutdown.add_argument(
+        '--yes-i-really-mean-it',
+        action='store_true',
+        help='Required flag to confirm cluster shutdown')
+
+    parser_cluster_start = subparsers.add_parser(
+        'cluster-start', help='Start the Ceph cluster after a shutdown')
+    parser_cluster_start.set_defaults(func=command_cluster_start)
+    parser_cluster_start.add_argument(
+        '--fsid',
+        help='cluster FSID')
+    parser_cluster_start.add_argument(
+        '--dry-run',
+        action='store_true',
+        help='Show what would be done without actually doing it')
+    parser_cluster_start.add_argument(
+        '--parallel',
+        type=int,
+        default=DEFAULT_PARALLEL_HOSTS,
+        help=f'Max hosts to start in parallel (default: {DEFAULT_PARALLEL_HOSTS})')
+
+    parser_cluster_status = subparsers.add_parser(
+        'cluster-status', help='Show cluster shutdown/startup status')
+    parser_cluster_status.set_defaults(func=command_cluster_status)
+    parser_cluster_status.add_argument(
+        '--fsid',
+        help='cluster FSID')
+
     return parser
 
 
diff --git a/src/cephadm/cephadmlib/cluster_ops.py b/src/cephadm/cephadmlib/cluster_ops.py
new file mode 100644 (file)
index 0000000..259af31
--- /dev/null
@@ -0,0 +1,1350 @@
+"""
+Cluster shutdown/startup/status operations helper functions.
+"""
+
+import datetime
+import json
+import logging
+import os
+import time
+from concurrent.futures import (
+    CancelledError,
+    ThreadPoolExecutor,
+    as_completed,
+)
+from typing import Any, Callable, Dict, List, Optional, Tuple
+
+from .call_wrappers import call, CallVerbosity
+from .net_utils import get_hostname
+from .constants import (
+    ADMIN_LABEL,
+    CEPH_CONF,
+    CEPH_CONF_DIR,
+    CEPH_DEFAULT_CONF,
+    CEPH_DEFAULT_KEYRING,
+    CEPH_KEYRING,
+    DATA_DIR,
+)
+from .container_types import CephContainer
+from .context import CephadmContext
+from .daemon_identity import DaemonIdentity
+from .exceptions import Error
+
+
+logger = logging.getLogger()
+
+
+# Daemon type ordering for cluster shutdown operations.
+# Shutdown order: monitoring/mgmt first, then gateways, then storage,
+# then core last. This is the reverse of CEPH_UPGRADE_ORDER from
+# mgr/cephadm/utils.py
+DAEMON_SHUTDOWN_ORDER = [
+    # Management and monitoring (stop first)
+    'oauth2-proxy',
+    'mgmt-gateway',
+    'alloy',
+    'promtail',
+    'loki',
+    'grafana',
+    'alertmanager',
+    'prometheus',
+    'node-exporter',
+    # Gateways
+    'smb',
+    'nvmeof',
+    'nfs',
+    'iscsi',
+    # Ceph services (stop last - core services)
+    'ceph-exporter',
+    'cephfs-mirror',
+    'rbd-mirror',
+    'rgw',
+    'mds',
+    'osd',
+    'crash',
+    'mon',
+    'mgr',
+]
+
+# Daemon types that should be stopped individually before stopping hosts
+# These are client-facing services that should be gracefully stopped first
+# Order: gateways/clients first (they depend on core services)
+DAEMONS_TO_STOP_FIRST = [
+    'nvmeof',
+    'iscsi',
+    'nfs',
+    'smb',
+    'rgw',
+    'mds',
+]
+
+# OSD flags to set during shutdown (and unset during startup)
+SHUTDOWN_OSD_FLAGS = ['noout']
+
+# Maximum number of hosts to process in parallel during shutdown/startup
+DEFAULT_PARALLEL_HOSTS = 5
+
+# Timeout for parallel host operations (seconds)
+HOST_OPERATION_TIMEOUT = 300
+
+# Timeout waiting for cluster to become accessible (seconds)
+CLUSTER_ACCESSIBLE_TIMEOUT = 120
+
+# Timeout waiting for PGs to become clean (seconds)
+PG_CLEAN_TIMEOUT = 300
+
+
+def print_section_header(title: str, dry_run: bool = False) -> None:
+    """Print a section header with consistent formatting."""
+    suffix = ' DRY-RUN' if dry_run else ''
+    print('=' * 60)
+    print(f'{title}{suffix}')
+    print('=' * 60)
+
+
+def print_section_footer() -> None:
+    """Print a section footer."""
+    print('=' * 60)
+
+
+def print_host_list(
+    hosts: List[str],
+    host_daemon_map: Dict[str, List[str]],
+    admin_host: Optional[str],
+    title: str = 'Hosts',
+) -> None:
+    """Print a numbered list of hosts with their daemons."""
+    print(f'\n{title} ({len(hosts)}):')
+    for i, hostname in enumerate(hosts, 1):
+        daemon_types = host_daemon_map.get(hostname, [])
+        admin_marker = ' (admin)' if hostname == admin_host else ''
+        print(f'  {i}. {hostname}{admin_marker}')
+        if daemon_types:
+            daemons_str = ', '.join(sorted(daemon_types))
+        else:
+            daemons_str = 'none'
+        print(f'     Daemons: {daemons_str}')
+
+
+def wait_for_cluster_accessible(
+    ctx: 'CephadmContext',
+    timeout: int = CLUSTER_ACCESSIBLE_TIMEOUT,
+) -> bool:
+    """
+    Wait for cluster to become accessible with exponential backoff.
+
+    Returns:
+        True if cluster became accessible, False if timeout
+    """
+    waited = 0
+    sleep_interval = 2
+
+    while waited < timeout:
+        try:
+            run_ceph_command(ctx, ['ceph', 'status'])
+            logger.info('Cluster is accessible')
+            return True
+        except RuntimeError:
+            pass
+        time.sleep(sleep_interval)
+        waited += sleep_interval
+        logger.info(f'Waiting for cluster... ({waited}s)')
+        sleep_interval = min(sleep_interval * 2, 30)
+
+    logger.warning(
+        f'Timeout waiting for cluster to become accessible after {timeout}s'
+    )
+    return False
+
+
+def wait_for_pgs_clean(
+    ctx: 'CephadmContext',
+    timeout: int = PG_CLEAN_TIMEOUT,
+) -> bool:
+    """
+    Wait for all PGs to become active+clean.
+
+    Returns:
+        True if PGs became clean, False if timeout
+    """
+    waited = 0
+
+    while waited < timeout:
+        is_clean, pg_msg = check_pgs_clean(ctx)
+        if is_clean:
+            logger.info(pg_msg)
+            return True
+        logger.info(f'{pg_msg} ({waited}s)')
+        time.sleep(10)
+        waited += 10
+
+    logger.warning(
+        f'Timeout waiting for PGs to become clean after {timeout}s'
+    )
+    return False
+
+
+def set_osd_flags(
+    ctx: 'CephadmContext', flags: List[str], force: bool = False
+) -> List[str]:
+    """
+    Set OSD flags.
+
+    Returns:
+        List of flags that were successfully set
+    """
+    flags_set = []
+    for flag in flags:
+        try:
+            run_ceph_command(ctx, ['ceph', 'osd', 'set', flag])
+            flags_set.append(flag)
+            logger.info(f'Set OSD {flag} flag')
+        except RuntimeError as e:
+            if not force:
+                raise Error(f'Failed to set {flag} flag: {e}')
+            logger.warning(f'Failed to set {flag} flag: {e}')
+    return flags_set
+
+
+def unset_osd_flags(ctx: 'CephadmContext', flags: List[str]) -> None:
+    """Unset OSD flags."""
+    for flag in flags:
+        try:
+            run_ceph_command(ctx, ['ceph', 'osd', 'unset', flag])
+            logger.info(f'Unset OSD {flag} flag')
+        except RuntimeError as e:
+            logger.warning(f'Failed to unset {flag} flag: {e}')
+
+
+def get_cluster_state_file(fsid: str) -> str:
+    """Get the path to the cluster shutdown state file."""
+    return os.path.join(DATA_DIR, fsid, 'cluster-shutdown-state.json')
+
+
+def save_cluster_state(fsid: str, state: Dict[str, Any]) -> None:
+    """Save cluster state to a JSON file."""
+    state_file = get_cluster_state_file(fsid)
+    try:
+        os.makedirs(os.path.dirname(state_file), exist_ok=True)
+        with open(state_file, 'w') as f:
+            json.dump(state, f, indent=2)
+        logger.info(f'Cluster state saved to {state_file}')
+    except OSError as e:
+        raise Error(f'Failed to save cluster state to {state_file}: {e}')
+
+
+def load_cluster_state(fsid: str) -> Optional[Dict[str, Any]]:
+    """Load cluster state from JSON file."""
+    state_file = get_cluster_state_file(fsid)
+    if not os.path.exists(state_file):
+        logger.info(f'No cluster state file found for FSID {fsid}')
+        return None
+    try:
+        with open(state_file, 'r') as f:
+            return json.load(f)
+    except json.JSONDecodeError as e:
+        logger.warning(f'Corrupt state file {state_file}: {e}')
+        return None
+    except OSError as e:
+        logger.warning(f'Failed to read state file {state_file}: {e}')
+        return None
+
+
+def remove_cluster_state(fsid: str) -> None:
+    """Remove the cluster state file."""
+    state_file = get_cluster_state_file(fsid)
+    if os.path.exists(state_file):
+        os.remove(state_file)
+        logger.info(f'Cluster state file removed: {state_file}')
+
+
+def find_admin_keyring(ctx: CephadmContext) -> Optional[str]:
+    """
+    Find the admin keyring file path.
+
+    Returns:
+        Path to the keyring file, or None if not found.
+    """
+    keyring_file = f'{ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_KEYRING}'
+    if os.path.exists(keyring_file):
+        return keyring_file
+    if os.path.exists(CEPH_DEFAULT_KEYRING):
+        return CEPH_DEFAULT_KEYRING
+    return None
+
+
+def find_ceph_config(ctx: CephadmContext) -> Optional[str]:
+    """
+    Find the ceph config file path.
+
+    Returns:
+        Path to the config file, or None if not found.
+    """
+    config_file = f'{ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_CONF}'
+    if os.path.exists(config_file):
+        return config_file
+    if os.path.exists(CEPH_DEFAULT_CONF):
+        return CEPH_DEFAULT_CONF
+    return None
+
+
+def run_ceph_command(
+    ctx: CephadmContext, cmd: List[str], json_output: bool = False
+) -> Any:
+    """
+    Run a ceph command using CephContainer directly.
+
+    Args:
+        ctx: CephadmContext
+        cmd: Command to run (e.g., ['ceph', 'osd', 'set', 'noout'])
+        json_output: If True, parse output as JSON
+
+    Returns:
+        Command output as string, or parsed JSON if json_output=True
+
+    Raises:
+        RuntimeError: If the command fails
+    """
+    # Build volume mounts for config and keyring
+    mounts: Dict[str, str] = {}
+
+    keyring = find_admin_keyring(ctx)
+    if keyring:
+        mounts[keyring] = '/etc/ceph/ceph.client.admin.keyring:z'
+
+    config = find_ceph_config(ctx)
+    if config:
+        mounts[config] = '/etc/ceph/ceph.conf:z'
+
+    # Strip 'ceph' prefix if present (entrypoint is /usr/bin/ceph)
+    args = cmd[1:] if cmd and cmd[0] == 'ceph' else cmd
+
+    out = CephContainer(
+        ctx,
+        image=ctx.image,
+        entrypoint='/usr/bin/ceph',
+        args=args,
+        volume_mounts=mounts,
+    ).run(verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+
+    if json_output and out.strip():
+        try:
+            return json.loads(out)
+        except json.JSONDecodeError:
+            logger.warning(f'Failed to parse JSON output: {out}')
+
+    return out
+
+
+def get_orch_hosts(ctx: CephadmContext) -> List[Dict[str, Any]]:
+    """Get list of hosts from orchestrator."""
+    try:
+        result = run_ceph_command(
+            ctx,
+            ['ceph', 'orch', 'host', 'ls', '--format', 'json'],
+            json_output=True,
+        )
+        return result if isinstance(result, list) else []
+    except RuntimeError as e:
+        raise Error(f'Failed to get host list: {e}')
+
+
+def get_orch_daemons(ctx: CephadmContext) -> List[Dict[str, Any]]:
+    """Get list of daemons from orchestrator."""
+    try:
+        result = run_ceph_command(
+            ctx, ['ceph', 'orch', 'ps', '--format', 'json'], json_output=True
+        )
+        return result if isinstance(result, list) else []
+    except RuntimeError as e:
+        raise Error(f'Failed to get daemon list: {e}')
+
+
+def get_admin_host(hosts: List[Dict[str, Any]]) -> Optional[str]:
+    """Find the admin host (host with _admin label)."""
+    for host in hosts:
+        labels = host.get('labels', [])
+        if ADMIN_LABEL in labels:
+            return host.get('hostname') or host.get('addr')
+    return None
+
+
+def build_host_daemon_map(
+    daemons: List[Dict[str, Any]]
+) -> Dict[str, List[str]]:
+    """Build a map of hostname -> list of daemon types."""
+    host_map: Dict[str, List[str]] = {}
+    for daemon in daemons:
+        hostname = daemon.get('hostname', '')
+        daemon_type = daemon.get('daemon_type', '')
+        if hostname and daemon_type:
+            if hostname not in host_map:
+                host_map[hostname] = []
+            if daemon_type not in host_map[hostname]:
+                host_map[hostname].append(daemon_type)
+    return host_map
+
+
+def get_daemon_type_priority(daemon_types: List[str]) -> int:
+    """
+    Get the shutdown priority for a list of daemon types.
+    Lower number = process earlier in shutdown.
+
+    Monitoring/gateways first (low number), core services last (high number).
+    """
+    min_priority = len(DAEMON_SHUTDOWN_ORDER)
+
+    for dtype in daemon_types:
+        if dtype in DAEMON_SHUTDOWN_ORDER:
+            priority = DAEMON_SHUTDOWN_ORDER.index(dtype)
+            min_priority = min(min_priority, priority)
+
+    return min_priority
+
+
+def order_hosts_for_shutdown(
+    host_daemon_map: Dict[str, List[str]],
+    admin_host: Optional[str],
+) -> List[str]:
+    """
+    Order hosts for shutdown based on their daemon types.
+
+    Monitoring hosts first, core (mon/mgr) hosts last, admin host very last.
+    """
+    hosts_with_priority = []
+    for hostname, daemon_types in host_daemon_map.items():
+        priority = get_daemon_type_priority(daemon_types)
+        hosts_with_priority.append((hostname, priority))
+
+    # Sort by priority
+    hosts_with_priority.sort(key=lambda x: x[1])
+    ordered_hosts = [h[0] for h in hosts_with_priority]
+
+    # Admin host is always last for shutdown
+    if admin_host and admin_host in ordered_hosts:
+        ordered_hosts.remove(admin_host)
+        ordered_hosts.append(admin_host)
+
+    return ordered_hosts
+
+
+def get_cephadm_ssh_key(ctx: CephadmContext) -> Optional[str]:
+    """
+    Get the cephadm SSH private key and cache it locally.
+
+    Returns:
+        Path to the cached SSH key file, or None if not available
+    """
+    cached_key_path = os.path.join(DATA_DIR, ctx.fsid, '.ssh_key')
+
+    # Check for cached key first
+    if os.path.exists(cached_key_path):
+        return cached_key_path
+
+    # Retrieve from config-key store
+    try:
+        out = run_ceph_command(
+            ctx, ['ceph', 'config-key', 'get', 'mgr/cephadm/ssh_identity_key']
+        )
+        if out.strip():
+            parent_dir = os.path.dirname(cached_key_path)
+            os.makedirs(parent_dir, exist_ok=True)
+            os.chmod(parent_dir, 0o700)
+            with open(cached_key_path, 'w') as f:
+                f.write(out)
+            os.chmod(cached_key_path, 0o600)
+            logger.debug('Cached SSH key for remote operations')
+            return cached_key_path
+    except RuntimeError as e:
+        logger.warning(f'Failed to retrieve SSH key: {e}')
+
+    return None
+
+
+def remove_cached_ssh_key(fsid: str) -> None:
+    """Remove the cached SSH key file."""
+    cached_key_path = os.path.join(DATA_DIR, fsid, '.ssh_key')
+    if os.path.exists(cached_key_path):
+        os.remove(cached_key_path)
+        logger.info('Removed cached SSH key')
+
+
+def remote_systemctl(
+    ctx: CephadmContext,
+    hostname: str,
+    action: str,
+    target: str,
+    is_local: bool = False,
+    ssh_key: Optional[str] = None,
+    timeout: Optional[int] = None,
+) -> Tuple[str, str, int]:
+    """
+    Run systemctl command on a remote host (or locally).
+
+    Args:
+        ctx: CephadmContext
+        hostname: Target hostname
+        action: systemctl action (stop, start, enable, disable, is-active)
+        target: systemd target/unit name
+        is_local: If True, run locally instead of via SSH
+        ssh_key: Path to SSH private key file
+        timeout: Command timeout in seconds
+
+    Returns:
+        Tuple of (stdout, stderr, return_code)
+    """
+    if is_local:
+        cmd = ['systemctl', action, target]
+    else:
+        cmd = ['ssh', '-o', 'StrictHostKeyChecking=accept-new']
+        if ssh_key:
+            cmd.extend(['-i', ssh_key])
+        cmd.extend([f'root@{hostname}', 'systemctl', action, target])
+
+    return call(ctx, cmd, verbosity=CallVerbosity.QUIET, timeout=timeout)
+
+
+def stop_daemons_by_type(
+    ctx: CephadmContext,
+    daemons: List[Dict[str, Any]],
+    daemon_types: List[str],
+    ssh_key: Optional[str] = None,
+) -> List[str]:
+    """
+    Stop daemons of specified types individually.
+
+    Args:
+        ctx: CephadmContext
+        daemons: List of daemon info dicts from orchestrator
+        daemon_types: List of daemon types to stop
+        ssh_key: Path to SSH private key file
+
+    Returns:
+        List of daemon names that failed to stop
+    """
+    current_host = get_hostname()
+    failed = []
+
+    for daemon_type in daemon_types:
+        # Find all daemons of this type
+        type_daemons = [
+            d for d in daemons if d.get('daemon_type') == daemon_type
+        ]
+        if not type_daemons:
+            continue
+
+        count = len(type_daemons)
+        logger.info(f'Stopping {count} {daemon_type} daemon(s)...')
+
+        for daemon in type_daemons:
+            hostname = daemon.get('hostname', '')
+            daemon_id = daemon.get('daemon_id', '')
+            daemon_name = f'{daemon_type}.{daemon_id}'
+
+            if not hostname or not daemon_id:
+                logger.warning(
+                    f'Skipping daemon with incomplete data: {daemon}'
+                )
+                continue
+
+            unit = DaemonIdentity(
+                ctx.fsid, daemon_type, daemon_id
+            ).service_name
+            is_local = hostname == current_host
+
+            logger.info(f'  Stopping {daemon_name} on {hostname}')
+            _, err, code = remote_systemctl(
+                ctx,
+                hostname,
+                'stop',
+                unit,
+                is_local,
+                ssh_key,
+                timeout=HOST_OPERATION_TIMEOUT,
+            )
+            if code != 0:
+                logger.warning(f'  Failed to stop {daemon_name}: {err}')
+                failed.append(daemon_name)
+            else:
+                logger.info(f'  Stopped {daemon_name}')
+
+    return failed
+
+
+def _apply_to_cephfs(
+    ctx: CephadmContext,
+    action: str,
+    cmd_builder: Callable[[str], List[str]],
+    success_msg: str,
+    failure_msg: str,
+) -> List[str]:
+    """
+    Apply an action to all CephFS filesystems.
+
+    Args:
+        ctx: CephadmContext
+        action: Description of action for logging
+        cmd_builder: Function that takes fs_name and returns command list
+        success_msg: Log message format for success ({fs_name} placeholder)
+        failure_msg: Log message format for failure ({fs_name} placeholder)
+
+    Returns:
+        List of filesystem names that failed
+    """
+    try:
+        fs_result = run_ceph_command(
+            ctx, ['ceph', 'fs', 'ls', '--format', 'json'], json_output=True
+        )
+    except RuntimeError as e:
+        logger.warning(f'Failed to list filesystems: {e}')
+        return []
+
+    if not isinstance(fs_result, list):
+        return []
+
+    failed = []
+    for fs in fs_result:
+        fs_name = fs.get('name')
+        if not fs_name:
+            continue
+        try:
+            run_ceph_command(ctx, cmd_builder(fs_name))
+            logger.info(success_msg.format(fs_name=fs_name))
+        except RuntimeError as e:
+            logger.warning(f'{failure_msg.format(fs_name=fs_name)}: {e}')
+            failed.append(fs_name)
+
+    return failed
+
+
+def fail_cephfs_filesystems(ctx: CephadmContext) -> List[str]:
+    """Fail all CephFS filesystems for graceful shutdown."""
+    return _apply_to_cephfs(
+        ctx,
+        action='fail',
+        cmd_builder=lambda fs_name: ['ceph', 'fs', 'fail', fs_name],
+        success_msg='Failed filesystem: {fs_name}',
+        failure_msg='Failed to fail filesystem {fs_name}',
+    )
+
+
+def set_cephfs_joinable(ctx: CephadmContext) -> List[str]:
+    """Set all CephFS filesystems to joinable for startup."""
+    return _apply_to_cephfs(
+        ctx,
+        action='set joinable',
+        cmd_builder=lambda fs_name: [
+            'ceph',
+            'fs',
+            'set',
+            fs_name,
+            'joinable',
+            'true',
+        ],
+        success_msg='Set filesystem {fs_name} to joinable',
+        failure_msg='Failed to set filesystem {fs_name} joinable',
+    )
+
+
+def _get_cluster_status(
+    ctx: CephadmContext,
+) -> Tuple[Optional[Dict[str, Any]], str]:
+    """
+    Get cluster status as JSON.
+
+    Returns:
+        Tuple of (status_dict or None, error_message)
+    """
+    try:
+        status_result = run_ceph_command(
+            ctx, ['ceph', 'status', '--format', 'json'], json_output=True
+        )
+    except RuntimeError as e:
+        return None, f'Failed to get cluster status: {e}'
+
+    if not isinstance(status_result, dict):
+        return None, 'Invalid status response from cluster'
+
+    return status_result, ''
+
+
+def _check_pgs_from_status(status: Dict[str, Any]) -> Tuple[bool, int, int]:
+    """
+    Check PG status from a cluster status dict.
+
+    Returns:
+        Tuple of (all_clean, active_clean_count, total_pgs)
+    """
+    pgmap = status.get('pgmap', {})
+    num_pgs = pgmap.get('num_pgs', 0)
+    pgs_by_state = pgmap.get('pgs_by_state', [])
+
+    if num_pgs == 0:
+        return True, 0, 0
+
+    active_clean = 0
+    for state_info in pgs_by_state:
+        state = state_info.get('state_name', '')
+        count = state_info.get('count', 0)
+        if state == 'active+clean':
+            active_clean = count
+            break
+
+    return active_clean == num_pgs, active_clean, num_pgs
+
+
+def check_cluster_health(ctx: CephadmContext) -> Tuple[bool, str]:
+    """
+    Check if cluster is healthy and all PGs are active+clean.
+
+    Returns:
+        Tuple of (is_healthy, message)
+    """
+    status, err = _get_cluster_status(ctx)
+    if status is None:
+        return False, err
+
+    # Check health status
+    health = status.get('health', {})
+    health_status = health.get('status', 'HEALTH_ERR')
+    if health_status != 'HEALTH_OK':
+        return False, f'Cluster is not healthy: {health_status}'
+
+    # Check PG status
+    pgs_clean, active_clean, num_pgs = _check_pgs_from_status(status)
+    if not pgs_clean:
+        return (
+            False,
+            f'Not all PGs are active+clean: {active_clean}/{num_pgs}',
+        )
+
+    return True, 'Cluster is healthy and all PGs are active+clean'
+
+
+def check_pgs_clean(ctx: CephadmContext) -> Tuple[bool, str]:
+    """
+    Check if all PGs are active+clean (ignores overall health status).
+
+    Returns:
+        Tuple of (all_clean, message)
+    """
+    status, err = _get_cluster_status(ctx)
+    if status is None:
+        return False, err
+
+    pgs_clean, active_clean, num_pgs = _check_pgs_from_status(status)
+
+    if num_pgs == 0:
+        return True, 'No PGs in cluster'
+
+    if pgs_clean:
+        return True, f'All PGs are active+clean ({num_pgs}/{num_pgs})'
+
+    return False, f'PGs not yet clean: {active_clean}/{num_pgs} active+clean'
+
+
+def check_admin_keyring(ctx: CephadmContext) -> None:
+    """
+    Check that the admin keyring exists and is accessible.
+    Raises Error if not found.
+    """
+    if find_admin_keyring(ctx) is not None:
+        return
+
+    raise Error(
+        f'Admin keyring not found. Checked:\n'
+        f'  - {ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_KEYRING}\n'
+        f'  - {CEPH_DEFAULT_KEYRING}\n'
+        f'Cannot execute ceph commands without admin keyring.'
+    )
+
+
+def _start_host(
+    ctx: CephadmContext,
+    hostname: str,
+    target: str,
+    current_host: str,
+    ssh_key: Optional[str],
+) -> Tuple[str, bool, str]:
+    """
+    Start a single host's Ceph services.
+
+    Returns:
+        Tuple of (hostname, success, error_message)
+    """
+    is_local = hostname == current_host
+
+    # Skip if this is the local host and we're already running
+    if is_local:
+        out, _, code = call(
+            ctx,
+            ['systemctl', 'is-active', target],
+            verbosity=CallVerbosity.QUIET,
+        )
+        if code == 0 and 'active' in out:
+            logger.info(f'{hostname} (local) is already running')
+            return (hostname, True, '')
+
+    logger.info(f'Starting {hostname}...')
+
+    _, err, code = remote_systemctl(
+        ctx,
+        hostname,
+        'enable',
+        target,
+        is_local,
+        ssh_key,
+        timeout=HOST_OPERATION_TIMEOUT,
+    )
+    if code != 0:
+        msg = f'Failed to enable target on {hostname}: {err}'
+        logger.warning(msg)
+        return (hostname, False, msg)
+
+    _, err, code = remote_systemctl(
+        ctx,
+        hostname,
+        'start',
+        target,
+        is_local,
+        ssh_key,
+        timeout=HOST_OPERATION_TIMEOUT,
+    )
+    if code != 0:
+        msg = f'Failed to start target on {hostname}: {err}'
+        logger.warning(msg)
+        return (hostname, False, msg)
+
+    logger.info(f'Started {hostname}')
+    return (hostname, True, '')
+
+
+def start_hosts_parallel(
+    ctx: CephadmContext,
+    hosts: List[str],
+    target: str,
+    current_host: str,
+    ssh_key: Optional[str],
+    max_parallel: int = DEFAULT_PARALLEL_HOSTS,
+) -> List[str]:
+    """
+    Start multiple hosts in parallel.
+
+    Args:
+        ctx: CephadmContext
+        hosts: List of hostnames to start
+        target: Systemd target name
+        current_host: The local hostname
+        ssh_key: SSH key for remote operations
+        max_parallel: Maximum number of hosts to start in parallel
+
+    Returns:
+        List of hostnames that failed to start
+    """
+    failed_hosts: List[str] = []
+
+    if not hosts:
+        return failed_hosts
+
+    with ThreadPoolExecutor(max_workers=max_parallel) as executor:
+        futures = {
+            executor.submit(
+                _start_host, ctx, hostname, target, current_host, ssh_key
+            ): hostname
+            for hostname in hosts
+        }
+
+        for future in as_completed(futures):
+            hostname = futures[future]
+            try:
+                _, success, _ = future.result()
+                if not success:
+                    failed_hosts.append(hostname)
+            except CancelledError:
+                logger.error(f'Operation cancelled for host {hostname}')
+                failed_hosts.append(hostname)
+            except Exception:
+                logger.exception(f'Unexpected error starting host {hostname}')
+                failed_hosts.append(hostname)
+
+    return failed_hosts
+
+
+# Shutdown helpers
+
+
+def validate_shutdown_preconditions(ctx: CephadmContext) -> str:
+    """Validate preconditions for shutdown. Returns health message."""
+    if not ctx.fsid:
+        raise Error('must pass --fsid to specify cluster')
+
+    existing_state = load_cluster_state(ctx.fsid)
+    if existing_state:
+        state_file = get_cluster_state_file(ctx.fsid)
+        shutdown_time = existing_state.get('timestamp', 'unknown')
+        logger.warning('A shutdown state file already exists.')
+        logger.warning(f'  State file: {state_file}')
+        logger.warning(f'  Shutdown timestamp: {shutdown_time}')
+        logger.warning('')
+        logger.warning('This indicates the cluster was previously shut down.')
+        logger.warning(
+            'If this is expected, use "cephadm cluster-start" '
+            'to start the cluster.'
+        )
+        logger.warning('')
+        logger.warning(
+            'If you believe the cluster is actually running '
+            'and this file is stale,'
+        )
+        logger.warning('rename or remove the state file and retry:')
+        logger.warning(f'  mv {state_file} {state_file}.bak')
+        raise SystemExit(0)
+
+    check_admin_keyring(ctx)
+
+    if not ctx.dry_run and not ctx.yes_i_really_mean_it:
+        raise Error(
+            'This will shut down the entire cluster! '
+            'Pass --yes-i-really-mean-it to proceed.'
+        )
+
+    logger.info('Checking cluster health...')
+    is_healthy, health_msg = check_cluster_health(ctx)
+    if not is_healthy:
+        if ctx.force:
+            logger.warning(f'Cluster health check failed: {health_msg}')
+            logger.warning('Proceeding anyway due to --force flag')
+        else:
+            raise Error(f'Cluster health check failed: {health_msg}')
+    else:
+        logger.info(health_msg)
+
+    return health_msg
+
+
+def gather_shutdown_info(ctx: CephadmContext) -> Dict[str, Any]:
+    """Gather cluster information for shutdown."""
+    logger.info('Gathering cluster information from orchestrator...')
+    hosts = get_orch_hosts(ctx)
+    daemons = get_orch_daemons(ctx)
+
+    if not hosts:
+        raise Error('No hosts found in the cluster')
+
+    admin_host = get_admin_host(hosts) or get_hostname()
+    if not get_admin_host(hosts):
+        logger.warning('No admin host found (no host with _admin label)')
+
+    host_daemon_map = build_host_daemon_map(daemons)
+    shutdown_order = order_hosts_for_shutdown(host_daemon_map, admin_host)
+
+    logger.info(f'Admin host: {admin_host}')
+    logger.info(f'Shutdown order: {shutdown_order}')
+
+    return {
+        'admin_host': admin_host,
+        'host_daemon_map': host_daemon_map,
+        'shutdown_order': shutdown_order,
+        'daemons': daemons,
+        'target': f'ceph-{ctx.fsid}.target',
+    }
+
+
+def print_shutdown_plan(
+    ctx: CephadmContext,
+    info: Dict[str, Any],
+    health_msg: str,
+    dry_run: bool,
+) -> None:
+    """Print the shutdown plan."""
+    print_section_header('CLUSTER SHUTDOWN', dry_run)
+    print(f'FSID: {ctx.fsid}')
+    print(f'\nCluster health: {health_msg}')
+    print(f'Admin host: {info["admin_host"]}')
+    print_host_list(
+        info['shutdown_order'],
+        info['host_daemon_map'],
+        info['admin_host'],
+        'Shutdown order',
+    )
+
+
+def set_orchestrator_state(ctx: CephadmContext, action: str) -> None:
+    """Pause or resume the cephadm orchestrator.
+
+    Args:
+        action: 'pause' or 'resume'
+    """
+    try:
+        run_ceph_command(ctx, ['ceph', 'orch', action])
+        logger.info(f'{action.capitalize()}d cephadm orchestrator')
+    except RuntimeError as e:
+        logger.warning(f'Failed to {action} orchestrator: {e}')
+
+
+def prepare_for_shutdown(
+    ctx: CephadmContext, info: Dict[str, Any], dry_run: bool
+) -> Tuple[Optional[str], List[str]]:
+    """Get SSH key, pause orch, set OSD flags."""
+    ssh_key = None
+    flags_set: List[str] = []
+
+    if not dry_run:
+        ssh_key = get_cephadm_ssh_key(ctx)
+        if not ssh_key:
+            raise Error(
+                'Could not retrieve SSH key from cluster. Cannot proceed.'
+            )
+
+    # Pause orchestrator to prevent it from restarting daemons we stop
+    action = '[DRY-RUN] Would pause' if dry_run else 'Pausing'
+    print(f'{action} cephadm orchestrator...')
+    if not dry_run:
+        set_orchestrator_state(ctx, 'pause')
+
+    flags_str = ', '.join(SHUTDOWN_OSD_FLAGS)
+    action = '[DRY-RUN] Would set' if dry_run else 'Setting'
+    print(f'{action} OSD safety flags: {flags_str}')
+    if not dry_run:
+        flags_set = set_osd_flags(ctx, SHUTDOWN_OSD_FLAGS, ctx.force)
+
+    action = '[DRY-RUN] Would fail' if dry_run else 'Failing'
+    print(f'{action} CephFS filesystems...')
+    if not dry_run:
+        fail_cephfs_filesystems(ctx)
+
+    return ssh_key, flags_set
+
+
+def stop_client_daemons(
+    ctx: CephadmContext,
+    info: Dict[str, Any],
+    ssh_key: Optional[str],
+    dry_run: bool,
+) -> None:
+    """Stop client-facing daemons before full shutdown."""
+    daemons_to_stop = [
+        d
+        for d in DAEMONS_TO_STOP_FIRST
+        if any(
+            d in info['host_daemon_map'].get(h, [])
+            for h in info['shutdown_order']
+        )
+    ]
+    if daemons_to_stop:
+        action = '[DRY-RUN] Would stop' if dry_run else 'Stopping'
+        services = ', '.join(daemons_to_stop)
+        print(f'{action} client-facing services: {services}...')
+        if not dry_run:
+            failed_daemons = stop_daemons_by_type(
+                ctx, info['daemons'], DAEMONS_TO_STOP_FIRST, ssh_key
+            )
+            if failed_daemons:
+                logger.warning(
+                    f'Some daemons failed to stop: {failed_daemons}'
+                )
+
+
+def save_shutdown_state(
+    ctx: CephadmContext, info: Dict[str, Any], flags_set: List[str]
+) -> None:
+    """Save shutdown state for later restart."""
+    state = {
+        'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
+        'fsid': ctx.fsid,
+        'admin_host': info['admin_host'],
+        'shutdown_order': info['shutdown_order'],
+        'hosts': info['host_daemon_map'],
+        'flags_set': flags_set,
+    }
+    save_cluster_state(ctx.fsid, state)
+
+
+def stop_all_hosts(
+    ctx: CephadmContext,
+    info: Dict[str, Any],
+    ssh_key: Optional[str],
+    dry_run: bool,
+) -> List[str]:
+    """Stop all hosts sequentially (admin last). Returns failed hosts."""
+    action = '[DRY-RUN] Would stop' if dry_run else 'Stopping'
+    host_count = len(info['shutdown_order'])
+    print(f'{action} {host_count} hosts (admin host last)...')
+
+    if dry_run:
+        return []
+
+    failed_hosts = []
+    current_host = get_hostname()
+    target = info['target']
+    admin_host = info['admin_host']
+
+    for hostname in info['shutdown_order']:
+        is_local = hostname == current_host
+        admin_suffix = ' (admin)' if hostname == admin_host else ''
+        logger.info(f'Stopping {hostname}{admin_suffix}...')
+
+        _, err, code = remote_systemctl(
+            ctx,
+            hostname,
+            'disable',
+            target,
+            is_local,
+            ssh_key,
+            timeout=HOST_OPERATION_TIMEOUT,
+        )
+        if code != 0:
+            logger.warning(f'Failed to disable target on {hostname}: {err}')
+            if not ctx.force:
+                failed_hosts.append(hostname)
+                continue
+
+        _, err, code = remote_systemctl(
+            ctx,
+            hostname,
+            'stop',
+            target,
+            is_local,
+            ssh_key,
+            timeout=HOST_OPERATION_TIMEOUT,
+        )
+        if code != 0:
+            logger.warning(f'Failed to stop target on {hostname}: {err}')
+            if not ctx.force:
+                failed_hosts.append(hostname)
+                continue
+
+        logger.info(f'Stopped {hostname}')
+
+    return failed_hosts
+
+
+def print_shutdown_complete(
+    ctx: CephadmContext, host_count: int, dry_run: bool
+) -> None:
+    """Print shutdown completion message."""
+    print_section_footer()
+    if dry_run:
+        print('DRY-RUN COMPLETE - No changes were made')
+    else:
+        print('CLUSTER SHUTDOWN COMPLETE')
+        print(f'\nAll {host_count} hosts have been stopped.')
+    print('\nTo restart the cluster later:')
+    print(f'  cephadm cluster-start --fsid {ctx.fsid}')
+    print_section_footer()
+
+
+# Startup helpers
+
+
+def load_startup_info(ctx: CephadmContext) -> Optional[Dict[str, Any]]:
+    """Load saved state and build startup info. Returns None if no state."""
+    if not ctx.fsid:
+        raise Error('must pass --fsid to specify cluster')
+
+    state = load_cluster_state(ctx.fsid)
+    if not state:
+        logger.info('No shutdown state file found - nothing to start.')
+        return None
+
+    admin_host = state.get('admin_host', get_hostname())
+    flags_set = state.get('flags_set', [])
+    host_daemon_map = state.get('hosts', {})
+
+    all_hosts = list(host_daemon_map.keys())
+    startup_order = [admin_host] if admin_host in all_hosts else []
+    startup_order.extend(h for h in all_hosts if h != admin_host)
+
+    logger.info(f'Admin host: {admin_host}')
+    logger.info(f'Hosts to start: {startup_order}')
+
+    return {
+        'admin_host': admin_host,
+        'flags_set': flags_set,
+        'host_daemon_map': host_daemon_map,
+        'startup_order': startup_order,
+        'target': f'ceph-{ctx.fsid}.target',
+    }
+
+
+def print_startup_plan(ctx: CephadmContext, info: Dict[str, Any]) -> None:
+    """Print the startup plan for dry-run."""
+    print_section_header('CLUSTER STARTUP', dry_run=True)
+    print(f'FSID: {ctx.fsid}')
+    print(f'\nAdmin host: {info["admin_host"]}')
+    print_host_list(
+        info['startup_order'],
+        info['host_daemon_map'],
+        info['admin_host'],
+        'Hosts to start',
+    )
+    print(f'\nFlags to unset: {info["flags_set"]}')
+    print_section_footer()
+
+
+def start_all_hosts(
+    ctx: CephadmContext, info: Dict[str, Any], ssh_key: Optional[str]
+) -> List[str]:
+    """Start all hosts (admin first, then parallel). Returns failed hosts."""
+    target = info['target']
+    admin_host = info['admin_host']
+    current_host = get_hostname()
+    max_parallel = getattr(ctx, 'parallel', DEFAULT_PARALLEL_HOSTS)
+    non_admin_hosts = [h for h in info['startup_order'] if h != admin_host]
+
+    logger.info(
+        f'Starting {len(info["startup_order"])} hosts '
+        f'(max {max_parallel} in parallel, admin first)...'
+    )
+
+    failed_hosts = []
+
+    if admin_host:
+        logger.info(f'Starting admin host {admin_host} first...')
+        admin_failed = start_hosts_parallel(
+            ctx, [admin_host], target, current_host, ssh_key, max_parallel=1
+        )
+        failed_hosts.extend(admin_failed)
+
+    if non_admin_hosts:
+        other_failed = start_hosts_parallel(
+            ctx, non_admin_hosts, target, current_host, ssh_key, max_parallel
+        )
+        failed_hosts.extend(other_failed)
+
+    if failed_hosts:
+        logger.error(f'Failed to start hosts: {failed_hosts}')
+
+    return failed_hosts
+
+
+def wait_for_cluster_ready(ctx: CephadmContext) -> bool:
+    """Wait for cluster to be accessible and PGs clean."""
+    logger.info('Waiting for cluster to become accessible...')
+    if not wait_for_cluster_accessible(ctx):
+        logger.error(
+            'Cluster is not responding. Cannot proceed with startup.'
+        )
+        logger.error('Please check cluster status manually.')
+        return False
+
+    logger.info('Waiting for all PGs to be active+clean...')
+    if not wait_for_pgs_clean(ctx):
+        logger.warning('PGs did not become clean within timeout.')
+        logger.warning('Proceeding with flag removal anyway.')
+
+    return True
+
+
+def restore_cluster_services(
+    ctx: CephadmContext, flags_set: List[str]
+) -> None:
+    """Unset safety flags, restore CephFS, and resume orchestrator."""
+    logger.info('Unsetting cluster safety flags...')
+    unset_osd_flags(ctx, flags_set)
+
+    logger.info('Setting CephFS filesystems to joinable...')
+    set_cephfs_joinable(ctx)
+
+    logger.info('Resuming cephadm orchestrator...')
+    set_orchestrator_state(ctx, 'resume')
+
+    logger.info('Checking cluster health...')
+    is_healthy, health_msg = check_cluster_health(ctx)
+    if is_healthy:
+        logger.info(health_msg)
+    else:
+        logger.warning(f'Cluster health: {health_msg}')
+        logger.warning('Check cluster status manually: ceph -s')
+
+
+def cleanup_startup(ctx: CephadmContext) -> None:
+    """Clean up state file and SSH key after startup."""
+    remove_cluster_state(ctx.fsid)
+    remove_cached_ssh_key(ctx.fsid)
+
+
+def print_startup_complete(failed_hosts: List[str]) -> None:
+    """Print startup completion message."""
+    print_section_header('CLUSTER STARTUP COMPLETE')
+    if failed_hosts:
+        print(f'\nWarning: Some hosts failed to start: {failed_hosts}')
+    print('\nMonitor cluster health with: ceph -s')
+    print_section_footer()
+
+
+# Status helpers
+
+
+def print_cluster_status_shutdown(
+    ctx: CephadmContext,
+    state: Dict[str, Any],
+    current_host: str,
+) -> None:
+    """Print status for a cluster in shutdown state."""
+    fsid = ctx.fsid
+    print('Cluster state: SHUTDOWN')
+    print(f'State file: {get_cluster_state_file(fsid)}')
+    print(f'Shutdown time: {state.get("timestamp", "unknown")}')
+    print(f'Admin host: {state.get("admin_host", "unknown")}')
+
+    flags_set = state.get('flags_set', [])
+    if flags_set:
+        print(f'Flags set: {", ".join(flags_set)}')
+
+    hosts = state.get('hosts', {})
+    if hosts:
+        print(f'\nHosts ({len(hosts)}):')
+
+        ssh_key = get_cephadm_ssh_key(ctx)
+        target = f'ceph-{fsid}.target'
+
+        for hostname, daemon_types in hosts.items():
+            is_local = hostname == current_host
+            _, _, code = remote_systemctl(
+                ctx, hostname, 'is-active', target, is_local, ssh_key
+            )
+            status = 'RUNNING' if code == 0 else 'STOPPED'
+
+            is_admin = hostname == state.get('admin_host')
+            admin_marker = ' (admin)' if is_admin else ''
+            print(f'  {hostname}{admin_marker}: {status}')
+            if daemon_types:
+                print(f'    Daemons: {", ".join(sorted(daemon_types))}')
+
+    print(f'\nTo start the cluster: cephadm cluster-start --fsid {fsid}')
+
+
+def print_cluster_status_running(ctx: CephadmContext) -> int:
+    """Print status for a running cluster. Returns 0 on success, 1 on err."""
+    print('Cluster state: RUNNING (no shutdown state file found)')
+
+    try:
+        check_admin_keyring(ctx)
+    except Error as e:
+        print(f'\n{e}')
+        return 1
+
+    try:
+        hosts_result = run_ceph_command(
+            ctx,
+            ['ceph', 'orch', 'host', 'ls', '--format', 'json'],
+            json_output=True,
+        )
+        if isinstance(hosts_result, list):
+            print(f'\nHosts ({len(hosts_result)}):')
+            for host in hosts_result:
+                hostname = host.get('hostname', 'unknown')
+                labels = host.get('labels', [])
+                status = host.get('status', '')
+                admin_marker = ' (admin)' if ADMIN_LABEL in labels else ''
+                print(f'  {hostname}{admin_marker}: {status or "OK"}')
+
+        health_result = run_ceph_command(ctx, ['ceph', 'health'])
+        print(f'\nCluster health: {health_result.strip()}')
+
+    except RuntimeError as e:
+        print(f'\nCannot query cluster: {e}')
+        print('Cluster may be starting up or not accessible.')
+
+    return 0
index 51582c2fa1e6e8478e9caa8026a6aea027cd546f..874c28742e3be1a207a3b2e3c94de49ceab8db7f 100644 (file)
@@ -37,3 +37,6 @@ NO_DEPRECATED = False
 UID_NOBODY = 65534
 GID_NOGROUP = 65534
 DAEMON_FAILED_ERROR = 17
+
+# Host labels
+ADMIN_LABEL = '_admin'
diff --git a/src/cephadm/tests/test_cluster_ops.py b/src/cephadm/tests/test_cluster_ops.py
new file mode 100644 (file)
index 0000000..0548ee4
--- /dev/null
@@ -0,0 +1,334 @@
+# type: ignore
+# flake8: noqa: F811
+"""Tests for cluster shutdown, start, and status operations."""
+
+import json
+import mock
+import os
+
+from .fixtures import (
+    cephadm_fs,  # noqa: F401  # type: ignore[unused-import]
+    with_cephadm_ctx,
+)
+
+from cephadmlib import cluster_ops
+from cephadmlib import constants
+
+
+SAMPLE_FSID = '00000000-0000-0000-0000-0000deadbeef'
+
+
+class TestClusterStateFile:
+    """Tests for cluster state file operations."""
+
+    def test_get_cluster_state_file(self):
+        """Test that state file path is correctly generated."""
+        path = cluster_ops.get_cluster_state_file(SAMPLE_FSID)
+        assert path == f'/var/lib/ceph/{SAMPLE_FSID}/cluster-shutdown-state.json'
+
+    def test_save_and_load_cluster_state(self, cephadm_fs):
+        """Test saving and loading cluster state."""
+        state = {
+            'timestamp': '2024-01-01T00:00:00Z',
+            'fsid': SAMPLE_FSID,
+            'admin_host': 'node-1',
+            'shutdown_order': ['node-2', 'node-1'],
+            'hosts': {'node-1': ['mon', 'mgr'], 'node-2': ['osd']},
+            'flags_set': ['noout'],
+        }
+
+        # Create the directory
+        os.makedirs(f'/var/lib/ceph/{SAMPLE_FSID}', exist_ok=True)
+
+        # Save state
+        cluster_ops.save_cluster_state(SAMPLE_FSID, state)
+
+        # Verify file exists
+        state_file = cluster_ops.get_cluster_state_file(SAMPLE_FSID)
+        assert os.path.exists(state_file)
+
+        # Load and verify state
+        loaded_state = cluster_ops.load_cluster_state(SAMPLE_FSID)
+        assert loaded_state == state
+
+    def test_load_cluster_state_not_found(self, cephadm_fs):
+        """Test loading state when file doesn't exist."""
+        state = cluster_ops.load_cluster_state(SAMPLE_FSID)
+        assert state is None
+
+    def test_remove_cluster_state(self, cephadm_fs):
+        """Test removing cluster state file."""
+        # Create state file
+        os.makedirs(f'/var/lib/ceph/{SAMPLE_FSID}', exist_ok=True)
+        state_file = cluster_ops.get_cluster_state_file(SAMPLE_FSID)
+        with open(state_file, 'w') as f:
+            json.dump({'test': 'data'}, f)
+
+        assert os.path.exists(state_file)
+
+        # Remove state
+        cluster_ops.remove_cluster_state(SAMPLE_FSID)
+
+        assert not os.path.exists(state_file)
+
+
+class TestCheckClusterHealth:
+    """Tests for cluster health check."""
+
+    def test_check_cluster_health_ok(self):
+        """Test health check when cluster is healthy."""
+        with with_cephadm_ctx(['--image=quay.io/ceph/ceph:latest', 'shell']) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            # Mock run_ceph_command to return healthy status
+            with mock.patch.object(cluster_ops, 'run_ceph_command') as mock_cmd:
+                # Single call to ceph status --format json
+                mock_cmd.return_value = {
+                    'health': {'status': 'HEALTH_OK'},
+                    'pgmap': {
+                        'num_pgs': 100,
+                        'pgs_by_state': [{'state_name': 'active+clean', 'count': 100}]
+                    }
+                }
+
+                is_healthy, msg = cluster_ops.check_cluster_health(ctx)
+
+                assert is_healthy is True
+                assert 'healthy' in msg.lower()
+
+    def test_check_cluster_health_not_ok(self):
+        """Test health check when cluster is not healthy."""
+        with with_cephadm_ctx(['--image=quay.io/ceph/ceph:latest', 'shell']) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            with mock.patch.object(cluster_ops, 'run_ceph_command') as mock_cmd:
+                mock_cmd.return_value = {
+                    'health': {'status': 'HEALTH_WARN'},
+                    'pgmap': {'num_pgs': 100, 'pgs_by_state': []}
+                }
+
+                is_healthy, msg = cluster_ops.check_cluster_health(ctx)
+
+                assert is_healthy is False
+                assert 'HEALTH_WARN' in msg
+
+    def test_check_cluster_health_pgs_not_clean(self):
+        """Test health check when PGs are not all active+clean."""
+        with with_cephadm_ctx(['--image=quay.io/ceph/ceph:latest', 'shell']) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            with mock.patch.object(cluster_ops, 'run_ceph_command') as mock_cmd:
+                mock_cmd.return_value = {
+                    'health': {'status': 'HEALTH_OK'},
+                    'pgmap': {
+                        'num_pgs': 100,
+                        'pgs_by_state': [
+                            {'state_name': 'active+clean', 'count': 90},
+                            {'state_name': 'active+recovering', 'count': 10}
+                        ]
+                    }
+                }
+
+                is_healthy, msg = cluster_ops.check_cluster_health(ctx)
+
+                assert is_healthy is False
+                assert '90/100' in msg
+
+
+class TestDaemonOrdering:
+    """Tests for daemon and host ordering."""
+
+    def test_daemon_shutdown_order(self):
+        """Test that shutdown order has monitoring/mgmt first, core last."""
+        order = cluster_ops.DAEMON_SHUTDOWN_ORDER
+        # Monitoring types should come before core types
+        assert order.index('prometheus') < order.index('mon')
+        assert order.index('grafana') < order.index('osd')
+        # Gateways should come before core types
+        assert order.index('nvmeof') < order.index('mon')
+        # Core types should be at the end
+        assert order.index('mon') > order.index('rgw')
+        assert order.index('mgr') == len(order) - 1
+
+    def test_daemons_to_stop_first(self):
+        """Test that DAEMONS_TO_STOP_FIRST contains expected daemon types."""
+        expected = ['nvmeof', 'iscsi', 'nfs', 'smb', 'rgw', 'mds']
+        assert cluster_ops.DAEMONS_TO_STOP_FIRST == expected
+
+    def test_shutdown_osd_flags(self):
+        """Test that SHUTDOWN_OSD_FLAGS contains expected flags."""
+        assert 'noout' in cluster_ops.SHUTDOWN_OSD_FLAGS
+        assert isinstance(cluster_ops.SHUTDOWN_OSD_FLAGS, list)
+
+    def test_default_parallel_hosts(self):
+        """Test that DEFAULT_PARALLEL_HOSTS is set to a reasonable value."""
+        assert cluster_ops.DEFAULT_PARALLEL_HOSTS == 5
+        assert isinstance(cluster_ops.DEFAULT_PARALLEL_HOSTS, int)
+        assert cluster_ops.DEFAULT_PARALLEL_HOSTS > 0
+
+    def test_get_daemon_type_priority(self):
+        """Test daemon type priority for shutdown ordering."""
+        # In shutdown order: rgw < osd < mon (rgw stops first, mon last)
+        rgw_priority = cluster_ops.get_daemon_type_priority(['rgw'])
+        osd_priority = cluster_ops.get_daemon_type_priority(['osd'])
+        mon_priority = cluster_ops.get_daemon_type_priority(['mon'])
+
+        assert rgw_priority < osd_priority
+        assert osd_priority < mon_priority
+
+    def test_order_hosts_for_shutdown(self):
+        """Test host ordering for shutdown (admin host last)."""
+        host_daemon_map = {
+            'node-1': ['mon', 'mgr', 'osd'],  # admin
+            'node-2': ['mon', 'osd', 'rgw'],
+            'node-3': ['osd'],
+        }
+        admin_host = 'node-1'
+
+        ordered = cluster_ops.order_hosts_for_shutdown(host_daemon_map, admin_host)
+
+        # Admin host should be last
+        assert ordered[-1] == admin_host
+        # All hosts should be included
+        assert set(ordered) == set(host_daemon_map.keys())
+
+
+class TestRemoteSystemctl:
+    """Tests for remote systemctl operations."""
+
+    def test_remote_systemctl_local(self):
+        """Test systemctl on local host."""
+        with with_cephadm_ctx(['shell'], mock_cephadm_call_fn=False) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            with mock.patch('cephadmlib.cluster_ops.call', return_value=('', '', 0)) as mock_call:
+                out, err, code = cluster_ops.remote_systemctl(
+                    ctx, 'node-1', 'stop', 'ceph.target', is_local=True
+                )
+
+                assert code == 0
+                # Should use local systemctl command
+                call_args = mock_call.call_args[0]
+                cmd = call_args[1]
+                assert cmd == ['systemctl', 'stop', 'ceph.target']
+
+    def test_remote_systemctl_remote(self):
+        """Test systemctl on remote host via SSH."""
+        with with_cephadm_ctx(['shell'], mock_cephadm_call_fn=False) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            with mock.patch('cephadmlib.cluster_ops.call', return_value=('', '', 0)) as mock_call:
+                out, err, code = cluster_ops.remote_systemctl(
+                    ctx, 'node-2', 'stop', 'ceph.target', is_local=False, ssh_key='/tmp/key'
+                )
+
+                assert code == 0
+                call_args = mock_call.call_args[0]
+                cmd = call_args[1]
+                # Should use ssh with key
+                assert 'ssh' in cmd
+                assert '-i' in cmd
+                assert '/tmp/key' in cmd
+                assert 'root@node-2' in cmd
+
+
+class TestBuildHostDaemonMap:
+    """Tests for building host to daemon type mapping."""
+
+    def test_build_host_daemon_map(self):
+        """Test building host daemon map from daemon list."""
+        daemons = [
+            {'hostname': 'node-1', 'daemon_type': 'mon', 'daemon_id': 'node-1'},
+            {'hostname': 'node-1', 'daemon_type': 'mgr', 'daemon_id': 'node-1.abc'},
+            {'hostname': 'node-1', 'daemon_type': 'osd', 'daemon_id': '0'},
+            {'hostname': 'node-2', 'daemon_type': 'mon', 'daemon_id': 'node-2'},
+            {'hostname': 'node-2', 'daemon_type': 'osd', 'daemon_id': '1'},
+            {'hostname': 'node-2', 'daemon_type': 'rgw', 'daemon_id': 'rgw.zone'},
+        ]
+
+        result = cluster_ops.build_host_daemon_map(daemons)
+
+        assert 'node-1' in result
+        assert 'node-2' in result
+        assert set(result['node-1']) == {'mon', 'mgr', 'osd'}
+        assert set(result['node-2']) == {'mon', 'osd', 'rgw'}
+
+
+class TestGetAdminHost:
+    """Tests for identifying admin host."""
+
+    def test_get_admin_host(self):
+        """Test finding admin host from host list."""
+        hosts = [
+            {'hostname': 'node-1', 'labels': [constants.ADMIN_LABEL, 'mon']},
+            {'hostname': 'node-2', 'labels': ['mon']},
+            {'hostname': 'node-3', 'labels': ['osd']},
+        ]
+
+        admin = cluster_ops.get_admin_host(hosts)
+        assert admin == 'node-1'
+
+    def test_get_admin_host_not_found(self):
+        """Test when no admin host exists."""
+        hosts = [
+            {'hostname': 'node-1', 'labels': ['mon']},
+            {'hostname': 'node-2', 'labels': ['osd']},
+        ]
+
+        admin = cluster_ops.get_admin_host(hosts)
+        assert admin is None
+
+
+class TestSSHKeyHandling:
+    """Tests for SSH key retrieval and caching."""
+
+    def test_get_cephadm_ssh_key_cached(self, cephadm_fs):
+        """Test getting SSH key when it's already cached."""
+        with with_cephadm_ctx(['shell']) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            # Create cached key
+            key_dir = f'/var/lib/ceph/{SAMPLE_FSID}'
+            os.makedirs(key_dir, exist_ok=True)
+            key_path = os.path.join(key_dir, '.ssh_key')
+            with open(key_path, 'w') as f:
+                f.write('-----BEGIN RSA PRIVATE KEY-----\ntest\n-----END RSA PRIVATE KEY-----')
+
+            result = cluster_ops.get_cephadm_ssh_key(ctx)
+            assert result == key_path
+
+    def test_get_cephadm_ssh_key_from_store(self, cephadm_fs):
+        """Test retrieving SSH key from config-key store."""
+        with with_cephadm_ctx(['shell']) as ctx:
+            ctx.fsid = SAMPLE_FSID
+
+            # Ensure directory exists but no cached key
+            key_dir = f'/var/lib/ceph/{SAMPLE_FSID}'
+            os.makedirs(key_dir, exist_ok=True)
+
+            test_key = '-----BEGIN RSA PRIVATE KEY-----\ntest\n-----END RSA PRIVATE KEY-----'
+
+            with mock.patch.object(cluster_ops, 'run_ceph_command') as mock_cmd:
+                mock_cmd.return_value = test_key
+
+                result = cluster_ops.get_cephadm_ssh_key(ctx)
+
+                assert result is not None
+                assert os.path.exists(result)
+                with open(result, 'r') as f:
+                    assert f.read() == test_key
+
+    def test_remove_cached_ssh_key(self, cephadm_fs):
+        """Test removing cached SSH key."""
+        # Create cached key
+        key_dir = f'/var/lib/ceph/{SAMPLE_FSID}'
+        os.makedirs(key_dir, exist_ok=True)
+        key_path = os.path.join(key_dir, '.ssh_key')
+        with open(key_path, 'w') as f:
+            f.write('test key')
+
+        assert os.path.exists(key_path)
+
+        cluster_ops.remove_cached_ssh_key(SAMPLE_FSID)
+
+        assert not os.path.exists(key_path)