def call_timeout(command, timeout):
- #type (List[str], int) -> int
+ # type: (List[str], int) -> int
logger.debug('Running command (timeout=%s): %s'
% (timeout, ' '.join(command)))
raise TimeoutExpired(msg)
def call_timeout_py2(command, timeout):
- #type (List[str], int) -> int
+ # type: (List[str], int) -> int
proc = subprocess.Popen(command)
thread = Thread(target=proc.wait)
thread.start()
return proc.returncode
def call_timeout_py3(command, timeout):
- #type (List[str], int) -> int
+ # type: (List[str], int) -> int
try:
return subprocess.call(command, timeout=timeout)
except subprocess.TimeoutExpired as e:
##################################
def is_available(what, func):
- # type (str, func, Optional[int]) -> func
+ # type: (str, func, Optional[int]) -> func
"""
Wait for a service to become available
for i in range(10))
def normalize_container_id(i):
+ # type: (str) -> str
# docker adds the sha256: prefix, but AFAICS both
# docker (18.09.7 in bionic at least) and podman
# both always use sha256, so leave off the prefix
return name
def get_unit_name(fsid, daemon_type, daemon_id=None):
- # type (str, str, Optional[Union[int, str]]) -> str
+ # type: (str, str, Optional[Union[int, str]]) -> str
# accept either name or type + id
if daemon_id is not None:
return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id)