from . import utils
from . import exchange
+from . import ssh
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw', 'nvmeof']
+WHICH = ssh.RemoteExecutable('which')
+CEPHADM_EXE = ssh.RemoteExecutable('/usr/bin/cephadm')
+
class CephadmServe:
"""
if path == '/etc/ceph/ceph.conf':
continue
self.log.info(f'Removing {host}:{path}')
- cmd = ['rm', '-f', path]
+ cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', path])
self.mgr.ssh.check_execute_command(host, cmd)
updated_files = True
self.mgr.cache.removed_client_file(host, path)
if stdin and 'agent' not in str(entity):
self.log.debug('stdin: %s' % stdin)
- cmd = ['which', 'python3']
+ cmd = ssh.RemoteCommand(WHICH, ['python3'])
python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
- cmd = [python, self.mgr.cephadm_binary_path] + final_args
+ # N.B. because the python3 executable is based on the results of the
+ # which command we can not know it ahead of time and must be converted
+ # into a RemoteExecutable.
+ cmd = ssh.RemoteCommand(
+ ssh.RemoteExecutable(python),
+ [self.mgr.cephadm_binary_path] + final_args
+ )
try:
out, err, code = await self.mgr.ssh._execute_command(
host, cmd, stdin=stdin, addr=addr)
if code == 2:
- ls_cmd = ['ls', self.mgr.cephadm_binary_path]
+ ls_cmd = ssh.RemoteCommand(
+ ssh.Executables.LS,
+ [self.mgr.cephadm_binary_path]
+ )
out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr,
log_command=log_output)
if code_ls == 2:
elif self.mgr.mode == 'cephadm-package':
try:
- cmd = ['/usr/bin/cephadm'] + final_args
+ cmd = ssh.RemoteCommand(CEPHADM_EXE, final_args)
out, err, code = await self.mgr.ssh._execute_command(
host, cmd, stdin=stdin, addr=addr)
except Exception as e:
+import enum
import logging
import os
import asyncio
"""
+class RemoteExecutable(str):
+ pass
+
+
+class RemoteCommand:
+ exe: RemoteExecutable
+ args: List[str]
+
+ def __init__(self, exe: RemoteExecutable, args: Optional[List[str]] = None) -> None:
+ self.exe = exe
+ self.args = args or []
+
+ def __iter__(self) -> Iterator[str]:
+ yield str(self.exe)
+ for arg in self.args:
+ yield arg
+
+ def quoted(self) -> Iterator[str]:
+ return (quote(a) for a in self)
+
+ def __str__(self) -> str:
+ return " ".join(self.quoted())
+
+ def __repr__(self) -> str:
+ # handy when debugging tests
+ return f'<RemoteCommand>({self.exe!r}, {self.args!r})'
+
+ def __eq__(self, other: object) -> bool:
+ # handy when working with unit tests
+ if not isinstance(other, self.__class__):
+ return NotImplemented
+ return other.exe == self.exe and other.args == self.args
+
+
+class RemoteSudoCommand(RemoteCommand):
+ use_sudo: bool = True
+
+ def __init__(
+ self, exe: RemoteExecutable, args: List[str], use_sudo: bool = True
+ ) -> None:
+ super().__init__(exe, args)
+ self.use_sudo = use_sudo
+
+ def __iter__(self) -> Iterator[str]:
+ if self.use_sudo:
+ yield 'sudo'
+ for a in super().__iter__():
+ yield a
+
+ @classmethod
+ def wrap(
+ cls, other: RemoteCommand, use_sudo: bool = True
+ ) -> 'RemoteSudoCommand':
+ return cls(other.exe, other.args, use_sudo)
+
+
+class Executables(RemoteExecutable, enum.Enum):
+ CHMOD = RemoteExecutable('chmod')
+ CHOWN = RemoteExecutable('chown')
+ LS = RemoteExecutable('ls')
+ MKDIR = RemoteExecutable('mkdir')
+ MV = RemoteExecutable('mv')
+ RM = RemoteExecutable('rm')
+ SYSCTL = RemoteExecutable('sysctl')
+ TOUCH = RemoteExecutable('touch')
+ TRUE = RemoteExecutable('true')
+
+ def __str__(self) -> str:
+ return self.value
+
+
class EventLoopThread(Thread):
def __init__(self) -> None:
async def _execute_command(self,
host: str,
- cmd_components: List[str],
+ cmd_components: RemoteCommand,
stdin: Optional[str] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True,
) -> Tuple[str, str, int]:
conn = await self._remote_connection(host, addr)
- sudo_prefix = "sudo " if self.mgr.ssh_user != 'root' else ""
- cmd = sudo_prefix + " ".join(quote(x) for x in cmd_components)
+ use_sudo = (self.mgr.ssh_user != 'root')
+ rcmd = RemoteSudoCommand.wrap(cmd_components, use_sudo=use_sudo)
try:
address = addr or self.mgr.inventory.get_addr(host)
except Exception:
address = host
if log_command:
- logger.debug(f'Running command: {cmd}')
+ logger.debug(f'Running command: {rcmd}')
try:
- r = await conn.run(f'{sudo_prefix}true', check=True, timeout=5) # host quick check
- r = await conn.run(cmd, input=stdin)
+ test_cmd = RemoteSudoCommand(
+ Executables.TRUE, [], use_sudo=use_sudo
+ )
+ r = await conn.run(str(test_cmd), check=True, timeout=5) # host quick check
+ r = await conn.run(str(rcmd), input=stdin)
# handle these Exceptions otherwise you might get a weird error like
# TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception)
except asyncssh.ChannelOpenError as e:
self.mgr.offline_hosts.add(host)
raise HostConnectionError(f'Unable to reach remote host {host}. {str(e)}', host, address)
except asyncssh.ProcessError as e:
- msg = f"Cannot execute the command '{cmd}' on the {host}. {str(e.stderr)}."
+ msg = f"Cannot execute the command '{rcmd}' on the {host}. {str(e.stderr)}."
logger.debug(msg)
await self._reset_con(host)
self.mgr.offline_hosts.add(host)
raise HostConnectionError(msg, host, address)
except Exception as e:
- msg = f"Generic error while executing command '{cmd}' on the host {host}. {str(e)}."
+ msg = f"Generic error while executing command '{rcmd}' on the host {host}. {str(e)}."
logger.debug(msg)
await self._reset_con(host)
self.mgr.offline_hosts.add(host)
def execute_command(self,
host: str,
- cmd: List[str],
+ cmd: RemoteCommand,
stdin: Optional[str] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True
async def _check_execute_command(self,
host: str,
- cmd: List[str],
+ cmd: RemoteCommand,
stdin: Optional[str] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True
def check_execute_command(self,
host: str,
- cmd: List[str],
+ cmd: RemoteCommand,
stdin: Optional[str] = None,
addr: Optional[str] = None,
log_command: Optional[bool] = True,
try:
cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}"
dirname = os.path.dirname(path)
- await self._check_execute_command(host, ['mkdir', '-p', dirname], addr=addr)
- await self._check_execute_command(host, ['mkdir', '-p', cephadm_tmp_dir + dirname], addr=addr)
+ mkdir = RemoteCommand(Executables.MKDIR, ['-p', dirname])
+ await self._check_execute_command(host, mkdir, addr=addr)
+ mkdir2 = RemoteCommand(Executables.MKDIR, ['-p', cephadm_tmp_dir + dirname])
+ await self._check_execute_command(host, mkdir2, addr=addr)
tmp_path = cephadm_tmp_dir + path + '.new'
- await self._check_execute_command(host, ['touch', tmp_path], addr=addr)
+ touch = RemoteCommand(Executables.TOUCH, [tmp_path])
+ await self._check_execute_command(host, touch, addr=addr)
if self.mgr.ssh_user != 'root':
assert self.mgr.ssh_user
- await self._check_execute_command(host, ['chown', '-R', self.mgr.ssh_user, cephadm_tmp_dir], addr=addr)
- await self._check_execute_command(host, ['chmod', str(644), tmp_path], addr=addr)
+ chown = RemoteCommand(
+ Executables.CHOWN,
+ ['-R', self.mgr.ssh_user, cephadm_tmp_dir]
+ )
+ await self._check_execute_command(host, chown, addr=addr)
+ chmod = RemoteCommand(Executables.CHMOD, [str(644), tmp_path])
+ await self._check_execute_command(host, chmod, addr=addr)
with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
os.fchmod(f.fileno(), 0o600)
f.write(content)
await sftp.put(f.name, tmp_path)
if uid is not None and gid is not None and mode is not None:
# shlex quote takes str or byte object, not int
- await self._check_execute_command(host, ['chown', '-R', str(uid) + ':' + str(gid), tmp_path], addr=addr)
- await self._check_execute_command(host, ['chmod', oct(mode)[2:], tmp_path], addr=addr)
- await self._check_execute_command(host, ['mv', tmp_path, path], addr=addr)
+ chown = RemoteCommand(
+ Executables.CHOWN,
+ ['-R', str(uid) + ':' + str(gid), tmp_path]
+ )
+ await self._check_execute_command(host, chown, addr=addr)
+ chmod = RemoteCommand(Executables.CHMOD, [oct(mode)[2:], tmp_path])
+ await self._check_execute_command(host, chmod, addr=addr)
+ mv = RemoteCommand(Executables.MV, [tmp_path, path])
+ await self._check_execute_command(host, mv, addr=addr)
except Exception as e:
msg = f"Unable to write {host}:{path}: {e}"
logger.exception(msg)
from cephadm.inventory import TunedProfileStore
from ceph.utils import datetime_now
from ceph.deployment.service_spec import TunedProfileSpec, PlacementSpec
-from cephadm.ssh import SSHManager
+from cephadm.ssh import SSHManager, RemoteCommand, Executables
from orchestrator import HostSpec
from typing import List, Dict
tp = TunedProfileUtils(mgr)
tp._remove_stray_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2]))
calls = [
- mock.call('a', ['ls', SYSCTL_DIR], log_command=False),
- mock.call('a', ['rm', '-f', f'{SYSCTL_DIR}/p3-cephadm-tuned-profile.conf']),
- mock.call('a', ['rm', '-f', f'{SYSCTL_DIR}/who-cephadm-tuned-profile.conf']),
- mock.call('a', ['sysctl', '--system'])
+ mock.call(
+ 'a', RemoteCommand(Executables.LS, [SYSCTL_DIR]), log_command=False
+ ),
+ mock.call(
+ 'a',
+ RemoteCommand(
+ Executables.RM,
+ ['-f', f'{SYSCTL_DIR}/p3-cephadm-tuned-profile.conf']
+ )
+ ),
+ mock.call(
+ 'a',
+ RemoteCommand(
+ Executables.RM,
+ ['-f', f'{SYSCTL_DIR}/who-cephadm-tuned-profile.conf']
+ )
+ ),
+ mock.call(
+ 'a', RemoteCommand(Executables.SYSCTL, ['--system'])
+ ),
]
_check_execute_command.assert_has_calls(calls, any_order=True)
profiles)
tp = TunedProfileUtils(mgr)
tp._write_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2]))
- _check_execute_command.assert_called_with('a', ['sysctl', '--system'])
+ _check_execute_command.assert_called_with(
+ 'a', RemoteCommand(Executables.SYSCTL, ['--system'])
+ )
_write_remote_file.assert_called_with(
'a', f'{SYSCTL_DIR}/p2-cephadm-tuned-profile.conf', tp._profile_to_str(self.tspec2).encode('utf-8'))
from ceph.utils import datetime_now
from .schedule import HostAssignment
from ceph.deployment.service_spec import ServiceSpec, TunedProfileSpec
+from . import ssh
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
SYSCTL_DIR = '/etc/sysctl.d'
+SYSCTL_SYSTEM_CMD = ssh.RemoteCommand(ssh.Executables.SYSCTL, ['--system'])
+
class TunedProfileUtils():
def __init__(self, mgr: "CephadmOrchestrator") -> None:
"""
if self.mgr.cache.is_host_unreachable(host):
return
- cmd = ['ls', SYSCTL_DIR]
+ cmd = ssh.RemoteCommand(ssh.Executables.LS, [SYSCTL_DIR])
found_files = self.mgr.ssh.check_execute_command(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
found_files = [s.strip() for s in found_files]
profile_names: List[str] = sum([[*p] for p in profiles], []) # extract all profiles names
continue
if file not in expected_files:
logger.info(f'Removing stray tuned profile file {file}')
- cmd = ['rm', '-f', f'{SYSCTL_DIR}/{file}']
+ cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', f'{SYSCTL_DIR}/{file}'])
self.mgr.ssh.check_execute_command(host, cmd)
updated = True
if updated:
- self.mgr.ssh.check_execute_command(host, ['sysctl', '--system'])
+ self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None:
if self.mgr.cache.is_host_unreachable(host):
self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8'))
updated = True
if updated:
- self.mgr.ssh.check_execute_command(host, ['sysctl', '--system'])
+ self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
self.mgr.cache.last_tuned_profile_update[host] = datetime_now()