From 376939d8d8f1c462bc715b979344d39bbbd58490 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Thu, 4 Feb 2021 15:22:47 +0800 Subject: [PATCH] pybind/mgr/balancer: add type annotations Signed-off-by: Kefu Chai --- src/pybind/mgr/balancer/module.py | 199 +++++++++++++++--------------- src/pybind/mgr/tox.ini | 1 + 2 files changed, 101 insertions(+), 99 deletions(-) diff --git a/src/pybind/mgr/balancer/module.py b/src/pybind/mgr/balancer/module.py index 70c6693f11ed7..cb062220645ec 100644 --- a/src/pybind/mgr/balancer/module.py +++ b/src/pybind/mgr/balancer/module.py @@ -9,9 +9,9 @@ import json import math import random import time -from mgr_module import CLIReadCommand, CLICommand, MgrModule, CommandResult +from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, OSDMap from threading import Event -from typing import Sequence, Tuple, Optional +from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union from mgr_module import CRUSHMap import datetime @@ -72,11 +72,11 @@ class MsPlan(Plan): """ Plan with a preloaded MappingState member. """ - def __init__(self, name, mode, ms, pools): + def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None: super(MsPlan, self).__init__(name, mode, ms.osdmap, pools) self.initial = ms - def final_state(self): + def final_state(self) -> MappingState: self.inc.set_osd_reweights(self.osd_weights) self.inc.set_crush_compat_weight_set_weights(self.compat_ws) return MappingState(self.initial.osdmap.apply_incremental(self.inc), @@ -84,10 +84,10 @@ class MsPlan(Plan): self.initial.raw_pool_stats, 'plan %s final' % self.name) - def dump(self): + def dump(self) -> str: return json.dumps(self.inc.dump(), indent=4, sort_keys=True) - def show(self): + def show(self) -> str: ls = [] ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch()) ls.append('# starting crush version %d' % @@ -114,29 +114,29 @@ class MsPlan(Plan): class Eval: - def __init__(self, ms): + def __init__(self, ms: MappingState): self.ms = ms - self.root_ids = {} # root name -> id - self.pool_name = {} # pool id -> pool name - self.pool_id = {} # pool name -> id - self.pool_roots = {} # pool name -> root name - self.root_pools = {} # root name -> pools - self.target_by_root = {} # root name -> target weight map - self.count_by_pool = {} - self.count_by_root = {} - self.actual_by_pool = {} # pool -> by_* -> actual weight map - self.actual_by_root = {} # pool -> by_* -> actual weight map - self.total_by_pool = {} # pool -> by_* -> total - self.total_by_root = {} # root -> by_* -> total - self.stats_by_pool = {} # pool -> by_* -> stddev or avg -> value - self.stats_by_root = {} # root -> by_* -> stddev or avg -> value - - self.score_by_pool = {} - self.score_by_root = {} + self.root_ids: Dict[str, int] = {} # root name -> id + self.pool_name: Dict[str, str] = {} # pool id -> pool name + self.pool_id: Dict[str, int] = {} # pool name -> id + self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name + self.root_pools: Dict[str, List[str]] = {} # root name -> pools + self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map + self.count_by_pool: Dict[str, dict] = {} + self.count_by_root: Dict[str, dict] = {} + self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map + self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map + self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total + self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total + self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value + self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value + + self.score_by_pool: Dict[str, float] = {} + self.score_by_root: Dict[str, Dict[str, float]] = {} self.score = 0.0 - def show(self, verbose=False): + def show(self, verbose: bool = False) -> str: if verbose: r = self.ms.desc + '\n' r += 'target_by_root %s\n' % self.target_by_root @@ -156,7 +156,7 @@ class Eval: def calc_stats(self, count, target, total): num = max(len(target), 1) - r = {} + r: Dict[str, Dict[str, Union[int, float]]] = {} for t in ('pgs', 'objects', 'bytes'): if total[t] == 0: r[t] = { @@ -339,7 +339,7 @@ class Module(MgrModule): active = False run = True - plans = {} + plans: Dict[str, Plan] = {} mode = '' optimizing = False last_optimize_started = '' @@ -348,7 +348,7 @@ class Module(MgrModule): success_string = 'Optimization plan created successfully' in_progress_string = 'in progress' - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(Module, self).__init__(*args, **kwargs) self.event = Event() @@ -419,14 +419,13 @@ class Module(MgrModule): Note that empty list means all existing pools will be automatic balancing targets, which is the default behaviour of balancer. """ - pool_ids = self.get_module_option('pool_ids') + pool_ids = cast(str, self.get_module_option('pool_ids')) if pool_ids == '': return (0, '', '') - pool_ids = pool_ids.split(',') - pool_ids = [int(p) for p in pool_ids] + pool_ids = [int(p) for p in pool_ids.split(',')] pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', [])) should_prune = False - final_ids = [] + final_ids: List[int] = [] final_names = [] for p in pool_ids: if p in pool_name_by_id: @@ -435,7 +434,7 @@ class Module(MgrModule): else: should_prune = True if should_prune: # some pools were gone, prune - self.set_module_option('pool_ids', ','.join(final_ids)) + self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids)) return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '') @CLICommand('balancer pool add') @@ -449,12 +448,10 @@ class Module(MgrModule): invalid_names = [p for p in raw_names if p not in pool_id_by_name] if invalid_names: return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names) - to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name] - existing = self.get_module_option('pool_ids') - final = to_add - if existing != '': - existing = existing.split(',') - final = set(to_add) | set(existing) + to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name) + pool_ids = cast(str, self.get_module_option('pool_ids')) + existing = set(pool_ids.split(',') if pool_ids else []) + final = to_add | existing self.set_module_option('pool_ids', ','.join(final)) return (0, '', '') @@ -464,7 +461,7 @@ class Module(MgrModule): Disable automatic balancing for specific pools """ raw_names = pools - existing = self.get_module_option('pool_ids') + existing = cast(str, self.get_module_option('pool_ids')) if existing == '': # for idempotence return (0, '', '') existing = existing.split(',') @@ -500,7 +497,7 @@ class Module(MgrModule): self.get("pool_stats"), f'plan "{plan.name}"') else: - ms = plan.final_state() + ms = cast(MsPlan, plan).final_state() else: # not a plan, does it look like a pool? osdmap = self.get_osdmap() @@ -631,19 +628,19 @@ class Module(MgrModule): self.plan_rm(plan) return (r, '', detail) - def shutdown(self): + def shutdown(self) -> None: self.log.info('Stopping') self.run = False self.event.set() - def time_permit(self): + def time_permit(self) -> bool: local_time = time.localtime() time_of_day = time.strftime('%H%M', local_time) weekday = (local_time.tm_wday + 1) % 7 # be compatible with C permit = False - begin_time = self.get_module_option('begin_time') - end_time = self.get_module_option('end_time') + begin_time = cast(str, self.get_module_option('begin_time')) + end_time = cast(str, self.get_module_option('end_time')) if begin_time <= end_time: permit = begin_time <= time_of_day < end_time else: @@ -653,8 +650,8 @@ class Module(MgrModule): begin_time, end_time, time_of_day) return False - begin_weekday = self.get_module_option('begin_weekday') - end_weekday = self.get_module_option('end_weekday') + begin_weekday = cast(int, self.get_module_option('begin_weekday')) + end_weekday = cast(int, self.get_module_option('end_weekday')) if begin_weekday <= end_weekday: permit = begin_weekday <= weekday < end_weekday else: @@ -666,11 +663,11 @@ class Module(MgrModule): return True - def serve(self): + def serve(self) -> None: self.log.info('Starting') while self.run: - self.active = self.get_module_option('active') - sleep_interval = self.get_module_option('sleep_interval') + self.active = cast(bool, self.get_module_option('active')) + sleep_interval = cast(float, self.get_module_option('sleep_interval')) self.log.debug('Waking up [%s, now %s]', "active" if self.active else "inactive", time.strftime(TIME_FORMAT, time.localtime())) @@ -678,17 +675,20 @@ class Module(MgrModule): self.log.debug('Running') name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime()) osdmap = self.get_osdmap() - allow = self.get_module_option('pool_ids') - final = [] - if allow != '': - allow = allow.split(',') - valid = [str(p['pool']) for p in osdmap.dump().get('pools', [])] - final = set(allow) & set(valid) + pool_ids = cast(str, self.get_module_option('pool_ids')) + if pool_ids: + allow = [int(p) for p in pool_ids.split(',')] + else: + allow = [] + final: List[str] = [] + if allow: + pools = osdmap.dump().get('pools', []) + valid = [p['pool'] for p in pools] + ids = set(allow) & set(valid) if set(allow) - set(valid): # some pools were gone, prune - self.set_module_option('pool_ids', ','.join(final)) - pool_name_by_id = dict((p['pool'], p['pool_name']) for p in osdmap.dump().get('pools', [])) - final = [int(p) for p in final] - final = [pool_name_by_id[p] for p in final if p in pool_name_by_id] + self.set_module_option('pool_ids', ','.join(str(p) for p in ids)) + pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools) + final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id] plan = self.plan_create(name, osdmap, final) self.optimizing = True self.last_optimize_started = time.asctime(time.localtime()) @@ -707,8 +707,8 @@ class Module(MgrModule): self.event.wait(sleep_interval) self.event.clear() - def plan_create(self, name, osdmap, pools): - mode = self.get_module_option('mode') + def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan: + mode = cast(str, self.get_module_option('mode')) if mode == 'upmap': # drop unnecessary MS member for upmap mode. # this way we could effectively eliminate the usage of a @@ -725,7 +725,7 @@ class Module(MgrModule): pools) return plan - def calc_eval(self, ms, pools): + def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval: pe = Eval(ms) pool_rule = {} pool_info = {} @@ -751,7 +751,7 @@ class Module(MgrModule): for a in ms.osdmap_dump.get('osds',[]) if a['weight'] > 0 } # get expected distributions by root - actual_by_root = {} + actual_by_root: Dict[str, Dict[str, dict]] = {} rootids = ms.crush.find_takes() roots = [] for rootid in rootids: @@ -920,7 +920,7 @@ class Module(MgrModule): self.log.debug('score_by_root %s' % pe.score_by_root) # get the list of score metrics, comma separated - metrics = self.get_module_option('crush_compat_metrics').split(',') + metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',') # total score is just average of normalized stddevs pe.score = 0.0 @@ -931,13 +931,13 @@ class Module(MgrModule): pe.score /= len(metrics) * len(roots) return pe - def evaluate(self, ms, pools, verbose=False): + def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str: pe = self.calc_eval(ms, pools) return pe.show(verbose=verbose) - def optimize(self, plan): + def optimize(self, plan: Plan) -> Tuple[int, str]: self.log.info('Optimize plan %s' % plan.name) - max_misplaced = self.get_ceph_option('target_max_misplaced_ratio') + max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio')) self.log.info('Mode %s, max misplaced %f' % (plan.mode, max_misplaced)) @@ -970,7 +970,7 @@ class Module(MgrModule): if plan.mode == 'upmap': return self.do_upmap(plan) elif plan.mode == 'crush-compat': - return self.do_crush_compat(plan) + return self.do_crush_compat(cast(MsPlan, plan)) elif plan.mode == 'none': detail = 'Please do "ceph balancer mode" to choose a valid mode first' self.log.info('Idle') @@ -980,10 +980,10 @@ class Module(MgrModule): self.log.info(detail) return -errno.EINVAL, detail - def do_upmap(self, plan): + def do_upmap(self, plan: Plan) -> Tuple[int, str]: self.log.info('do_upmap') - max_optimizations = self.get_module_option('upmap_max_optimizations') - max_deviation = self.get_module_option('upmap_max_deviation') + max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations')) + max_deviation = cast(int, self.get_module_option('upmap_max_deviation')) osdmap_dump = plan.osdmap_dump if len(plan.pools): @@ -1046,22 +1046,22 @@ class Module(MgrModule): 'or distribution is already perfect' return 0, '' - def do_crush_compat(self, plan): + def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]: self.log.info('do_crush_compat') - max_iterations = self.get_module_option('crush_compat_max_iterations') + max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations')) if max_iterations < 1: return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1' - step = self.get_module_option('crush_compat_step') + step = cast(float, self.get_module_option('crush_compat_step')) if step <= 0 or step >= 1.0: return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)' - max_misplaced = self.get_ceph_option('target_max_misplaced_ratio') + max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio')) min_pg_per_osd = 2 ms = plan.initial osdmap = ms.osdmap crush = osdmap.get_crush() pe = self.calc_eval(ms, plan.pools) - min_score_to_optimize = self.get_module_option('min_score') + min_score_to_optimize = cast(float, self.get_module_option('min_score')) if pe.score <= min_score_to_optimize: if pe.score == 0: detail = 'Distribution is already perfect' @@ -1087,9 +1087,9 @@ class Module(MgrModule): # can't proceed. roots = list(pe.target_by_root.keys()) self.log.debug('roots %s', roots) - visited = {} - overlap = {} - root_ids = {} + visited: Dict[int, str] = {} + overlap: Dict[int, List[str]] = {} + root_ids: Dict[str, int] = {} for root, wm in pe.target_by_root.items(): for osd in wm: if osd in visited: @@ -1104,7 +1104,7 @@ class Module(MgrModule): return -errno.EOPNOTSUPP, detail # rebalance by pgs, objects, or bytes - metrics = self.get_module_option('crush_compat_metrics').split(',') + metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',') key = metrics[0] # balancing using the first score metric if key not in ['pgs', 'bytes', 'objects']: self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key) @@ -1221,7 +1221,7 @@ class Module(MgrModule): left -= 1 # allow a small regression if we are phasing out osd weights - fudge = 0 + fudge = 0.0 if best_ow != orig_osd_weight: fudge = .001 @@ -1240,7 +1240,7 @@ class Module(MgrModule): return -errno.EDOM, 'Unable to find further optimization, ' \ 'change balancer mode and retry might help' - def get_compat_weight_set_weights(self, ms): + def get_compat_weight_set_weights(self, ms: MappingState): if not CRUSHMap.have_default_choose_args(ms.crush_dump): # enable compat weight-set first self.log.debug('ceph osd crush weight-set create-compat') @@ -1290,30 +1290,31 @@ class Module(MgrModule): self.log.debug('weight_set weights %s' % weight_set) return weight_set - def do_crush(self): + def do_crush(self) -> None: self.log.info('do_crush (not yet implemented)') - def do_osd_weight(self): + def do_osd_weight(self) -> None: self.log.info('do_osd_weight (not yet implemented)') - def execute(self, plan): + def execute(self, plan: Plan) -> Tuple[int, str]: self.log.info('Executing plan %s' % plan.name) commands = [] # compat weight-set - if len(plan.compat_ws) and \ - not CRUSHMap.have_default_choose_args(plan.initial.crush_dump): - self.log.debug('ceph osd crush weight-set create-compat') - result = CommandResult('') - self.send_command(result, 'mon', '', json.dumps({ - 'prefix': 'osd crush weight-set create-compat', - 'format': 'json', - }), '') - r, outb, outs = result.wait() - if r != 0: - self.log.error('Error creating compat weight-set') - return r, outs + if len(plan.compat_ws): + ms_plan = cast(MsPlan, plan) + if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump): + self.log.debug('ceph osd crush weight-set create-compat') + result = CommandResult('') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd crush weight-set create-compat', + 'format': 'json', + }), '') + r, outb, outs = result.wait() + if r != 0: + self.log.error('Error creating compat weight-set') + return r, outs for osd, weight in plan.compat_ws.items(): self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f', @@ -1400,7 +1401,7 @@ class Module(MgrModule): self.log.debug('done') return 0, '' - def gather_telemetry(self): + def gather_telemetry(self) -> Dict[str, Any]: return { 'active': self.active, 'mode': self.mode, diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 15e1b502fb899..b9965b9861693 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -56,6 +56,7 @@ deps = commands = mypy --config-file=../../mypy.ini \ -m alerts \ + -m balancer \ -m cephadm \ -m crash \ -m dashboard \ -- 2.39.5