]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/ceph_argparse: add more type annotations 42043/head
authorKefu Chai <kchai@redhat.com>
Sun, 27 Jun 2021 08:15:57 +0000 (16:15 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 30 Jun 2021 02:18:47 +0000 (10:18 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/pybind/ceph_argparse.py
src/pybind/ceph_daemon.py

index 45474e3901b5a77371d73ed3ede97ac5912c731b..6e1ce7b72d31d16522c9e0557e981304f38e58d0 100644 (file)
@@ -23,7 +23,7 @@ import threading
 import uuid
 
 from collections import abc
-from typing import Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union
+from typing import cast, Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union
 
 if sys.version_info >= (3, 8):
     from typing import get_args, get_origin
@@ -337,20 +337,20 @@ class CephString(CephArgtype):
             [c for c in printable if re.match(goodchars, c)]
         )
 
-    def valid(self, s, partial=False):
+    def valid(self, s: str, partial: bool = False) -> None:
         sset = set(s)
         if self.goodset and not sset <= self.goodset:
             raise ArgumentFormat("invalid chars {0} in {1}".
                                  format(''.join(sset - self.goodset), s))
         self.val = s
 
-    def __str__(self):
+    def __str__(self) -> str:
         b = ''
         if self.goodchars:
             b += '(goodchars {0})'.format(self.goodchars)
         return '<string{0}>'.format(b)
 
-    def complete(self, s):
+    def complete(self, s) -> List[str]:
         if s == '':
             return []
         else:
@@ -366,13 +366,13 @@ class CephSocketpath(CephArgtype):
     """
     Admin socket path; check that it's readable and S_ISSOCK
     """
-    def valid(self, s, partial=False):
+    def valid(self, s: str, partial: bool = False) -> None:
         mode = os.stat(s).st_mode
         if not stat.S_ISSOCK(mode):
             raise ArgumentValid('socket path {0} is not a socket'.format(s))
         self.val = s
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '<admin-socket-path>'
 
 
@@ -435,7 +435,7 @@ class CephEntityAddr(CephIPAddr):
     """
     EntityAddress, that is, IP address[/nonce]
     """
-    def valid(self, s, partial=False):
+    def valid(self, s: str, partial: bool = False) -> None:
         nonce = None
         if '/' in s:
             ip, nonce = s.split('/')
@@ -455,7 +455,7 @@ class CephEntityAddr(CephIPAddr):
                 )
         self.val = s
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '<EntityAddr>'
 
 
@@ -463,7 +463,7 @@ class CephPoolname(CephArgtype):
     """
     Pool name; very little utility
     """
-    def __str__(self):
+    def __str__(self) -> str:
         return '<poolname>'
 
 
@@ -472,7 +472,7 @@ class CephObjectname(CephArgtype):
     Object name.  Maybe should be combined with Pool name as they're always
     present in pairs, and then could be checked for presence
     """
-    def __str__(self):
+    def __str__(self) -> str:
         return '<objectname>'
 
 
@@ -702,14 +702,14 @@ class CephUUID(CephArgtype):
     """
     CephUUID: pretty self-explanatory
     """
-    def valid(self, s, partial=False):
+    def valid(self, s: str, partial: bool = False) -> None:
         try:
             uuid.UUID(s)
         except Exception as e:
             raise ArgumentFormat('invalid UUID {0}: {1}'.format(s, e))
         self.val = s
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '<uuid>'
 
 
@@ -717,10 +717,10 @@ class CephPrefix(CephArgtype):
     """
     CephPrefix: magic type for "all the first n fixed strings"
     """
-    def __init__(self, prefix=''):
+    def __init__(self, prefix: str = '') -> None:
         self.prefix = prefix
 
-    def valid(self, s, partial=False):
+    def valid(self, s: str, partial: bool = False) -> None:
         try:
             s = str(s)
             if isinstance(s, bytes):
@@ -743,10 +743,10 @@ class CephPrefix(CephArgtype):
 
         raise ArgumentPrefix("no match for {0}".format(s))
 
-    def __str__(self):
+    def __str__(self) -> str:
         return self.prefix
 
-    def complete(self, s):
+    def complete(self, s) -> List[str]:
         if self.prefix.startswith(s):
             return [self.prefix.rstrip(' ')]
         else:
@@ -775,7 +775,7 @@ class argdesc(object):
     valid() will later be called with input to validate against it,
     and will store the validated value in self.instance.val for extraction.
     """
