import math
import random
import time
-from mgr_module import CLIReadCommand, CLICommand, MgrModule, CommandResult
+from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, OSDMap
from threading import Event
-from typing import Sequence, Tuple, Optional
+from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
from mgr_module import CRUSHMap
import datetime
"""
Plan with a preloaded MappingState member.
"""
- def __init__(self, name, mode, ms, pools):
+ def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
self.initial = ms
- def final_state(self):
+ def final_state(self) -> MappingState:
self.inc.set_osd_reweights(self.osd_weights)
self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
return MappingState(self.initial.osdmap.apply_incremental(self.inc),
self.initial.raw_pool_stats,
'plan %s final' % self.name)
- def dump(self):
+ def dump(self) -> str:
return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
- def show(self):
+ def show(self) -> str:
ls = []
ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
ls.append('# starting crush version %d' %
class Eval:
- def __init__(self, ms):
+ def __init__(self, ms: MappingState):
self.ms = ms
- self.root_ids = {} # root name -> id
- self.pool_name = {} # pool id -> pool name
- self.pool_id = {} # pool name -> id
- self.pool_roots = {} # pool name -> root name
- self.root_pools = {} # root name -> pools
- self.target_by_root = {} # root name -> target weight map
- self.count_by_pool = {}
- self.count_by_root = {}
- self.actual_by_pool = {} # pool -> by_* -> actual weight map
- self.actual_by_root = {} # pool -> by_* -> actual weight map
- self.total_by_pool = {} # pool -> by_* -> total
- self.total_by_root = {} # root -> by_* -> total
- self.stats_by_pool = {} # pool -> by_* -> stddev or avg -> value
- self.stats_by_root = {} # root -> by_* -> stddev or avg -> value
-
- self.score_by_pool = {}
- self.score_by_root = {}
+ self.root_ids: Dict[str, int] = {} # root name -> id
+ self.pool_name: Dict[str, str] = {} # pool id -> pool name
+ self.pool_id: Dict[str, int] = {} # pool name -> id
+ self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
+ self.root_pools: Dict[str, List[str]] = {} # root name -> pools
+ self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
+ self.count_by_pool: Dict[str, dict] = {}
+ self.count_by_root: Dict[str, dict] = {}
+ self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
+ self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
+ self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
+ self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
+ self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
+ self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
+
+ self.score_by_pool: Dict[str, float] = {}
+ self.score_by_root: Dict[str, Dict[str, float]] = {}
self.score = 0.0
- def show(self, verbose=False):
+ def show(self, verbose: bool = False) -> str:
if verbose:
r = self.ms.desc + '\n'
r += 'target_by_root %s\n' % self.target_by_root
def calc_stats(self, count, target, total):
num = max(len(target), 1)
- r = {}
+ r: Dict[str, Dict[str, Union[int, float]]] = {}
for t in ('pgs', 'objects', 'bytes'):
if total[t] == 0:
r[t] = {
active = False
run = True
- plans = {}
+ plans: Dict[str, Plan] = {}
mode = ''
optimizing = False
last_optimize_started = ''
success_string = 'Optimization plan created successfully'
in_progress_string = 'in progress'
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
Note that empty list means all existing pools will be automatic balancing targets,
which is the default behaviour of balancer.
"""
- pool_ids = self.get_module_option('pool_ids')
+ pool_ids = cast(str, self.get_module_option('pool_ids'))
if pool_ids == '':
return (0, '', '')
- pool_ids = pool_ids.split(',')
- pool_ids = [int(p) for p in pool_ids]
+ pool_ids = [int(p) for p in pool_ids.split(',')]
pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', []))
should_prune = False
- final_ids = []
+ final_ids: List[int] = []
final_names = []
for p in pool_ids:
if p in pool_name_by_id:
else:
should_prune = True
if should_prune: # some pools were gone, prune
- self.set_module_option('pool_ids', ','.join(final_ids))
+ self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
@CLICommand('balancer pool add')
invalid_names = [p for p in raw_names if p not in pool_id_by_name]
if invalid_names:
return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
- to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
- existing = self.get_module_option('pool_ids')
- final = to_add
- if existing != '':
- existing = existing.split(',')
- final = set(to_add) | set(existing)
+ to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
+ pool_ids = cast(str, self.get_module_option('pool_ids'))
+ existing = set(pool_ids.split(',') if pool_ids else [])
+ final = to_add | existing
self.set_module_option('pool_ids', ','.join(final))
return (0, '', '')
Disable automatic balancing for specific pools
"""
raw_names = pools
- existing = self.get_module_option('pool_ids')
+ existing = cast(str, self.get_module_option('pool_ids'))
if existing == '': # for idempotence
return (0, '', '')
existing = existing.split(',')
self.get("pool_stats"),
f'plan "{plan.name}"')
else:
- ms = plan.final_state()
+ ms = cast(MsPlan, plan).final_state()
else:
# not a plan, does it look like a pool?
osdmap = self.get_osdmap()
self.plan_rm(plan)
return (r, '', detail)
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping')
self.run = False
self.event.set()
- def time_permit(self):
+ def time_permit(self) -> bool:
local_time = time.localtime()
time_of_day = time.strftime('%H%M', local_time)
weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
permit = False
- begin_time = self.get_module_option('begin_time')
- end_time = self.get_module_option('end_time')
+ begin_time = cast(str, self.get_module_option('begin_time'))
+ end_time = cast(str, self.get_module_option('end_time'))
if begin_time <= end_time:
permit = begin_time <= time_of_day < end_time
else:
begin_time, end_time, time_of_day)
return False
- begin_weekday = self.get_module_option('begin_weekday')
- end_weekday = self.get_module_option('end_weekday')
+ begin_weekday = cast(int, self.get_module_option('begin_weekday'))
+ end_weekday = cast(int, self.get_module_option('end_weekday'))
if begin_weekday <= end_weekday:
permit = begin_weekday <= weekday < end_weekday
else:
return True
- def serve(self):
+ def serve(self) -> None:
self.log.info('Starting')
while self.run:
- self.active = self.get_module_option('active')
- sleep_interval = self.get_module_option('sleep_interval')
+ self.active = cast(bool, self.get_module_option('active'))
+ sleep_interval = cast(float, self.get_module_option('sleep_interval'))
self.log.debug('Waking up [%s, now %s]',
"active" if self.active else "inactive",
time.strftime(TIME_FORMAT, time.localtime()))
self.log.debug('Running')
name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
osdmap = self.get_osdmap()
- allow = self.get_module_option('pool_ids')
- final = []
- if allow != '':
- allow = allow.split(',')
- valid = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
- final = set(allow) & set(valid)
+ pool_ids = cast(str, self.get_module_option('pool_ids'))
+ if pool_ids:
+ allow = [int(p) for p in pool_ids.split(',')]
+ else:
+ allow = []
+ final: List[str] = []
+ if allow:
+ pools = osdmap.dump().get('pools', [])
+ valid = [p['pool'] for p in pools]
+ ids = set(allow) & set(valid)
if set(allow) - set(valid): # some pools were gone, prune
- self.set_module_option('pool_ids', ','.join(final))
- pool_name_by_id = dict((p['pool'], p['pool_name']) for p in osdmap.dump().get('pools', []))
- final = [int(p) for p in final]
- final = [pool_name_by_id[p] for p in final if p in pool_name_by_id]
+ self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
+ pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
+ final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
plan = self.plan_create(name, osdmap, final)
self.optimizing = True
self.last_optimize_started = time.asctime(time.localtime())
self.event.wait(sleep_interval)
self.event.clear()
- def plan_create(self, name, osdmap, pools):
- mode = self.get_module_option('mode')
+ def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
+ mode = cast(str, self.get_module_option('mode'))
if mode == 'upmap':
# drop unnecessary MS member for upmap mode.
# this way we could effectively eliminate the usage of a
pools)
return plan
- def calc_eval(self, ms, pools):
+ def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
pe = Eval(ms)
pool_rule = {}
pool_info = {}
for a in ms.osdmap_dump.get('osds',[]) if a['weight'] > 0 }
# get expected distributions by root
- actual_by_root = {}
+ actual_by_root: Dict[str, Dict[str, dict]] = {}
rootids = ms.crush.find_takes()
roots = []
for rootid in rootids:
self.log.debug('score_by_root %s' % pe.score_by_root)
# get the list of score metrics, comma separated
- metrics = self.get_module_option('crush_compat_metrics').split(',')
+ metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
# total score is just average of normalized stddevs
pe.score = 0.0
pe.score /= len(metrics) * len(roots)
return pe
- def evaluate(self, ms, pools, verbose=False):
+ def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
pe = self.calc_eval(ms, pools)
return pe.show(verbose=verbose)
- def optimize(self, plan):
+ def optimize(self, plan: Plan) -> Tuple[int, str]:
self.log.info('Optimize plan %s' % plan.name)
- max_misplaced = self.get_ceph_option('target_max_misplaced_ratio')
+ max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
self.log.info('Mode %s, max misplaced %f' %
(plan.mode, max_misplaced))
if plan.mode == 'upmap':
return self.do_upmap(plan)
elif plan.mode == 'crush-compat':
- return self.do_crush_compat(plan)
+ return self.do_crush_compat(cast(MsPlan, plan))
elif plan.mode == 'none':
detail = 'Please do "ceph balancer mode" to choose a valid mode first'
self.log.info('Idle')
self.log.info(detail)
return -errno.EINVAL, detail
- def do_upmap(self, plan):
+ def do_upmap(self, plan: Plan) -> Tuple[int, str]:
self.log.info('do_upmap')
- max_optimizations = self.get_module_option('upmap_max_optimizations')
- max_deviation = self.get_module_option('upmap_max_deviation')
+ max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
+ max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
osdmap_dump = plan.osdmap_dump
if len(plan.pools):
'or distribution is already perfect'
return 0, ''
- def do_crush_compat(self, plan):
+ def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
self.log.info('do_crush_compat')
- max_iterations = self.get_module_option('crush_compat_max_iterations')
+ max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
if max_iterations < 1:
return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
- step = self.get_module_option('crush_compat_step')
+ step = cast(float, self.get_module_option('crush_compat_step'))
if step <= 0 or step >= 1.0:
return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
- max_misplaced = self.get_ceph_option('target_max_misplaced_ratio')
+ max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
min_pg_per_osd = 2
ms = plan.initial
osdmap = ms.osdmap
crush = osdmap.get_crush()
pe = self.calc_eval(ms, plan.pools)
- min_score_to_optimize = self.get_module_option('min_score')
+ min_score_to_optimize = cast(float, self.get_module_option('min_score'))
if pe.score <= min_score_to_optimize:
if pe.score == 0:
detail = 'Distribution is already perfect'
# can't proceed.
roots = list(pe.target_by_root.keys())
self.log.debug('roots %s', roots)
- visited = {}
- overlap = {}
- root_ids = {}
+ visited: Dict[int, str] = {}
+ overlap: Dict[int, List[str]] = {}
+ root_ids: Dict[str, int] = {}
for root, wm in pe.target_by_root.items():
for osd in wm:
if osd in visited:
return -errno.EOPNOTSUPP, detail
# rebalance by pgs, objects, or bytes
- metrics = self.get_module_option('crush_compat_metrics').split(',')
+ metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
key = metrics[0] # balancing using the first score metric
if key not in ['pgs', 'bytes', 'objects']:
self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
left -= 1
# allow a small regression if we are phasing out osd weights
- fudge = 0
+ fudge = 0.0
if best_ow != orig_osd_weight:
fudge = .001
return -errno.EDOM, 'Unable to find further optimization, ' \
'change balancer mode and retry might help'
- def get_compat_weight_set_weights(self, ms):
+ def get_compat_weight_set_weights(self, ms: MappingState):
if not CRUSHMap.have_default_choose_args(ms.crush_dump):
# enable compat weight-set first
self.log.debug('ceph osd crush weight-set create-compat')
self.log.debug('weight_set weights %s' % weight_set)
return weight_set
- def do_crush(self):
+ def do_crush(self) -> None:
self.log.info('do_crush (not yet implemented)')
- def do_osd_weight(self):
+ def do_osd_weight(self) -> None:
self.log.info('do_osd_weight (not yet implemented)')
- def execute(self, plan):
+ def execute(self, plan: Plan) -> Tuple[int, str]:
self.log.info('Executing plan %s' % plan.name)
commands = []
# compat weight-set
- if len(plan.compat_ws) and \
- not CRUSHMap.have_default_choose_args(plan.initial.crush_dump):
- self.log.debug('ceph osd crush weight-set create-compat')
- result = CommandResult('')
- self.send_command(result, 'mon', '', json.dumps({
- 'prefix': 'osd crush weight-set create-compat',
- 'format': 'json',
- }), '')
- r, outb, outs = result.wait()
- if r != 0:
- self.log.error('Error creating compat weight-set')
- return r, outs
+ if len(plan.compat_ws):
+ ms_plan = cast(MsPlan, plan)
+ if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
+ self.log.debug('ceph osd crush weight-set create-compat')
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd crush weight-set create-compat',
+ 'format': 'json',
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.error('Error creating compat weight-set')
+ return r, outs
for osd, weight in plan.compat_ws.items():
self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
self.log.debug('done')
return 0, ''
- def gather_telemetry(self):
+ def gather_telemetry(self) -> Dict[str, Any]:
return {
'active': self.active,
'mode': self.mode,