import time
import errno
import ssl
-from typing import Dict, List, Tuple, Optional, Union, Any, Callable, IO, Sequence, TypeVar, cast, Iterable, TextIO
+from typing import Dict, List, Tuple, Optional, Union, Any, Callable, Sequence, TypeVar, cast, Iterable
import re
import uuid
)
from cephadmlib.systemd import check_unit, check_units
from cephadmlib import systemd_unit
+from cephadmlib import runscripts
from cephadmlib.container_types import (
CephContainer,
InitContainer,
call_throws(ctx, ['systemctl', 'restart', ident.unit_name])
-def _bash_cmd(
- fh: IO[str],
- cmd: List[str],
- check: bool = True,
- background: bool = False,
- stderr: bool = True,
-) -> None:
- line = ' '.join(shlex.quote(arg) for arg in cmd)
- if not check:
- line = f'! {line}'
- if not stderr:
- line = f'{line} 2> /dev/null'
- if background:
- line = f'{line} &'
- fh.write(line)
- fh.write('\n')
-
-
-def _write_container_cmd_to_bash(
- ctx: CephadmContext,
- file_obj: IO[str],
- container: 'CephContainer',
- comment: Optional[str] = None,
- background: Optional[bool] = False,
-) -> None:
- if comment:
- # Sometimes adding a comment, especially if there are multiple containers in one
- # unit file, makes it easier to read and grok.
- assert '\n' not in comment
- file_obj.write(f'# {comment}\n')
- # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
- _bash_cmd(
- file_obj, container.rm_cmd(old_cname=True), check=False, stderr=False
- )
- _bash_cmd(file_obj, container.rm_cmd(), check=False, stderr=False)
-
- # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
- if isinstance(ctx.container_engine, Podman):
- _bash_cmd(
- file_obj,
- container.rm_cmd(storage=True),
- check=False,
- stderr=False,
- )
- _bash_cmd(
- file_obj,
- container.rm_cmd(old_cname=True, storage=True),
- check=False,
- stderr=False,
- )
-
- # container run command
- _bash_cmd(file_obj, container.run_cmd(), background=bool(background))
-
-
-def _write_init_container_cmds(
- ctx: CephadmContext,
- file_obj: IO[str],
- index: int,
- init_container: 'InitContainer',
-) -> None:
- file_obj.write(f'# init container {index}: {init_container.cname}\n')
- _bash_cmd(file_obj, init_container.run_cmd())
- _write_init_container_cmds_clean(ctx, file_obj, init_container, comment='')
-
-
-def _write_init_container_cmds_clean(
- ctx: CephadmContext,
- file_obj: IO[str],
- init_container: 'InitContainer',
- comment: str = 'init container cleanup',
-) -> None:
- if comment:
- assert '\n' not in comment
- file_obj.write(f'# {comment}\n')
- _bash_cmd(
- file_obj,
- init_container.rm_cmd(),
- check=False,
- stderr=False,
- )
- # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
- if isinstance(ctx.container_engine, Podman):
- _bash_cmd(
- file_obj,
- init_container.rm_cmd(storage=True),
- check=False,
- stderr=False,
- )
-
-
def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str) -> None:
# systemd may fail to cleanup cgroups from previous stopped unit, which will cause next "systemctl start" to fail.
# see https://tracker.ceph.com/issues/50998
endpoints: Optional[List[EndPoint]] = None,
init_containers: Optional[List['InitContainer']] = None,
) -> None:
- # cmd
-
- # unpack values from ident because they're used very frequently
- fsid = ident.fsid
- daemon_type = ident.daemon_type
- daemon_id = ident.daemon_id
-
data_dir = ident.data_dir(ctx.data_dir)
- run_file_path = data_dir + '/unit.run'
- meta_file_path = data_dir + '/unit.meta'
- with write_new(run_file_path) as f, write_new(meta_file_path) as metaf:
-
- f.write('set -e\n')
-
- if daemon_type in ceph_daemons():
- install_path = find_program('install')
- f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
+ pre_start_commands: List[runscripts.Command] = []
+ post_stop_commands: List[runscripts.Command] = []
+
+ if ident.daemon_type in ceph_daemons():
+ install_path = find_program('install')
+ pre_start_commands.append('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=ident.fsid, uid=uid, gid=gid))
+ if ident.daemon_type == 'osd':
+ assert osd_fsid
+ pre_start_commands.extend(_osd_unit_run_commands(
+ ctx, ident, osd_fsid, data_dir, uid, gid
+ ))
+ post_stop_commands.extend(
+ _osd_unit_poststop_commands(ctx, ident, osd_fsid)
+ )
+ if ident.daemon_type == CephIscsi.daemon_type:
+ pre_start_commands.extend(_iscsi_unit_run_commands(ctx, ident, data_dir))
+ post_stop_commands.extend(_iscsi_unit_poststop_commands(ctx, ident, data_dir))
- # pre-start cmd(s)
- if daemon_type == 'osd':
- assert osd_fsid
- _write_osd_unit_run_commands(
- ctx, f, ident, osd_fsid, data_dir, uid, gid
- )
- elif daemon_type == CephIscsi.daemon_type:
- _write_iscsi_unit_run_commands(ctx, f, ident, data_dir)
- init_containers = init_containers or []
- if init_containers:
- _write_init_container_cmds_clean(ctx, f, init_containers[0])
- for idx, ic in enumerate(init_containers):
- _write_init_container_cmds(ctx, f, idx, ic)
-
- _write_container_cmd_to_bash(ctx, f, container, '%s.%s' % (daemon_type, str(daemon_id)))
-
- # some metadata about the deploy
- meta: Dict[str, Any] = fetch_meta(ctx)
- meta.update({
- 'memory_request': int(ctx.memory_request) if ctx.memory_request else None,
- 'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None,
- })
- if not meta.get('ports'):
- if endpoints:
- meta['ports'] = [e.port for e in endpoints]
- else:
- meta['ports'] = []
- metaf.write(json.dumps(meta, indent=4) + '\n')
-
- timeout = 30 if daemon_type == 'osd' else None
- # post-stop command(s)
- with write_new(data_dir + '/unit.poststop') as f:
- # this is a fallback to eventually stop any underlying container that was not stopped properly by unit.stop,
- # this could happen in very slow setups as described in the issue https://tracker.ceph.com/issues/58242.
- _write_stop_actions(ctx, cast(TextIO, f), container, timeout)
- if daemon_type == 'osd':
- assert osd_fsid
- _write_osd_unit_poststop_commands(ctx, f, ident, osd_fsid)
- elif daemon_type == CephIscsi.daemon_type:
- _write_iscsi_unit_poststop_commands(ctx, f, ident, data_dir)
-
- # post-stop command(s)
- with write_new(data_dir + '/unit.stop') as f:
- _write_stop_actions(ctx, cast(TextIO, f), container, timeout)
-
- if container:
- with write_new(data_dir + '/unit.image') as f:
- f.write(container.image + '\n')
+ runscripts.write_service_scripts(
+ ctx,
+ ident,
+ container=container,
+ init_containers=init_containers,
+ endpoints=endpoints,
+ pre_start_commands=pre_start_commands,
+ post_stop_commands=post_stop_commands,
+ timeout=30 if ident.daemon_type == 'osd' else None,
+ )
# sysctl
- install_sysctl(ctx, fsid, daemon_form_create(ctx, ident))
+ install_sysctl(ctx, ident.fsid, daemon_form_create(ctx, ident))
# systemd
systemd_unit.update_files(ctx, ident)
call_throws(ctx, ['systemctl', 'daemon-reload'])
- unit_name = get_unit_name(fsid, daemon_type, daemon_id)
+ unit_name = get_unit_name(ident.fsid, ident.daemon_type, ident.daemon_id)
call(ctx, ['systemctl', 'stop', unit_name],
verbosity=CallVerbosity.DEBUG)
call(ctx, ['systemctl', 'reset-failed', unit_name],
if enable:
call_throws(ctx, ['systemctl', 'enable', unit_name])
if start:
- clean_cgroup(ctx, fsid, unit_name)
+ clean_cgroup(ctx, ident.fsid, unit_name)
call_throws(ctx, ['systemctl', 'start', unit_name])
-def _write_stop_actions(
- ctx: CephadmContext, f: TextIO, container: 'CephContainer', timeout: Optional[int]
-) -> None:
- # following generated script basically checks if the container exists
- # before stopping it. Exit code will be success either if it doesn't
- # exist or if it exists and is stopped successfully.
- container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
- f.write(f'! {container_exists % container.old_cname} || {" ".join(container.stop_cmd(old_cname=True, timeout=timeout))} \n')
- f.write(f'! {container_exists % container.cname} || {" ".join(container.stop_cmd(timeout=timeout))} \n')
-
-
-def _write_osd_unit_run_commands(
+def _osd_unit_run_commands(
ctx: CephadmContext,
- f: IO,
ident: 'DaemonIdentity',
osd_fsid: str,
data_dir: str,
uid: int,
gid: int,
-) -> None:
+) -> List[runscripts.Command]:
+ cmds: List[runscripts.Command] = []
# osds have a pre-start step
simple_fn = os.path.join('/etc/ceph/osd',
'%s-%s.json.adopted-by-cephadm' % (ident.daemon_id, osd_fsid))
if os.path.exists(simple_fn):
- f.write('# Simple OSDs need chown on startup:\n')
+ cmds.append('# Simple OSDs need chown on startup:\n')
for n in ['block', 'block.db', 'block.wal']:
p = os.path.join(data_dir, n)
- f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
+ cmds.append('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
else:
# if ceph-volume does not support 'ceph-volume activate', we must
# do 'ceph-volume lvm activate'.
bind_mounts=get_container_binds(ctx, ident),
cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
)
- _write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate')
+ cmds.append(runscripts.ContainerCommand(prestart, comment='LVM OSDs use ceph-volume lvm activate'))
+ return cmds
-def _write_iscsi_unit_run_commands(
- ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', data_dir: str
-) -> None:
- f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
+def _iscsi_unit_run_commands(
+ ctx: CephadmContext, ident: 'DaemonIdentity', data_dir: str
+) -> List[runscripts.Command]:
+ cmds: List[runscripts.Command] = []
+ cmds.append(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
ceph_iscsi = CephIscsi.init(ctx, ident.fsid, ident.daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
- _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runner container', background=True)
+ cmds.append(runscripts.ContainerCommand(tcmu_container, comment='iscsi tcmu-runner container', background=True))
+ return cmds
-def _write_osd_unit_poststop_commands(
- ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', osd_fsid: str
-) -> None:
+def _osd_unit_poststop_commands(
+ ctx: CephadmContext, ident: 'DaemonIdentity', osd_fsid: str
+) -> List[runscripts.Command]:
poststop = get_ceph_volume_container(
ctx,
args=[
bind_mounts=get_container_binds(ctx, ident),
cname='ceph-%s-%s.%s-deactivate' % (ident.fsid, ident.daemon_type, ident.daemon_id),
)
- _write_container_cmd_to_bash(ctx, f, poststop, 'deactivate osd')
+ return [runscripts.ContainerCommand(poststop, comment='deactivate osd')]
-def _write_iscsi_unit_poststop_commands(
- ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', data_dir: str
-) -> None:
+def _iscsi_unit_poststop_commands(
+ ctx: CephadmContext, ident: 'DaemonIdentity', data_dir: str
+) -> List[runscripts.Command]:
# make sure we also stop the tcmu container
+ cmds: List[runscripts.Command] = []
runtime_dir = '/run'
ceph_iscsi = CephIscsi.init(ctx, ident.fsid, ident.daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
- f.write('! ' + ' '.join(tcmu_container.stop_cmd()) + '\n')
- f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n')
- f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n')
- f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
+ cmds.append('! ' + ' '.join(tcmu_container.stop_cmd()) + '\n')
+ cmds.append('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n')
+ cmds.append('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n')
+ cmds.append(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
+ return cmds
##################################
--- /dev/null
+import contextlib
+import json
+import pathlib
+import shlex
+
+from typing import Any, Dict, Union, List, IO, TextIO, Optional, cast
+
+from .container_engines import Podman
+from .container_types import CephContainer, InitContainer
+from .context import CephadmContext
+from .context_getters import fetch_meta
+from .daemon_identity import DaemonIdentity
+from .file_utils import write_new
+from .net_utils import EndPoint
+
+
+# Ideally, all ContainerCommands would be converted to init containers. Until
+# that is done one can wrap a CephContainer in a ContainerCommand object and
+# pass that as a pre- or post- command to run arbitrary container based
+# commands in the script.
+class ContainerCommand:
+ def __init__(
+ self,
+ container: CephContainer,
+ comment: str = '',
+ background: bool = False,
+ ):
+ self.container = container
+ self.comment = comment
+ self.background = background
+
+
+Command = Union[List[str], str, ContainerCommand]
+
+
+def write_service_scripts(
+ ctx: CephadmContext,
+ ident: DaemonIdentity,
+ *,
+ container: CephContainer,
+ init_containers: Optional[List[InitContainer]] = None,
+ endpoints: Optional[List[EndPoint]] = None,
+ pre_start_commands: Optional[List[Command]] = None,
+ post_stop_commands: Optional[List[Command]] = None,
+ timeout: Optional[int] = None,
+) -> None:
+ """Write the scripts that systemd services will call in order to
+ start/stop/etc components of a cephadm managed daemon. Also writes some
+ metadata about the service getting deployed.
+ """
+ data_dir = pathlib.Path(ident.data_dir(ctx.data_dir))
+ run_file_path = data_dir / 'unit.run'
+ meta_file_path = data_dir / 'unit.meta'
+ post_stop_file_path = data_dir / 'unit.poststop'
+ stop_file_path = data_dir / 'unit.stop'
+ image_file_path = data_dir / 'unit.image'
+ # use an ExitStack to make writing the files an all-or-nothing affair. If
+ # any file fails to write then the write_new'd file will not get renamed
+ # into place
+ with contextlib.ExitStack() as estack:
+ # write out the main file to run (start) a service
+ runf = estack.enter_context(write_new(run_file_path))
+ runf.write('set -e\n')
+ for command in pre_start_commands or []:
+ _write_command(ctx, runf, command)
+ init_containers = init_containers or []
+ if init_containers:
+ _write_init_container_cmds_clean(ctx, runf, init_containers[0])
+ for idx, ic in enumerate(init_containers):
+ _write_init_container_cmds(ctx, runf, idx, ic)
+ _write_container_cmd_to_bash(ctx, runf, container, ident.daemon_name)
+
+ # some metadata about the deploy
+ metaf = estack.enter_context(write_new(meta_file_path))
+ meta: Dict[str, Any] = fetch_meta(ctx)
+ meta.update(
+ {
+ 'memory_request': int(ctx.memory_request)
+ if ctx.memory_request
+ else None,
+ 'memory_limit': int(ctx.memory_limit)
+ if ctx.memory_limit
+ else None,
+ }
+ )
+ if not meta.get('ports'):
+ if endpoints:
+ meta['ports'] = [e.port for e in endpoints]
+ else:
+ meta['ports'] = []
+ metaf.write(json.dumps(meta, indent=4) + '\n')
+
+ # post-stop command(s)
+ pstopf = estack.enter_context(write_new(post_stop_file_path))
+ # this is a fallback to eventually stop any underlying container that
+ # was not stopped properly by unit.stop, this could happen in very slow
+ # setups as described in the issue
+ # https://tracker.ceph.com/issues/58242.
+ _write_stop_actions(ctx, cast(TextIO, pstopf), container, timeout)
+ for command in post_stop_commands or []:
+ _write_command(ctx, pstopf, command)
+
+ # stop command(s)
+ stopf = estack.enter_context(write_new(stop_file_path))
+ _write_stop_actions(ctx, cast(TextIO, stopf), container, timeout)
+
+ if container:
+ imgf = estack.enter_context(write_new(image_file_path))
+ imgf.write(container.image + '\n')
+
+
+def _write_container_cmd_to_bash(
+ ctx: CephadmContext,
+ file_obj: IO[str],
+ container: 'CephContainer',
+ comment: Optional[str] = None,
+ background: Optional[bool] = False,
+) -> None:
+ if comment:
+ # Sometimes adding a comment, especially if there are multiple containers in one
+ # unit file, makes it easier to read and grok.
+ assert '\n' not in comment
+ file_obj.write(f'# {comment}\n')
+ # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
+ _bash_cmd(
+ file_obj, container.rm_cmd(old_cname=True), check=False, stderr=False
+ )
+ _bash_cmd(file_obj, container.rm_cmd(), check=False, stderr=False)
+
+ # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
+ if isinstance(ctx.container_engine, Podman):
+ _bash_cmd(
+ file_obj,
+ container.rm_cmd(storage=True),
+ check=False,
+ stderr=False,
+ )
+ _bash_cmd(
+ file_obj,
+ container.rm_cmd(old_cname=True, storage=True),
+ check=False,
+ stderr=False,
+ )
+
+ # container run command
+ _bash_cmd(file_obj, container.run_cmd(), background=bool(background))
+
+
+def _write_init_container_cmds(
+ ctx: CephadmContext,
+ file_obj: IO[str],
+ index: int,
+ init_container: 'InitContainer',
+) -> None:
+ file_obj.write(f'# init container {index}: {init_container.cname}\n')
+ _bash_cmd(file_obj, init_container.run_cmd())
+ _write_init_container_cmds_clean(ctx, file_obj, init_container, comment='')
+
+
+def _write_init_container_cmds_clean(
+ ctx: CephadmContext,
+ file_obj: IO[str],
+ init_container: 'InitContainer',
+ comment: str = 'init container cleanup',
+) -> None:
+ if comment:
+ assert '\n' not in comment
+ file_obj.write(f'# {comment}\n')
+ _bash_cmd(
+ file_obj,
+ init_container.rm_cmd(),
+ check=False,
+ stderr=False,
+ )
+ # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
+ if isinstance(ctx.container_engine, Podman):
+ _bash_cmd(
+ file_obj,
+ init_container.rm_cmd(storage=True),
+ check=False,
+ stderr=False,
+ )
+
+
+def _write_stop_actions(
+ ctx: CephadmContext, f: TextIO, container: 'CephContainer', timeout: Optional[int]
+) -> None:
+ # following generated script basically checks if the container exists
+ # before stopping it. Exit code will be success either if it doesn't
+ # exist or if it exists and is stopped successfully.
+ container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
+ f.write(f'! {container_exists % container.old_cname} || {" ".join(container.stop_cmd(old_cname=True, timeout=timeout))} \n')
+ f.write(f'! {container_exists % container.cname} || {" ".join(container.stop_cmd(timeout=timeout))} \n')
+
+
+def _bash_cmd(
+ fh: IO[str],
+ cmd: List[str],
+ check: bool = True,
+ background: bool = False,
+ stderr: bool = True,
+) -> None:
+ line = ' '.join(shlex.quote(arg) for arg in cmd)
+ if not check:
+ line = f'! {line}'
+ if not stderr:
+ line = f'{line} 2> /dev/null'
+ if background:
+ line = f'{line} &'
+ fh.write(line)
+ fh.write('\n')
+
+
+def _write_command(
+ ctx: CephadmContext,
+ fh: IO[str],
+ cmd: Command,
+) -> None:
+ """Wrapper func for turning a command list or string into something suitable
+ for appending to a run script.
+ """
+ if isinstance(cmd, list):
+ _bash_cmd(fh, cmd)
+ elif isinstance(cmd, ContainerCommand):
+ _write_container_cmd_to_bash(
+ ctx,
+ fh,
+ cmd.container,
+ comment=cmd.comment,
+ background=cmd.background,
+ )
+ else:
+ fh.write(cmd)
+ if not cmd.endswith('\n'):
+ fh.write('\n')