]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: annotate OSDMap and friends
authorKefu Chai <kchai@redhat.com>
Tue, 19 Jan 2021 06:53:11 +0000 (14:53 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 26 Jan 2021 10:02:24 +0000 (18:02 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/pybind/mgr/mgr_module.py

index 5a87e528de03c2c7e761465c64434d06e4d0cc36..a69b350e3f0b3b071331574d57d9ca2d981c32be 100644 (file)
@@ -124,78 +124,80 @@ class MonCommandFailed(RuntimeError): pass
 
 
 class OSDMap(ceph_module.BasePyOSDMap):
-    def get_epoch(self):
+    def get_epoch(self) -> int:
         return self._get_epoch()
 
-    def get_crush_version(self):
+    def get_crush_version(self) -> int:
         return self._get_crush_version()
 
-    def dump(self):
+    def dump(self) -> Dict[str, Any]:
         return self._dump()
 
-    def get_pools(self):
+    def get_pools(self) -> Dict[int, Dict[str, Any]]:
         # FIXME: efficient implementation
         d = self._dump()
         return dict([(p['pool'], p) for p in d['pools']])
 
-    def get_pools_by_name(self):
+    def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
         # FIXME: efficient implementation
         d = self._dump()
         return dict([(p['pool_name'], p) for p in d['pools']])
 
-    def new_incremental(self):
+    def new_incremental(self) -> 'OSDMapIncremental':
         return self._new_incremental()
 
-    def apply_incremental(self, inc):
+    def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
         return self._apply_incremental(inc)
 
-    def get_crush(self):
+    def get_crush(self) -> 'CRUSHMap':
         return self._get_crush()
 
-    def get_pools_by_take(self, take):
+    def get_pools_by_take(self, take: int) -> List[int]:
         return self._get_pools_by_take(take).get('pools', [])
 
-    def calc_pg_upmaps(self, inc,
-                       max_deviation=.01, max_iterations=10, pools=None):
+    def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
+                       max_deviation: int,
+                       max_iterations: int = 10,
+                       pools: Optional[List[str]] = None) -> int:
         if pools is None:
             pools = []
         return self._calc_pg_upmaps(
             inc,
             max_deviation, max_iterations, pools)
 
-    def map_pool_pgs_up(self, poolid):
+    def map_pool_pgs_up(self, poolid: int) -> List[int]:
         return self._map_pool_pgs_up(poolid)
 
-    def pg_to_up_acting_osds(self, pool_id, ps):
+    def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
         return self._pg_to_up_acting_osds(pool_id, ps)
 
-    def pool_raw_used_rate(self, pool_id):
+    def pool_raw_used_rate(self, pool_id: int) -> float:
         return self._pool_raw_used_rate(pool_id)
 
-    def get_ec_profile(self, name):
+    def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
         # FIXME: efficient implementation
         d = self._dump()
         return d['erasure_code_profiles'].get(name, None)
 
-    def get_require_osd_release(self):
+    def get_require_osd_release(self) -> str:
         d = self._dump()
         return d['require_osd_release']
 
 
 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
-    def get_epoch(self):
+    def get_epoch(self) -> int:
         return self._get_epoch()
 
-    def dump(self):
+    def dump(self) -> Dict[str, Any]:
         return self._dump()
 
-    def set_osd_reweights(self, weightmap):
+    def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
         """
         weightmap is a dict, int to float.  e.g. { 0: .9, 1: 1.0, 3: .997 }
         """
         return self._set_osd_reweights(weightmap)
 
-    def set_crush_compat_weight_set_weights(self, weightmap):
+    def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
         """
         weightmap is a dict, int to float.  devices only.  e.g.,
         { 0: 3.4, 1: 3.3, 2: 3.334 }
@@ -207,31 +209,33 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
     ITEM_NONE = 0x7fffffff
     DEFAULT_CHOOSE_ARGS = '-1'
 
-    def dump(self):
+    def dump(self) -> Dict[str, Any]:
         return self._dump()
 
-    def get_item_weight(self, item):
+    def get_item_weight(self, item: int) -> Optional[int]:
         return self._get_item_weight(item)
 
-    def get_item_name(self, item):
+    def get_item_name(self, item: int) -> Optional[str]:
         return self._get_item_name(item)
 
-    def find_takes(self):
+    def find_takes(self) -> List[int]:
         return self._find_takes().get('takes', [])
 
-    def get_take_weight_osd_map(self, root):
+    def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
         uglymap = self._get_take_weight_osd_map(root)
         return {int(k): v for k, v in uglymap.get('weights', {}).items()}
 
     @staticmethod
-    def have_default_choose_args(dump):
+    def have_default_choose_args(dump: Dict[str, Any]) -> bool:
         return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
 
     @staticmethod
-    def get_default_choose_args(dump):
-        return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
+    def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
+        choose_args = dump.get('choose_args')
+        assert isinstance(choose_args, dict)
+        return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
 
-    def get_rule(self, rule_name):
+    def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
         # TODO efficient implementation
         for rule in self.dump()['rules']:
             if rule_name == rule['rule_name']:
@@ -239,14 +243,14 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
 
         return None
 
-    def get_rule_by_id(self, rule_id):
+    def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
         for rule in self.dump()['rules']:
             if rule['rule_id'] == rule_id:
                 return rule
 
         return None
 
-    def get_rule_root(self, rule_name):
+    def get_rule_root(self, rule_name: str) -> Optional[int]:
         rule = self.get_rule(rule_name)
         if rule is None:
             return None
@@ -260,14 +264,14 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
         else:
             return first_take['item']
 
-    def get_osds_under(self, root_id):
+    def get_osds_under(self, root_id: int) -> List[int]:
         # TODO don't abuse dump like this
         d = self.dump()
         buckets = dict([(b['id'], b) for b in d['buckets']])
 
         osd_list = []
 
-        def accumulate(b):
+        def accumulate(b: Dict[str, Any]) -> None:
             for item in b['items']:
                 if item['id'] >= 0:
                     osd_list.append(item['id'])
@@ -281,7 +285,7 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
 
         return osd_list
 
-    def device_class_counts(self):
+    def device_class_counts(self) -> Dict[str, int]:
         result = defaultdict(int)  # type: Dict[str, int]
         # TODO don't abuse dump like this
         d = self.dump()