| [--log-dir LOG_DIR] [--logrotate-dir LOGROTATE_DIR]
| [--unit-dir UNIT_DIR] [--verbose] [--timeout TIMEOUT]
| [--retry RETRY] [--no-container-init]
-| {version,pull,inspect-image,ls,list-networks,list-rdma,adopt,rm-daemon,rm-cluster,remove-file,sysctl-dir,run,shell,enter,ceph-volume,unit,logs,bootstrap,deploy,check-host,check-online,prepare-host,prepare-host-sudo-hardening,setup-ssh-user,add-repo,rm-repo,install,list-images,update-osd-service}
+| {version,pull,inspect-image,ls,list-networks,list-rdma,adopt,rm-daemon,rm-cluster,remove-file,deploy-file,sysctl-dir,run,shell,enter,ceph-volume,unit,logs,bootstrap,deploy,check-host,check-online,prepare-host,prepare-host-sudo-hardening,setup-ssh-user,add-repo,rm-repo,install,list-images,update-osd-service}
| ...
| **cephadm** **remove-file** [-h] [--fsid FSID] --path PATH
+| **cephadm** **deploy-file** [-h] [--fsid FSID] --path PATH [--mode MODE]
+| [--uid UID] [--gid GID]
+
| **cephadm** **sysctl-dir** [-h] [--fsid FSID] (--list | --apply-system)
| **cephadm** **prepare-host**
* --path PATH absolute path of the file to remove (required)
+deploy-file
+-----------
+
+Write or replace a file on the local host. The **entire file body** is read from
+**standard input** as raw bytes (no encoding or line-ending translation).
+
+Arguments:
+
+* [--fsid FSID] cluster FSID (passed automatically when invoked by the orchestrator)
+* --path PATH absolute destination path for the file (required)
+* [--mode MODE] octal file mode (for example ``644`` or ``0644``)
+* [--uid UID] numeric owner user id (**must** be given together with ``--gid``)
+* [--gid GID] numeric owner group id (**must** be given together with ``--uid``)
+
+
sysctl-dir
----------
return 0
+@infer_fsid
+def command_deploy_file(ctx: CephadmContext) -> int:
+ """Write or replace a host file from raw stdin bytes (for mgr-driven config sync)."""
+ dest = Path(ctx.deploy_file_path).expanduser()
+ if not dest.is_absolute():
+ raise Error(f'deploy-file: destination must be an absolute path: {dest}')
+
+ uid = ctx.deploy_file_uid
+ gid = ctx.deploy_file_gid
+ if (uid is None) != (gid is None):
+ raise Error('deploy-file: --uid and --gid must be given together')
+
+ owner = (uid, gid) if uid is not None and gid is not None else None
+ perms = None
+ if ctx.deploy_file_mode is not None:
+ perms = int(str(ctx.deploy_file_mode), 8)
+
+ dest.parent.mkdir(parents=True, mode=0o755, exist_ok=True)
+ try:
+ with write_new(dest, owner=owner, perms=perms, binary=True) as fh:
+ fh.write(sys.stdin.buffer.read())
+ except Exception as e:
+ logger.exception('deploy-file: Failed to write file, exception: %s', e)
+ raise
+ return 0
+
+
def command_sysctl_dir(ctx: CephadmContext) -> int:
"""List basenames under sysctl.d or run sysctl --system"""
action = ctx.sysctl_dir_action
dest='remove_file_path',
help='absolute path of the file to remove')
+ parser_deploy_file = subparsers.add_parser(
+ 'deploy-file',
+ help='write or replace a host file from stdin (raw bytes)')
+ parser_deploy_file.set_defaults(func=command_deploy_file)
+ parser_deploy_file.add_argument(
+ '--fsid',
+ help='cluster FSID')
+ parser_deploy_file.add_argument(
+ '--path',
+ required=True,
+ dest='deploy_file_path',
+ help='absolute destination path for the file')
+ parser_deploy_file.add_argument(
+ '--mode',
+ dest='deploy_file_mode',
+ default=None,
+ help='octal mode for the file (e.g. 644 or 0644)')
+ parser_deploy_file.add_argument(
+ '--uid',
+ type=int,
+ dest='deploy_file_uid',
+ default=None,
+ help='numeric owner uid (requires --gid)')
+ parser_deploy_file.add_argument(
+ '--gid',
+ type=int,
+ dest='deploy_file_gid',
+ default=None,
+ help='numeric owner gid (requires --uid)')
+
parser_sysctl_dir = subparsers.add_parser(
'sysctl-dir',
help='list entries in sysctl.d or run sysctl --system')
owner: Optional[Tuple[int, int]] = None,
perms: Optional[int] = DEFAULT_MODE,
encoding: Optional[str] = None,
+ binary: bool = False,
) -> Generator[IO, None, None]:
"""Write a new file in a robust manner, optionally specifying the owner,
permissions, or encoding. This function takes care to never leave a file in
open_kwargs: Dict[str, Any] = {}
if encoding:
open_kwargs['encoding'] = encoding
+ file_mode = 'wb' if binary else 'w'
try:
- with open(tempname, 'w', **open_kwargs) as fh:
+ with open(tempname, file_mode, **open_kwargs) as fh:
yield fh
fh.flush()
os.fsync(fh.fileno())
_cephadm.command_remove_file(ctx)
assert cephadm_fs.exists(link)
+ def test_command_deploy_file(self, cephadm_fs):
+ import io
+ fsid = '00000000-0000-0000-0000-0000deadbeef'
+ dest = '/etc/ceph/kube.conf'
+ cephadm_fs.create_dir('/etc/ceph')
+ content = b'hello\xff'
+ stdin_mock = mock.Mock()
+ stdin_mock.buffer = io.BytesIO(content)
+ with mock.patch('sys.stdin', stdin_mock):
+ with with_cephadm_ctx(
+ ['deploy-file', '--fsid', fsid, '--path', dest, '--mode', '600']
+ ) as ctx:
+ assert _cephadm.command_deploy_file(ctx) == 0
+ assert cephadm_fs.exists(dest)
+ with open(dest, 'rb') as f:
+ assert f.read() == content
+
+ def test_command_deploy_file_rejects_relative_path(self, cephadm_fs):
+ import io
+ stdin_mock = mock.Mock()
+ stdin_mock.buffer = io.BytesIO(b'x')
+ with mock.patch('sys.stdin', stdin_mock):
+ with with_cephadm_ctx(
+ ['deploy-file', '--fsid', '00000000-0000-0000-0000-0000deadbeef',
+ '--path', 'relative/path.conf']
+ ) as ctx:
+ with pytest.raises(_cephadm.Error, match='absolute path'):
+ _cephadm.command_deploy_file(ctx)
+
+ def test_command_deploy_file_uid_gid_together(self, cephadm_fs):
+ import io
+ stdin_mock = mock.Mock()
+ stdin_mock.buffer = io.BytesIO(b'x')
+ with mock.patch('sys.stdin', stdin_mock):
+ with with_cephadm_ctx(
+ ['deploy-file', '--fsid', '00000000-0000-0000-0000-0000deadbeef',
+ '--path', '/etc/ceph/a', '--uid', '0']
+ ) as ctx:
+ with pytest.raises(_cephadm.Error, match='together'):
+ _cephadm.command_deploy_file(ctx)
+
def test_command_sysctl_dir_list(self, cephadm_fs, capsys):
from cephadmlib.constants import SYSCTL_DIR
cephadm_fs.create_dir(SYSCTL_DIR)
if match:
continue
self.log.info(f'Updating {host}:{path}')
- self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
+ with self.mgr.async_timeout_handler(host, f'cephadm deploy-file ({path})'):
+ self.mgr.wait_async(self._deploy_file_via_cephadm(
+ host, path, content, mode, uid, gid))
self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
updated_files = True
for path in old_files.keys():
command: Union[str, List[str]],
args: List[str],
addr: Optional[str] = "",
- stdin: Optional[str] = "",
+ stdin: Optional[Union[str, bytes]] = "",
no_fsid: Optional[bool] = False,
error_ok: Optional[bool] = False,
image: Optional[str] = "",
# agent has cephadm binary as an extra file which is
# therefore passed over stdin. Even for debug logs it's too much
if stdin and 'agent' not in str(entity):
- self.log.debug('stdin: %s' % stdin)
+ if isinstance(stdin, bytes):
+ self.log.debug('stdin: <binary len %d>', len(stdin))
+ else:
+ self.log.debug('stdin: %s', stdin)
# If SSH hardening is enabled, call invoker directly without which python
if self.mgr.sudo_hardening and self.mgr.invoker_path:
return f"Host {host} failed to login to all registries"
return None
+ async def _deploy_file_via_cephadm(
+ self,
+ host: str,
+ path: str,
+ content: bytes,
+ mode: Optional[int] = None,
+ uid: Optional[int] = None,
+ gid: Optional[int] = None,
+ addr: Optional[str] = None,
+ ) -> None:
+ """Write a host file using ``cephadm deploy-file`` (stdin = raw file bytes)."""
+ args: List[str] = ['--path', path]
+ if mode is not None:
+ args.extend(['--mode', oct(mode)[2:]])
+ if uid is not None and gid is not None:
+ args.extend(['--uid', str(uid), '--gid', str(gid)])
+ await self._run_cephadm(
+ host,
+ cephadmNoImage,
+ 'deploy-file',
+ args,
+ stdin=content,
+ addr=addr or '',
+ )
+
async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
# Use tee (from coreutils) to create a copy of cephadm on the target machine
self.log.info(f"Deploying cephadm binary to {host}")
from contextlib import contextmanager
from io import StringIO
from shlex import quote
-from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union
+from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union, Any
from orchestrator import OrchestratorError
try:
async def _execute_command(self,
host: str,
cmd_components: RemoteCommand,
- stdin: Optional[str] = None,
+ stdin: Optional[Union[str, bytes]] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True,
) -> Tuple[str, str, int]:
# Retry logic for transient connection/channel errors
for attempt in range(self.SSH_RETRY_COUNT):
try:
- r = await conn.run(str(rcmd), input=stdin)
+ run_kw: Dict[str, Any] = {}
+ if stdin is not None:
+ run_kw['input'] = stdin
+ # Bytes stdin: use encoding=None (else asyncssh expects str).
+ if isinstance(stdin, bytes):
+ run_kw['encoding'] = None
+ r = await conn.run(str(rcmd), **run_kw)
break # Success, exit retry loop
# Handle retryable exceptions (connection/channel errors)
# Note: handle these Exceptions otherwise you might get a weird error like
def execute_command(self,
host: str,
cmd: RemoteCommand,
- stdin: Optional[str] = None,
+ stdin: Optional[Union[str, bytes]] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True
) -> Tuple[str, str, int]:
async def _check_execute_command(self,
host: str,
cmd: RemoteCommand,
- stdin: Optional[str] = None,
+ stdin: Optional[Union[str, bytes]] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True
) -> str:
def check_execute_command(self,
host: str,
cmd: RemoteCommand,
- stdin: Optional[str] = None,
+ stdin: Optional[Union[str, bytes]] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True,
) -> str:
gid: Optional[int] = None,
addr: Optional[str] = None,
) -> None:
+ """This method will be used to only write cephadm binary when sudo_hardening is disbaled.
+ Other host files are written via ``cephadm deploy-file`` on the target host.
+ """
try:
cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}"
dirname = os.path.dirname(path)
except OSError:
pass
- def write_remote_file(self,
- host: str,
- path: str,
- content: bytes,
- mode: Optional[int] = None,
- uid: Optional[int] = None,
- gid: Optional[int] = None,
- addr: Optional[str] = None,
- ) -> None:
- with self.mgr.async_timeout_handler(host, f'writing file {path}'):
- self.mgr.wait_async(self._write_remote_file(
- host, path, content, mode, uid, gid, addr))
-
async def _reset_con(self, host: str) -> None:
conn = self.cons.get(host)
if conn:
@mock.patch("cephadm.ssh.SSHManager._remote_connection")
@mock.patch("cephadm.ssh.SSHManager._execute_command")
@mock.patch("cephadm.ssh.SSHManager._check_execute_command")
- @mock.patch("cephadm.ssh.SSHManager._write_remote_file")
- def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module):
- _write_file.side_effect = async_side_effect(None)
+ @mock.patch("cephadm.serve.CephadmServe._deploy_file_via_cephadm", new_callable=mock.AsyncMock)
+ def test_etc_ceph(self, _deploy_file_via_cephadm, check_execute_command, execute_command, remote_connection, cephadm_module):
+ _deploy_file_via_cephadm.side_effect = async_side_effect(None)
check_execute_command.side_effect = async_side_effect('')
execute_command.side_effect = async_side_effect(('{}', '', 0))
remote_connection.side_effect = async_side_effect(mock.Mock())
CephadmServe(cephadm_module)._write_all_client_files()
# Make sure both ceph conf locations (default and per fsid) are called
- _write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'',
- 0o644, 0, 0, None),
- mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'',
- 0o644, 0, 0, None)]
- )
+ _deploy_file_via_cephadm.assert_has_calls(
+ [
+ mock.call(
+ 'test',
+ '/etc/ceph/ceph.conf',
+ b'',
+ 0o644,
+ 0,
+ 0,
+ ),
+ mock.call(
+ 'test',
+ '/var/lib/ceph/fsid/config/ceph.conf',
+ b'',
+ 0o644,
+ 0,
+ 0,
+ ),
+ ],
+ )
ceph_conf_files = cephadm_module.cache.get_host_client_files('test')
assert len(ceph_conf_files) == 2
assert '/etc/ceph/ceph.conf' in ceph_conf_files
# set extra config and expect that we deploy another ceph.conf
cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
CephadmServe(cephadm_module)._write_all_client_files()
- _write_file.assert_has_calls([mock.call('test',
- '/etc/ceph/ceph.conf',
- b'[mon]\nk=v\n', 0o644, 0, 0, None),
- mock.call('test',
- '/var/lib/ceph/fsid/config/ceph.conf',
- b'[mon]\nk=v\n', 0o644, 0, 0, None)])
+ _deploy_file_via_cephadm.assert_has_calls(
+ [
+ mock.call(
+ 'test',
+ '/etc/ceph/ceph.conf',
+ b'[mon]\nk=v\n',
+ 0o644,
+ 0,
+ 0,
+ ),
+ mock.call(
+ 'test',
+ '/var/lib/ceph/fsid/config/ceph.conf',
+ b'[mon]\nk=v\n',
+ 0o644,
+ 0,
+ 0,
+ ),
+ ],
+ )
# reload
cephadm_module.cache.last_client_files = {}
cephadm_module.cache.load()
assert f1_before_digest != f1_after_digest
assert f2_before_digest != f2_after_digest
+ @mock.patch('cephadm.ssh.SSHManager._deploy_cephadm_binary_via_invoker',
+ new_callable=mock.AsyncMock)
+ @mock.patch('cephadm.ssh.SSHManager._write_remote_file', new_callable=mock.AsyncMock)
+ def test_deploy_cephadm_binary_uses_write_remote_file_without_sudo_hardening(
+ self, _write_remote_file, _deploy_via_invoker, cephadm_module):
+ """Without sudo hardening, the mgr stages the cephadm binary via _write_remote_file."""
+ cephadm_module.sudo_hardening = False
+ cephadm_module.invoker_path = ''
+ cephadm_module.cephadm_binary_path = (
+ f'/var/lib/ceph/{cephadm_module._cluster_fsid}/cephadm.deadbeef')
+ fake_bin = b'#!/usr/bin/fake-cephadm\n'
+ cephadm_module._cephadm = fake_bin
+
+ _write_remote_file.side_effect = async_side_effect(None)
+ cephadm_module.wait_async(
+ CephadmServe(cephadm_module)._deploy_cephadm_binary('testhost'))
+
+ _write_remote_file.assert_called_once()
+ _write_remote_file.assert_called_with(
+ 'testhost',
+ cephadm_module.cephadm_binary_path,
+ fake_bin,
+ addr=None,
+ mode=0o744,
+ )
+ _deploy_via_invoker.assert_not_called()
+
@mock.patch("cephadm.inventory.HostCache.get_host_client_files")
def test_dont_write_client_files_to_unreachable_hosts(self, _get_client_files, cephadm_module):
cephadm_module.inventory.add_host(HostSpec('host1', '1.2.3.1')) # online
assert _run_cephadm.call_count == 2
@mock.patch('cephadm.tuned_profiles.TunedProfileUtils._sysctl_dir_apply_system')
- @mock.patch('cephadm.ssh.SSHManager.write_remote_file')
- def test_write_tuned_profiles(self, _write_remote_file, _sysctl_dir_apply_system):
+ @mock.patch('cephadm.serve.CephadmServe._deploy_file_via_cephadm', new_callable=mock.AsyncMock)
+ def test_write_tuned_profiles(self, _deploy_file_via_cephadm, _sysctl_dir_apply_system):
profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
# for this test we will use host "a" and have it so host_needs_tuned_profile_update
# returns True for p2 and False for p1 (see FakeCache class). So we should see
# one write for p2 and sysctl-dir --apply-system via cephadm.
- _write_remote_file.return_value = 'success'
+ _deploy_file_via_cephadm.return_value = None
mgr = FakeMgr(['a', 'b', 'c'],
['a', 'b', 'c'],
[],
tp = TunedProfileUtils(mgr)
tp._write_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2]))
_sysctl_dir_apply_system.assert_called_once_with('a')
- _write_remote_file.assert_called_with(
+ _deploy_file_via_cephadm.assert_called_with(
'a', f'{SYSCTL_DIR}/p2-cephadm-tuned-profile.conf', tp._profile_to_str(self.tspec2).encode('utf-8'))
def test_dont_write_to_unreachable_hosts(self):
for profile_name, content in p.items():
if self.mgr.cache.host_needs_tuned_profile_update(host, profile_name):
logger.info(f'Writing tuned profile {profile_name} to host {host}')
- profile_filename: str = f'{SYSCTL_DIR}/{profile_name}-cephadm-tuned-profile.conf'
- self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8'))
+ profile_filename: str = (
+ f'{SYSCTL_DIR}/{profile_name}-cephadm-tuned-profile.conf')
+ with self.mgr.async_timeout_handler(host, f'cephadm deploy-file ({profile_filename})'):
+ self.mgr.wait_async(
+ CephadmServe(self.mgr)._deploy_file_via_cephadm(
+ host, profile_filename, content.encode('utf-8')))
updated = True
if updated:
self._sysctl_dir_apply_system(host)