import ssl
from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast
import re
import uuid
from urllib.request import urlopen
from pathlib import Path
+FuncT = TypeVar('FuncT', bound=Callable)
+
# Default container images -----------------------------------------------------
DEFAULT_IMAGE = 'quay.ceph.io/ceph-ci/ceph:master'
DEFAULT_IMAGE_IS_MASTER = True
class BaseConfig:
- def __init__(self):
+ def __init__(self) -> None:
self.image: str = ''
self.docker: bool = False
self.data_dir: str = DATA_DIR
self.container_init: bool = CONTAINER_INIT
self.container_engine: Optional[ContainerEngine] = None
- def set_from_args(self, args: argparse.Namespace):
+ def set_from_args(self, args: argparse.Namespace) -> None:
argdict: Dict[str, Any] = vars(args)
for k, v in argdict.items():
if hasattr(self, k):
class CephadmContext:
- def __init__(self):
+ def __init__(self) -> None:
self.__dict__['_args'] = None
self.__dict__['_conf'] = BaseConfig()
class ContainerEngine:
- def __init__(self):
+ def __init__(self) -> None:
self.path = find_program(self.EXE)
@property
class Podman(ContainerEngine):
EXE = 'podman'
- def __init__(self):
+ def __init__(self) -> None:
super().__init__()
- self._version = None
+ self._version: Optional[Tuple[int, ...]] = None
@property
- def version(self):
+ def version(self) -> Tuple[int, ...]:
if self._version is None:
raise RuntimeError('Please call `get_version` first')
return self._version
- def get_version(self, ctx: CephadmContext):
+ def get_version(self, ctx: CephadmContext) -> None:
out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}'])
self._version = _parse_podman_version(out)
cname = '%s-%s' % (cname, desc)
return cname
- def extract_uid_gid_haproxy(self):
+ def extract_uid_gid_haproxy(self) -> Tuple[int, int]:
# better directory for this?
return extract_uid_gid(self.ctx, file_path='/var/lib')
'net.ipv4.ip_nonlocal_bind = 1',
]
- def extract_uid_gid_keepalived(self):
+ def extract_uid_gid_keepalived(self) -> Tuple[int, int]:
# better directory for this?
return extract_uid_gid(self.ctx, file_path='/var/lib')
seconds.
"""
- def __init__(self, lock_file):
+ def __init__(self, lock_file: str) -> None:
"""
"""
#: The path of the file lock.
self.lock_file = lock_file
return None
- def __str__(self):
+ def __str__(self) -> str:
temp = "The file lock '{}' could not be acquired."\
.format(self.lock_file)
return temp
class _Acquire_ReturnProxy(object):
- def __init__(self, lock):
+ def __init__(self, lock: 'FileLock') -> None:
self.lock = lock
return None
- def __enter__(self):
+ def __enter__(self) -> 'FileLock':
return self.lock
- def __exit__(self, exc_type, exc_value, traceback):
+ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
self.lock.release()
return None
class FileLock(object):
- def __init__(self, ctx: CephadmContext, name, timeout=-1):
+ def __init__(self, ctx: CephadmContext, name: str, timeout: int = -1) -> None:
if not os.path.exists(LOCK_DIR):
os.mkdir(LOCK_DIR, 0o700)
self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
return None
@property
- def is_locked(self):
+ def is_locked(self) -> bool:
return self._lock_file_fd is not None
- def acquire(self, timeout=None, poll_intervall=0.05):
+ def acquire(self, timeout: Optional[int] = None, poll_intervall: float = 0.05) -> _Acquire_ReturnProxy:
"""
Acquires the file lock or fails with a :exc:`Timeout` error.
.. code-block:: python
raise
return _Acquire_ReturnProxy(lock=self)
- def release(self, force=False):
+ def release(self, force: bool = False) -> None:
"""
Releases the file lock.
Please note, that the lock is only completly released, if the lock
return None
- def __enter__(self):
+ def __enter__(self) -> 'FileLock':
self.acquire()
return self
- def __exit__(self, exc_type, exc_value, traceback):
+ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
self.release()
return None
- def __del__(self):
+ def __del__(self) -> None:
self.release(force=True)
return None
- def _acquire(self):
+ def _acquire(self) -> None:
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd = os.open(self._lock_file, open_mode)
self._lock_file_fd = fd
return None
- def _release(self):
+ def _release(self) -> None:
# Do not remove the lockfile:
#
# https://github.com/benediktschmitt/py-filelock/issues/31
on amount of spawn processes.
"""
- def __init__(self):
+ def __init__(self) -> None:
self._pid_counter = itertools.count(0)
self._threads = {}
desc: Optional[str] = None,
verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
timeout: Optional[int] = DEFAULT_TIMEOUT,
- **kwargs) -> Tuple[str, str, int]:
+ **kwargs: Any) -> Tuple[str, str, int]:
"""
Wrap subprocess.Popen to
desc: Optional[str] = None,
verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
timeout: Optional[int] = DEFAULT_TIMEOUT,
- **kwargs) -> Tuple[str, str, int]:
+ **kwargs: Any) -> Tuple[str, str, int]:
out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs)
if ret:
raise RuntimeError('Failed command: %s' % ' '.join(command))
##################################
-def json_loads_retry(cli_func):
+def json_loads_retry(cli_func: Callable[[], str]) -> Any:
for sleep_secs in [1, 4, 4]:
try:
return json.loads(cli_func())
def _parse_podman_version(version_str):
# type: (str) -> Tuple[int, ...]
- def to_int(val, org_e=None):
+ def to_int(val: str, org_e: Optional[Exception] = None) -> int:
if not val and org_e:
raise org_e
try:
return True
-def validate_fsid(func):
+def validate_fsid(func: FuncT) -> FuncT:
@wraps(func)
- def _validate_fsid(ctx: CephadmContext):
+ def _validate_fsid(ctx: CephadmContext) -> Any:
if 'fsid' in ctx and ctx.fsid:
if not is_fsid(ctx.fsid):
raise Error('not an fsid: %s' % ctx.fsid)
return func(ctx)
- return _validate_fsid
+ return cast(FuncT, _validate_fsid)
-def infer_fsid(func):
+def infer_fsid(func: FuncT) -> FuncT:
"""
If we only find a single fsid in /var/lib/ceph/*, use that
"""
@infer_config
@wraps(func)
- def _infer_fsid(ctx: CephadmContext):
+ def _infer_fsid(ctx: CephadmContext) -> Any:
if 'fsid' in ctx and ctx.fsid:
logger.debug('Using specified fsid: %s' % ctx.fsid)
return func(ctx)
raise Error('Cannot infer an fsid, one must be specified: %s' % fsids)
return func(ctx)
- return _infer_fsid
+ return cast(FuncT, _infer_fsid)
-def infer_config(func):
+def infer_config(func: FuncT) -> FuncT:
"""
If we find a MON daemon, use the config from that container
"""
@wraps(func)
- def _infer_config(ctx: CephadmContext):
+ def _infer_config(ctx: CephadmContext) -> Any:
ctx.config = ctx.config if 'config' in ctx else None
if ctx.config:
logger.debug('Using specified config: %s' % ctx.config)
ctx.config = SHELL_DEFAULT_CONF
return func(ctx)
- return _infer_config
+ return cast(FuncT, _infer_config)
-def _get_default_image(ctx: CephadmContext):
+def _get_default_image(ctx: CephadmContext) -> str:
if DEFAULT_IMAGE_IS_MASTER:
warn = """This is a development version of cephadm.
For information regarding the latest stable release:
return DEFAULT_IMAGE
-def infer_image(func):
+def infer_image(func: FuncT) -> FuncT:
"""
Use the most recent ceph image
"""
@wraps(func)
- def _infer_image(ctx: CephadmContext):
+ def _infer_image(ctx: CephadmContext) -> Any:
if not ctx.image:
ctx.image = os.environ.get('CEPHADM_IMAGE')
if not ctx.image:
ctx.image = _get_default_image(ctx)
return func(ctx)
- return _infer_image
+ return cast(FuncT, _infer_image)
-def default_image(func):
+def default_image(func: FuncT) -> FuncT:
@wraps(func)
- def _default_image(ctx: CephadmContext):
+ def _default_image(ctx: CephadmContext) -> Any:
if not ctx.image:
if 'name' in ctx and ctx.name:
type_ = ctx.name.split('.', 1)[0]
return func(ctx)
- return _default_image
+ return cast(FuncT, _default_image)
-def get_last_local_ceph_image(ctx: CephadmContext, container_path: str):
+def get_last_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
"""
:return: The most recent local ceph image (already pulled)
"""
# copied from distutils
-def find_executable(executable, path=None):
+def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]:
"""Tries to find 'executable' in the directories listed in 'path'.
A string listing directories separated by 'os.pathsep'; defaults to
os.environ['PATH']. Returns the complete filename or None if not found.
return name
-def find_container_engine(ctx: CephadmContext):
+def find_container_engine(ctx: CephadmContext) -> Optional[ContainerEngine]:
if ctx.docker:
return Docker()
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
-def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid, name):
+def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> str:
daemon = get_daemon_description(ctx, fsid, name)
try:
return daemon['systemd_unit']
return fsid
-def should_log_to_journald(ctx):
+def should_log_to_journald(ctx: CephadmContext) -> bool:
if ctx.log_to_journald is not None:
return ctx.log_to_journald
return isinstance(ctx.container_engine, Podman) and \
else:
config_js = get_parm(ctx.config_json)
assert isinstance(config_js, dict)
+ assert isinstance(daemon_id, str)
cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port)
cephadm_exporter.deploy_daemon_unit(config_js)
+ (' &' if background else '') + '\n')
-def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str):
+def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str) -> None:
# systemd may fail to cleanup cgroups from previous stopped unit, which will cause next "systemctl start" to fail.
# see https://tracker.ceph.com/issues/50998
if not cg_path.exists():
return
- def cg_trim(path: Path):
+ def cg_trim(path: Path) -> None:
for p in path.iterdir():
if p.is_dir():
cg_trim(p)
return ret
def stop_cmd(self):
- # type () -> List[str]
+ # type: () -> List[str]
ret = [
str(self.ctx.container_engine.path),
'stop', self.cname,
return 0
-def normalize_image_digest(digest):
+def normalize_image_digest(digest: str) -> str:
# normal case:
# ceph/ceph -> docker.io/ceph/ceph
# edge cases that shouldn't ever come up:
fsid: str, mon_id: str,
bootstrap_keyring_path: str,
monmap_path: str
-):
+) -> Tuple[str, str]:
logger.info('Creating mon...')
create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid)
mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', mon_id)
ctx: CephadmContext,
mon_id: str, mon_dir: str,
admin_keyring_path: str, config_path: str
-):
+) -> None:
logger.info('Waiting for mon to start...')
c = CephContainer(
ctx,
{tmp.name: '/var/lib/ceph/user.conf:z'})
# wait for mgr to restart (after enabling a module)
- def wait_for_mgr_restart():
+ def wait_for_mgr_restart() -> None:
# first get latest mgrmap epoch from the mon. try newer 'mgr
# stat' command first, then fall back to 'mgr dump' if
# necessary
##################################
-def command_registry_login(ctx: CephadmContext):
+def command_registry_login(ctx: CephadmContext) -> int:
if ctx.registry_json:
logger.info('Pulling custom registry login info from %s.' % ctx.registry_json)
d = get_parm(ctx.registry_json)
return 0
-def registry_login(ctx: CephadmContext, url, username, password):
+def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None:
logger.info('Logging into custom registry.')
try:
engine = ctx.container_engine
return res
-def _list_ipv4_networks(ctx: CephadmContext):
+def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]:
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
return _parse_ipv4_route(out)
-def _parse_ipv4_route(out):
+def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, List[str]]]:
r = {} # type: Dict[str,Dict[str,List[str]]]
p = re.compile(r'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)')
for line in out.splitlines():
return r
-def _list_ipv6_networks(ctx: CephadmContext):
+def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]:
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
return _parse_ipv6_route(routes, ips)
-def _parse_ipv6_route(routes, ips):
+def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, List[str]]]:
r = {} # type: Dict[str,Dict[str,List[str]]]
route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
##################################
-def _zap(ctx, what):
+def _zap(ctx: CephadmContext, what: str) -> None:
mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None)
c = CephContainer(
ctx,
@infer_image
-def _zap_osds(ctx):
+def _zap_osds(ctx: CephadmContext) -> None:
# assume fsid lock already held
# list
logger.warning(f'Not zapping LVs (not implemented): {lv_names}')
-def command_zap_osds(ctx):
+def command_zap_osds(ctx: CephadmContext) -> None:
if not ctx.force:
raise Error('must pass --force to proceed: '
'this command may destroy precious data!')
class CustomValidation(argparse.Action):
- def _check_name(self, values):
+ def _check_name(self, values: str) -> None:
try:
(daemon_type, daemon_id) = values.split('.', 1)
except ValueError:
'name must declare the type of daemon e.g. '
'{}'.format(', '.join(daemons)))
- def __call__(self, parser, namespace, values, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
+ assert isinstance(values, str)
if self.dest == 'name':
self._check_name(values)
setattr(namespace, self.dest, values)
class Packager(object):
def __init__(self, ctx: CephadmContext,
- stable=None, version=None, branch=None, commit=None):
+ stable: Optional[str] = None, version: Optional[str] = None,
+ branch: Optional[str] = None, commit: Optional[str] = None):
assert \
(stable and not version and not branch and not commit) or \
(not stable and version and not branch and not commit) or \
self.branch = branch
self.commit = commit
- def add_repo(self):
+ def add_repo(self) -> None:
+ raise NotImplementedError
+
+ def rm_repo(self) -> None:
+ raise NotImplementedError
+
+ def install(self, ls: List[str]) -> None:
raise NotImplementedError
- def rm_repo(self):
+ def install_podman(self) -> None:
raise NotImplementedError
- def query_shaman(self, distro, distro_version, branch, commit):
+ def query_shaman(self, distro: str, distro_version: Any, branch: Optional[str], commit: Optional[str]) -> str:
# query shaman
logger.info('Fetching repo metadata from shaman and chacra...')
shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format(
raise Error('%s, failed to fetch %s' % (err, chacra_url))
return chacra_response.read().decode('utf-8')
- def repo_gpgkey(self):
+ def repo_gpgkey(self) -> Tuple[str, str]:
if self.ctx.gpg_url:
return self.ctx.gpg_url
if self.stable or self.version:
else:
return 'https://download.ceph.com/keys/autobuild.gpg', 'autobuild'
- def enable_service(self, service):
+ def enable_service(self, service: str) -> None:
"""
Start and enable the service (typically using systemd).
"""
}
def __init__(self, ctx: CephadmContext,
- stable, version, branch, commit,
- distro, distro_version, distro_codename):
+ stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
+ distro: Optional[str], distro_version: Optional[str], distro_codename: Optional[str]) -> None:
super(Apt, self).__init__(ctx, stable=stable, version=version,
branch=branch, commit=commit)
+ assert distro
self.ctx = ctx
self.distro = self.DISTRO_NAMES[distro]
self.distro_codename = distro_codename
self.distro_version = distro_version
- def repo_path(self):
+ def repo_path(self) -> str:
return '/etc/apt/sources.list.d/ceph.list'
- def add_repo(self):
+ def add_repo(self) -> None:
url, name = self.repo_gpgkey()
logger.info('Installing repo GPG key from %s...' % url)
self.update()
- def rm_repo(self):
+ def rm_repo(self) -> None:
for name in ['autobuild', 'release']:
p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
if os.path.exists(p):
if self.distro == 'ubuntu':
self.rm_kubic_repo()
- def install(self, ls):
+ def install(self, ls: List[str]) -> None:
logger.info('Installing packages %s...' % ls)
call_throws(self.ctx, ['apt-get', 'install', '-y'] + ls)
- def update(self):
+ def update(self) -> None:
logger.info('Updating package list...')
call_throws(self.ctx, ['apt-get', 'update'])
- def install_podman(self):
+ def install_podman(self) -> None:
if self.distro == 'ubuntu':
logger.info('Setting up repo for podman...')
self.add_kubic_repo()
logger.info('Podman did not work. Falling back to docker...')
self.install(['docker.io'])
- def kubic_repo_url(self):
+ def kubic_repo_url(self) -> str:
return 'https://download.opensuse.org/repositories/devel:/kubic:/' \
'libcontainers:/stable/xUbuntu_%s/' % self.distro_version
- def kubic_repo_path(self):
+ def kubic_repo_path(self) -> str:
return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list'
- def kubric_repo_gpgkey_url(self):
+ def kubric_repo_gpgkey_url(self) -> str:
return '%s/Release.key' % self.kubic_repo_url()
- def kubric_repo_gpgkey_path(self):
+ def kubric_repo_gpgkey_path(self) -> str:
return '/etc/apt/trusted.gpg.d/kubic.release.gpg'
- def add_kubic_repo(self):
+ def add_kubic_repo(self) -> None:
url = self.kubric_repo_gpgkey_url()
logger.info('Installing repo GPG key from %s...' % url)
try:
with open(self.kubic_repo_path(), 'w') as f:
f.write(content)
- def rm_kubic_repo(self):
+ def rm_kubic_repo(self) -> None:
keyring = self.kubric_repo_gpgkey_path()
if os.path.exists(keyring):
logger.info('Removing repo GPG key %s...' % keyring)
}
def __init__(self, ctx: CephadmContext,
- stable, version, branch, commit,
- distro, distro_version):
+ stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
+ distro: Optional[str], distro_version: Optional[str]) -> None:
super(YumDnf, self).__init__(ctx, stable=stable, version=version,
branch=branch, commit=commit)
+ assert distro
+ assert distro_version
self.ctx = ctx
self.major = int(distro_version.split('.')[0])
self.distro_normalized = self.DISTRO_NAMES[distro][0]
else:
self.tool = 'yum'
- def custom_repo(self, **kw):
+ def custom_repo(self, **kw: Any) -> str:
"""
Repo files need special care in that a whole line should not be present
if there is no value for it. Because we were using `format()` we could
return '\n'.join(lines)
- def repo_path(self):
+ def repo_path(self) -> str:
return '/etc/yum.repos.d/ceph.repo'
- def repo_baseurl(self):
+ def repo_baseurl(self) -> str:
assert self.stable or self.version
if self.version:
return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.version,
return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.stable,
self.distro_code)
- def add_repo(self):
+ def add_repo(self) -> None:
if self.distro_code.startswith('fc'):
raise Error('Ceph team does not build Fedora specific packages and therefore cannot add repos for this distro')
if self.distro_code == 'el7':
logger.info('Enabling EPEL...')
call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release'])
- def rm_repo(self):
+ def rm_repo(self) -> None:
if os.path.exists(self.repo_path()):
os.unlink(self.repo_path())
- def install(self, ls):
+ def install(self, ls: List[str]) -> None:
logger.info('Installing packages %s...' % ls)
call_throws(self.ctx, [self.tool, 'install', '-y'] + ls)
- def install_podman(self):
+ def install_podman(self) -> None:
self.install(['podman'])
]
def __init__(self, ctx: CephadmContext,
- stable, version, branch, commit,
- distro, distro_version):
+ stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
+ distro: Optional[str], distro_version: Optional[str]) -> None:
super(Zypper, self).__init__(ctx, stable=stable, version=version,
branch=branch, commit=commit)
+ assert distro is not None
self.ctx = ctx
self.tool = 'zypper'
self.distro = 'opensuse'
if 'tumbleweed' not in distro and distro_version is not None:
self.distro_version = distro_version
- def custom_repo(self, **kw):
+ def custom_repo(self, **kw: Any) -> str:
"""
See YumDnf for format explanation.
"""
return '\n'.join(lines)
- def repo_path(self):
+ def repo_path(self) -> str:
return '/etc/zypp/repos.d/ceph.repo'
- def repo_baseurl(self):
+ def repo_baseurl(self) -> str:
assert self.stable or self.version
if self.version:
return '%s/rpm-%s/%s' % (self.ctx.repo_url,
return '%s/rpm-%s/%s' % (self.ctx.repo_url,
self.stable, self.distro)
- def add_repo(self):
+ def add_repo(self) -> None:
if self.stable or self.version:
content = ''
for n, t in {
with open(self.repo_path(), 'w') as f:
f.write(content)
- def rm_repo(self):
+ def rm_repo(self) -> None:
if os.path.exists(self.repo_path()):
os.unlink(self.repo_path())
- def install(self, ls):
+ def install(self, ls: List[str]) -> None:
logger.info('Installing packages %s...' % ls)
call_throws(self.ctx, [self.tool, 'in', '-y'] + ls)
- def install_podman(self):
+ def install_podman(self) -> None:
self.install(['podman'])
def create_packager(ctx: CephadmContext,
- stable=None, version=None, branch=None, commit=None):
+ stable: Optional[str] = None, version: Optional[str] = None,
+ branch: Optional[str] = None, commit: Optional[str] = None) -> Packager:
distro, distro_version, distro_codename = get_distro()
if distro in YumDnf.DISTRO_NAMES:
return YumDnf(ctx, stable=stable, version=version,
raise Error('Distro %s version %s not supported' % (distro, distro_version))
-def command_add_repo(ctx: CephadmContext):
+def command_add_repo(ctx: CephadmContext) -> None:
if ctx.version and ctx.release:
raise Error('you can specify either --release or --version but not both')
if not ctx.version and not ctx.release and not ctx.dev and not ctx.dev_commit:
logger.info('Completed adding repo.')
-def command_rm_repo(ctx: CephadmContext):
+def command_rm_repo(ctx: CephadmContext) -> None:
pkg = create_packager(ctx)
pkg.rm_repo()
-def command_install(ctx: CephadmContext):
+def command_install(ctx: CephadmContext) -> None:
pkg = create_packager(ctx)
pkg.install(ctx.packages)
def get_ipv4_address(ifname):
# type: (str) -> str
- def _extract(sock, offset):
+ def _extract(sock: socket.socket, offset: int) -> str:
return socket.inet_ntop(
socket.AF_INET,
fcntl.ioctl(
}
@property
- def selinux_enabled(self):
+ def selinux_enabled(self) -> bool:
return (self.kernel_security['type'] == 'SELinux') and \
(self.kernel_security['description'] != 'SELinux: Disabled')
##################################
-def command_gather_facts(ctx: CephadmContext):
+def command_gather_facts(ctx: CephadmContext) -> None:
"""gather_facts is intended to provide host releated metadata to the caller"""
host = HostFacts(ctx)
print(host.dump())
class CephadmCache:
task_types = ['disks', 'daemons', 'host', 'http_server']
- def __init__(self):
+ def __init__(self) -> None:
self.started_epoch_secs = time.time()
self.tasks = {
'daemons': 'inactive',
'host': 'inactive',
'http_server': 'inactive',
}
- self.errors = []
- self.disks = {}
- self.daemons = {}
- self.host = {}
+ self.errors: list = []
+ self.disks: dict = {}
+ self.daemons: dict = {}
+ self.host: dict = {}
self.lock = RLock()
@property
- def health(self):
+ def health(self) -> dict:
return {
'started_epoch_secs': self.started_epoch_secs,
'tasks': self.tasks,
'errors': self.errors,
}
- def to_json(self):
+ def to_json(self) -> dict:
return {
'health': self.health,
'host': self.host,
'disks': self.disks,
}
- def update_health(self, task_type, task_status, error_msg=None):
+ def update_health(self, task_type: str, task_status: str, error_msg: Optional[str] = None) -> None:
assert task_type in CephadmCache.task_types
with self.lock:
self.tasks[task_type] = task_status
if error_msg:
self.errors.append(error_msg)
- def update_task(self, task_type, content):
+ def update_task(self, task_type: str, content: dict) -> None:
assert task_type in CephadmCache.task_types
assert isinstance(content, dict)
with self.lock:
class Decorators:
@classmethod
- def authorize(cls, f):
+ def authorize(cls, f: Any) -> Any:
"""Implement a basic token check.
The token is installed at deployment time and must be provided to
ensure we only respond to callers who know our token i.e. mgr
"""
- def wrapper(self, *args, **kwargs):
+ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
auth = self.headers.get('Authorization', None)
if auth != 'Bearer ' + self.server.token:
self.send_error(401)
return wrapper
- def _help_page(self):
+ def _help_page(self) -> str:
return """<!DOCTYPE html>
<html>
<head><title>cephadm metadata exporter</title></head>
</body>
</html>""".format(api_version=CephadmDaemonHandler.api_version)
- def _fetch_root(self):
+ def _fetch_root(self) -> None:
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(self._help_page().encode('utf-8'))
@Decorators.authorize
- def do_GET(self):
+ def do_GET(self) -> None:
"""Handle *all* GET requests"""
if self.path == '/':
self.end_headers()
self.wfile.write(json.dumps({'message': bad_request_msg}).encode('utf-8'))
- def log_message(self, format, *args):
+ def log_message(self, format: str, *args: Any) -> None:
rqst = ' '.join(str(a) for a in args)
logger.info(f'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}')
loop_delay = 1
thread_check_interval = 5
- def __init__(self, ctx: CephadmContext, fsid, daemon_id=None, port=None):
+ def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Optional[str] = None, port: Optional[int] = None) -> None:
self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.token = read_file([os.path.join(self.daemon_path, CephadmDaemon.token_name)])
@classmethod
- def validate_config(cls, config):
+ def validate_config(cls, config: dict) -> None:
reqs = ', '.join(CephadmDaemon.config_requirements)
errors = []
raise Error('Parameter errors : {}'.format(', '.join(errors)))
@property
- def port_active(self):
+ def port_active(self) -> bool:
return port_in_use(self.ctx, self.port)
@property
- def can_run(self):
+ def can_run(self) -> bool:
# if port is in use
if self.port_active:
self.errors.append(f'TCP port {self.port} already in use, unable to bind')
return len(self.errors) == 0
@staticmethod
- def _unit_name(fsid, daemon_id):
+ def _unit_name(fsid: str, daemon_id: str) -> str:
return '{}.service'.format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id))
@property
- def unit_name(self):
+ def unit_name(self) -> str:
+ assert self.daemon_id is not None
return CephadmDaemon._unit_name(self.fsid, self.daemon_id)
@property
- def daemon_path(self):
+ def daemon_path(self) -> str:
return os.path.join(
self.ctx.data_dir,
self.fsid,
)
@property
- def binary_path(self):
+ def binary_path(self) -> str:
path = os.path.realpath(__file__)
assert os.path.isfile(path)
return path
- def _handle_thread_exception(self, exc, thread_type):
+ def _handle_thread_exception(self, exc: Exception, thread_type: str) -> None:
e_msg = f'{exc.__class__.__name__} exception: {str(exc)}'
thread_info = getattr(self.cephadm_cache, thread_type)
errors = thread_info.get('scrape_errors', [])
}
)
- def _scrape_host_facts(self, refresh_interval=10):
+ def _scrape_host_facts(self, refresh_interval: int = 10) -> None:
ctr = 0
exception_encountered = False
ctr += CephadmDaemon.loop_delay
logger.info('host-facts thread stopped')
- def _scrape_ceph_volume(self, refresh_interval=15):
+ def _scrape_ceph_volume(self, refresh_interval: int = 15) -> None:
# we're invoking the ceph_volume command, so we need to set the args that it
# expects to use
self.ctx.command = 'inventory --format=json'.split()
logger.info('ceph-volume thread stopped')
- def _scrape_list_daemons(self, refresh_interval=20):
+ def _scrape_list_daemons(self, refresh_interval: int = 20) -> None:
ctr = 0
exception_encountered = False
while True:
ctr += CephadmDaemon.loop_delay
logger.info('list-daemons thread stopped')
- def _create_thread(self, target, name, refresh_interval=None):
+ def _create_thread(self, target: Any, name: str, refresh_interval: Optional[int] = None) -> Thread:
if refresh_interval:
t = Thread(target=target, args=(refresh_interval,))
else:
logger.info(f'{start_msg}')
return t
- def reload(self, *args):
+ def reload(self, *args: Any) -> None:
"""reload -HUP received
This is a placeholder function only, and serves to provide the hook that could
"""
logger.info('Reload request received - ignoring, no action needed')
- def shutdown(self, *args):
+ def shutdown(self, *args: Any) -> None:
logger.info('Shutdown request received')
self.stop = True
self.http_server.shutdown()
- def run(self):
+ def run(self) -> None:
logger.info(f"cephadm exporter starting for FSID '{self.fsid}'")
if not self.can_run:
logger.error('Unable to start the exporter daemon')
logger.info('Main http server thread stopped')
@property
- def unit_run(self):
+ def unit_run(self) -> str:
return """set -e
{py3} {bin_path} exporter --fsid {fsid} --id {daemon_id} --port {port} &""".format(
)
@property
- def unit_file(self):
+ def unit_file(self) -> str:
docker = isinstance(self.ctx.container_engine, Docker)
return """#generated by cephadm
[Unit]
docker_after=' docker.service' if docker else '',
docker_requires='Requires=docker.service\n' if docker else '')
- def deploy_daemon_unit(self, config=None):
+ def deploy_daemon_unit(self, config: Optional[dict] = None) -> None:
"""deploy a specific unit file for cephadm
The normal deploy_daemon_units doesn't apply for this
call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name])
@classmethod
- def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id):
+ def uninstall(cls, ctx: CephadmContext, fsid: str, daemon_type: str, daemon_id: str) -> None:
unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
unit_path = os.path.join(ctx.unit_dir, unit_name)
unit_run = os.path.join(ctx.data_dir, fsid, f'{daemon_type}.{daemon_id}', 'unit.run')
stdout, stderr, rc = call(ctx, ['systemctl', 'daemon-reload'])
-def command_exporter(ctx: CephadmContext):
+def command_exporter(ctx: CephadmContext) -> None:
exporter = CephadmDaemon(ctx, ctx.fsid, daemon_id=ctx.id, port=ctx.port)
if ctx.fsid not in os.listdir(ctx.data_dir):
@infer_fsid
-def command_maintenance(ctx: CephadmContext):
+def command_maintenance(ctx: CephadmContext) -> str:
if not ctx.fsid:
raise Error('must pass --fsid to specify cluster')
return 'failed - unable to start the target'
else:
return f'success - systemd target {target} enabled and started'
+ return f'success - systemd target {target} enabled and started'
##################################
return parser
-def _parse_args(av):
+def _parse_args(av: List[str]) -> argparse.Namespace:
parser = _get_parser()
args = parser.parse_args(av)
return ctx
-def main():
+def main() -> None:
# root?
if os.geteuid() != 0: