class OSDMap(ceph_module.BasePyOSDMap):
- def get_epoch(self):
+ def get_epoch(self) -> int:
return self._get_epoch()
- def get_crush_version(self):
+ def get_crush_version(self) -> int:
return self._get_crush_version()
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
return self._dump()
- def get_pools(self):
+ def get_pools(self) -> Dict[int, Dict[str, Any]]:
# FIXME: efficient implementation
d = self._dump()
return dict([(p['pool'], p) for p in d['pools']])
- def get_pools_by_name(self):
+ def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
# FIXME: efficient implementation
d = self._dump()
return dict([(p['pool_name'], p) for p in d['pools']])
- def new_incremental(self):
+ def new_incremental(self) -> 'OSDMapIncremental':
return self._new_incremental()
- def apply_incremental(self, inc):
+ def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
return self._apply_incremental(inc)
- def get_crush(self):
+ def get_crush(self) -> 'CRUSHMap':
return self._get_crush()
- def get_pools_by_take(self, take):
+ def get_pools_by_take(self, take: int) -> List[int]:
return self._get_pools_by_take(take).get('pools', [])
- def calc_pg_upmaps(self, inc,
- max_deviation=.01, max_iterations=10, pools=None):
+ def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
+ max_deviation: int,
+ max_iterations: int = 10,
+ pools: Optional[List[str]] = None) -> int:
if pools is None:
pools = []
return self._calc_pg_upmaps(
inc,
max_deviation, max_iterations, pools)
- def map_pool_pgs_up(self, poolid):
+ def map_pool_pgs_up(self, poolid: int) -> List[int]:
return self._map_pool_pgs_up(poolid)
- def pg_to_up_acting_osds(self, pool_id, ps):
+ def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
return self._pg_to_up_acting_osds(pool_id, ps)
- def pool_raw_used_rate(self, pool_id):
+ def pool_raw_used_rate(self, pool_id: int) -> float:
return self._pool_raw_used_rate(pool_id)
- def get_ec_profile(self, name):
+ def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
# FIXME: efficient implementation
d = self._dump()
return d['erasure_code_profiles'].get(name, None)
- def get_require_osd_release(self):
+ def get_require_osd_release(self) -> str:
d = self._dump()
return d['require_osd_release']
class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
- def get_epoch(self):
+ def get_epoch(self) -> int:
return self._get_epoch()
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
return self._dump()
- def set_osd_reweights(self, weightmap):
+ def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
"""
weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
"""
return self._set_osd_reweights(weightmap)
- def set_crush_compat_weight_set_weights(self, weightmap):
+ def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
"""
weightmap is a dict, int to float. devices only. e.g.,
{ 0: 3.4, 1: 3.3, 2: 3.334 }
ITEM_NONE = 0x7fffffff
DEFAULT_CHOOSE_ARGS = '-1'
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
return self._dump()
- def get_item_weight(self, item):
+ def get_item_weight(self, item: int) -> Optional[int]:
return self._get_item_weight(item)
- def get_item_name(self, item):
+ def get_item_name(self, item: int) -> Optional[str]:
return self._get_item_name(item)
- def find_takes(self):
+ def find_takes(self) -> List[int]:
return self._find_takes().get('takes', [])
- def get_take_weight_osd_map(self, root):
+ def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
uglymap = self._get_take_weight_osd_map(root)
return {int(k): v for k, v in uglymap.get('weights', {}).items()}
@staticmethod
- def have_default_choose_args(dump):
+ def have_default_choose_args(dump: Dict[str, Any]) -> bool:
return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
@staticmethod
- def get_default_choose_args(dump):
- return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
+ def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
+ choose_args = dump.get('choose_args')
+ assert isinstance(choose_args, dict)
+ return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
- def get_rule(self, rule_name):
+ def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
# TODO efficient implementation
for rule in self.dump()['rules']:
if rule_name == rule['rule_name']:
return None
- def get_rule_by_id(self, rule_id):
+ def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
for rule in self.dump()['rules']:
if rule['rule_id'] == rule_id:
return rule
return None
- def get_rule_root(self, rule_name):
+ def get_rule_root(self, rule_name: str) -> Optional[int]:
rule = self.get_rule(rule_name)
if rule is None:
return None
else:
return first_take['item']
- def get_osds_under(self, root_id):
+ def get_osds_under(self, root_id: int) -> List[int]:
# TODO don't abuse dump like this
d = self.dump()
buckets = dict([(b['id'], b) for b in d['buckets']])
osd_list = []
- def accumulate(b):
+ def accumulate(b: Dict[str, Any]) -> None:
for item in b['items']:
if item['id'] >= 0:
osd_list.append(item['id'])
return osd_list
- def device_class_counts(self):
+ def device_class_counts(self) -> Dict[str, int]:
result = defaultdict(int) # type: Dict[str, int]
# TODO don't abuse dump like this
d = self.dump()