cephadm_version = ctx.cephadm_version if hasattr(ctx, 'cephadm_version') else None
has_failures = False
- try:
- validate_user_exists(user)
- except Exception as e:
- logger.exception('User %s does not exists. err: %s', user, e)
+ if not validate_user_exists(user):
+ logger.error('User %s does not exists on this host.', user)
return 1
# Step 1: Authorize SSH public key for the user
if ssh_pub_key:
sys.exit(1)
-def verify_and_execute_cephadm_binary(binary_path: str, cephadm_args: List[str]) -> None:
+def verify_and_execute_cephadm_binary(binary_path: str, cephadm_args: List[str]) -> int:
"""
verify, and execute cephadm binary with hash validation.
"""
expected_hash = extract_hash_from_path(binary_path)
if not expected_hash:
logger.error('Could not extract hash from binary path: %s', binary_path)
- sys.exit(1)
+ return 1
fh = open(binary_path, 'rb')
# Disable CLOEXEC so the FD stays open across exec
disable_cloexec(fh)
execute_cephadm(fh.fileno(), [binary_path] + cephadm_args)
- sys.exit(0)
+ return 0
if actual_hash is None:
logger.error('Failed to read or hash binary at: %s', binary_path)
- sys.exit(2)
+ return 2
else:
# Hash mismatch - backup the corrupted binary
logger.error('Binary hash mismatch at: %s', binary_path)
logger.error('Could not backup corrupted binary: %s', e)
logger.info('Returning exit code 2 to trigger binary redeployment')
- sys.exit(2)
+ return 2
except (IOError, OSError) as e:
logger.error('Error opening cephadm binary at %s: %s', binary_path, e)
- sys.exit(2)
+ return 2
finally:
if fh is not None:
try:
"""
Run cephadm binary with arguments after hash verification.
"""
- verify_and_execute_cephadm_binary(args.binary, args.args)
- return 0
+ ret = verify_and_execute_cephadm_binary(args.binary, args.args)
+ return ret
def command_deploy_binary(args: argparse.Namespace) -> int:
logger = logging.getLogger()
-def validate_user_exists(username: str) -> Tuple[int, int, str]:
- """Validate that a user exists and return their uid, gid, and home directory.
- Args:
- username: The username to validate
- Returns:
- Tuple of (uid, gid, home_directory)
- Raises:
- Error: If the user does not exist
- """
+def validate_user_exists(username: str) -> bool:
+ """Validate that a user exists"""
try:
- pwd_entry = pwd.getpwnam(username)
- return pwd_entry.pw_uid, pwd_entry.pw_gid, pwd_entry.pw_dir
+ pwd.getpwnam(username)
+ return True
except KeyError:
- raise Error(
- f'User {username} does not exist on this host. '
- f'Please create the user first: useradd -m -s /bin/bash {username}'
- )
+ return False
def setup_sudoers(
logger.info('Setting up SSH user %s on this host', username)
# Validate user exists (will raise Error if not)
- validate_user_exists(username)
+ if not validate_user_exists(username):
+ raise Error(
+ f'User {username} does not exist on this host. '
+ f'Please create the user first: useradd -m -s /bin/bash {username}'
+ )
# Setup sudoers (skip for root user)
if username != 'root':
# Tests for cephadm_invoker.py - secure wrapper for executing cephadm commands
#
import hashlib
-import io
-import os
-import sys
-import tempfile
from pathlib import Path
from unittest import mock
import pytest
monkeypatch.setattr('sys.argv', ['cephadm_invoker.py', 'run', str(test_file), 'ls'])
with mock.patch('os.execve') as mock_execve:
- with pytest.raises(SystemExit) as exc_info:
- invoker.main()
- assert exc_info.value.code == 0
+ assert invoker.main() == 0
mock_execve.assert_called_once()
def test_run_command_hash_mismatch(self, monkeypatch, tmp_path):
wrong_hash = 'wronghash123'
test_file = tmp_path / f'cephadm.{wrong_hash}'
test_file.write_bytes(content)
-
+
monkeypatch.setattr('sys.argv', ['cephadm_invoker.py', 'run', str(test_file), 'ls'])
-
- with pytest.raises(SystemExit) as exc_info:
- invoker.main()
- assert exc_info.value.code == 2
+
+ assert invoker.main() == 2
def test_run_command_nonexistent(self, monkeypatch, tmp_path):
"""Test 'run' command with nonexistent binary."""
nonexistent = tmp_path / 'nonexistent'
monkeypatch.setattr('sys.argv', ['cephadm_invoker.py', 'run', str(nonexistent), 'ls'])
-
- with pytest.raises(SystemExit) as exc_info:
- invoker.main()
- assert exc_info.value.code == 1
+
+ assert invoker.main() == 1
def test_deploy_command_success(self, monkeypatch, tmp_path):
"""Test 'deploy_binary' command."""
"""Test 'deploy_binary' with nonexistent temp file."""
temp_file = tmp_path / 'nonexistent'
final_path = tmp_path / 'cephadm'
-
+
monkeypatch.setattr('sys.argv', [
'cephadm_invoker.py',
'deploy_binary',
str(temp_file),
str(final_path)
])
-
+
result = invoker.main()
assert result == 1
"""Test 'check_binary' command when file exists."""
test_file = tmp_path / 'test_file'
test_file.write_text('content')
-
+
monkeypatch.setattr('sys.argv', [
- 'cephadm_invoker.py',
+ 'cephadm_invoker.py',
'check_binary',
str(test_file)
])
-
+
result = invoker.main()
assert result == 0