From: Kefu Chai Date: Sun, 27 Jun 2021 08:15:57 +0000 (+0800) Subject: pybind/ceph_argparse: add more type annotations X-Git-Tag: v17.1.0~1519^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=abce58d2034c3f4f89340e830747dc2f4966472a;p=ceph.git pybind/ceph_argparse: add more type annotations Signed-off-by: Kefu Chai --- diff --git a/src/pybind/ceph_argparse.py b/src/pybind/ceph_argparse.py index 45474e3901b5a..6e1ce7b72d31d 100644 --- a/src/pybind/ceph_argparse.py +++ b/src/pybind/ceph_argparse.py @@ -23,7 +23,7 @@ import threading import uuid from collections import abc -from typing import Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union +from typing import cast, Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union if sys.version_info >= (3, 8): from typing import get_args, get_origin @@ -337,20 +337,20 @@ class CephString(CephArgtype): [c for c in printable if re.match(goodchars, c)] ) - def valid(self, s, partial=False): + def valid(self, s: str, partial: bool = False) -> None: sset = set(s) if self.goodset and not sset <= self.goodset: raise ArgumentFormat("invalid chars {0} in {1}". format(''.join(sset - self.goodset), s)) self.val = s - def __str__(self): + def __str__(self) -> str: b = '' if self.goodchars: b += '(goodchars {0})'.format(self.goodchars) return ''.format(b) - def complete(self, s): + def complete(self, s) -> List[str]: if s == '': return [] else: @@ -366,13 +366,13 @@ class CephSocketpath(CephArgtype): """ Admin socket path; check that it's readable and S_ISSOCK """ - def valid(self, s, partial=False): + def valid(self, s: str, partial: bool = False) -> None: mode = os.stat(s).st_mode if not stat.S_ISSOCK(mode): raise ArgumentValid('socket path {0} is not a socket'.format(s)) self.val = s - def __str__(self): + def __str__(self) -> str: return '' @@ -435,7 +435,7 @@ class CephEntityAddr(CephIPAddr): """ EntityAddress, that is, IP address[/nonce] """ - def valid(self, s, partial=False): + def valid(self, s: str, partial: bool = False) -> None: nonce = None if '/' in s: ip, nonce = s.split('/') @@ -455,7 +455,7 @@ class CephEntityAddr(CephIPAddr): ) self.val = s - def __str__(self): + def __str__(self) -> str: return '' @@ -463,7 +463,7 @@ class CephPoolname(CephArgtype): """ Pool name; very little utility """ - def __str__(self): + def __str__(self) -> str: return '' @@ -472,7 +472,7 @@ class CephObjectname(CephArgtype): Object name. Maybe should be combined with Pool name as they're always present in pairs, and then could be checked for presence """ - def __str__(self): + def __str__(self) -> str: return '' @@ -702,14 +702,14 @@ class CephUUID(CephArgtype): """ CephUUID: pretty self-explanatory """ - def valid(self, s, partial=False): + def valid(self, s: str, partial: bool = False) -> None: try: uuid.UUID(s) except Exception as e: raise ArgumentFormat('invalid UUID {0}: {1}'.format(s, e)) self.val = s - def __str__(self): + def __str__(self) -> str: return '' @@ -717,10 +717,10 @@ class CephPrefix(CephArgtype): """ CephPrefix: magic type for "all the first n fixed strings" """ - def __init__(self, prefix=''): + def __init__(self, prefix: str = '') -> None: self.prefix = prefix - def valid(self, s, partial=False): + def valid(self, s: str, partial: bool = False) -> None: try: s = str(s) if isinstance(s, bytes): @@ -743,10 +743,10 @@ class CephPrefix(CephArgtype): raise ArgumentPrefix("no match for {0}".format(s)) - def __str__(self): + def __str__(self) -> str: return self.prefix - def complete(self, s): + def complete(self, s) -> List[str]: if self.prefix.startswith(s): return [self.prefix.rstrip(' ')] else: @@ -775,7 +775,7 @@ class argdesc(object): valid() will later be called with input to validate against it, and will store the validated value in self.instance.val for extraction. """ - def __init__(self, t, name=None, n=1, req=True, positional=True, **kwargs): + def __init__(self, t, name=None, n=1, req=True, positional=True, **kwargs) -> None: if isinstance(t, basestring): self.t = CephPrefix self.typeargs = {'prefix': t} @@ -908,7 +908,7 @@ def descsort_key(sh): return concise_sig(sh['sig']) -def parse_funcsig(sig: Sequence[Union[str, Dict[str, str]]]) -> List[argdesc]: +def parse_funcsig(sig: Sequence[Union[str, Dict[str, Any]]]) -> List[argdesc]: """ parse a single descriptor (array of strings or dicts) into a dict of function descriptor/validators (objects of CephXXX type) @@ -1005,7 +1005,12 @@ def parse_json_funcsigs(s: str, return sigdict -def validate_one(word, desc, is_kwarg, partial=False): +ArgValT = Union[bool, int, float, str, Tuple[str, str]] + +def validate_one(word: str, + desc, + is_kwarg: bool, + partial: bool = False) -> List[ArgValT]: """ validate_one(word, desc, is_kwarg, partial=False) @@ -1030,7 +1035,9 @@ def validate_one(word, desc, is_kwarg, partial=False): return vals -def matchnum(args, signature, partial=False): +def matchnum(args: List[str], + signature: List[argdesc], + partial: bool = False) -> int: """ matchnum(s, signature, partial=False) @@ -1078,7 +1085,7 @@ ValidatedArg = Union[bool, int, float, str, ValidatedArgs = Dict[str, ValidatedArg] -def store_arg(desc: argdesc, args: List[ValidatedArg], d: ValidatedArgs): +def store_arg(desc: argdesc, args: Sequence[ValidatedArg], d: ValidatedArgs): ''' Store argument described by, and held in, thanks to valid(), desc into the dictionary d, keyed by desc.name. Three cases: @@ -1106,7 +1113,7 @@ def store_arg(desc: argdesc, args: List[ValidatedArg], d: ValidatedArgs): def validate(args: List[str], signature: Sequence[argdesc], - flags: Optional[int] = 0, + flags: int = 0, partial: Optional[bool] = False) -> ValidatedArgs: """ validate(args, signature, flags=0, partial=False) @@ -1130,11 +1137,11 @@ def validate(args: List[str], mysig = copy.deepcopy(signature) reqsiglen = len([desc for desc in mysig if desc.req]) matchcnt = 0 - d = dict() + d: ValidatedArgs = dict() save_exception = None - arg_descs_by_name = dict([desc.name, desc] for desc in mysig - if desc.t != CephPrefix) + arg_descs_by_name: Dict[str, argdesc] = \ + dict((desc.name, desc) for desc in mysig if desc.t != CephPrefix) # Special case: detect "injectargs" (legacy way of modifying daemon # configs) and permit "--" string arguments if so. @@ -1146,7 +1153,7 @@ def validate(args: List[str], while desc.numseen < desc.n: if myargs: - myarg = myargs.pop(0) + myarg: Optional[str] = myargs.pop(0) else: myarg = None @@ -1250,6 +1257,7 @@ def validate(args: List[str], break # Have an arg; validate it + assert myarg is not None try: args = validate_one(myarg, desc, False) except ArgumentError as e: @@ -1292,7 +1300,7 @@ def validate(args: List[str], def validate_command(sigdict: Dict[str, Dict[str, Any]], - args: Sequence[str], + args: List[str], verbose: Optional[bool] = False) -> ValidatedArgs: """ Parse positional arguments into a parameter dict, according to @@ -1312,19 +1320,19 @@ def validate_command(sigdict: Dict[str, Dict[str, Any]], """ if verbose: print("validate_command: " + " ".join(args), file=sys.stderr) - found = [] + found: Optional[Dict[str, Any]] = None valid_dict = {} # look for best match, accumulate possibles in bestcmds # (so we can maybe give a more-useful error message) - best_match_cnt = 0 - bestcmds = [] + best_match_cnt = 0.0 + bestcmds: List[Dict[str, Any]] = [] for cmd in sigdict.values(): flags = cmd.get('flags', 0) if flags & Flag.OBSOLETE: continue sig = cmd['sig'] - matched = matchnum(args, sig, partial=True) + matched: float = matchnum(args, sig, partial=True) if (matched >= math.floor(best_match_cnt) and matched == matchnum(args, sig, partial=False)): # prefer those fully matched over partial patch @@ -1354,7 +1362,7 @@ def validate_command(sigdict: Dict[str, Dict[str, Any]], print("bestcmds_sorted: ", file=sys.stderr) pprint.PrettyPrinter(stream=sys.stderr).pprint(bestcmds_sorted) - ex = None + ex: Optional[ArgumentError] = None # for everything in bestcmds, look for a true match for cmd in bestcmds_sorted: sig = cmd['sig'] @@ -1402,7 +1410,7 @@ def validate_command(sigdict: Dict[str, Dict[str, Any]], return valid_dict -def find_cmd_target(childargs: List[str]) -> Tuple[str, str]: +def find_cmd_target(childargs: List[str]) -> Tuple[str, Optional[str]]: """ Using a minimal validation, figure out whether the command should be sent to a monitor or an osd. We do this before even @@ -1422,6 +1430,7 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]: # if this fails, something is horribly wrong, as it just # validated successfully above name.valid(valid_dict['target']) + assert name.nametype is not None return name.nametype, name.nameid sig = parse_funcsig(['tell', {'name': 'pgid', 'type': 'CephPgid'}]) @@ -1432,7 +1441,9 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]: else: if len(valid_dict) == 2: # pg doesn't need revalidation; the string is fine - return 'pg', valid_dict['pgid'] + pgid = valid_dict['pgid'] + assert isinstance(pgid, str) + return 'pg', pgid # If we reached this far it must mean that so far we've been unable to # obtain a proper target from childargs. This may mean that we are not @@ -1456,6 +1467,8 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]: # CephName.valid() raises on validation error; find_cmd_target()'s # caller should handle them name.valid(childargs[1]) + assert name.nametype is not None + assert name.nameid is not None return name.nametype, name.nameid sig = parse_funcsig(['pg', {'name': 'pgid', 'type': 'CephPgid'}]) @@ -1465,7 +1478,9 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]: pass else: if len(valid_dict) == 2: - return 'pg', valid_dict['pgid'] + pgid = valid_dict['pgid'] + assert isinstance(pgid, str) + return 'pg', pgid return 'mon', '' @@ -1485,8 +1500,8 @@ class RadosThread(threading.Thread): self.exception = e -def run_in_thread(func: Callable[[Any, Any], int], - *args: Any, **kwargs: Any) -> int: +def run_in_thread(func: Callable[[Any, Any], Tuple[int, bytes, str]], + *args: Any, **kwargs: Any) -> Tuple[int, bytes, str]: timeout = kwargs.pop('timeout', 0) if timeout == 0 or timeout is None: # python threading module will just get blocked if timeout is `None`, @@ -1534,8 +1549,8 @@ def send_command_retry(*args: Any, **kwargs: Any) -> Tuple[int, bytes, str]: def send_command(cluster, - target: Optional[Tuple[str, str]] = ('mon', ''), - cmd: Optional[List[str]] = None, + target: Tuple[str, Optional[str]] = ('mon', ''), + cmd: Optional[str] = None, inbuf: Optional[bytes] = b'', timeout: Optional[int] = 0, verbose: Optional[bool] = False) -> Tuple[int, bytes, str]: @@ -1550,10 +1565,10 @@ def send_command(cluster, If target is osd.N, send command to that osd (except for pgid cmds) """ - cmd = cmd or [] try: if target[0] == 'osd': osdid = target[1] + assert osdid is not None if verbose: print('submit {0} to osd.{1}'.format(cmd, osdid), @@ -1634,9 +1649,9 @@ def send_command(cluster, def json_command(cluster, - target: Optional[Tuple[str, str]] = ('mon', ''), + target: Tuple[str, Optional[str]] = ('mon', ''), prefix: Optional[str] = None, - argdict: Optional[Dict[str, str]] = None, + argdict: Optional[ValidatedArgs] = None, inbuf: Optional[bytes] = b'', timeout: Optional[int] = 0, verbose: Optional[bool] = False) -> Tuple[int, bytes, str]: @@ -1651,14 +1666,14 @@ def json_command(cluster, :param prefix: String to inject into command arguments as 'prefix' :param argdict: Command arguments """ - cmddict = {} + cmddict: ValidatedArgs = {} if prefix: cmddict.update({'prefix': prefix}) if argdict: cmddict.update(argdict) if 'target' in argdict: - target = argdict.get('target') + target = cast(Tuple[str, str], argdict['target']) try: if target[0] == 'osd': @@ -1666,7 +1681,7 @@ def json_command(cluster, osdtarget = '{0}.{1}'.format(*target) # prefer target from cmddict if present and valid if 'target' in cmddict: - osdtarget = cmddict.pop('target') + osdtarget = cast(str, cmddict.pop('target')) try: osdtarg.valid(osdtarget) target = ('osd', osdtarg.nameid) diff --git a/src/pybind/ceph_daemon.py b/src/pybind/ceph_daemon.py index 1a105eb3da07a..f15af91e8ae25 100644 --- a/src/pybind/ceph_daemon.py +++ b/src/pybind/ceph_daemon.py @@ -22,7 +22,7 @@ from prettytable import PrettyTable, HEADER from signal import signal, Signals, SIGWINCH from termios import TIOCGWINSZ from types import FrameType -from typing import Any, Callable, Dict, Optional, Sequence, TextIO, Tuple +from typing import Any, Callable, Dict, List, Optional, Sequence, TextIO, Tuple from ceph_argparse import parse_json_funcsigs, validate_command @@ -32,7 +32,7 @@ READ_CHUNK_SIZE = 4096 def admin_socket(asok_path: str, - cmd: Sequence[str], + cmd: List[str], format: Optional[str] = '') -> bytes: """ Send a daemon (--admin-daemon) command 'cmd'. asok_path is the