) -> None:
file_obj.write(f'# init container {index}: {init_container.cname}\n')
_bash_cmd(file_obj, init_container.run_cmd())
- _write_init_container_cmds_clean(ctx, file_obj, init_container, comment='')
+ _write_init_container_cmds_clean(
+ ctx, file_obj, init_container, comment=''
+ )
def _write_init_container_cmds_clean(
def _write_stop_actions(
- ctx: CephadmContext, f: TextIO, container: 'CephContainer', timeout: Optional[int]
+ ctx: CephadmContext,
+ f: TextIO,
+ container: 'CephContainer',
+ timeout: Optional[int],
) -> None:
# following generated script basically checks if the container exists
# before stopping it. Exit code will be success either if it doesn't
# exist or if it exists and is stopped successfully.
container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
- f.write(f'! {container_exists % container.old_cname} || {" ".join(container.stop_cmd(old_cname=True, timeout=timeout))} \n')
- f.write(f'! {container_exists % container.cname} || {" ".join(container.stop_cmd(timeout=timeout))} \n')
+ f.write(
+ f'! {container_exists % container.old_cname} || {" ".join(container.stop_cmd(old_cname=True, timeout=timeout))} \n'
+ )
+ f.write(
+ f'! {container_exists % container.cname} || {" ".join(container.stop_cmd(timeout=timeout))} \n'
+ )
def _bash_cmd(