cmd_args.extend(['--ulimit', 'nofile=1048576'])
if self.memory_request:
- cmd_args.extend(['-e', 'POD_MEMORY_REQUEST', str(self.memory_request)])
+ cmd_args.extend(
+ ['-e', 'POD_MEMORY_REQUEST', str(self.memory_request)]
+ )
if self.memory_limit:
- cmd_args.extend(['-e', 'POD_MEMORY_LIMIT', str(self.memory_limit)])
+ cmd_args.extend(
+ ['-e', 'POD_MEMORY_LIMIT', str(self.memory_limit)]
+ )
cmd_args.extend(['--memory', str(self.memory_limit)])
if self.network:
if self.entrypoint:
cmd_args.extend(['--entrypoint', self.entrypoint])
if self.privileged:
- cmd_args.extend([
- '--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk'])
+ cmd_args.extend(
+ [
+ '--privileged',
+ # let OSD etc read block devs that haven't been chowned
+ '--group-add=disk',
+ ]
+ )
if self.ptrace and not self.privileged:
# if privileged, the SYS_PTRACE cap is already added
# in addition, --cap-add and --privileged are mutually
cmd_args.extend(['--name', self.cname])
envs: List[str] = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
+ '-e',
+ 'CONTAINER_IMAGE=%s' % self.image,
]
if self.envs:
for env in self.envs:
+ self.args
)
- def build_rm_cmd(self, cname: str = '', storage: bool = False) -> List[str]:
+ def build_rm_cmd(
+ self, cname: str = '', storage: bool = False
+ ) -> List[str]:
cmd = [
self._container_engine,
'rm',
cmd.append(cname or self.cname)
return cmd
- def build_stop_cmd(self, cname: str = '', timeout: Optional[int] = None) -> List[str]:
+ def build_stop_cmd(
+ self, cname: str = '', timeout: Optional[int] = None
+ ) -> List[str]:
cmd = [self._container_engine, 'stop']
if timeout is not None:
cmd.extend(('-t', str(timeout)))
class CephContainer(BasicContainer):
- def __init__(self,
- ctx: CephadmContext,
- image: str,
- entrypoint: str,
- args: List[str] = [],
- volume_mounts: Dict[str, str] = {},
- identity: Optional['DaemonIdentity'] = None,
- cname: str = '',
- container_args: List[str] = [],
- envs: Optional[List[str]] = None,
- privileged: bool = False,
- ptrace: bool = False,
- bind_mounts: Optional[List[List[str]]] = None,
- init: Optional[bool] = None,
- host_network: bool = True,
- memory_request: Optional[str] = None,
- memory_limit: Optional[str] = None,
- ) -> None:
+ def __init__(
+ self,
+ ctx: CephadmContext,
+ image: str,
+ entrypoint: str,
+ args: List[str] = [],
+ volume_mounts: Dict[str, str] = {},
+ identity: Optional['DaemonIdentity'] = None,
+ cname: str = '',
+ container_args: List[str] = [],
+ envs: Optional[List[str]] = None,
+ privileged: bool = False,
+ ptrace: bool = False,
+ bind_mounts: Optional[List[List[str]]] = None,
+ init: Optional[bool] = None,
+ host_network: bool = True,
+ memory_request: Optional[str] = None,
+ memory_limit: Optional[str] = None,
+ ) -> None:
self.ctx = ctx
self.image = image
self.entrypoint = entrypoint
self.network = 'host' if self.host_network else ''
@classmethod
- def for_daemon(cls,
- ctx: CephadmContext,
- ident: 'DaemonIdentity',
- entrypoint: str,
- args: List[str] = [],
- volume_mounts: Dict[str, str] = {},
- container_args: List[str] = [],
- envs: Optional[List[str]] = None,
- privileged: bool = False,
- ptrace: bool = False,
- bind_mounts: Optional[List[List[str]]] = None,
- init: Optional[bool] = None,
- host_network: bool = True,
- memory_request: Optional[str] = None,
- memory_limit: Optional[str] = None,
- ) -> 'CephContainer':
+ def for_daemon(
+ cls,
+ ctx: CephadmContext,
+ ident: 'DaemonIdentity',
+ entrypoint: str,
+ args: List[str] = [],
+ volume_mounts: Dict[str, str] = {},
+ container_args: List[str] = [],
+ envs: Optional[List[str]] = None,
+ privileged: bool = False,
+ ptrace: bool = False,
+ bind_mounts: Optional[List[List[str]]] = None,
+ init: Optional[bool] = None,
+ host_network: bool = True,
+ memory_request: Optional[str] = None,
+ memory_limit: Optional[str] = None,
+ ) -> 'CephContainer':
return cls(
ctx,
image=ctx.image,
self.envs.insert(0, 'NODE_NAME=%s' % get_hostname())
return self.build_run_cmd()
- def rm_cmd(self, old_cname: bool = False, storage: bool = False) -> List[str]:
+ def rm_cmd(
+ self, old_cname: bool = False, storage: bool = False
+ ) -> List[str]:
return self.build_rm_cmd(
cname=self.old_cname if old_cname else self.cname,
storage=storage,
)
- def stop_cmd(self, old_cname: bool = False, timeout: Optional[int] = None) -> List[str]:
+ def stop_cmd(
+ self, old_cname: bool = False, timeout: Optional[int] = None
+ ) -> List[str]:
return self.build_stop_cmd(
cname=self.old_cname if old_cname else self.cname,
timeout=timeout,
'--ipc=host',
]
envs: List[str] = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- '-e', 'NODE_NAME=%s' % get_hostname(),
+ '-e',
+ 'CONTAINER_IMAGE=%s' % self.image,
+ '-e',
+ 'NODE_NAME=%s' % get_hostname(),
]
vols: List[str] = []
binds: List[str] = []
if self.ctx.no_hosts:
cmd_args.append('--no-hosts')
if self.privileged:
- cmd_args.extend([
- '--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk',
- ])
+ cmd_args.extend(
+ [
+ '--privileged',
+ # let OSD etc read block devs that haven't been chowned
+ '--group-add=disk',
+ ]
+ )
if self.init:
cmd_args.append('--init')
if self.envs:
envs.extend(['-e', env])
vols = sum(
- [['-v', '%s:%s' % (host_dir, container_dir)]
- for host_dir, container_dir in self.volume_mounts.items()], [])
- binds = sum([['--mount', '{}'.format(','.join(bind))]
- for bind in self.bind_mounts], [])
+ [
+ ['-v', '%s:%s' % (host_dir, container_dir)]
+ for host_dir, container_dir in self.volume_mounts.items()
+ ],
+ [],
+ )
+ binds = sum(
+ [
+ ['--mount', '{}'.format(','.join(bind))]
+ for bind in self.bind_mounts
+ ],
+ [],
+ )
- return cmd_args + self.container_args + envs + vols + binds + [
- '--entrypoint', cmd[0],
- self.image,
- ] + cmd[1:]
+ return (
+ cmd_args
+ + self.container_args
+ + envs
+ + vols
+ + binds
+ + [
+ '--entrypoint',
+ cmd[0],
+ self.image,
+ ]
+ + cmd[1:]
+ )
def exec_cmd(self, cmd):
# type: (List[str]) -> List[str]
cname = get_running_container_name(self.ctx, self)
if not cname:
raise Error('unable to find container "{}"'.format(self.cname))
- return [
- str(self.ctx.container_engine.path),
- 'exec',
- ] + self.container_args + [
- self.cname,
- ] + cmd
+ return (
+ [
+ str(self.ctx.container_engine.path),
+ 'exec',
+ ]
+ + self.container_args
+ + [
+ self.cname,
+ ]
+ + cmd
+ )
- def run(self, timeout=DEFAULT_TIMEOUT, verbosity=CallVerbosity.VERBOSE_ON_FAILURE):
+ def run(
+ self,
+ timeout=DEFAULT_TIMEOUT,
+ verbosity=CallVerbosity.VERBOSE_ON_FAILURE,
+ ):
# type: (Optional[int], CallVerbosity) -> str
- out, _, _ = call_throws(self.ctx, self.run_cmd(),
- desc=self.entrypoint, timeout=timeout, verbosity=verbosity)
+ out, _, _ = call_throws(
+ self.ctx,
+ self.run_cmd(),
+ desc=self.entrypoint,
+ timeout=timeout,
+ verbosity=verbosity,
+ )
return out
return bool(get_running_container_name(ctx, c))
-def get_running_container_name(ctx: CephadmContext, c: 'CephContainer') -> Optional[str]:
+def get_running_container_name(
+ ctx: CephadmContext, c: 'CephContainer'
+) -> Optional[str]:
for name in [c.cname, c.old_cname]:
- out, err, ret = call(ctx, [
- ctx.container_engine.path, 'container', 'inspect',
- '--format', '{{.State.Status}}', name
- ])
+ out, err, ret = call(
+ ctx,
+ [
+ ctx.container_engine.path,
+ 'container',
+ 'inspect',
+ '--format',
+ '{{.State.Status}}',
+ name,
+ ],
+ )
if out.strip() == 'running':
return name
return None