os.environ['PATH'] = '{}:{}'.format(os.path.abspath('../../../../build/bin'),
os.environ['PATH'])
- from tests import mock
+ from tests import mock # type: ignore
mgr = mock.Mock()
mgr.get_frontend_path.side_effect = lambda: os.path.abspath("./frontend/dist")
key = self.secret_key.encode('utf-8')
msg = canonical_string.encode('utf-8')
else:
- key = self.secret_key
+ key = self.secret_key # type: ignore
msg = canonical_string
h = hmac.new(key, msg, digestmod=sha)
return encodestring(h.digest()).strip()
It's evaluated lazily on render.
"""
try:
- self._uuid4
+ self._uuid4 # type: ignore
except AttributeError:
# evaluate on first access
self._uuid4 = uuid.uuid4()
from ..services.auth import AuthManager, JwtManager
from ..plugins import PLUGIN_MANAGER
+try:
+ from typing import Any, List, Optional
+except ImportError:
+ pass # For typing only
+
def EndpointDoc(description="", group="", parameters=None, responses=None): # noqa: N802
if not isinstance(description, str):
return splitted
def _split_list(data, nested):
- splitted = []
+ splitted = [] # type: List[Any]
for item in data:
splitted.extend(_split_parameters(item, nested))
return splitted
# nested = True means parameters are inside a dict or array
def _split_parameters(data, nested=False):
- param_list = []
+ param_list = [] # type: List[Any]
if isinstance(data, dict):
param_list.extend(_split_dict(data, nested))
elif isinstance(data, (list, tuple)):
return controllers
-ENDPOINT_MAP = collections.defaultdict(list)
+ENDPOINT_MAP = collections.defaultdict(list) # type: dict
def generate_controller_routes(endpoint, mapper, base_url):
def __init__(self):
logger = logging.getLogger('controller')
logger.info('Initializing controller: %s -> %s',
- self.__class__.__name__, self._cp_path_)
+ self.__class__.__name__, self._cp_path_) # type: ignore
super(BaseController, self).__init__()
def _has_permissions(self, permissions, scope=None):
- if not self._cp_config['tools.authenticate.on']:
+ if not self._cp_config['tools.authenticate.on']: # type: ignore
raise Exception("Cannot verify permission in non secured "
"controllers")
def get_path_param_names(cls, path_extension=None):
if path_extension is None:
path_extension = ""
- full_path = cls._cp_path_[1:] + path_extension
+ full_path = cls._cp_path_[1:] + path_extension # type: ignore
path_params = []
for step in full_path.split('/'):
param = None
@classmethod
def get_path(cls):
- return cls._cp_path_
+ return cls._cp_path_ # type: ignore
@classmethod
def endpoints(cls):
# to specify a composite id (two parameters) use '/'. e.g., "param1/param2".
# If subclasses don't override this property we try to infer the structure
# of the resource ID.
- RESOURCE_ID = None
+ RESOURCE_ID = None # type: Optional[str]
_permission_map = {
'GET': Permission.READ,
permission = None
if func.__name__ in cls._method_mapping:
- meth = cls._method_mapping[func.__name__]
+ meth = cls._method_mapping[func.__name__] # type: dict
if meth['resource']:
if not res_id_params:
"mds_mem.ino"
]
- result = {}
+ result = {} # type: dict
mds_names = self._get_mds_names(fs_id)
for mds_name in mds_names:
# pylint: disable=too-many-statements,too-many-branches
def fs_status(self, fs_id):
- mds_versions = defaultdict(list)
+ mds_versions = defaultdict(list) # type: dict
fsmap = mgr.get("fs_map")
filesystem = None
from ..exceptions import DashboardException
from ..tools import str_to_bool, TaskManager
+try:
+ from typing import Any, Dict, List, no_type_check
+except ImportError:
+ no_type_check = object() # Just for type checking
+
@UiApiController('/iscsi', Scope.ISCSI)
class IscsiUi(BaseController):
@Endpoint()
@ReadPermission
+ @no_type_check
def status(self):
status = {'available': False}
try:
code='disk_control_invalid_max',
component='iscsi')
- initiators = []
+ initiators = [] # type: List[Any]
for group in groups:
initiators = initiators + group['members']
if len(initiators) != len(set(initiators)):
@staticmethod
def _get_portals_by_host(portals):
- portals_by_host = {}
+ # type: (List[dict]) -> Dict[str, List[str]]
+ portals_by_host = {} # type: Dict[str, List[str]]
for portal in portals:
host = portal['host']
ip = portal['ip']
"""
Identify a device by switching on the device light for N seconds.
:param hostname: The hostname of the device to process.
- :param device: The device identifier to process, e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
+ :param device: The device identifier to process, e.g. ``/dev/dm-0`` or
+ ``ABC1234DEF567-1R1234_ABC8DE0Q``.
:param duration: The duration in seconds how long the LED should flash.
"""
orch = OrchClient.instance()
@staticmethod
def get_osd_map(svc_id=None):
- # type: (Union[int, None]) -> Dict[int, Union[Dict[str, Any], Any]]
+ # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
def add_id(osd):
osd['id'] = osd['osd']
return osd
from ..services.exception import handle_rados_error, handle_rbd_error, \
serialize_dashboard_exception
+try:
+ from typing import no_type_check
+except ImportError:
+ no_type_check = object() # Just for type checking
+
logger = logging.getLogger('controllers.rbd_mirror')
for daemon in daemons:
for _, pool_data in daemon['status'].items():
- stats = pool_stats.get(pool_data['name'], None)
+ stats = pool_stats.get(pool_data['name'], None) # type: ignore
if stats is None:
continue
@ViewCache()
+@no_type_check
def _get_pool_datum(pool_name):
data = {}
logger.debug("Constructing IOCtx %s", pool_name)
from ..services.rgw_client import RgwClient
from ..tools import json_str_to_object
+try:
+ from typing import List
+except ImportError:
+ pass # Just for type checking
+
logger = logging.getLogger('controllers.rgw')
raise RequestException(msg)
status['available'] = True
except (RequestException, LookupError) as ex:
- status['message'] = str(ex)
+ status['message'] = str(ex) # type: ignore
return status
class RgwDaemon(RESTController):
def list(self):
+ # type: () -> List[dict]
daemons = []
for hostname, server in CephService.get_service_map('rgw').items():
for service in server['services']:
return sorted(daemons, key=lambda k: k['id'])
def get(self, svc_id):
+ # type: (str) -> dict
daemon = {
'rgw_metadata': [],
'rgw_id': svc_id,
@staticmethod
def strip_tenant_from_bucket_name(bucket_name):
- # type (str) => str
+ # type (str) -> str
"""
>>> RgwBucket.strip_tenant_from_bucket_name('tenant/bucket-name')
'bucket-name'
@staticmethod
def get_s3_bucket_name(bucket_name, tenant=None):
- # type (str, str) => str
+ # type (str, str) -> str
"""
>>> RgwBucket.get_s3_bucket_name('bucket-name', 'tenant')
'tenant:bucket-name'
return bucket_name
def list(self):
+ # type: () -> List[str]
return self.proxy('GET', 'bucket')
def get(self, bucket):
+ # type: (str) -> dict
result = self.proxy('GET', 'bucket', {'bucket': bucket})
result['versioning'] =\
return user
def list(self):
- users = []
+ # type: () -> List[str]
+ users = [] # type: List[str]
marker = None
while True:
- params = {}
+ params = {} # type: dict
if marker:
params['marker'] = marker
result = self.proxy('GET', 'user?list', params)
return users
def get(self, uid):
+ # type: (str) -> dict
result = self.proxy('GET', 'user', {'uid': uid})
return self._append_uid(result)
@Endpoint()
@ReadPermission
def get_emails(self):
+ # type: () -> List[str]
emails = []
- for uid in json.loads(self.list()):
- user = json.loads(self.get(uid))
+ for uid in json.loads(self.list()): # type: ignore
+ user = json.loads(self.get(uid)) # type: ignore
if user["email"]:
emails.append(user["email"])
return emails
:returns our URI
"""
- server_addr = self.get_localized_module_option(
+ server_addr = self.get_localized_module_option( # type: ignore
'server_addr', get_default_addr())
- ssl = self.get_localized_module_option('ssl', True)
+ ssl = self.get_localized_module_option('ssl', True) # type: ignore
if not ssl:
- server_port = self.get_localized_module_option('server_port', 8080)
+ server_port = self.get_localized_module_option('server_port', 8080) # type: ignore
else:
- server_port = self.get_localized_module_option('ssl_server_port', 8443)
+ server_port = self.get_localized_module_option('ssl_server_port', 8443) # type: ignore
if server_addr is None:
raise ServerConfigException(
'no server_addr configured; '
'try "ceph config set mgr mgr/{}/{}/server_addr <ip>"'
- .format(self.module_name, self.get_mgr_id()))
- self.log.info('server: ssl=%s host=%s port=%d', 'yes' if ssl else 'no',
+ .format(self.module_name, self.get_mgr_id())) # type: ignore
+ self.log.info('server: ssl=%s host=%s port=%d', 'yes' if ssl else 'no', # type: ignore
server_addr, server_port)
# Initialize custom handlers.
if ssl:
# SSL initialization
- cert = self.get_store("crt")
+ cert = self.get_store("crt") # type: ignore
if cert is not None:
self.cert_tmp = tempfile.NamedTemporaryFile()
self.cert_tmp.write(cert.encode('utf-8'))
self.cert_tmp.flush() # cert_tmp must not be gc'ed
cert_fname = self.cert_tmp.name
else:
- cert_fname = self.get_localized_module_option('crt_file')
+ cert_fname = self.get_localized_module_option('crt_file') # type: ignore
- pkey = self.get_store("key")
+ pkey = self.get_store("key") # type: ignore
if pkey is not None:
self.pkey_tmp = tempfile.NamedTemporaryFile()
self.pkey_tmp.write(pkey.encode('utf-8'))
self.pkey_tmp.flush() # pkey_tmp must not be gc'ed
pkey_fname = self.pkey_tmp.name
else:
- pkey_fname = self.get_localized_module_option('key_file')
+ pkey_fname = self.get_localized_module_option('key_file') # type: ignore
verify_tls_files(cert_fname, pkey_fname)
self.update_cherrypy_config(config)
- self._url_prefix = prepare_url_prefix(self.get_module_option('url_prefix',
- default=''))
+ self._url_prefix = prepare_url_prefix(self.get_module_option( # type: ignore
+ 'url_prefix', default=''))
uri = "{0}://{1}:{2}{3}/".format(
'https' if ssl else 'http',
try:
uri = self._configure()
except ServerConfigException as e:
- self.log.info("Config not ready to serve, waiting: {0}".format(
- e
- ))
+ self.log.info( # type: ignore
+ "Config not ready to serve, waiting: {0}".format(e)
+ )
# Poll until a non-errored config is present
self._stopping.wait(5)
else:
- self.log.info("Configured CherryPy, starting engine...")
+ self.log.info("Configured CherryPy, starting engine...") # type: ignore
return uri
MODULE_OPTIONS.extend(options)
__pool_stats = collections.defaultdict(lambda: collections.defaultdict(
- lambda: collections.deque(maxlen=10)))
+ lambda: collections.deque(maxlen=10))) # type: dict
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
from . import interfaces as I # noqa: E741,N812
from .plugin import SimplePlugin as SP
+try:
+ from typing import no_type_check
+except ImportError:
+ no_type_check = object() # Just for type checking
+
class Actions(Enum):
ENABLE = 'enable'
)
]
+ @no_type_check
def handler(self, action):
ret = 0
msg = ''
from ..controllers.cephfs import CephFS
from ..controllers.rgw import Rgw, RgwDaemon, RgwBucket, RgwUser
+try:
+ from typing import no_type_check, Set
+except ImportError:
+ no_type_check = object() # Just for type checking
+
class Features(Enum):
RBD = 'rbd'
RGW = 'rgw'
-PREDISABLED_FEATURES = set()
+PREDISABLED_FEATURES = set() # type: Set[str]
Feature2Controller = {
self.Controller2Feature = {
controller: feature
for feature, controllers in Feature2Controller.items()
- for controller in controllers}
+ for controller in controllers} # type: ignore
@PM.add_hook
def get_options(self):
return ret, '\n'.join(msg), ''
return {'handle_command': cmd}
+ @no_type_check # https://github.com/python/mypy/issues/7806
def _get_feature_from_request(self, request):
try:
return self.Controller2Feature[
return None
@ttl_cache(ttl=CACHE_TTL, maxsize=CACHE_MAX_SIZE)
+ @no_type_check # https://github.com/python/mypy/issues/7806
def _is_feature_enabled(self, feature):
return self.mgr.get_module_option(self.OPTION_FMT.format(feature.value))
class CanMgr(Mixin):
from .. import mgr
- mgr = mgr
+ mgr = mgr # type: ignore
class CanCherrypy(Mixin):
TODO: Once this becomes available in the above distros, this file should be
REMOVED, and the fully featured python-pluggy should be used instead.
"""
+try:
+ from typing import DefaultDict
+except ImportError:
+ pass # For typing only
class HookspecMarker(object):
"""
def __init__(self):
from collections import defaultdict
- self._registry = defaultdict(list)
+ self._registry = defaultdict(list) # type: DefaultDict[str, list]
def __getattr__(self, hook_name):
return lambda *args, **kwargs: [
from . import PLUGIN_MANAGER as PM
from . import interfaces as I # noqa: E741,N812
+try:
+ from typing import no_type_check
+except ImportError:
+ no_type_check = object() # Just for type checking
+
class SimplePlugin(I.CanMgr, I.HasOptions, I.HasCommands):
"""
@PM.add_hook
def get_options(self):
- return self.OPTIONS
+ return self.OPTIONS # type: ignore
@PM.final
+ @no_type_check # https://github.com/python/mypy/issues/7806
def get_option(self, option):
return self.mgr.get_module_option(option)
@PM.final
+ @no_type_check # https://github.com/python/mypy/issues/7806
def set_option(self, option, value):
self.mgr.set_module_option(option, value)
@PM.add_hook
+ @no_type_check # https://github.com/python/mypy/issues/7806
def register_commands(self):
for cmd in self.COMMANDS:
cmd.register(instance=self)
from threading import RLock
from time import time
+try:
+ from typing import Tuple
+except ImportError:
+ pass # For typing only
+
def ttl_cache(ttl, maxsize=128, typed=False):
if typed is not False:
raise NotImplementedError("typed caching not supported")
def decorating_function(function):
- cache = OrderedDict()
+ cache = OrderedDict() # type: OrderedDict[object, Tuple[bool, float]]
stats = [0, 0, 0]
rlock = RLock()
setattr(function, 'cache_info', lambda:
try:
from requests.packages.urllib3.exceptions import SSLError
except ImportError:
- from urllib3.exceptions import SSLError
+ from urllib3.exceptions import SSLError # type: ignore
+
+try:
+ from typing import List
+except ImportError:
+ pass # Just for type checking
logger = logging.getLogger('rest_client')
level_next = path[path_sep + 1:].strip()
else:
path_sep = len(path)
- level_next = None
+ level_next = None # type: ignore
key = path[:path_sep].strip()
if key == '*':
continue
elif key == '': # check all keys
- for k in resp.keys():
+ for k in resp.keys(): # type: ignore
_ResponseValidator._validate_key(k, level_next, resp)
else:
_ResponseValidator._validate_key(key, level_next, resp)
@staticmethod
def _parse_level_paths(level):
+ # type: (str) -> List[str]
level = level.strip()
if level[0] == '(':
level = level[1:]
match = re.match(r'.*: \[Errno (-?\d+)\] (.+)',
ex.args[0].reason.args[0])
except AttributeError:
- match = False
+ match = None
if match:
errno = match.group(1)
strerror = match.group(2)
return False
def permissions_dict(self):
- perms = {}
+ # type: () -> dict
+ perms = {} # type: dict
for role in self.roles:
for scope, perms_list in role.scopes_permissions.items():
if scope in perms:
db.check_and_update_db()
return db
- db = json.loads(json_db)
+ dict_db = json.loads(json_db)
roles = {rn: Role.from_dict(r)
- for rn, r in db.get('roles', {}).items()}
+ for rn, r in dict_db.get('roles', {}).items()}
users = {un: User.from_dict(u, dict(roles, **SYSTEM_ROLES))
- for un, u in db.get('users', {}).items()}
- return cls(db['version'], users, roles)
+ for un, u in dict_db.get('users', {}).items()}
+ return cls(dict_db['version'], users, roles)
def load_access_control_db():
@classmethod
def init(cls):
- cls.logger = logging.getLogger('jwt')
+ cls.logger = logging.getLogger('jwt') # type: ignore
# generate a new secret if it does not exist
secret = mgr.get_store('jwt_secret')
if secret is None:
'iat': now,
'username': username
}
- return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM)
+ return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM) # type: ignore
@classmethod
def decode_token(cls, token):
if not cls._secret:
cls.init()
- return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM)
+ return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM) # type: ignore
@classmethod
def get_token_from_header(cls):
user = AuthManager.get_user(dtoken['username'])
if user.last_update <= dtoken['iat']:
return user
- cls.logger.debug("user info changed after token was issued, iat=%s last_update=%s",
- dtoken['iat'], user.last_update)
+ cls.logger.debug( # type: ignore
+ "user info changed after token was issued, iat=%s last_update=%s",
+ dtoken['iat'], user.last_update
+ )
else:
- cls.logger.debug('Token is black-listed')
- except jwt.exceptions.ExpiredSignatureError:
- cls.logger.debug("Token has expired")
- except jwt.exceptions.InvalidTokenError:
- cls.logger.debug("Failed to decode token")
+ cls.logger.debug('Token is black-listed') # type: ignore
+ except jwt.ExpiredSignatureError:
+ cls.logger.debug("Token has expired") # type: ignore
+ except jwt.InvalidTokenError:
+ cls.logger.debug("Failed to decode token") # type: ignore
except UserDoesNotExist:
- cls.logger.debug("Invalid token: user %s does not exist", dtoken['username'])
+ cls.logger.debug( # type: ignore
+ "Invalid token: user %s does not exist", dtoken['username']
+ )
return None
@classmethod
@classmethod
def get_user(cls, username):
- return cls.AUTH_PROVIDER.get_user(username)
+ return cls.AUTH_PROVIDER.get_user(username) # type: ignore
@classmethod
def authenticate(cls, username, password):
- return cls.AUTH_PROVIDER.authenticate(username, password)
+ return cls.AUTH_PROVIDER.authenticate(username, password) # type: ignore
@classmethod
def authorize(cls, username, scope, permissions):
- return cls.AUTH_PROVIDER.authorize(username, scope, permissions)
+ return cls.AUTH_PROVIDER.authorize(username, scope, permissions) # type: ignore
class AuthManagerTool(cherrypy.Tool):
from .. import mgr
try:
- from typing import Dict, Any # pylint: disable=unused-import
+ from typing import Dict # pylint: disable=unused-import
except ImportError:
pass # For typing only
@classmethod
def get_service_map(cls, service_name):
- service_map = {} # type: Dict[str, Dict[str, Any]]
+ service_map = {} # type: Dict[str, dict]
for server in mgr.list_servers():
for service in server['services']:
if service['type'] == service_name:
def get_smart_data_by_host(hostname):
# type: (str) -> dict
devices = CephService.get_devices_by_host(hostname)
- smart_data = {}
+ smart_data = {} # type: dict
if devices:
for device in devices:
if device['devid'] not in smart_data:
if include_http_status:
out['status'] = getattr(e, 'status', 500)
if task:
- out['task'] = dict(name=task.name, metadata=task.metadata)
+ out['task'] = dict(name=task.name, metadata=task.metadata) # type: ignore
return out
class IscsiClient(RestClient):
_CLIENT_NAME = 'iscsi'
- _instances = {}
+ _instances = {} # type: dict
service_url = None
gateway_name = None
@staticmethod
def _load_config_from_orchestrator():
- config = {'gateways': {}}
+ config = {'gateways': {}} # type: dict
try:
instances = OrchClient.instance().services.list("iscsi")
for instance in instances:
from .. import mgr
from ..tools import wraps
-
logger = logging.getLogger('orchestrator')
class OrchestratorAPI(OrchestratorClientMixin):
def __init__(self):
super(OrchestratorAPI, self).__init__()
- self.set_mgr(mgr)
+ self.set_mgr(mgr) # type: ignore
def status(self):
try:
logger.info("is orchestrator available: %s, %s", status, desc)
return dict(available=status, description=desc)
except (RuntimeError, OrchestratorError, ImportError):
- return dict(available=False,
- description='Orchestrator is unavailable for unknown reason')
+ return dict(
+ available=False,
+ description='Orchestrator is unavailable for unknown reason')
def orchestrator_wait(self, completions):
return self._orchestrator_wait(completions)
self.api.orchestrator_wait([completion])
raise_if_exception(completion)
return completion.result
+
return inner
class HostManger(ResourceManager):
-
@wait_api_result
def list(self):
return self.api.get_hosts()
class InventoryManager(ResourceManager):
-
@wait_api_result
def list(self, hosts=None, refresh=False):
host_filter = InventoryFilter(hosts=hosts) if hosts else None
class ServiceManager(ResourceManager):
-
@wait_api_result
def list(self, service_type=None, service_id=None, host_name=None):
return self.api.list_daemons(service_type, service_id, host_name)
if not isinstance(service_ids, list):
service_ids = [service_ids]
- completion_list = [self.api.service_action('reload', service_type,
- service_name, service_id)
- for service_name, service_id in service_ids]
+ completion_list = [
+ self.api.service_action('reload', service_type, service_name,
+ service_id)
+ for service_name, service_id in service_ids
+ ]
self.api.orchestrator_wait(completion_list)
for c in completion_list:
raise_if_exception(c)
class OsdManager(ResourceManager):
-
@wait_api_result
def create(self, drive_group):
return self.api.create_osds([drive_group])
@wait_api_result
def blink_device_light(self, hostname, device, ident_fault, on):
# type: (str, str, str, bool) -> Completion
- return self.api.blink_device_light(ident_fault, on, [DeviceLightLoc(hostname, device)])
+ return self.api.blink_device_light(
+ ident_fault, on, [DeviceLightLoc(hostname, device, device)])
from ..tools import ViewCache
from .ceph_service import CephService
+try:
+ from typing import List
+except ImportError:
+ pass # For typing only
+
RBD_FEATURES_NAME_MAPPING = {
rbd.RBD_FEATURE_LAYERING: "layering",
def __init__(self, pool_name='', namespace='', image_name='', pool_ioctx=None,
image_ioctx=None):
- # type: (str, str, object, object) -> None
+ # type: (str, str, str, object, object) -> None
assert bool(pool_name) != bool(pool_ioctx) # xor
self._pool_name = pool_name
self._namespace = namespace if namespace is not None else ''
return option if option.startswith('conf_') else 'conf_' + option
def list(self):
- # type: () -> [dict]
+ # type: () -> List[dict]
def _list(ioctx):
if self._image_name: # image config
with rbd.Image(ioctx, self._image_name) as image:
pool_ioctx = self._pool_ioctx
if self._pool_name: # open ioctx
pool_ioctx = mgr.rados.open_ioctx(self._pool_name)
- pool_ioctx.__enter__()
- pool_ioctx.set_namespace(self._namespace)
+ pool_ioctx.__enter__() # type: ignore
+ pool_ioctx.set_namespace(self._namespace) # type: ignore
image_ioctx = self._image_ioctx
if self._image_name:
image_ioctx = rbd.Image(pool_ioctx, self._image_name)
- image_ioctx.__enter__()
+ image_ioctx.__enter__() # type: ignore
if image_ioctx:
- image_ioctx.metadata_set(option_name, option_value)
+ image_ioctx.metadata_set(option_name, option_value) # type: ignore
else:
self._rbd.pool_metadata_set(pool_ioctx, option_name, option_value)
if self._image_name: # Name provided, so we opened it and now have to close it
- image_ioctx.__exit__(None, None, None)
+ image_ioctx.__exit__(None, None, None) # type: ignore
if self._pool_name:
- pool_ioctx.__exit__(None, None, None)
+ pool_ioctx.__exit__(None, None, None) # type: ignore
def remove(self, option_name):
"""
from .. import mgr
try:
- from typing import Any, Dict, List # pylint: disable=unused-import
+ from typing import Dict, List, Optional # pylint: disable=unused-import
except ImportError:
pass # For typing only
return port, ssl
if option_name in ['endpoint', 'ssl_endpoint']:
match = re.search(r'([\d.]+|\[.+\])(:(\d+))?',
- match.group(2))
+ match.group(2)) # type: ignore
if match:
port = int(match.group(3)) if \
match.group(2) is not None else 443 if \
80
ssl = option_name == 'ssl_endpoint'
return port, ssl
- if match.group(1) == 'civetweb':
+ if match.group(1) == 'civetweb': # type: ignore
match = re.search(r'port=(.*:)?(\d+)(s)?', config)
if match:
port = int(match.group(2))
_host = None
_port = None
_ssl = None
- _user_instances = {}
+ _user_instances = {} # type: Dict[str, RgwClient]
_rgw_settings_snapshot = None
@staticmethod
# Append the instance to the internal map.
RgwClient._user_instances[RgwClient._SYSTEM_USERID] = instance
- def _get_daemon_zone_info(self): # type: () -> Dict[str, Any]
+ def _get_daemon_zone_info(self): # type: () -> dict
return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None))
- def _get_daemon_zonegroup_map(self): # type: () -> List[Dict[str, Any]]
+ def _get_daemon_zonegroup_map(self): # type: () -> List[dict]
zonegroups = json_str_to_object(
self.proxy('GET', 'config?type=zonegroup-map', None, None)
)
@staticmethod
def instance(userid):
+ # type: (Optional[str]) -> RgwClient
# Discard all cached instances if any rgw setting has changed
if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings():
RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings()
userid))
# Create an instance and append it to the internal map.
- RgwClient._user_instances[userid] = RgwClient(userid,
+ RgwClient._user_instances[userid] = RgwClient(userid, # type: ignore
keys['access_key'],
keys['secret_key'])
- return RgwClient._user_instances[userid]
+ return RgwClient._user_instances[userid] # type: ignore
@staticmethod
def admin_instance():
super(RgwClient, self).__init__(host, port, 'RGW', ssl, s3auth, ssl_verify=ssl_verify)
# If user ID is not set, then try to get it via the RGW Admin Ops API.
- self.userid = userid if userid else self._get_user_id(self.admin_path)
+ self.userid = userid if userid else self._get_user_id(self.admin_path) # type: str
logger.info("Created new connection for user: %s", self.userid)
return request(data=data)
- def get_placement_targets(self): # type: () -> Dict[str, Any]
+ def get_placement_targets(self): # type: () -> dict
zone = self._get_daemon_zone_info()
# A zone without realm id can only belong to default zonegroup.
zonegroup_name = 'default'
from .. import mgr
from ..tools import prepare_url_prefix
+
if six.PY2:
FileNotFoundError = IOError # pylint: disable=redefined-builtin
-
logger = logging.getLogger('sso')
try:
db.check_and_update_db()
return db
- db = json.loads(json_db)
- return cls(db['version'], db.get('protocol'), Saml2.from_dict(db.get('saml2')))
+ dict_db = json.loads(json_db) # type: dict
+ return cls(dict_db['version'], dict_db.get('protocol'),
+ Saml2.from_dict(dict_db.get('saml2')))
def load_sso_db():
from dashboard.services.ceph_service import CephService
from .. import mgr
+try:
+ from typing import Dict
+except ImportError:
+ pass # Just for type checking
+
+
SERVICE_TYPE = 'tcmu-runner'
# pylint: disable=too-many-nested-blocks
@staticmethod
def get_iscsi_info():
- daemons = {}
- images = {}
+ daemons = {} # type: Dict[str, dict]
+ images = {} # type: Dict[str, dict]
for service in CephService.get_service_list(SERVICE_TYPE):
metadata = service['metadata']
if metadata is None:
from .services.auth import JwtManager
try:
- from typing import Any, AnyStr, Dict, List # noqa pylint: disable=unused-import
+ from typing import Any, AnyStr, Callable, DefaultDict, Deque,\
+ Dict, List, Set, Tuple, Union # noqa pylint: disable=unused-import
except ImportError:
pass # For typing only
rvc = ViewCache.RemoteViewCache(self.timeout)
self.cache_by_args[args] = rvc
return rvc.run(fn, args, kwargs)
- wrapper.reset = self.reset
+ wrapper.reset = self.reset # type: ignore
return wrapper
def reset(self):
class NotificationQueue(threading.Thread):
_ALL_TYPES_ = '__ALL__'
- _listeners = collections.defaultdict(set)
+ _listeners = collections.defaultdict(set) # type: DefaultDict[str, Set[Tuple[int, Callable]]]
_lock = threading.Lock()
_cond = threading.Condition()
- _queue = collections.deque()
+ _queue = collections.deque() # type: Deque[Tuple[str, Any]]
_running = False
_instance = None
return
cls._running = True
cls._instance = NotificationQueue()
- cls.logger = logging.getLogger('notification_queue')
- cls.logger.debug("starting notification queue")
+ cls.logger = logging.getLogger('notification_queue') # type: ignore
+ cls.logger.debug("starting notification queue") # type: ignore
cls._instance.start()
@classmethod
cls._running = False
with cls._cond:
cls._cond.notify()
- cls.logger.debug("waiting for notification queue to finish")
+ cls.logger.debug("waiting for notification queue to finish") # type: ignore
instance.join()
- cls.logger.debug("notification queue stopped")
+ cls.logger.debug("notification queue stopped") # type: ignore
@classmethod
def _registered_handler(cls, func, n_types):
for ev_type in n_types:
if not cls._registered_handler(func, ev_type):
cls._listeners[ev_type].add((priority, func))
- cls.logger.debug("function %s was registered for events of"
- " type %s", func, ev_type)
+ cls.logger.debug( # type: ignore
+ "function %s was registered for events of type %s",
+ func, ev_type
+ )
@classmethod
def deregister(cls, func, n_types=None):
+ # type: (Callable, Union[str, list, None]) -> None
"""Removes the listener function from this notification queue
If the second parameter `n_types` is omitted, the function is removed
break
if to_remove:
listeners.discard(to_remove)
- cls.logger.debug("function %s was deregistered for events "
- "of type %s", func, ev_type)
+ cls.logger.debug( # type: ignore
+ "function %s was deregistered for events of type %s",
+ func, ev_type
+ )
@classmethod
def new_notification(cls, notify_type, notify_value):
+ # type: (str, Any) -> None
with cls._cond:
cls._queue.append((notify_type, notify_value))
cls._cond.notify()
listener[1](notify_value)
def run(self):
- self.logger.debug("notification queue started")
+ self.logger.debug("notification queue started") # type: ignore
while self._running:
private_buffer = []
- self.logger.debug("processing queue: %s", len(self._queue))
+ self.logger.debug("processing queue: %s", len(self._queue)) # type: ignore
try:
while True:
private_buffer.append(self._queue.popleft())
while self._running and not self._queue:
self._cond.wait()
# flush remaining events
- self.logger.debug("flush remaining events: %s", len(self._queue))
+ self.logger.debug("flush remaining events: %s", len(self._queue)) # type: ignore
self._notify_listeners(self._queue)
self._queue.clear()
- self.logger.debug("notification queue finished")
+ self.logger.debug("notification queue finished") # type: ignore
# pylint: disable=too-many-arguments, protected-access
VALUE_DONE = "done"
VALUE_EXECUTING = "executing"
- _executing_tasks = set()
- _finished_tasks = []
+ _executing_tasks = set() # type: Set[Task]
+ _finished_tasks = [] # type: List[Task]
_lock = threading.Lock()
_task_local_data = threading.local()
@classmethod
def init(cls):
- cls.logger = logging.getLogger('taskmgr')
+ cls.logger = logging.getLogger('taskmgr') # type: ignore
NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished')
@classmethod
def _handle_finished_task(cls, task):
- cls.logger.info("finished %s", task)
+ cls.logger.info("finished %s", task) # type: ignore
with cls._lock:
cls._executing_tasks.remove(task)
cls._finished_tasks.append(task)
exception_handler)
with cls._lock:
if task in cls._executing_tasks:
- cls.logger.debug("task already executing: %s", task)
+ cls.logger.debug("task already executing: %s", task) # type: ignore
for t in cls._executing_tasks:
if t == task:
return t
- cls.logger.debug("created %s", task)
+ cls.logger.debug("created %s", task) # type: ignore
cls._executing_tasks.add(task)
- cls.logger.info("running %s", task)
+ cls.logger.info("running %s", task) # type: ignore
task._run()
return task
def start(self):
self.logger.debug("executing task %s", self.task)
try:
- self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+ self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore
except Exception as ex:
self.logger.exception("Error while calling %s", self.task)
self.finish(None, ex)
self.logger.debug("successfully finished task: %s", self.task)
else:
self.logger.debug("task finished with exception: %s", self.task)
- self.task._complete(ret_value, exception)
+ self.task._complete(ret_value, exception) # type: ignore
# pylint: disable=protected-access
TaskManager._task_local_data.task = self.task
try:
self.logger.debug("executing task %s", self.task)
- val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+ val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore
except Exception as ex:
self.logger.exception("Error while calling %s", self.task)
self.finish(None, ex)
self.end_time = now
self.ret_value = ret_value
self.exception = exception
- self.duration = now - self.begin_time
+ self.duration = now - self.begin_time # type: ignore
if not self.exception:
self.set_progress(100, True)
NotificationQueue.new_notification('cd_task_finished', self)
raise Exception("Progress delta value must be a positive integer")
if not in_lock:
self.lock.acquire()
- prog = self.progress + delta
+ prog = self.progress + delta # type: ignore
self.progress = prog if prog <= 100 else 100
if not in_lock:
self.lock.release()
try:
# json.loads accepts binary input from version >=3.6
- value = value.decode('utf-8')
+ value = value.decode('utf-8') # type: ignore
except AttributeError:
pass
:return: A dictionary containing the parameters.
:rtype: dict
"""
- params = {}
+ params = {} # type: dict
if request.method not in request.methods_with_bodies:
return params
commands = mypy --config-file=../../mypy.ini \
cephadm/module.py \
mgr_module.py \
+ dashboard/module.py \
mgr_util.py \
orchestrator/__init__.py \
progress/module.py \