import sys
import tempfile
import time
+try:
+ from typing import Tuple, Optional
+except ImportError:
+ pass
import uuid
from distutils.spawn import find_executable
from tempfile import TemporaryDirectory # py3
except ImportError:
# define a minimal (but sufficient) equivalent for <= py 3.2
- class TemporaryDirectory(object):
+ class TemporaryDirectory(object): # type: ignore
def __init__(self):
self.name = tempfile.mkdtemp()
##################################
-def pathify(p):
+def pathify(p): # type: (str) -> str
if not p.startswith('/'):
return os.path.join(os.getcwd(), p)
return p
-def get_hostname():
+def get_hostname(): # type: () -> str
return socket.gethostname()
-def get_fqdn():
+def get_fqdn(): # type: () -> str
return socket.getfqdn() or socket.gethostname()
-def generate_password():
+def generate_password(): # type: () -> str
return ''.join(random.choice(string.ascii_lowercase + string.digits)
for i in range(10))
-def make_fsid():
+def make_fsid(): # type: () -> str
return str(uuid.uuid1())
-def is_fsid(s):
+def is_fsid(s): # type: (str) -> bool
try:
uuid.UUID(s)
except ValueError:
return False
return True
-def makedirs(dir, uid, gid, mode):
+def makedirs(dir, uid, gid, mode): # type: (str, int, int, int) -> None
if not os.path.exists(dir):
os.makedirs(dir, mode=mode)
else:
os.chown(dir, uid, gid)
os.chmod(dir, mode) # the above is masked by umask...
-def get_data_dir(fsid, t, n):
+def get_data_dir(fsid, t, n): # type: (str, str, int) -> str
return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
-def get_log_dir(fsid):
+def get_log_dir(fsid): # type: (str) -> str
return os.path.join(args.log_dir, fsid)
-def make_data_dir_base(fsid, uid, gid):
+def make_data_dir_base(fsid, uid, gid): # type: (str, int, int) -> str
data_dir_base = os.path.join(args.data_dir, fsid)
makedirs(data_dir_base, uid, gid, DATA_DIR_MODE)
makedirs(os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE)
DATA_DIR_MODE)
return data_dir_base
-def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
- if not uid:
+def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None): # type: (str, str, int, int, int) -> str
+ if not uid or not gid:
(uid, gid) = extract_uid_gid()
make_data_dir_base(fsid, uid, gid)
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
-def make_log_dir(fsid, uid=None, gid=None):
- if not uid:
+def make_log_dir(fsid, uid=None, gid=None): # type: (str, int, int) -> str
+ if not uid or not gid:
(uid, gid) = extract_uid_gid()
log_dir = get_log_dir(fsid)
makedirs(log_dir, uid, gid, LOG_DIR_MODE)
return log_dir
-def find_program(filename):
+def find_program(filename): # type: (str) -> str
name = find_executable(filename)
if name is None:
raise ValueError('%s not found' % filename)
return name
-def get_unit_name(fsid, daemon_type, daemon_id=None):
+def get_unit_name(fsid, daemon_type, daemon_id=None): # type (str, str, int) -> str
# accept either name or type + id
if daemon_id is not None:
return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id)
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
-def check_unit(unit_name):
+def check_unit(unit_name): # type: (str) -> Tuple[bool, str]
# NOTE: we ignore the exit code here because systemctl outputs
# various exit codes based on the state of the service, but the
# string result is more explicit (and sufficient).
state = 'unknown'
return (enabled, state)
-def get_legacy_config_fsid(cluster, legacy_dir=None):
+def get_legacy_config_fsid(cluster, legacy_dir=None): # type: (str, str) -> Optional[str]
config_file = '/etc/ceph/%s.conf' % cluster
if legacy_dir is not None:
config_file = os.path.abspath(legacy_dir + config_file)
cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
)
-def extract_uid_gid():
+def extract_uid_gid(): # type: () -> Tuple[int, int]
out = CephContainer(
image=args.image,
entrypoint='/usr/bin/grep',
if __name__ == "__main__":
# allow argv to be injected
try:
- av = injected_argv
+ av = injected_argv # type: ignore
except NameError:
av = sys.argv[1:]
parser = _get_parser()