List,
Mapping,
NamedTuple,
+ no_type_check,
Optional,
Sequence,
Set,
return f, extra_args
-class CLICommand(object):
- COMMANDS = {} # type: Dict[str, CLICommand]
-
+class CLICommandBase(object):
def __init__(self,
prefix: str,
perm: str = 'rw',
def _register_handler(self, func: HandlerFuncType) -> HandlerFuncType:
self.store_func_metadata(func)
+ self.COMMANDS[self.prefix] = self # type: ignore[attr-defined]
self.func = func
- self.COMMANDS[self.prefix] = self
return self.func
def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
special_args = set()
kwargs_switch = False
for index, (name, tp) in enumerate(self.arg_spec.items()):
- if name in CLICommand.KNOWN_ARGS:
+ if name in self.KNOWN_ARGS:
special_args.add(name)
continue
assert self.first_default >= 0
}
@classmethod
- def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
- return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
-
+ def Read(cls, prefix: str, poll: bool = False) -> 'CLICommandBase':
+ return cls(prefix, "r", poll)
-def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
- return CLICommand(prefix, "r", poll)
+ @classmethod
+ def Write(cls, prefix: str, poll: bool = False) -> 'CLICommandBase':
+ return cls(prefix, "w", poll)
+ @classmethod
+ def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
+ return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()] # type: ignore[attr-defined]
-def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
- return CLICommand(prefix, "w", poll)
+ @classmethod
+ @no_type_check
+ def make_registry_subtype(cls, name) -> 'CLICommandBase':
+ return type(name, (cls,), {
+ 'COMMANDS': {},
+ })
def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
from their active peer), and to configuration settings (read only).
"""
- MODULE_OPTIONS: List[Option] = []
- MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
-
def __init__(self, module_name: str, capsule: Any):
super(MgrStandbyModule, self).__init__(capsule)
self.module_name = module_name
- # see also MgrModule.__init__()
- for o in self.MODULE_OPTIONS:
- if 'default' in o:
- if 'type' in o:
- self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
- else:
- self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
-
# mock does not return a str
mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
log_level = cast(str, self.get_module_option("log_level"))
self._logger = self.getLogger()
@classmethod
+ @no_type_check
def _register_options(cls, module_name: str) -> None:
+ if not hasattr(cls, 'MODULE_OPTIONS'):
+ cls.MODULE_OPTIONS = []
+
+ if not hasattr(cls, 'MODULE_OPTION_DEFAULTS'):
+ cls.MODULE_OPTION_DEFAULTS = {}
+
cls.MODULE_OPTIONS.append(
Option(name='log_level', type='str', default="", runtime=True,
enum_allowed=['info', 'debug', 'critical', 'error',
enum_allowed=['info', 'debug', 'critical', 'error',
'warning', '']))
+ for o in cls.MODULE_OPTIONS:
+ if 'default' in o:
+ if 'type' in o:
+ cls.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
+ else:
+ cls.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
+
@property
def log(self) -> logging.Logger:
return self._logger
"""
r = self._ceph_get_module_option(key)
if r is None:
- return self.MODULE_OPTION_DEFAULTS.get(key, default)
+ return self.MODULE_OPTION_DEFAULTS.get(key, default) # type: ignore[attr-defined]
else:
return r
def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
r = self._ceph_get_module_option(key, self.get_mgr_id())
if r is None:
- return self.MODULE_OPTION_DEFAULTS.get(key, default)
+ return self.MODULE_OPTION_DEFAULTS.get(key, default) # type: ignore[attr-defined]
else:
return r
class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
MGR_POOL_NAME = ".mgr"
- COMMANDS = [] # type: List[Any]
- MODULE_OPTIONS: List[Option] = []
- MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
-
- # Database Schema
- SCHEMA = None # type: Optional[List[str]]
- SCHEMA_VERSIONED = None # type: Optional[List[List[str]]]
-
# Priority definitions for perf counters
PRIO_CRITICAL = 10
PRIO_INTERESTING = 8
self.module_name = module_name
super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
- for o in self.MODULE_OPTIONS:
- if 'default' in o:
- if 'type' in o:
- # we'll assume the declared type matches the
- # supplied default value's type.
- self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
- else:
- # module not declaring it's type, so normalize the
- # default value to be a string for consistent behavior
- # with default and user-supplied option values.
- self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
-
mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
log_level = cast(str, self.get_module_option("log_level"))
cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
self._db_lock = threading.Lock()
@classmethod
+ @no_type_check
def _register_options(cls, module_name: str) -> None:
+ if not hasattr(cls, 'MODULE_OPTIONS'):
+ cls.MODULE_OPTIONS = []
+
+ if not hasattr(cls, 'MODULE_OPTION_DEFAULTS'):
+ cls.MODULE_OPTION_DEFAULTS = {}
+
cls.MODULE_OPTIONS.append(
Option(name='log_level', type='str', default="", runtime=True,
enum_allowed=['info', 'debug', 'critical', 'error',
enum_allowed=['info', 'debug', 'critical', 'error',
'warning', '']))
+ for o in cls.MODULE_OPTIONS:
+ if 'default' in o:
+ if 'type' in o:
+ # we'll assume the declared type matches the
+ # supplied default value's type.
+ cls.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
+ else:
+ # module not declaring it's type, so normalize the
+ # default value to be a string for consistent behavior
+ # with default and user-supplied option values.
+ cls.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
+
@classmethod
+ @no_type_check
def _register_commands(cls, module_name: str) -> None:
- cls.COMMANDS.extend(CLICommand.dump_cmd_list())
+ if not hasattr(cls, 'COMMANDS'):
+ cls.COMMANDS = []
+
+ cls.COMMANDS.extend(cls.CLICommand.dump_cmd_list())
@property
def log(self) -> logging.Logger:
def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
if version <= 0:
+ assert hasattr(self, 'SCHEMA')
self.log.info(f"creating main.db for {self.module_name}")
assert self.SCHEMA is not None
for sql in self.SCHEMA:
db.execute(sql)
self.update_schema_version(db, 1)
else:
+ assert hasattr(self, 'SCHEMA_VERSIONED')
assert self.SCHEMA_VERSIONED is not None
latest = len(self.SCHEMA_VERSIONED)
if latest < version:
inbuf: str,
cmd: Dict[str, Any]) -> Union[HandleCommandResult,
Tuple[int, str, str]]:
- if cmd['prefix'] not in CLICommand.COMMANDS:
+ if cmd['prefix'] not in self.CLICommand.COMMANDS: # type: ignore[attr-defined]
return self.handle_command(inbuf, cmd)
- return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
+ return self.CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf) # type: ignore[attr-defined]
def handle_command(self,
inbuf: str,
access config options that they didn't declare
in their schema.
"""
- if key not in [o['name'] for o in self.MODULE_OPTIONS]:
+ if key not in [o['name'] for o in self.MODULE_OPTIONS]: # type: ignore[attr-defined]
raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
format(key, self.__class__.__name__))
r = self._ceph_get_module_option(self.module_name, key,
localized_prefix)
if r is None:
- return self.MODULE_OPTION_DEFAULTS.get(key, default)
+ return self.MODULE_OPTION_DEFAULTS.get(key, default) # type: ignore[attr-defined]
else:
return r