-    def __init__(self, t, name=None, n=1, req=True, positional=True, **kwargs):
+    def __init__(self, t, name=None, n=1, req=True, positional=True, **kwargs) -> None:
         if isinstance(t, basestring):
             self.t = CephPrefix
             self.typeargs = {'prefix': t}
@@ -908,7 +908,7 @@ def descsort_key(sh):
     return concise_sig(sh['sig'])
 
 
-def parse_funcsig(sig: Sequence[Union[str, Dict[str, str]]]) -> List[argdesc]:
+def parse_funcsig(sig: Sequence[Union[str, Dict[str, Any]]]) -> List[argdesc]:
     """
     parse a single descriptor (array of strings or dicts) into a
     dict of function descriptor/validators (objects of CephXXX type)
@@ -1005,7 +1005,12 @@ def parse_json_funcsigs(s: str,
     return sigdict
 
 
-def validate_one(word, desc, is_kwarg, partial=False):
+ArgValT = Union[bool, int, float, str, Tuple[str, str]]
+
+def validate_one(word: str,
+                 desc,
+                 is_kwarg: bool,
+                 partial: bool = False) -> List[ArgValT]:
     """
     validate_one(word, desc, is_kwarg, partial=False)
 
@@ -1030,7 +1035,9 @@ def validate_one(word, desc, is_kwarg, partial=False):
     return vals
 
 
-def matchnum(args, signature, partial=False):
+def matchnum(args: List[str],
+             signature: List[argdesc],
+             partial: bool = False) -> int:
     """
     matchnum(s, signature, partial=False)
 
@@ -1078,7 +1085,7 @@ ValidatedArg = Union[bool, int, float, str,
 ValidatedArgs = Dict[str, ValidatedArg]
 
 
-def store_arg(desc: argdesc, args: List[ValidatedArg], d: ValidatedArgs):
+def store_arg(desc: argdesc, args: Sequence[ValidatedArg], d: ValidatedArgs):
     '''
     Store argument described by, and held in, thanks to valid(),
     desc into the dictionary d, keyed by desc.name.  Three cases:
@@ -1106,7 +1113,7 @@ def store_arg(desc: argdesc, args: List[ValidatedArg], d: ValidatedArgs):
 
 def validate(args: List[str],
              signature: Sequence[argdesc],
-             flags: Optional[int] = 0,
+             flags: int = 0,
              partial: Optional[bool] = False) -> ValidatedArgs:
     """
     validate(args, signature, flags=0, partial=False)
