@staticmethod
def get_version(container_id):
- # type(str) -> Optional[str]
+ # type: (str) -> Optional[str]
version = None
out, err, code = call(
[container_path, 'exec', container_id,
return version
def validate(self):
- # type () -> None
+ # type: () -> None
if not is_fsid(self.fsid):
raise Error('not an fsid: %s' % self.fsid)
if not self.daemon_id:
@staticmethod
def get_version(container_id):
- # type(str) -> Optional[str]
+ # type: (str) -> Optional[str]
version = None
out, err, code = call(
[container_path, 'exec', container_id,
return version
def validate(self):
- # type () -> None
+ # type: () -> None
if not is_fsid(self.fsid):
raise Error('not an fsid: %s' % self.fsid)
if not self.daemon_id:
@staticmethod
def configfs_mount_umount(data_dir, mount=True):
+ # type: (str, bool) -> List[str]
mount_path = os.path.join(data_dir, 'configfs')
if mount:
cmd = "if ! grep -qs {0} /proc/mounts; then " \
##################################
def get_supported_daemons():
+ # type: () -> List[str]
supported_daemons = list(Ceph.daemons)
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
##################################
def attempt_bind(s, address, port):
- # type (str) -> None
+ # type: (socket.socket, str, int) -> None
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((address, port))
s.close()
def port_in_use(port_num):
- # type (int) -> bool
+ # type: (int) -> bool
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
logger.info('Verifying port %d ...' % port_num)
try:
return False
def check_ip_port(ip, port):
+ # type: (str, int) -> None
if not args.skip_ping_check:
logger.info('Verifying IP %s port %d ...' % (ip, port))
if ip.startswith('[') or '::' in ip:
return os.path.abspath(p)
def get_file_timestamp(fn):
+ # type: (str) -> Optional[str]
try:
mt = os.path.getmtime(fn)
return datetime.datetime.fromtimestamp(
return None
def try_convert_datetime(s):
+ # type: (str) -> Optional[str]
# This is super irritating because
# 1) podman and docker use different formats
# 2) python's strptime can't parse either one
return None
def write_tmp(s, uid, gid):
+ # type: (str, int, int) -> Any
tmp_f = tempfile.NamedTemporaryFile(mode='w',
prefix='ceph-tmp')
os.fchown(tmp_f.fileno(), uid, gid)
##################################
def get_distro():
+ # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
distro = None
distro_version = None
distro_codename = None