VersionStatusUpdater,
)
from cephadmlib.container_lookup import infer_local_ceph_image, identify
-from cephadmlib.user_utils import setup_ssh_user
+from cephadmlib.user_utils import (
+ setup_ssh_user,
+ validate_user_exists,
+ setup_sudoers_restricted,
+ install_or_upgrade_cephadm
+)
FuncT = TypeVar('FuncT', bound=Callable)
##################################
+def command_prepare_host_sudo_hardening(ctx: CephadmContext) -> int:
+ """
+ Prepare host for sudo hardening by:
+ 1. Authorizing SSH public key for the user
+ 2. Installing/upgrading cephadm package to match cluster version (includes cephadm_invoker.py)
+ 3. Setting up sudoers with restricted permissions for cephadm_invoker.py
+ """
+ logger.info('Preparing host for sudo hardening...')
+
+ user = ctx.ssh_user if hasattr(ctx, 'ssh_user') and ctx.ssh_user else 'root'
+ ssh_pub_key = ctx.ssh_pub_key if hasattr(ctx, 'ssh_pub_key') else None
+ cephadm_version = ctx.cephadm_version if hasattr(ctx, 'cephadm_version') else None
+
+ has_failures = False
+ try:
+ validate_user_exists(user)
+ except Exception as e:
+ logger.exception('User %s does not exists. err: %s', user, e)
+ return 1
+ # Step 1: Authorize SSH public key for the user
+ if ssh_pub_key:
+ try:
+ logger.debug('Authorizing SSH key for user %s', user)
+ authorize_ssh_key(ssh_pub_key, user)
+ except Exception as e:
+ logger.exception('Failed to authorize SSH key for %s. err: %s', user, e)
+ has_failures = True
+ else:
+ logger.warning('SSH key authorization skipped (no key provided)')
+
+ # Step 2: Install/upgrade the cephadm package (includes cephadm_invoker.py)
+ success, message = install_or_upgrade_cephadm(ctx, cephadm_version)
+ if success:
+ logger.debug('Installed the cephadm package: %s', message)
+ else:
+ logger.error('Failed to install the cephadm package: %s', message)
+ has_failures = True
+
+ # Step 3: Setup sudoers with restricted permissions for cephadm_invoker.py
+ if user and user != 'root':
+ try:
+ setup_sudoers_restricted(ctx, user, '/usr/libexec/cephadm_invoker.py')
+ logger.debug('Sudoers configured for %s (restricted to cephadm_nvoker.py)', user)
+ except Exception as e:
+ logger.exception('Failed to setup sudoers for %s. err: %s', user, e)
+ has_failures = True
+ else:
+ logger.debug('Sudoers setup skipped (root user)')
+
+ logger.info('Successfully prepared host for sudo hardening')
+
+ # Raise error if any step failed
+ if has_failures:
+ raise Error('Failed to prepare host for sudo hardening')
+ return 0
+
+##################################
+
+
class ArgumentFacade:
def __init__(self) -> None:
self.defaults: Dict[str, Any] = {}
help='Set hostname')
parser_setup_ssh_user = subparsers.add_parser(
- 'setup-ssh-user', help='setup SSH user with passwordless sudo and SSH key')
+ 'setup-ssh-user', help='set up SSH user with passwordless sudo and key')
parser_setup_ssh_user.set_defaults(func=command_setup_ssh_user)
parser_setup_ssh_user.add_argument(
'--ssh-user',
parser_setup_ssh_user.add_argument(
'--ssh-pub-key',
required=True,
- help='SSH public key to add to user authorized_keys')
+ help='SSH public key to add to authorized_keys')
+
+ parser_prepare_host_sudo_hardening = subparsers.add_parser(
+ 'prepare-host-sudo-hardening',
+ help='prepare host by installing cephadm, configuring sudoers, and enabling sudo hardening')
+ parser_prepare_host_sudo_hardening.set_defaults(func=command_prepare_host_sudo_hardening)
+ parser_prepare_host_sudo_hardening.add_argument(
+ '--ssh-user',
+ help='SSH user to configure (default: root)')
+ parser_prepare_host_sudo_hardening.add_argument(
+ '--ssh-pub-key',
+ help='SSH public key to authorize for the user')
+ parser_prepare_host_sudo_hardening.add_argument(
+ '--cephadm-version',
+ help='Specific cephadm version to install')
parser_add_repo = subparsers.add_parser(
'add-repo', help='configure package repository')
import logging
import os
import pwd
-from typing import Tuple
+from typing import Tuple, Optional
from .call_wrappers import call, CallVerbosity
from .context import CephadmContext
-from .exceptions import Error
+from .exceptions import Error, TimeoutExpired
from .exe_utils import find_program
+from .file_utils import write_new
from .ssh import authorize_ssh_key
+from .packagers import create_packager
logger = logging.getLogger()
)
-def setup_sudoers(ctx: CephadmContext, username: str) -> None:
- """Setup passwordless sudo for a user.
- """
+def setup_sudoers(
+ ctx: CephadmContext, username: str, sudoers_content: Optional[str] = None
+) -> None:
+ """Setup sudoers for a user with custom or default permissions."""
sudoers_file = f'/etc/sudoers.d/{username}'
- sudoers_content = f'{username} ALL=(ALL) NOPASSWD: ALL\n'
+ if sudoers_content is None:
+ sudoers_content = f'{username} ALL=(ALL) NOPASSWD: ALL\n'
+
+ # Ensure content ends with newline
+ if not sudoers_content.endswith('\n'):
+ sudoers_content += '\n'
logger.info('Setting up sudoers for user %s', username)
try:
# Write sudoers file with proper permissions
- with open(sudoers_file, 'w') as f:
- f.write(sudoers_content)
- os.chmod(sudoers_file, 0o440)
- os.chown(sudoers_file, 0, 0)
+ with write_new(sudoers_file, owner=(0, 0), perms=0o440) as fh:
+ fh.write(sudoers_content)
# Validate sudoers syntax
visudo_cmd = find_program('visudo')
_out, _err, code = call(
ctx,
[visudo_cmd, '-c', '-f', sudoers_file],
- verbosity=CallVerbosity.DEBUG
+ verbosity=CallVerbosity.DEBUG,
)
if code != 0:
- # Clean up invalid file
try:
os.remove(sudoers_file)
except OSError:
raise Error(msg)
-def setup_ssh_user(ctx: CephadmContext, username: str, ssh_pub_key: str) -> None:
+def setup_sudoers_restricted(
+ ctx: CephadmContext, username: str, allowed_command: str
+) -> None:
+ """Setup sudoers with restricted permissions for a specific command.
+ Args:
+ ctx: CephadmContext
+ username: Username to configure sudoers for
+ allowed_command: Full path to the command that user can run with sudo
+ """
+ sudoers_content = f'{username} ALL=(ALL) NOPASSWD: {allowed_command}'
+ setup_sudoers(ctx, username, sudoers_content)
+
+
+def setup_ssh_user(
+ ctx: CephadmContext, username: str, ssh_pub_key: str
+) -> None:
"""Setup SSH user with passwordless sudo and SSH key authorization.
This function performs the following operations:
1. Validates that the user exists on the system
2. Sets up passwordless sudo for the user (skipped for root)
3. Authorizes the SSH public key for the user
"""
- # Verify we're running as root
- if os.geteuid() != 0:
- raise Error('This operation must be run as root')
-
if not ssh_pub_key or ssh_pub_key.isspace():
raise Error('SSH public key is required and cannot be empty')
raise Error(msg)
logger.info('Successfully configured SSH user %s on this host', username)
+
+
+def _check_cephadm_version(ctx: CephadmContext) -> Tuple[Optional[str], bool]:
+ """Check if cephadm is installed and return its version.
+ Returns:
+ Tuple of (version_string_or_none, is_available)
+ """
+ try:
+ result_stdout, result_stderr, result_code = call(
+ ctx, ['cephadm', 'version']
+ )
+ if result_code == 0:
+ version = result_stdout.strip()
+ logger.info('Found installed cephadm: %s', version)
+ return version, True
+ else:
+ logger.info('cephadm command failed, package not available')
+ return None, False
+ except (TimeoutExpired, FileNotFoundError) as e:
+ logger.exception('cephadm not found: %s', e)
+ return None, False
+
+
+def install_or_upgrade_cephadm(
+ ctx: CephadmContext, requested_version: Optional[str] = None
+) -> Tuple[bool, str]:
+ """Install or upgrade cephadm package to match a specific version.
+ Returns:
+ Tuple of (success, message)
+ """
+ try:
+ current_version, is_available = _check_cephadm_version(ctx)
+ if is_available and current_version:
+ if requested_version and requested_version not in current_version:
+ logger.info(
+ 'Version mismatch: installed=%s, requested=%s',
+ current_version,
+ requested_version,
+ )
+ needs_install = True
+ elif requested_version:
+ logger.debug(
+ 'cephadm version %s already installed', requested_version
+ )
+ return True, f'cephadm {requested_version} already installed'
+ else:
+ logger.info(
+ 'cephadm installed, no specific version requested'
+ )
+ return True, f'cephadm already installed: {current_version}'
+ else:
+ needs_install = True
+
+ # Install or upgrade if needed
+ if needs_install:
+ logger.info('Installing/upgrading cephadm package...')
+ pkg = create_packager(ctx, version=requested_version)
+ pkg.install(['cephadm'])
+
+ # Verify installation
+ new_version, install_success = _check_cephadm_version(ctx)
+ if install_success and new_version:
+ logger.info(f'Successfully installed cephadm: {new_version}')
+ # Check version match if requested
+ if requested_version and requested_version not in new_version:
+ logger.warning(
+ 'Installed version %s does not match requested %s',
+ new_version,
+ requested_version,
+ )
+ return (
+ True,
+ f'cephadm installed/upgraded: {new_version} (requested {requested_version} not available)',
+ )
+ return True, f'cephadm installed/upgraded: {new_version}'
+ else:
+ logger.warning(
+ 'cephadm package installed but verification failed'
+ )
+ return (
+ True,
+ 'cephadm package installed (verification unavailable)',
+ )
+ return True, 'cephadm operation completed'
+ except Exception as e:
+ error_msg = f'Failed to install/upgrade cephadm: {e}'
+ logger.exception(error_msg)
+ return False, error_msg
desc="Default IP address for RedFish API (OOB management)."
),
Option(
- 'ssh_hardening',
+ 'sudo_hardening',
type='bool',
default=False,
- desc='Enable SSH hardening by routing all command execution through invoker.py. '
+ desc='Enable sudo hardening by routing all command execution through invoker.py. '
'When enabled, cephadm and bash commands are validated and executed via '
'the secure invoker wrapper.'
),
else:
self.paused = False
- self.ssh_hardening = False
+ self.sudo_hardening = False
self.invoker_path = '/usr/libexec/cephadm_invoker.py'
# for mypy which does not run the code
self.oob_default_addr = ''
self.ssh_keepalive_interval = 0
self.ssh_keepalive_count_max = 0
- self.ssh_hardening = False
+ self.sudo_hardening = False
self.invoker_path = '/usr/libexec/cephadm_invoker.py'
self.certificate_duration_days = 0
self.certificate_renewal_threshold_days = 0
self.event.set()
return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
+ def _prepare_host_for_sudo_hardening(
+ self,
+ host: str,
+ cephadm_args: List[str],
+ addr: Optional[str] = None
+ ) -> Tuple[str, bool, str]:
+ """
+ Prepare a host for sudo hardening by executing 'cephadm prepare-host-sudo-hardening' command.
+ """
+ try:
+ self.log.debug('Preparing host %s for sudo hardening...', host)
+ addr = addr or (self.inventory.get_addr(host) if host in self.inventory else None)
+
+ with self.async_timeout_handler(host, 'cephadm prepare-host-sudo-hardening'):
+ out, err, code = self.wait_async(
+ CephadmServe(self)._run_cephadm(
+ host, cephadmNoImage, 'prepare-host-sudo-hardening', cephadm_args,
+ addr=addr, error_ok=True, no_fsid=True))
+ if code:
+ error_msg = '\n'.join(err) if err else 'Unknown error'
+ self.log.error('Failed to prepare host %s: %s', host, error_msg)
+ return (host, False, error_msg)
+ self.log.debug('Successfully prepared host %s', host)
+ return (host, True, '')
+ except Exception as e:
+ error_msg = str(e)
+ self.log.exception('Exception while preparing host %s: %s', host, error_msg)
+ return (host, False, error_msg)
+
+ @forall_hosts
+ def _prepare_hosts_for_sudo_hardening(
+ self,
+ host: str,
+ cephadm_args: List[str],
+ addr: Optional[str] = None
+ ) -> Tuple[str, bool, str]:
+ return self._prepare_host_for_sudo_hardening(host, cephadm_args, addr)
+
+ @CephadmCLICommand.Write('cephadm prepare-host-and-enable-sudo-hardening')
+ def _prepare_host_and_enable_sudo_hardening(
+ self,
+ user: str,
+ host_label: Optional[str] = None
+ ) -> Tuple[int, str, str]:
+ """
+ Prepare hosts and enable sudo hardening for the cluster.
+ This command performs a complete sudo hardening setup:
+ 1. Prepare each host for sudo hardening (install cephadm, configure sudoers) - executed on all hosts
+ 2. Set SSH user to the specified user for cluster operations (without pre-steps)
+ 3. Enable sudo hardening globally for the cluster
+ """
+ if not self.ssh_pub:
+ return 1, '', 'Error: No SSH public key configured. Run "ceph cephadm generate-key" first.'
+ if host_label:
+ hosts_to_prepare = [h for h in self.cache.get_hosts()
+ if self.inventory.has_label(h, host_label)]
+ else:
+ hosts_to_prepare = self.cache.get_hosts()
+ if not hosts_to_prepare:
+ return 1, '', 'Error: No hosts found'
+ self.log.debug('Preparing %s host(s) in the cluster: %s', len(hosts_to_prepare), ", ".join(hosts_to_prepare))
+
+ # Prepare arguments for the cephadm command
+ args = []
+ args.extend(['--ssh-user', user])
+ args.extend(['--ssh-pub-key', self.ssh_pub])
+ ceph_version = self._get_cephadm_version_for_host_prep()
+ if ceph_version:
+ args.extend(['--cephadm-version', ceph_version])
+
+ # Step 1: Prepare each host for sudo hardening (executed on all target hosts in parallel)
+ self.log.debug('Step 1: Preparing %s host(s) for sudo hardening in parallel', len(hosts_to_prepare))
+ host_args = [(host, args) for host in hosts_to_prepare]
+ try:
+ host_results = self._prepare_hosts_for_sudo_hardening(host_args)
+ except Exception as e:
+ self.log.exception('Failed to prepare hosts in parallel: %s', e)
+ return 1, '', f'Failed to prepare hosts: {str(e)}'
+ failed_hosts = []
+ for hostname, success, error_msg in host_results:
+ if not success:
+ failed_hosts.append(hostname)
+ if failed_hosts:
+ return 1, '', f'Failed to prepare {len(failed_hosts)} host(s): {", ".join(failed_hosts)}'
+ self.log.debug('All %s hosts prepared successfully', len(hosts_to_prepare))
+
+ # Step 2: Enable sudo hardening globally
+ self.log.debug('Step 2: Enabling sudo hardening...')
+ try:
+ if not self.sudo_hardening:
+ self.set_module_option('sudo_hardening', True)
+ self.sudo_hardening = True
+ except Exception as e:
+ error_msg = f'Failed to enable sudo hardening: {e}'
+ self.log.exception(error_msg)
+ return 1, '', error_msg
+
+ # Step 3: Set SSH user for cluster operations
+ self.log.debug('Step 3: Setting SSH user to %s for cluster operations', user)
+ try:
+ # Call set_ssh_user with skip_pre_steps=True since we've already prepared the hosts
+ current_user = self.ssh_user
+ if current_user != user:
+ retval, out_msg, err_msg = self.set_ssh_user(user, skip_pre_steps=True)
+ if retval != 0:
+ error_msg = f'Failed to set SSH user: {err_msg}'
+ self.log.error(error_msg)
+ return 1, '', error_msg
+ except Exception as e:
+ error_msg = f'Failed to set SSH user: {e}'
+ self.log.exception(error_msg)
+ return 1, '', error_msg
+
+ success_msg = (
+ f'Sudo hardening is now active for all cluster operations.\n'
+ f'Affected hosts: {", ".join(hosts_to_prepare)}\n'
+ )
+ self.log.debug(success_msg)
+ return 0, success_msg, ''
+
@CephadmCLICommand.Write(
prefix='cephadm set-extra-ceph-conf')
def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
raise OrchestratorError(str(e))
return ip_addr
+ def _get_cephadm_version_for_host_prep(self) -> Optional[str]:
+ """Extract cephadm version from cluster version string."""
+ try:
+ if self.version:
+ parts = self.version.split()
+ if len(parts) > 2 and parts[0] == 'ceph' and parts[1] == 'version':
+ return parts[2]
+ except Exception as e:
+ self.log.debug(f'Could not determine cluster version: {e}')
+ return None
+
+ def _cleanup_add_host_tracking(self, hostname: str) -> None:
+ """Clean up host from tracking sets and reset SSH connection."""
+ try:
+ self.hosts_being_added.discard(hostname)
+ except Exception as e:
+ self.log.debug(f'Failed to remove {hostname} from hosts_being_added: {e}')
+
+ try:
+ self.ssh.reset_con(hostname)
+ except Exception as e:
+ self.log.debug(f'Failed to reset SSH connection for {hostname}: {e}')
+
+ def _prepare_new_host_for_sudo_hardening(self, hostname: str, addr: str) -> None:
+ """
+ Prepare a new host for sudo hardening before adding to inventory.
+ Raises OrchestratorError on failure.
+ """
+ if not self.ssh_pub:
+ raise OrchestratorError('No SSH public key configured')
+ if not self.ssh_user:
+ raise OrchestratorError('No SSH user configured')
+
+ self.log.info('Preparing new host %s for sudo hardening with root', hostname)
+
+ # Build arguments
+ ceph_version = self._get_cephadm_version_for_host_prep()
+ if ceph_version:
+ self.log.debug('Will install cephadm version %s on %s', ceph_version, hostname)
+ cephadm_args = ['--ssh-user', self.ssh_user, '--ssh-pub-key', self.ssh_pub]
+ if ceph_version:
+ cephadm_args.extend(['--cephadm-version', ceph_version])
+
+ # Execute preparation
+ _, success, error_msg = self._prepare_host_for_sudo_hardening(
+ hostname, cephadm_args, addr
+ )
+ if not success:
+ raise OrchestratorError(
+ f'Failed to prepare host {hostname} for sudo hardening: {error_msg}'
+ )
+ self.log.info('Successfully prepared host %s for sudo hardening', hostname)
+
+ def _setup_user_on_new_host(self, hostname: str, addr: Optional[str]) -> None:
+ """
+ user setup for new hosts (when sudo hardening is disabled).
+ """
+ try:
+ assert self.ssh_pub
+ assert self.ssh_user
+ self._setup_user_on_host(hostname, self.ssh_user, self.ssh_pub, addr=addr)
+ except OrchestratorError as oe:
+ self.log.exception('Failed to setup user %s on %s: %s', self.ssh_user, hostname, oe)
+ self.log.warning('Please manually setup user %s on %s', hostname, self.ssh_user, hostname)
+ except Exception as e:
+ self.log.exception('Unexpected error setting up user %s on %s: %s', self.ssh_user, hostname, e)
+ self.log.warning('You may need to manually setup user %s on %s', self.ssh_user, hostname)
+
def _add_host(self, spec):
# type: (HostSpec) -> str
"""
if is_new_host and self.ssh_user and self.ssh_user != 'root':
try:
self.hosts_being_added.add(spec.hostname)
- self.log.info(f'Adding new host {spec.hostname}, will use root user temporarily for setup')
+ self.log.info('Adding new host %s, will use root user temporarily for setup', spec.hostname)
except Exception as e:
- self.log.warning(f'Failed to add {spec.hostname} to hosts_being_added tracking: {e}')
+ self.log.warning('Failed to add %s to hosts_being_added tracking: %s', spec.hostname, e)
try:
ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
'args': [f'{k}={v}' for k, v in spec.location.items()],
})
+ # BEFORE adding host to inventory, prepare it for sudo hardening if needed
+ if is_new_host and self.sudo_hardening and self.ssh_user and self.ssh_user != 'root':
+ try:
+ self._prepare_new_host_for_sudo_hardening(spec.hostname, spec.addr)
+ except Exception as e:
+ self.log.exception('Sudo hardening preparation failed for %s: %s', spec.hostname, e)
+ raise OrchestratorError(
+ f'Failed to prepare host {spec.hostname} for sudo hardening. '
+ f'Host was not added to the cluster. Error: {e}'
+ )
+ elif is_new_host and self.ssh_user and self.ssh_user != 'root':
+ # Sudo hardening not enabled, just perform set user setup presteps
+ self._setup_user_on_new_host(spec.hostname, spec.addr)
+
if spec.hostname not in self.inventory:
self.cache.prime_empty_host(spec.hostname)
self.inventory.add_host(spec)
self.event.set() # refresh stray health check
self.log.info('Added host %s' % spec.hostname)
- # If this is a new host and using non-root user, setup the user now
- if is_new_host and self.ssh_user and self.ssh_user != 'root':
- self.log.info(f'Setting up user {self.ssh_user} on new host {spec.hostname}')
- try:
- assert self.ssh_pub
- self._setup_user_on_host(spec.hostname, self.ssh_user, self.ssh_pub, addr=spec.addr)
- self.log.info(f'Successfully set up user {self.ssh_user} on {spec.hostname}')
- except OrchestratorError as oe:
- # OrchestratorError from user setup (user doesn't exist, SSH failures, etc.)
- # Log warning but don't fail the add_host operation
- self.log.warning(f'Failed to setup user {self.ssh_user} on {spec.hostname}: {oe}')
- self.log.warning(
- f'Host {spec.hostname} added but user setup incomplete. '
- f'Please manually setup user {self.ssh_user} on {spec.hostname}')
- except Exception as e:
- # Unexpected error during user setup
- # Log warning but don't fail the add_host operation
- self.log.error(f'Unexpected error setting up user {self.ssh_user} on {spec.hostname}: {e}')
- self.log.warning(f'You may need to manually setup user {self.ssh_user} on {spec.hostname}')
- finally:
- # Always remove from hosts_being_added after setup attempt
- try:
- self.hosts_being_added.discard(spec.hostname)
- except Exception as discard_err:
- self.log.debug(f'Failed to remove {spec.hostname} from hosts_being_added: {discard_err}')
-
- # Reset SSH connection so next operations will use configured user
- try:
- self.ssh.reset_con(spec.hostname)
- self.log.debug(f'Reset SSH connection for {spec.hostname} to use configured user')
- except Exception as reset_err:
- # Connection reset failure is not critical - connection will be recreated on next use
- self.log.debug(f'Failed to reset SSH connection for {spec.hostname}: {reset_err}')
- elif is_new_host:
- # Root user, no need to track
- try:
- self.hosts_being_added.discard(spec.hostname)
- except Exception as e:
- self.log.debug(f'Failed to remove {spec.hostname} from hosts_being_added: {e}')
-
return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
except Exception:
- # If anything fails in core add_host operations, cleanup the new tracking mechanisms
- if is_new_host and self.ssh_user and self.ssh_user != 'root':
- # Cleanup tracking set
- try:
- self.hosts_being_added.discard(spec.hostname)
- self.log.debug(f'Cleaned up hosts_being_added tracking for {spec.hostname} after error')
- except Exception as cleanup_err:
- self.log.debug(f'Failed to cleanup hosts_being_added for {spec.hostname}: {cleanup_err}')
-
- # Cleanup any stale SSH connection
- try:
- self.ssh.reset_con(spec.hostname)
- self.log.debug(f'Reset SSH connection for {spec.hostname} after error')
- except Exception as reset_err:
- self.log.debug(f'Failed to reset SSH connection for {spec.hostname}: {reset_err}')
raise
+ finally:
+ if is_new_host and self.ssh_user and self.ssh_user != 'root':
+ self.log.debug('Cleaning up %s ', spec.hostname)
+ self._cleanup_add_host_tracking(spec.hostname)
@handle_orch_error
def add_host(self, spec: HostSpec) -> str:
self.log.debug('stdin: %s' % stdin)
# If SSH hardening is enabled, call invoker directly without which python
- if self.mgr.ssh_hardening and self.mgr.invoker_path:
+ if self.mgr.sudo_hardening and self.mgr.invoker_path:
# For invoker, pass all args as a single string
cmd = ssh.RemoteCommand(
ssh.Executables.INVOKER,
host, cmd, stdin=stdin, addr=addr)
if code == 2 or code == 127:
# Use invoker to check file existence when SSH hardening is enabled
- if self.mgr.ssh_hardening and self.mgr.invoker_path:
+ if self.mgr.sudo_hardening and self.mgr.invoker_path:
check_cmd = ssh.RemoteCommand(
ssh.Executables.INVOKER,
- ['check_existence', self.mgr.cephadm_binary_path]
+ ['check_binary', self.mgr.cephadm_binary_path]
)
else:
check_cmd = ssh.RemoteCommand(
elif self.mgr.mode == 'cephadm-package':
try:
# Wrap with invoker if SSH hardening is enabled
- if self.mgr.ssh_hardening and self.mgr.invoker_path:
+ if self.mgr.sudo_hardening and self.mgr.invoker_path:
cmd = ssh.RemoteCommand(
ssh.Executables.INVOKER,
['run', str(CEPHADM_EXE), '--'] + final_args
async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
# Use tee (from coreutils) to create a copy of cephadm on the target machine
self.log.info(f"Deploying cephadm binary to {host}")
- if self.mgr.ssh_hardening and self.mgr.invoker_path:
+ if self.mgr.sudo_hardening and self.mgr.invoker_path:
# Use invoker for secure deployment when SSH hardening is enabled
await self.mgr.ssh._deploy_cephadm_binary_via_invoker(
host, self.mgr.cephadm_binary_path, self.mgr._cephadm, addr=addr)
with self.mgr.async_timeout_handler(host, f'ssh {host} (addr {addr})'):
return self.mgr.wait_async(self._remote_connection(host, addr))
- def _enforce_ssh_hardening(self,
- host: str,
- cmd_components: RemoteCommand) -> None:
+ def _enforce_sudo_hardening(
+ self,
+ host: str,
+ cmd_components: RemoteCommand
+ ) -> None:
"""
Enforce that commands are wrapped with invoker when SSH hardening
is enabled.
"""
- if not self.mgr.ssh_hardening:
+ if not self.mgr.sudo_hardening:
return
is_wrapped = (
logger.debug(f'Host {host} is being added, using root user without sudo')
# Enforce invoker usage if SSH hardening is enabled
- self._enforce_ssh_hardening(host, cmd_components)
+ self._enforce_sudo_hardening(host, cmd_components)
rcmd = RemoteSudoCommand.wrap(cmd_components, use_sudo=use_sudo)
try:
invoker_cmd = RemoteCommand(
Executables.INVOKER,
- ['deploy_cephadm_binary', remote_tmp_path, cephadm_path]
+ ['deploy_binary', remote_tmp_path, cephadm_path]
)
await self._execute_command(host, invoker_cmd, addr=addr)
--- /dev/null
+#!/usr/bin/env python3
+"""Tests for SSH hardening functionality in cephadm."""
+
+import pytest
+from unittest import mock
+try:
+ from unittest.mock import AsyncMock
+except ImportError:
+ from asyncmock import AsyncMock
+
+from orchestrator import OrchestratorError
+from cephadm.ssh import SSHManager, RemoteCommand, Executables
+from cephadm.tests.fixtures import with_host
+
+
+class TestSudoHardening:
+ """Test SSH hardening functionality."""
+
+ @pytest.fixture
+ def setup_sudo_hardening(self, cephadm_module):
+ """Common setup for SSH hardening tests."""
+ cephadm_module.sudo_hardening = True
+ cephadm_module.ssh_user = 'cephadm'
+ cephadm_module.cephadm_binary_path = '/var/lib/ceph/fsid/cephadm.abc123'
+ cephadm_module.invoker_path = '/usr/libexec/cephadm_invoker.py'
+ return cephadm_module
+
+ @pytest.fixture
+ def mock_connection(self):
+ """Create a mock SSH connection."""
+ mock_conn = AsyncMock()
+ mock_conn.run.return_value = mock.Mock(
+ stdout='', stderr='', returncode=0
+ )
+ return mock_conn
+
+ @pytest.mark.parametrize('command', [
+ lambda: RemoteCommand(RemoteCommand('python3'),
+ ['/var/lib/ceph/fsid/cephadm.abc123',
+ 'check-host', '--expect-hostname', 'test-host']),
+ lambda: RemoteCommand(Executables.LS, ['-la', '/tmp']),
+ lambda: RemoteCommand(RemoteCommand('/var/lib/ceph/fsid/cephadm.abc123'),
+ ['version']),
+ ])
+ def test_unwrapped_command_errors(self, setup_sudo_hardening,
+ mock_connection, command):
+ """Test that unwrapped commands error out when SSH hardening
+ is enabled."""
+ ssh_manager = SSHManager(setup_sudo_hardening)
+
+ with mock.patch.object(ssh_manager, '_remote_connection',
+ return_value=mock_connection):
+ cmd = command()
+ with pytest.raises(OrchestratorError,
+ match='command is not wrapped with invoker'):
+ setup_sudo_hardening.wait_async(
+ ssh_manager._execute_command('test-host', cmd)
+ )
+
+ def test_sudo_hardening_disabled_no_wrapping(
+ self,
+ cephadm_module,
+ mock_connection
+ ):
+ """Test that commands are not wrapped when SSH hardening
+ is disabled."""
+ cephadm_module.sudo_hardening = False
+ cephadm_module.ssh_user = 'cephadm'
+ cephadm_module.cephadm_binary_path = '/var/lib/ceph/fsid/cephadm.abc123'
+ ssh_manager = SSHManager(cephadm_module)
+
+ with mock.patch.object(ssh_manager, '_remote_connection',
+ return_value=mock_connection):
+ cmd = RemoteCommand(Executables.LS, ['-la', '/tmp'])
+ cephadm_module.wait_async(
+ ssh_manager._execute_command('test-host', cmd)
+ )
+
+ mock_connection.run.assert_called_once()
+ called_args = mock_connection.run.call_args[0][0]
+ assert called_args == 'sudo ls -la /tmp'
+
+ def test_wrapped_command_succeeds(self, setup_sudo_hardening,
+ mock_connection):
+ """Test that wrapped commands succeed when SSH hardening
+ is enabled."""
+ ssh_manager = SSHManager(setup_sudo_hardening)
+
+ with mock.patch.object(ssh_manager, '_remote_connection',
+ return_value=mock_connection):
+ cmd = RemoteCommand(
+ Executables.INVOKER,
+ ['run', '/var/lib/ceph/fsid/cephadm.abc123', '--help']
+ )
+ setup_sudo_hardening.wait_async(
+ ssh_manager._execute_command('test-host', cmd)
+ )
+
+ mock_connection.run.assert_called()
+ called_args = mock_connection.run.call_args[0][0]
+ assert called_args == ('sudo /usr/libexec/cephadm_invoker.py '
+ 'run /var/lib/ceph/fsid/cephadm.abc123 --help')
+
+ def test_check_host_with_sudo_hardening_integration(
+ self, setup_sudo_hardening, mock_connection):
+ """Integration test for check_host with SSH hardening enabled."""
+ with mock.patch.object(setup_sudo_hardening,
+ '_prepare_new_host_for_sudo_hardening'):
+ with mock.patch.object(setup_sudo_hardening.ssh, '_remote_connection',
+ return_value=mock_connection):
+ with with_host(setup_sudo_hardening, 'test-host'):
+ code, _, err = setup_sudo_hardening.check_host('test-host')
+ assert code == 0
+ assert err == ''
+
+ called_args = mock_connection.run.call_args[0][0]
+ assert '/usr/libexec/cephadm_invoker.py' in called_args
+ assert 'sudo' in called_args
+ assert 'run' in called_args
+ assert 'check-host' in called_args
+ assert '--expect-hostname test-host' in called_args
+
+ def test_check_host_without_sudo_hardening(
+ self,
+ cephadm_module,
+ mock_connection
+ ):
+ """Integration test for check_host with SSH hardening disabled."""
+ cephadm_module.sudo_hardening = False
+ cephadm_module.ssh_user = 'root'
+ cephadm_module.cephadm_binary_path = '/var/lib/ceph/fsid/cephadm.abc123'
+
+ with mock.patch.object(cephadm_module.ssh, '_remote_connection',
+ return_value=mock_connection):
+ with with_host(cephadm_module, 'test-host'):
+ code, _, err = cephadm_module.check_host('test-host')
+ assert code == 0
+ assert err == ''
+
+ called_args = mock_connection.run.call_args[0][0]
+ assert '/usr/libexec/cephadm_invoker.py' not in called_args
+ assert 'sudo' not in called_args
+ assert 'check-host' in called_args
+ assert '--expect-hostname test-host' in called_args
+
+ def test_execute_cephadm_exec_with_hardening(self, setup_sudo_hardening):
+ """Test cephadm exec with SSH hardening enabled."""
+ ssh_manager = SSHManager(setup_sudo_hardening)
+ mock_conn = AsyncMock()
+ mock_conn.run.return_value = mock.Mock(
+ stdout='success', stderr='', returncode=0
+ )
+
+ with mock.patch.object(setup_sudo_hardening.ssh, '_remote_connection',
+ return_value=mock_conn):
+ cmd = RemoteCommand(Executables.LS, ['-la', '/tmp'])
+ out, err, code = ssh_manager.execute_cephadm_exec('test-host', cmd)
+
+ called_args = mock_conn.run.call_args[0][0]
+ assert called_args.startswith('sudo /usr/libexec/cephadm_invoker.py')
+ assert setup_sudo_hardening.cephadm_binary_path in called_args
+ assert 'exec' in called_args and '--command' in called_args
+ assert (out, err, code) == ('success', '', 0)
+
+ def test_execute_cephadm_exec_without_hardening(self, cephadm_module):
+ """Test cephadm exec with SSH hardening disabled."""
+ cephadm_module.sudo_hardening = False
+ cephadm_module.ssh_user = 'cephadm'
+ cephadm_module.cephadm_binary_path = '/var/lib/ceph/fsid/cephadm.abc123'
+ ssh_manager = SSHManager(cephadm_module)
+
+ mock_conn = AsyncMock()
+ mock_conn.run.return_value = mock.Mock(
+ stdout='success', stderr='', returncode=0
+ )
+
+ with mock.patch.object(cephadm_module.ssh, '_remote_connection',
+ return_value=mock_conn):
+ cmd = RemoteCommand(Executables.LS, ['-la', '/tmp'])
+ out, err, code = ssh_manager.execute_cephadm_exec('test-host', cmd)
+
+ called_args = mock_conn.run.call_args[0][0]
+ # When hardening is disabled, cephadm exec should use cephadm binary directly (not invoker)
+ assert cephadm_module.cephadm_binary_path in called_args
+ assert '/usr/libexec/cephadm_invoker.py' not in called_args
+ assert 'exec' in called_args and '--command' in called_args
+ assert (out, err, code) == ('success', '', 0)