"""
import copy
+import enum
import errno
import json
import math
import random
import time
-from mgr_module import MgrModule, CommandResult
+from mgr_module import CLIReadCommand, CLICommand, MgrModule, CommandResult
from threading import Event
+from typing import Sequence, Tuple, Optional
from mgr_module import CRUSHMap
import datetime
return float(misplaced) / float(num)
return 0.0
+
+class Mode(enum.Enum):
+ none = 'none'
+ crush_compat = 'crush-compat'
+ upmap = 'upmap'
+
+
class Plan(object):
def __init__(self, name, mode, osdmap, pools):
self.name = name
},
]
- COMMANDS = [
- {
- "cmd": "balancer status",
- "desc": "Show balancer status",
- "perm": "r",
- },
- {
- "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
- "desc": "Set balancer mode",
- "perm": "rw",
- },
- {
- "cmd": "balancer on",
- "desc": "Enable automatic balancing",
- "perm": "rw",
- },
- {
- "cmd": "balancer off",
- "desc": "Disable automatic balancing",
- "perm": "rw",
- },
- {
- "cmd": "balancer pool ls",
- "desc": "List automatic balancing pools. "
- "Note that empty list means all existing pools will be automatic balancing targets, "
- "which is the default behaviour of balancer.",
- "perm": "r",
- },
- {
- "cmd": "balancer pool add name=pools,type=CephString,n=N",
- "desc": "Enable automatic balancing for specific pools",
- "perm": "rw",
- },
- {
- "cmd": "balancer pool rm name=pools,type=CephString,n=N",
- "desc": "Disable automatic balancing for specific pools",
- "perm": "rw",
- },
- {
- "cmd": "balancer eval name=option,type=CephString,req=false",
- "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan",
- "perm": "r",
- },
- {
- "cmd": "balancer eval-verbose name=option,type=CephString,req=false",
- "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)",
- "perm": "r",
- },
- {
- "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false",
- "desc": "Run optimizer to create a new plan",
- "perm": "rw",
- },
- {
- "cmd": "balancer show name=plan,type=CephString",
- "desc": "Show details of an optimization plan",
- "perm": "r",
- },
- {
- "cmd": "balancer rm name=plan,type=CephString",
- "desc": "Discard an optimization plan",
- "perm": "rw",
- },
- {
- "cmd": "balancer reset",
- "desc": "Discard all optimization plans",
- "perm": "rw",
- },
- {
- "cmd": "balancer dump name=plan,type=CephString",
- "desc": "Show an optimization plan",
- "perm": "r",
- },
- {
- "cmd": "balancer ls",
- "desc": "List all plans",
- "perm": "r",
- },
- {
- "cmd": "balancer execute name=plan,type=CephString",
- "desc": "Execute an optimization plan",
- "perm": "rw",
- },
- ]
active = False
run = True
plans = {}
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
- def handle_command(self, inbuf, command):
- self.log.warning("Handling command: '%s'" % str(command))
- if command['prefix'] == 'balancer status':
- s = {
- 'plans': list(self.plans.keys()),
- 'active': self.active,
- 'last_optimize_started': self.last_optimize_started,
- 'last_optimize_duration': self.last_optimize_duration,
- 'optimize_result': self.optimize_result,
- 'mode': self.get_module_option('mode'),
- }
- return (0, json.dumps(s, indent=4, sort_keys=True), '')
- elif command['prefix'] == 'balancer mode':
- if command['mode'] == 'upmap':
- min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
- if min_compat_client < 'luminous': # works well because version is alphabetized..
- warn = 'min_compat_client "%s" ' \
- '< "luminous", which is required for pg-upmap. ' \
- 'Try "ceph osd set-require-min-compat-client luminous" ' \
- 'before enabling this mode' % min_compat_client
- return (-errno.EPERM, '', warn)
- elif command['mode'] == 'crush-compat':
- ms = MappingState(self.get_osdmap(),
- self.get("pg_stats"),
- self.get("pool_stats"),
- 'initialize compat weight-set')
- self.get_compat_weight_set_weights(ms) # ignore error
- self.set_module_option('mode', command['mode'])
- return (0, '', '')
- elif command['prefix'] == 'balancer on':
- if not self.active:
- self.set_module_option('active', 'true')
- self.active = True
- self.event.set()
- return (0, '', '')
- elif command['prefix'] == 'balancer off':
- if self.active:
- self.set_module_option('active', 'false')
- self.active = False
- self.event.set()
- return (0, '', '')
- elif command['prefix'] == 'balancer pool ls':
- pool_ids = self.get_module_option('pool_ids')
- if pool_ids == '':
- return (0, '', '')
- pool_ids = pool_ids.split(',')
- pool_ids = [int(p) for p in pool_ids]
- pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', []))
- should_prune = False
- final_ids = []
- final_names = []
- for p in pool_ids:
- if p in pool_name_by_id:
- final_ids.append(p)
- final_names.append(pool_name_by_id[p])
- else:
- should_prune = True
- if should_prune: # some pools were gone, prune
- self.set_module_option('pool_ids', ','.join(final_ids))
- return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
- elif command['prefix'] == 'balancer pool add':
- raw_names = command['pools']
- pool_id_by_name = dict((p['pool_name'], p['pool']) for p in self.get_osdmap().dump().get('pools', []))
- invalid_names = [p for p in raw_names if p not in pool_id_by_name]
- if invalid_names:
- return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
- to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
- existing = self.get_module_option('pool_ids')
- final = to_add
- if existing != '':
- existing = existing.split(',')
- final = set(to_add) | set(existing)
- self.set_module_option('pool_ids', ','.join(final))
+ @CLIReadCommand('balancer status')
+ def show_status(self) -> Tuple[int, str, str]:
+ """
+ Show balancer status
+ """
+ s = {
+ 'plans': list(self.plans.keys()),
+ 'active': self.active,
+ 'last_optimize_started': self.last_optimize_started,
+ 'last_optimize_duration': self.last_optimize_duration,
+ 'optimize_result': self.optimize_result,
+ 'mode': self.get_module_option('mode'),
+ }
+ return (0, json.dumps(s, indent=4, sort_keys=True), '')
+
+ @CLICommand('balancer mode')
+ def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
+ """
+ Set balancer mode
+ """
+ if mode == Mode.upmap:
+ min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
+ if min_compat_client < 'luminous': # works well because version is alphabetized..
+ warn = ('min_compat_client "%s" '
+ '< "luminous", which is required for pg-upmap. '
+ 'Try "ceph osd set-require-min-compat-client luminous" '
+ 'before enabling this mode' % min_compat_client)
+ return (-errno.EPERM, '', warn)
+ elif mode == Mode.crush_compat:
+ ms = MappingState(self.get_osdmap(),
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ 'initialize compat weight-set')
+ self.get_compat_weight_set_weights(ms) # ignore error
+ self.set_module_option('mode', mode.value)
+ return (0, '', '')
+
+ @CLICommand('balancer on')
+ def on(self) -> Tuple[int, str, str]:
+ """
+ Enable automatic balancing
+ """
+ if not self.active:
+ self.set_module_option('active', 'true')
+ self.active = True
+ self.event.set()
+ return (0, '', '')
+
+ @CLICommand('balancer off')
+ def off(self) -> Tuple[int, str, str]:
+ """
+ Disable automatic balancing
+ """
+ if self.active:
+ self.set_module_option('active', 'false')
+ self.active = False
+ self.event.set()
+ return (0, '', '')
+
+ @CLIReadCommand('balancer pool ls')
+ def pool_ls(self) -> Tuple[int, str, str]:
+ """
+ List automatic balancing pools
+
+ Note that empty list means all existing pools will be automatic balancing targets,
+ which is the default behaviour of balancer.
+ """
+ pool_ids = self.get_module_option('pool_ids')
+ if pool_ids == '':
return (0, '', '')
- elif command['prefix'] == 'balancer pool rm':
- raw_names = command['pools']
- existing = self.get_module_option('pool_ids')
- if existing == '': # for idempotence
- return (0, '', '')
+ pool_ids = pool_ids.split(',')
+ pool_ids = [int(p) for p in pool_ids]
+ pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', []))
+ should_prune = False
+ final_ids = []
+ final_names = []
+ for p in pool_ids:
+ if p in pool_name_by_id:
+ final_ids.append(p)
+ final_names.append(pool_name_by_id[p])
+ else:
+ should_prune = True
+ if should_prune: # some pools were gone, prune
+ self.set_module_option('pool_ids', ','.join(final_ids))
+ return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
+
+ @CLICommand('balancer pool add')
+ def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
+ """
+ Enable automatic balancing for specific pools
+ """
+ raw_names = pools
+ pool_id_by_name = dict((p['pool_name'], p['pool'])
+ for p in self.get_osdmap().dump().get('pools', []))
+ invalid_names = [p for p in raw_names if p not in pool_id_by_name]
+ if invalid_names:
+ return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
+ to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
+ existing = self.get_module_option('pool_ids')
+ final = to_add
+ if existing != '':
existing = existing.split(',')
- osdmap = self.get_osdmap()
- pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
- pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
- final = [p for p in existing if p in pool_ids]
- to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
- final = set(final) - set(to_delete)
- self.set_module_option('pool_ids', ','.join(final))
+ final = set(to_add) | set(existing)
+ self.set_module_option('pool_ids', ','.join(final))
+ return (0, '', '')
+
+ @CLICommand('balancer pool rm')
+ def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
+ """
+ Disable automatic balancing for specific pools
+ """
+ raw_names = pools
+ existing = self.get_module_option('pool_ids')
+ if existing == '': # for idempotence
return (0, '', '')
- elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
- verbose = command['prefix'] == 'balancer eval-verbose'
- pools = []
- if 'option' in command:
- plan = self.plans.get(command['option'])
- if not plan:
- # not a plan, does it look like a pool?
- osdmap = self.get_osdmap()
- valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
- option = command['option']
- if option not in valid_pool_names:
- return (-errno.EINVAL, '', 'option "%s" not a plan or a pool' % option)
- pools.append(option)
- ms = MappingState(osdmap, self.get("pg_stats"), self.get("pool_stats"), 'pool "%s"' % option)
- else:
- pools = plan.pools
- if plan.mode == 'upmap':
- # Note that for upmap, to improve the efficiency,
- # we use a basic version of Plan without keeping the obvious
- # *redundant* MS member.
- # Hence ms might not be accurate here since we are basically
- # using an old snapshotted osdmap vs a fresh copy of pg_stats.
- # It should not be a big deal though..
- ms = MappingState(plan.osdmap,
- self.get("pg_stats"),
- self.get("pool_stats"),
- 'plan "%s"' % plan.name)
- else:
- ms = plan.final_state()
- else:
- ms = MappingState(self.get_osdmap(),
+ existing = existing.split(',')
+ osdmap = self.get_osdmap()
+ pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
+ pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
+ final = [p for p in existing if p in pool_ids]
+ to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
+ final = set(final) - set(to_delete)
+ self.set_module_option('pool_ids', ','.join(final))
+ return (0, '', '')
+
+ def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
+ pools = []
+ if option is None:
+ ms = MappingState(self.get_osdmap(),
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ 'current cluster')
+ elif option in self.plans:
+ plan = self.plans.get(option)
+ assert plan
+ pools = plan.pools
+ if plan.mode == 'upmap':
+ # Note that for upmap, to improve the efficiency,
+ # we use a basic version of Plan without keeping the obvious
+ # *redundant* MS member.
+ # Hence ms might not be accurate here since we are basically
+ # using an old snapshotted osdmap vs a fresh copy of pg_stats.
+ # It should not be a big deal though..
+ ms = MappingState(plan.osdmap,
self.get("pg_stats"),
self.get("pool_stats"),
- 'current cluster')
- return (0, self.evaluate(ms, pools, verbose=verbose), '')
- elif command['prefix'] == 'balancer optimize':
- # The GIL can be release by the active balancer, so disallow when active
- if self.active:
- return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
- if self.optimizing:
- return (-errno.EINVAL, '', 'Balancer finishing up....try again')
- pools = []
- if 'pools' in command:
- pools = command['pools']
+ f'plan "{plan.name}"')
+ else:
+ ms = plan.final_state()
+ else:
+ # not a plan, does it look like a pool?
osdmap = self.get_osdmap()
valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
- invalid_pool_names = []
- for p in pools:
- if p not in valid_pool_names:
- invalid_pool_names.append(p)
- if len(invalid_pool_names):
- return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
- plan = self.plan_create(command['plan'], osdmap, pools)
- self.last_optimize_started = time.asctime(time.localtime())
- self.optimize_result = self.in_progress_string
- start = time.time()
- r, detail = self.optimize(plan)
- end = time.time()
- self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
- if r == 0:
- # Add plan if an optimization was created
- self.optimize_result = self.success_string
- self.plans[command['plan']] = plan
- else:
- self.optimize_result = detail
- return (r, '', detail)
- elif command['prefix'] == 'balancer rm':
- self.plan_rm(command['plan'])
- return (0, '', '')
- elif command['prefix'] == 'balancer reset':
- self.plans = {}
- return (0, '', '')
- elif command['prefix'] == 'balancer ls':
- return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
- elif command['prefix'] == 'balancer dump':
- plan = self.plans.get(command['plan'])
- if not plan:
- return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
- return (0, plan.dump(), '')
- elif command['prefix'] == 'balancer show':
- plan = self.plans.get(command['plan'])
- if not plan:
- return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
- return (0, plan.show(), '')
- elif command['prefix'] == 'balancer execute':
- # The GIL can be release by the active balancer, so disallow when active
- if self.active:
- return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
- if self.optimizing:
- return (-errno.EINVAL, '', 'Balancer finishing up....try again')
- plan = self.plans.get(command['plan'])
- if not plan:
- return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
- r, detail = self.execute(plan)
- self.plan_rm(command['plan'])
- return (r, '', detail)
+ if option not in valid_pool_names:
+ raise ValueError(f'option "{option}" not a plan or a pool')
+ pools.append(option)
+ ms = MappingState(osdmap,
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ f'pool "{option}"')
+ return ms, pools
+
+ @CLIReadCommand('balancer eval-verbose')
+ def plan_eval_verbose(self, option: Optional[str] = None):
+ """
+ Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)
+ """
+ try:
+ ms, pools = self._state_from_option(option)
+ return (0, self.evaluate(ms, pools, verbose=True), '')
+ except ValueError as e:
+ return (-errno.EINVAL, '', str(e))
+
+ @CLIReadCommand('balancer eval')
+ def plan_eval_brief(self, option: Optional[str] = None):
+ """
+ Evaluate data distribution for the current cluster or specific pool or specific plan
+ """
+ try:
+ ms, pools = self._state_from_option(option)
+ return (0, self.evaluate(ms, pools, verbose=False), '')
+ except ValueError as e:
+ return (-errno.EINVAL, '', str(e))
+
+ @CLIReadCommand('balancer optimize')
+ def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
+ """
+ Run optimizer to create a new plan
+ """
+ # The GIL can be release by the active balancer, so disallow when active
+ if self.active:
+ return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
+ if self.optimizing:
+ return (-errno.EINVAL, '', 'Balancer finishing up....try again')
+ osdmap = self.get_osdmap()
+ valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
+ invalid_pool_names = []
+ for p in pools:
+ if p not in valid_pool_names:
+ invalid_pool_names.append(p)
+ if len(invalid_pool_names):
+ return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
+ plan_ = self.plan_create(plan, osdmap, pools)
+ self.last_optimize_started = time.asctime(time.localtime())
+ self.optimize_result = self.in_progress_string
+ start = time.time()
+ r, detail = self.optimize(plan_)
+ end = time.time()
+ self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
+ if r == 0:
+ # Add plan if an optimization was created
+ self.optimize_result = self.success_string
+ self.plans[plan] = plan_
+ else:
+ self.optimize_result = detail
+ return (r, '', detail)
+
+ @CLIReadCommand('balancer show')
+ def plan_show(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Show details of an optimization plan
+ """
+ plan_ = self.plans.get(plan)
+ if not plan_:
+ return (-errno.ENOENT, '', f'plan {plan} not found')
+ return (0, plan_.show(), '')
+
+ @CLICommand('balancer rm')
+ def plan_rm(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Discard an optimization plan
+ """
+ if plan in self.plans:
+ del self.plans[plan]
+ return (0, '', '')
+
+ @CLICommand('balancer reset')
+ def plan_reset(self) -> Tuple[int, str, str]:
+ """
+ Discard all optimization plans
+ """
+ self.plans = {}
+ return (0, '', '')
+
+ @CLIReadCommand('balancer dump')
+ def plan_dump(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Show an optimization plan
+ """
+ plan_ = self.plans.get(plan)
+ if not plan_:
+ return -errno.ENOENT, '', f'plan {plan} not found'
else:
- return (-errno.EINVAL, '',
- "Command not found '{0}'".format(command['prefix']))
+ return (0, plan_.dump(), '')
+
+ @CLIReadCommand('balancer ls')
+ def plan_ls(self) -> Tuple[int, str, str]:
+ """
+ List all plans
+ """
+ return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
+
+ @CLIReadCommand('balancer execute')
+ def plan_execute(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Execute an optimization plan
+ """
+ # The GIL can be release by the active balancer, so disallow when active
+ if self.active:
+ return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
+ if self.optimizing:
+ return (-errno.EINVAL, '', 'Balancer finishing up....try again')
+ plan_ = self.plans.get(plan)
+ if not plan_:
+ return (-errno.ENOENT, '', f'plan {plan} not found')
+ r, detail = self.execute(plan_)
+ self.plan_rm(plan)
+ return (r, '', detail)
def shutdown(self):
self.log.info('Stopping')
pools)
return plan
- def plan_rm(self, name):
- if name in self.plans:
- del self.plans[name]
-
def calc_eval(self, ms, pools):
pe = Eval(ms)
pool_rule = {}