@@ -1130,11 +1137,11 @@ def validate(args: List[str],
     mysig = copy.deepcopy(signature)
     reqsiglen = len([desc for desc in mysig if desc.req])
     matchcnt = 0
-    d = dict()
+    d: ValidatedArgs = dict()
     save_exception = None
 
-    arg_descs_by_name = dict([desc.name, desc] for desc in mysig
-                             if desc.t != CephPrefix)
+    arg_descs_by_name: Dict[str, argdesc] = \
+        dict((desc.name, desc) for desc in mysig if desc.t != CephPrefix)
 
     # Special case: detect "injectargs" (legacy way of modifying daemon
     # configs) and permit "--" string arguments if so.
@@ -1146,7 +1153,7 @@ def validate(args: List[str],
 
         while desc.numseen < desc.n:
             if myargs:
-                myarg = myargs.pop(0)
+                myarg: Optional[str] = myargs.pop(0)
             else:
                 myarg = None
 
@@ -1250,6 +1257,7 @@ def validate(args: List[str],
                 break
 
             # Have an arg; validate it
+            assert myarg is not None
             try:
                 args = validate_one(myarg, desc, False)
             except ArgumentError as e:
@@ -1292,7 +1300,7 @@ def validate(args: List[str],
 
 
 def validate_command(sigdict: Dict[str, Dict[str, Any]],
-                     args: Sequence[str],
+                     args: List[str],
                      verbose: Optional[bool] = False) -> ValidatedArgs:
     """
     Parse positional arguments into a parameter dict, according to
@@ -1312,19 +1320,19 @@ def validate_command(sigdict: Dict[str, Dict[str, Any]],
     """
     if verbose:
         print("validate_command: " + " ".join(args), file=sys.stderr)
-    found = []
+    found: Optional[Dict[str, Any]] = None
     valid_dict = {}
 
     # look for best match, accumulate possibles in bestcmds
     # (so we can maybe give a more-useful error message)
-    best_match_cnt = 0
-    bestcmds = []
+    best_match_cnt = 0.0
+    bestcmds: List[Dict[str, Any]] = []
     for cmd in sigdict.values():
         flags = cmd.get('flags', 0)
         if flags & Flag.OBSOLETE:
             continue
         sig = cmd['sig']
-        matched = matchnum(args, sig, partial=True)
+        matched: float = matchnum(args, sig, partial=True)
         if (matched >= math.floor(best_match_cnt) and
             matched == matchnum(args, sig, partial=False)):
             # prefer those fully matched over partial patch
@@ -1354,7 +1362,7 @@ def validate_command(sigdict: Dict[str, Dict[str, Any]],
         print("bestcmds_sorted: ", file=sys.stderr)
         pprint.PrettyPrinter(stream=sys.stderr).pprint(bestcmds_sorted)
 
-    ex = None
+    ex: Optional[ArgumentError] = None
     # for everything in bestcmds, look for a true match
     for cmd in bestcmds_sorted:
         sig = cmd['sig']
@@ -1402,7 +1410,7 @@ def validate_command(sigdict: Dict[str, Dict[str, Any]],
     return valid_dict
 
 
-def find_cmd_target(childargs: List[str]) -> Tuple[str, str]:
+def find_cmd_target(childargs: List[str]) -> Tuple[str, Optional[str]]:
     """
     Using a minimal validation, figure out whether the command
     should be sent to a monitor or an osd.  We do this before even
@@ -1422,6 +1430,7 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]:
             # if this fails, something is horribly wrong, as it just
             # validated successfully above
             name.valid(valid_dict['target'])
+            assert name.nametype is not None
             return name.nametype, name.nameid
 
     sig = parse_funcsig(['tell', {'name': 'pgid', 'type': 'CephPgid'}])
@@ -1432,7 +1441,9 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]:
     else:
         if len(valid_dict) == 2:
             # pg doesn't need revalidation; the string is fine
-            return 'pg', valid_dict['pgid']
+            pgid = valid_dict['pgid']
+            assert isinstance(pgid, str)
+            return 'pg', pgid
 
     # If we reached this far it must mean that so far we've been unable to
     # obtain a proper target from childargs.  This may mean that we are not
@@ -1456,6 +1467,8 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]:
         # CephName.valid() raises on validation error; find_cmd_target()'s
         # caller should handle them
         name.valid(childargs[1])
+        assert name.nametype is not None
+        assert name.nameid is not None
         return name.nametype, name.nameid
 
     sig = parse_funcsig(['pg', {'name': 'pgid', 'type': 'CephPgid'}])
@@ -1465,7 +1478,9 @@ def find_cmd_target(childargs: List[str]) -> Tuple[str, str]:
         pass
     else:
         if len(valid_dict) == 2:
-            return 'pg', valid_dict['pgid']
+            pgid = valid_dict['pgid']
+            assert isinstance(pgid, str)
+            return 'pg', pgid
 
     return 'mon', ''
 
@@ -1485,8 +1500,8 @@ class RadosThread(threading.Thread):
             self.exception = e
 
 
-def run_in_thread(func: Callable[[Any, Any], int],
-                  *args: Any, **kwargs: Any) -> int:
+def run_in_thread(func: Callable[[Any, Any], Tuple[int, bytes, str]],
+                  *args: Any, **kwargs: Any) -> Tuple[int, bytes, str]:
     timeout = kwargs.pop('timeout', 0)
     if timeout == 0 or timeout is None:
         # python threading module will just get blocked if timeout is `None`,
@@ -1534,8 +1549,8 @@ def send_command_retry(*args: Any, **kwargs: Any) -> Tuple[int, bytes, str]:
 
 
 def send_command(cluster,
-                 target: Optional[Tuple[str, str]] = ('mon', ''),
-                 cmd: Optional[List[str]] = None,
+                 target: Tuple[str, Optional[str]] = ('mon', ''),
+                 cmd: Optional[str] = None,
                  inbuf: Optional[bytes] = b'',
                  timeout: Optional[int] = 0,
                  verbose: Optional[bool] = False) -> Tuple[int, bytes, str]:
@@ -1550,10 +1565,10 @@ def send_command(cluster,
 
     If target is osd.N, send command to that osd (except for pgid cmds)
     """
-    cmd = cmd or []
     try:
         if target[0] == 'osd':
             osdid = target[1]
+            assert osdid is not None
 
             if verbose:
                 print('submit {0} to osd.{1}'.format(cmd, osdid),
@@ -1634,9 +1649,9 @@ def send_command(cluster,
 
 
 def json_command(cluster,
-                 target: Optional[Tuple[str, str]] = ('mon', ''),
+                 target: Tuple[str, Optional[str]] = ('mon', ''),
                  prefix: Optional[str] = None,
-                 argdict: Optional[Dict[str, str]] = None,
+                 argdict: Optional[ValidatedArgs] = None,
                  inbuf: Optional[bytes] = b'',
                  timeout: Optional[int] = 0,
                  verbose: Optional[bool] = False) -> Tuple[int, bytes, str]:
@@ -1651,14 +1666,14 @@ def json_command(cluster,
     :param prefix: String to inject into command arguments as 'prefix'
     :param argdict: Command arguments
     """
-    cmddict = {}
+    cmddict: ValidatedArgs = {}
     if prefix:
         cmddict.update({'prefix': prefix})
 
     if argdict:
         cmddict.update(argdict)
         if 'target' in argdict:
-            target = argdict.get('target')
+            target = cast(Tuple[str, str], argdict['target'])
 
     try:
         if target[0] == 'osd':
@@ -1666,7 +1681,7 @@ def json_command(cluster,
             osdtarget = '{0}.{1}'.format(*target)
             # prefer target from cmddict if present and valid
             if 'target' in cmddict:
-                osdtarget = cmddict.pop('target')
+                osdtarget = cast(str, cmddict.pop('target'))
             try:
                 osdtarg.valid(osdtarget)
                 target = ('osd', osdtarg.nameid)
index 1a105eb3da07ae31a7b51208d40f3e48d8a08226..f15af91e8ae2544401000fc3e25f8622d67ffe2c 100644 (file)
@@ -22,7 +22,7 @@ from prettytable import PrettyTable, HEADER
 from signal import signal, Signals, SIGWINCH
 from termios import TIOCGWINSZ
 from types import FrameType
-from typing import Any, Callable, Dict, Optional, Sequence, TextIO, Tuple
+from typing import Any, Callable, Dict, List, Optional, Sequence, TextIO, Tuple
 
 from ceph_argparse import parse_json_funcsigs, validate_command
 
@@ -32,7 +32,7 @@ READ_CHUNK_SIZE = 4096
 
 
 def admin_socket(asok_path: str,
-                 cmd: Sequence[str],
+                 cmd: List[str],
                  format: Optional[str] = '') -> bytes:
     """
     Send a daemon (--admin-daemon) command 'cmd'.  asok_path is the