| [--log-dir LOG_DIR] [--logrotate-dir LOGROTATE_DIR]
| [--unit-dir UNIT_DIR] [--verbose] [--timeout TIMEOUT]
| [--retry RETRY] [--no-container-init]
-| {version,pull,inspect-image,ls,list-networks,list-rdma,adopt,rm-daemon,rm-cluster,run,shell,enter,ceph-volume,unit,logs,bootstrap,deploy,check-host,prepare-host,prepare-host-sudo-hardening,setup-ssh-user,add-repo,rm-repo,install,list-images,update-osd-service,exec}
+| {version,pull,inspect-image,ls,list-networks,list-rdma,adopt,rm-daemon,rm-cluster,run,shell,enter,ceph-volume,unit,logs,bootstrap,deploy,check-host,prepare-host,prepare-host-sudo-hardening,setup-ssh-user,add-repo,rm-repo,install,list-images,update-osd-service}
| ...
* [--expect-hostname EXPECT_HOSTNAME] Set hostname
-exec
-----
-
-Execute a shell command on a cluster host::
-
- cephadm exec -- ls -la
-
-Positional arguments:
-
-* [command] command to execute
-
-Arguments:
-
-* [--timeout TIMEOUT] timeout in seconds (default: None)
-
-
prepare-host-sudo-hardening
--------------------------
from cephadmlib.exceptions import (
ClusterAlreadyExists,
Error,
- TimeoutExpired,
UnauthorizedRegistryError,
DaemonStartException,
)
##################################
-def command_exec(ctx: CephadmContext) -> int:
- """
- Execute a shell command on the host
- Return Codes:
- - `0`: Command executed successfully
- - `124`: Command timed out
- - `1`: Error during execution
- - Other: Return code from the executed command
- """
- if not ctx.command:
- raise Error('No command provided to execute')
- cmd = ctx.command
- logger.debug('Executing command: %s' % ' '.join(cmd))
- try:
- stdout, stderr, returncode = call(
- ctx,
- cmd,
- verbosity=CallVerbosity.SILENT,
- timeout=ctx.timeout
- )
- if stdout:
- sys.stdout.write(stdout)
- if stderr:
- sys.stderr.write(stderr)
- return returncode
- except TimeoutExpired:
- logger.exception('Command timed out after %s seconds' % ctx.timeout)
- return 124
- except Exception as e:
- logger.exception('Error executing command: %s' % str(e))
- return 1
-
-##################################
-
-
def command_list_networks(ctx):
# type: (CephadmContext) -> None
r = list_networks(ctx)
'command', nargs='*',
help='additional journalctl args')
- parser_exec = subparsers.add_parser(
- 'exec', help='execute a shell command on the host')
- parser_exec.set_defaults(func=command_exec)
- parser_exec.add_argument(
- '--command', nargs=argparse.REMAINDER,
- help='command to execute')
- parser_exec.add_argument(
- '--fsid',
- help='cluster FSID')
-
parser_bootstrap = subparsers.add_parser(
'bootstrap', help='bootstrap a cluster (mon + mgr daemons)')
parser_bootstrap.set_defaults(func=command_bootstrap)
assert os.path.exists('/etc/ceph/ceph.conf')
assert os.path.isdir('/etc/ceph/ceph.conf')
-
-
-class TestExec(object):
- """Test cases for the 'cephadm exec' command"""
-
- @mock.patch.object(_cephadm, 'call')
- @mock.patch('cephadm.logger')
- def test_exec_non_zero_exit(self, _logger, mock_call):
- """Test command execution with non-zero exit code"""
- mock_call.return_value = ('', 'command not found\n', 127)
-
- cmd = ['exec', '--command', 'nonexistent_command']
- with with_cephadm_ctx(cmd, mock_cephadm_call_fn=False) as ctx:
- ctx.command = ['nonexistent_command']
- ctx.timeout = 300
- retval = _cephadm.command_exec(ctx)
- assert retval == 127
-
- @mock.patch('subprocess.run')
- @mock.patch('cephadm.logger')
- def test_exec_empty_command_list(self, _logger, mock_run):
- """Test command_exec with empty command list"""
- cmd = ['exec', '--command']
- with with_cephadm_ctx(cmd) as ctx:
- ctx.command = []
- ctx.timeout = 300
- with pytest.raises(_cephadm.Error, match='No command provided to execute'):
- _cephadm.command_exec(ctx)
-
- @mock.patch.object(_cephadm, 'call')
- @mock.patch('cephadm.logger')
- @mock.patch('sys.stdout')
- @mock.patch('sys.stderr')
- def test_exec_with_stdout_and_stderr(self, mock_stderr, mock_stdout, _logger, mock_call):
- """Test command execution with both stdout and stderr"""
- mock_call.return_value = ('Standard output\n', 'Standard error\n', 2)
-
- cmd = ['exec', '--command', 'test_command']
- with with_cephadm_ctx(cmd, mock_cephadm_call_fn=False) as ctx:
- ctx.command = ['test_command']
- ctx.timeout = 300
- retval = _cephadm.command_exec(ctx)
- assert retval == 2
- mock_stdout.write.assert_called_once_with('Standard output\n')
- mock_stderr.write.assert_called_once_with('Standard error\n')
-
- @mock.patch.object(_cephadm, 'call')
- @mock.patch('cephadm.logger')
- def test_exec_general_exception(self, _logger, mock_call):
- """Test command execution with general exception"""
- mock_call.side_effect = Exception('Unexpected error')
- cmd = ['exec', '--command', 'test']
- with with_cephadm_ctx(cmd, mock_cephadm_call_fn=False) as ctx:
- ctx.command = ['test']
- ctx.timeout = 300
- retval = _cephadm.command_exec(ctx)
- assert retval == 1
if host not in self.mgr.offline_hosts:
try:
rcmd = ssh.RemoteCommand(ssh.Executables.TRUE)
- self.mgr.ssh.check_execute_cephadm_exec(host, rcmd, log_command=self.mgr.log_refresh_metadata)
+ self.mgr.ssh.check_execute_command(host, rcmd, log_command=self.mgr.log_refresh_metadata)
except Exception:
logger.debug(f'OfflineHostDetector: detected {host} to be offline')
# kick serve loop in case corrective action must be taken for offline host
continue
self.log.info(f'Removing {host}:{path}')
cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', path])
- self.mgr.ssh.check_execute_cephadm_exec(host, cmd)
+ self.mgr.ssh.check_execute_command(host, cmd)
updated_files = True
self.mgr.cache.removed_client_file(host, path)
if updated_files:
host, self.mgr.cephadm_binary_path, self.mgr._cephadm, addr=addr)
else:
await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
- self.mgr._cephadm, addr=addr, mode=0o744,
- bypass_cephadm_exec=True)
-
- async def run_cephadm_exec(self,
- host: str,
- cmd: List[str],
- addr: Optional[str] = None,
- stdin: Optional[str] = None,
- log_output: Optional[bool] = True,
- timeout: Optional[int] = None,
- ) -> Tuple[str, str, int]:
- """
- Execute a bash command on the remote host via 'cephadm exec --command <bash command>'
- """
- self.log.debug(f"run_cephadm_exec: Executing command on {host}: {cmd}")
-
- exec_args = ['--command'] + cmd
- try:
- out, err, code = await self._run_cephadm(
- host=host,
- entity=cephadmNoImage,
- command='exec',
- args=exec_args,
- addr=addr,
- stdin=stdin,
- no_fsid=True, # exec doesn't need fsid
- error_ok=True, # We'll handle errors at a higher level
- log_output=log_output,
- timeout=timeout
- )
- stdout = out[0] if out else ''
- stderr = err[0] if err else ''
- if log_output:
- self.log.debug(f"run_cephadm_exec result: code={code}, stdout={stdout}, stderr={stderr}")
- return stdout, stderr, code
-
- except Exception as e:
- self.log.exception(f"Error executing command via cephadm exec on {host}: {e}")
- return '', str(e), 1
+ self.mgr._cephadm, addr=addr, mode=0o744)
def _retry_failed_operations(self) -> None:
self.log.debug('_retry_failed_operations')
addr: Optional[str] = None,
log_command: Optional[bool] = True,
) -> Tuple[str, str, int]:
+
conn = await self._remote_connection(host, addr)
# Enforce invoker usage if SSH hardening is enabled
self._enforce_sudo_hardening(host, cmd_components)
with self.mgr.async_timeout_handler(host, " ".join(cmd)):
return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
- async def _execute_cephadm_exec(self,
- host: str,
- cmd_components: RemoteCommand,
- stdin: Optional[str] = None,
- addr: Optional[str] = None,
- log_command: Optional[bool] = True,
- ) -> Tuple[str, str, int]:
- """
- Execute a command on the remote host via 'cephadm exec --command <bash command>'
- This routes the command through CephadmServe.run_cephadm_exec
- """
- if log_command:
- logger.debug(f'Executing command via cephadm exec: {cmd_components}')
-
- cmd_list = list(cmd_components)
-
- from cephadm.serve import CephadmServe
-
- out, err, code = await CephadmServe(self.mgr).run_cephadm_exec(
- host=host,
- cmd=cmd_list,
- addr=addr,
- stdin=stdin,
- log_output=log_command
- )
- return out, err, code
-
- def execute_cephadm_exec(self,
- host: str,
- cmd: RemoteCommand,
- stdin: Optional[str] = None,
- addr: Optional[str] = None,
- log_command: Optional[bool] = True
- ) -> Tuple[str, str, int]:
- with self.mgr.async_timeout_handler(host, " ".join(cmd)):
- return self.mgr.wait_async(self._execute_cephadm_exec(host, cmd, stdin, addr, log_command))
-
- async def _check_execute_cephadm_exec(self,
- host: str,
- cmd: RemoteCommand,
- stdin: Optional[str] = None,
- addr: Optional[str] = None,
- log_command: Optional[bool] = True
- ) -> str:
- """Execute a command via cephadm exec and raise error if it fails"""
- out, err, code = await self._execute_cephadm_exec(host, cmd, stdin, addr, log_command)
- if code != 0:
- msg = f'Command {cmd} failed. {err}'
- logger.debug(msg)
- raise OrchestratorError(msg)
- return out
-
- def check_execute_cephadm_exec(self,
- host: str,
- cmd: RemoteCommand,
- stdin: Optional[str] = None,
- addr: Optional[str] = None,
- log_command: Optional[bool] = True,
- ) -> str:
- with self.mgr.async_timeout_handler(host, " ".join(cmd)):
- return self.mgr.wait_async(self._check_execute_cephadm_exec(host, cmd, stdin, addr, log_command))
-
async def _write_remote_file(self,
host: str,
path: str,
uid: Optional[int] = None,
gid: Optional[int] = None,
addr: Optional[str] = None,
- bypass_cephadm_exec: Optional[bool] = False,
) -> None:
- """
- Write a file to a remote host.
- """
try:
cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}"
dirname = os.path.dirname(path)
-
- # Choose execution method based on bypass flag
- execute_method = self._check_execute_command if bypass_cephadm_exec else self._check_execute_cephadm_exec
mkdir = RemoteCommand(Executables.MKDIR, ['-p', dirname])
- await execute_method(host, mkdir, addr=addr)
+ await self._check_execute_command(host, mkdir, addr=addr)
mkdir2 = RemoteCommand(Executables.MKDIR, ['-p', cephadm_tmp_dir + dirname])
- await execute_method(host, mkdir2, addr=addr)
+ await self._check_execute_command(host, mkdir2, addr=addr)
tmp_path = cephadm_tmp_dir + path + '.new'
touch = RemoteCommand(Executables.TOUCH, [tmp_path])
- await execute_method(host, touch, addr=addr)
+ await self._check_execute_command(host, touch, addr=addr)
if self.mgr.ssh_user != 'root':
assert self.mgr.ssh_user
chown = RemoteCommand(
Executables.CHOWN,
['-R', self.mgr.ssh_user, cephadm_tmp_dir]
)
- await execute_method(host, chown, addr=addr)
+ await self._check_execute_command(host, chown, addr=addr)
chmod = RemoteCommand(Executables.CHMOD, [str(644), tmp_path])
- await execute_method(host, chmod, addr=addr)
+ await self._check_execute_command(host, chmod, addr=addr)
with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
os.fchmod(f.fileno(), 0o600)
f.write(content)
Executables.CHOWN,
['-R', str(uid) + ':' + str(gid), tmp_path]
)
- await execute_method(host, chown, addr=addr)
+ await self._check_execute_command(host, chown, addr=addr)
if mode is not None:
chmod = RemoteCommand(Executables.CHMOD, [oct(mode)[2:], tmp_path])
- await execute_method(host, chmod, addr=addr)
+ await self._check_execute_command(host, chmod, addr=addr)
mv = RemoteCommand(Executables.MV, ['-Z', tmp_path, path])
- await execute_method(host, mv, addr=addr)
+ await self._check_execute_command(host, mv, addr=addr)
except Exception as e:
msg = f"Unable to write {host}:{path}: {e}"
logger.exception(msg)
uid: Optional[int] = None,
gid: Optional[int] = None,
addr: Optional[str] = None,
- bypass_cephadm_exec: Optional[bool] = False,
) -> None:
with self.mgr.async_timeout_handler(host, f'writing file {path}'):
self.mgr.wait_async(self._write_remote_file(
- host, path, content, mode, uid, gid, addr, bypass_cephadm_exec))
+ host, path, content, mode, uid, gid, addr))
async def _reset_con(self, host: str) -> None:
conn = self.cons.get(host)
CephadmServe(cephadm_module)._write_all_client_files()
# Make sure both ceph conf locations (default and per fsid) are called
_write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'',
- 0o644, 0, 0, None, False),
+ 0o644, 0, 0, None),
mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'',
- 0o644, 0, 0, None, False)]
+ 0o644, 0, 0, None)]
)
ceph_conf_files = cephadm_module.cache.get_host_client_files('test')
assert len(ceph_conf_files) == 2
CephadmServe(cephadm_module)._write_all_client_files()
_write_file.assert_has_calls([mock.call('test',
'/etc/ceph/ceph.conf',
- b'[mon]\nk=v\n', 0o644, 0, 0, None, False),
+ b'[mon]\nk=v\n', 0o644, 0, 0, None),
mock.call('test',
'/var/lib/ceph/fsid/config/ceph.conf',
- b'[mon]\nk=v\n', 0o644, 0, 0, None, False)])
+ b'[mon]\nk=v\n', 0o644, 0, 0, None)])
# reload
cephadm_module.cache.last_client_files = {}
cephadm_module.cache.load()
assert 'sudo' not in called_args
assert 'check-host' in called_args
assert '--expect-hostname test-host' in called_args
-
- def test_execute_cephadm_exec_with_hardening(self, setup_sudo_hardening):
- """Test cephadm exec with SSH hardening enabled."""
- ssh_manager = SSHManager(setup_sudo_hardening)
- mock_conn = AsyncMock()
- mock_conn.run.return_value = mock.Mock(
- stdout='success', stderr='', returncode=0
- )
-
- with mock.patch.object(setup_sudo_hardening.ssh, '_remote_connection',
- return_value=mock_conn):
- cmd = RemoteCommand(Executables.LS, ['-la', '/tmp'])
- out, err, code = ssh_manager.execute_cephadm_exec('test-host', cmd)
-
- called_args = mock_conn.run.call_args[0][0]
- assert called_args.startswith('sudo /usr/libexec/cephadm_invoker.py')
- assert setup_sudo_hardening.cephadm_binary_path in called_args
- assert 'exec' in called_args and '--command' in called_args
- assert (out, err, code) == ('success', '', 0)
-
- def test_execute_cephadm_exec_without_hardening(self, cephadm_module):
- """Test cephadm exec with SSH hardening disabled."""
- cephadm_module.sudo_hardening = False
- cephadm_module.ssh_user = 'cephadm'
- cephadm_module.cephadm_binary_path = '/var/lib/ceph/fsid/cephadm.abc123'
- ssh_manager = SSHManager(cephadm_module)
-
- mock_conn = AsyncMock()
- mock_conn.run.return_value = mock.Mock(
- stdout='success', stderr='', returncode=0
- )
-
- with mock.patch.object(cephadm_module.ssh, '_remote_connection',
- return_value=mock_conn):
- cmd = RemoteCommand(Executables.LS, ['-la', '/tmp'])
- out, err, code = ssh_manager.execute_cephadm_exec('test-host', cmd)
-
- called_args = mock_conn.run.call_args[0][0]
- # When hardening is disabled, cephadm exec should use cephadm binary directly (not invoker)
- assert cephadm_module.cephadm_binary_path in called_args
- assert '/usr/libexec/cephadm_invoker.py' not in called_args
- assert 'exec' in called_args and '--command' in called_args
- assert (out, err, code) == ('success', '', 0)
{'y': 'y'}).to_json()})
return ''
- def async_timeout_handler(self, host='', cmd='', timeout=None):
- """Mock async_timeout_handler for tests"""
- from contextlib import contextmanager
-
- @contextmanager
- def handler():
- yield
- return handler()
-
- def wait_async(self, coro, timeout=None):
- """Mock wait_async for tests - just run the coroutine"""
- import asyncio
- loop = asyncio.new_event_loop()
- try:
- return loop.run_until_complete(coro)
- finally:
- loop.close()
-
class TestTunedProfiles:
tspec1 = TunedProfileSpec('p1',
]
_write_profiles.assert_has_calls(calls, any_order=True)
- @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec')
- def test_rm_stray_tuned_profiles(self, _check_execute_cephadm_exec):
+ @mock.patch('cephadm.ssh.SSHManager.check_execute_command')
+ def test_rm_stray_tuned_profiles(self, _check_execute_command):
profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
# for this test, going to use host "a" and put 4 cephadm generated
# profiles "p1" "p2", "p3" and "who" only two of which should be there ("p1", "p2")
# as well as a file not generated by cephadm. Only the "p3" and "who"
# profiles should be removed from the host. This should total to 4
- # calls to check_execute_cephadm_exec, 1 "ls", 2 "rm", and 1 "sysctl --system"
- _check_execute_cephadm_exec.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf',
- 'p2-cephadm-tuned-profile.conf',
- 'p3-cephadm-tuned-profile.conf',
- 'who-cephadm-tuned-profile.conf',
- 'dont-touch-me'])
+ # calls to check_execute_command, 1 "ls", 2 "rm", and 1 "sysctl --system"
+ _check_execute_command.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf',
+ 'p2-cephadm-tuned-profile.conf',
+ 'p3-cephadm-tuned-profile.conf',
+ 'who-cephadm-tuned-profile.conf',
+ 'dont-touch-me'])
mgr = FakeMgr(['a', 'b', 'c'],
['a', 'b', 'c'],
[],
'a', RemoteCommand(Executables.SYSCTL, ['--system'])
),
]
- _check_execute_cephadm_exec.assert_has_calls(calls, any_order=True)
+ _check_execute_command.assert_has_calls(calls, any_order=True)
- @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec')
+ @mock.patch('cephadm.ssh.SSHManager.check_execute_command')
@mock.patch('cephadm.ssh.SSHManager.write_remote_file')
- def test_write_tuned_profiles(self, _write_remote_file, _check_execute_cephadm_exec):
+ def test_write_tuned_profiles(self, _write_remote_file, _check_execute_command):
profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
# for this test we will use host "a" and have it so host_needs_tuned_profile_update
# returns True for p2 and False for p1 (see FakeCache class). So we should see
# 2 ssh calls, one to write p2, one to run sysctl --system
- _check_execute_cephadm_exec.return_value = 'success'
+ _check_execute_command.return_value = 'success'
_write_remote_file.return_value = 'success'
mgr = FakeMgr(['a', 'b', 'c'],
['a', 'b', 'c'],
profiles)
tp = TunedProfileUtils(mgr)
tp._write_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2]))
- _check_execute_cephadm_exec.assert_called_with(
+ _check_execute_command.assert_called_with(
'a', RemoteCommand(Executables.SYSCTL, ['--system'])
)
_write_remote_file.assert_called_with(
if self.mgr.cache.is_host_unreachable(host):
return
cmd = ssh.RemoteCommand(ssh.Executables.LS, [SYSCTL_DIR])
- found_files = self.mgr.ssh.check_execute_cephadm_exec(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
+ found_files = self.mgr.ssh.check_execute_command(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
found_files = [s.strip() for s in found_files]
profile_names: List[str] = sum([[*p] for p in profiles], []) # extract all profiles names
profile_names = list(set(profile_names)) # remove duplicates
if file not in expected_files:
logger.info(f'Removing stray tuned profile file {file}')
cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', f'{SYSCTL_DIR}/{file}'])
- self.mgr.ssh.check_execute_cephadm_exec(host, cmd)
+ self.mgr.ssh.check_execute_command(host, cmd)
updated = True
if updated:
- self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD)
+ self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None:
if self.mgr.cache.is_host_unreachable(host):
self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8'))
updated = True
if updated:
- self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD)
+ self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
self.mgr.cache.last_tuned_profile_update[host] = datetime_now()