import uuid
from collections import abc
-from typing import Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union
+from typing import cast, Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union
if sys.version_info >= (3, 8):
from typing import get_args, get_origin
[c for c in printable if re.match(goodchars, c)]
)
- def valid(self, s, partial=False):
+ def valid(self, s: str, partial: bool = False) -> None:
sset = set(s)
if self.goodset and not sset <= self.goodset:
raise ArgumentFormat("invalid chars {0} in {1}".
format(''.join(sset - self.goodset), s))
self.val = s
- def __str__(self):
+ def __str__(self) -> str:
b = ''
if self.goodchars:
b += '(goodchars {0})'.format(self.goodchars)
return '<string{0}>'.format(b)
- def complete(self, s):
+ def complete(self, s) -> List[str]:
if s == '':
return []
else:
"""
Admin socket path; check that it's readable and S_ISSOCK
"""
- def valid(self, s, partial=False):
+ def valid(self, s: str, partial: bool = False) -> None:
mode = os.stat(s).st_mode
if not stat.S_ISSOCK(mode):
raise ArgumentValid('socket path {0} is not a socket'.format(s))
self.val = s
- def __str__(self):
+ def __str__(self) -> str:
return '<admin-socket-path>'
"""
EntityAddress, that is, IP address[/nonce]
"""
- def valid(self, s, partial=False):
+ def valid(self, s: str, partial: bool = False) -> None:
nonce = None
if '/' in s:
ip, nonce = s.split('/')
)
self.val = s
- def __str__(self):
+ def __str__(self) -> str:
return '<EntityAddr>'
"""
Pool name; very little utility
"""
- def __str__(self):
+ def __str__(self) -> str:
return '<poolname>'
Object name. Maybe should be combined with Pool name as they're always
present in pairs, and then could be checked for presence
"""
- def __str__(self):
+ def __str__(self) -> str:
return '<objectname>'
"""
CephUUID: pretty self-explanatory
"""
- def valid(self, s, partial=False):
+ def valid(self, s: str, partial: bool = False) -> None:
try:
uuid.UUID(s)
except Exception as e:
raise ArgumentFormat('invalid UUID {0}: {1}'.format(s, e))
self.val = s
- def __str__(self):
+ def __str__(self) -> str:
return '<uuid>'
"""
CephPrefix: magic type for "all the first n fixed strings"
"""
- def __init__(self, prefix=''):
+ def __init__(self, prefix: str = '') -> None:
self.prefix = prefix
- def valid(self, s, partial=False):
+ def valid(self, s: str, partial: bool = False) -> None:
try:
s = str(s)
if isinstance(s, bytes):
raise ArgumentPrefix("no match for {0}".format(s))
- def __str__(self):
+ def __str__(self) -> str:
return self.prefix
- def complete(self, s):
+ def complete(self, s) -> List[str]:
if self.prefix.startswith(s):
return [self.prefix.rstrip(' ')]
else:
valid() will later be called with input to validate against it,
and will store the validated value in self.instance.val for extraction.
"""
- def __init__(self, t, name=None, n=1, req=True, positional=True, **kwargs):
+ def __init__(self, t, name=None, n=1, req=True, positional=True, **kwargs) -> None:
if isinstance(t, basestring):
self.t = CephPrefix
self.typeargs = {'prefix': t}
return concise_sig(sh['sig'])
-def parse_funcsig(sig: Sequence[Union[str, Dict[str, str]]]) -> List[argdesc]:
+def parse_funcsig(sig: Sequence[Union[str, Dict[str, Any]]]) -> List[argdesc]:
"""
parse a single descriptor (array of strings or dicts) into a
dict of function descriptor/validators (objects of CephXXX type)
return sigdict
-def validate_one(word, desc, is_kwarg, partial=False):
+ArgValT = Union[bool, int, float, str, Tuple[str, str]]
+
+def validate_one(word: str,
+ desc,
+ is_kwarg: bool,
+ partial: bool = False) -> List[ArgValT]:
"""
validate_one(word, desc, is_kwarg, partial=False)
return vals
-def matchnum(args, signature, partial=False):
+def matchnum(args: List[str],
+ signature: List[argdesc],
+ partial: bool = False) -> int:
"""
matchnum(s, signature, partial=False)
ValidatedArgs = Dict[str, ValidatedArg]
-def store_arg(desc: argdesc, args: List[ValidatedArg], d: ValidatedArgs):
+def store_arg(desc: argdesc, args: Sequence[ValidatedArg], d: ValidatedArgs):
'''
Store argument described by, and held in, thanks to valid(),
desc into the dictionary d, keyed by desc.name. Three cases:
def validate(args: List[str],
signature: Sequence[argdesc],
- flags: Optional[int] = 0,
+ flags: int = 0,
partial: Optional[bool] = False) -> ValidatedArgs:
"""
validate(args, signature, flags=0, partial=False)
mysig = copy.deepcopy(signature)
reqsiglen = len([desc for desc in mysig if desc.req])
matchcnt = 0
- d = dict()
+ d: ValidatedArgs = dict()
save_exception = None
- arg_descs_by_name = dict([desc.name, desc] for desc in mysig
- if desc.t != CephPrefix)
+ arg_descs_by_name: Dict[str, argdesc] = \
+ dict((desc.name, desc) for desc in mysig if desc.t != CephPrefix)
# Special case: detect "injectargs" (legacy way of modifying daemon
# configs) and permit "--" string arguments if so.
while desc.numseen < desc.n:
if myargs:
- myarg = myargs.pop(0)
+ myarg: Optional[str] = myargs.pop(0)
else:
myarg = None
break
# Have an arg; validate it
+ assert myarg is not None
try:
args = validate_one(myarg, desc, False)
except ArgumentError as e:
def validate_command(sigdict: Dict[str, Dict[str, Any]],
- args: Sequence[str],
+ args: List[str],
verbose: Optional[bool] = False) -> ValidatedArgs:
"""
Parse positional arguments into a parameter dict, according to
"""
if verbose:
print("validate_command: " + " ".join(args), file=sys.stderr)
- found = []
+ found: Optional[Dict[str, Any]] = None
valid_dict = {}
# look for best match, accumulate possibles in bestcmds
# (so we can maybe give a more-useful error message)
- best_match_cnt = 0
- bestcmds = []
+ best_match_cnt = 0.0
+ bestcmds: List[Dict[str, Any]] = []
for cmd in sigdict.values():
flags = cmd.get('flags', 0)
if flags & Flag.OBSOLETE:
continue
sig = cmd['sig']
- matched = matchnum(args, sig, partial=True)
+ matched: float = matchnum(args, sig, partial=True)
if (matched >= math.floor(best_match_cnt) and
matched == matchnum(args, sig, partial=False)):
# prefer those fully matched over partial patch
print("bestcmds_sorted: ", file=sys.stderr)
pprint.PrettyPrinter(stream=sys.stderr).pprint(bestcmds_sorted)
- ex = None
+ ex: Optional[ArgumentError] = None
# for everything in bestcmds, look for a true match
for cmd in bestcmds_sorted:
sig = cmd['sig']
return valid_dict
-def find_cmd_target(childargs: List[str]) -> Tuple[str, str]:
+def find_cmd_target(childargs: List[str]) -> Tuple[str, Optional[str]]:
"""
Using a minimal validation, figure out whether the command
should be sent to a monitor or an osd. We do this before even
# if this fails, something is horribly wrong, as it just
# validated successfully above
name.valid(valid_dict['target'])
+ assert name.nametype is not None
return name.nametype, name.nameid
sig = parse_funcsig(['tell', {'name': 'pgid', 'type': 'CephPgid'}])
else:
if len(valid_dict) == 2:
# pg doesn't need revalidation; the string is fine
- return 'pg', valid_dict['pgid']
+ pgid = valid_dict['pgid']
+ assert isinstance(pgid, str)
+ return 'pg', pgid
# If we reached this far it must mean that so far we've been unable to
# obtain a proper target from childargs. This may mean that we are not
# CephName.valid() raises on validation error; find_cmd_target()'s
# caller should handle them
name.valid(childargs[1])
+ assert name.nametype is not None
+ assert name.nameid is not None
return name.nametype, name.nameid
sig = parse_funcsig(['pg', {'name': 'pgid', 'type': 'CephPgid'}])
pass
else:
if len(valid_dict) == 2:
- return 'pg', valid_dict['pgid']
+ pgid = valid_dict['pgid']
+ assert isinstance(pgid, str)
+ return 'pg', pgid
return 'mon', ''
self.exception = e
-def run_in_thread(func: Callable[[Any, Any], int],
- *args: Any, **kwargs: Any) -> int:
+def run_in_thread(func: Callable[[Any, Any], Tuple[int, bytes, str]],
+ *args: Any, **kwargs: Any) -> Tuple[int, bytes, str]:
timeout = kwargs.pop('timeout', 0)
if timeout == 0 or timeout is None:
# python threading module will just get blocked if timeout is `None`,
def send_command(cluster,
- target: Optional[Tuple[str, str]] = ('mon', ''),
- cmd: Optional[List[str]] = None,
+ target: Tuple[str, Optional[str]] = ('mon', ''),
+ cmd: Optional[str] = None,
inbuf: Optional[bytes] = b'',
timeout: Optional[int] = 0,
verbose: Optional[bool] = False) -> Tuple[int, bytes, str]:
If target is osd.N, send command to that osd (except for pgid cmds)
"""
- cmd = cmd or []
try:
if target[0] == 'osd':
osdid = target[1]
+ assert osdid is not None
if verbose:
print('submit {0} to osd.{1}'.format(cmd, osdid),
def json_command(cluster,
- target: Optional[Tuple[str, str]] = ('mon', ''),
+ target: Tuple[str, Optional[str]] = ('mon', ''),
prefix: Optional[str] = None,
- argdict: Optional[Dict[str, str]] = None,
+ argdict: Optional[ValidatedArgs] = None,
inbuf: Optional[bytes] = b'',
timeout: Optional[int] = 0,
verbose: Optional[bool] = False) -> Tuple[int, bytes, str]:
:param prefix: String to inject into command arguments as 'prefix'
:param argdict: Command arguments
"""
- cmddict = {}
+ cmddict: ValidatedArgs = {}
if prefix:
cmddict.update({'prefix': prefix})
if argdict:
cmddict.update(argdict)
if 'target' in argdict:
- target = argdict.get('target')
+ target = cast(Tuple[str, str], argdict['target'])
try:
if target[0] == 'osd':
osdtarget = '{0}.{1}'.format(*target)
# prefer target from cmddict if present and valid
if 'target' in cmddict:
- osdtarget = cmddict.pop('target')
+ osdtarget = cast(str, cmddict.pop('target'))
try:
osdtarg.valid(osdtarget)
target = ('osd', osdtarg.nameid)