From: Kefu Chai Date: Wed, 3 Feb 2021 05:44:10 +0000 (+0800) Subject: pybind/mgr/balancer: define commands using CLICommand X-Git-Tag: v17.1.0~3038^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=46f00b3da28fc1f552eff733aa543b8dcedec068;p=ceph-ci.git pybind/mgr/balancer: define commands using CLICommand Signed-off-by: Kefu Chai --- diff --git a/src/pybind/mgr/balancer/module.py b/src/pybind/mgr/balancer/module.py index acdf936b65e..70c6693f11e 100644 --- a/src/pybind/mgr/balancer/module.py +++ b/src/pybind/mgr/balancer/module.py @@ -3,13 +3,15 @@ Balance PG distribution across OSDs. """ import copy +import enum import errno import json import math import random import time -from mgr_module import MgrModule, CommandResult +from mgr_module import CLIReadCommand, CLICommand, MgrModule, CommandResult from threading import Event +from typing import Sequence, Tuple, Optional from mgr_module import CRUSHMap import datetime @@ -47,6 +49,13 @@ class MappingState: return float(misplaced) / float(num) return 0.0 + +class Mode(enum.Enum): + none = 'none' + crush_compat = 'crush-compat' + upmap = 'upmap' + + class Plan(object): def __init__(self, name, mode, osdmap, pools): self.name = name @@ -328,90 +337,6 @@ class Module(MgrModule): }, ] - COMMANDS = [ - { - "cmd": "balancer status", - "desc": "Show balancer status", - "perm": "r", - }, - { - "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap", - "desc": "Set balancer mode", - "perm": "rw", - }, - { - "cmd": "balancer on", - "desc": "Enable automatic balancing", - "perm": "rw", - }, - { - "cmd": "balancer off", - "desc": "Disable automatic balancing", - "perm": "rw", - }, - { - "cmd": "balancer pool ls", - "desc": "List automatic balancing pools. " - "Note that empty list means all existing pools will be automatic balancing targets, " - "which is the default behaviour of balancer.", - "perm": "r", - }, - { - "cmd": "balancer pool add name=pools,type=CephString,n=N", - "desc": "Enable automatic balancing for specific pools", - "perm": "rw", - }, - { - "cmd": "balancer pool rm name=pools,type=CephString,n=N", - "desc": "Disable automatic balancing for specific pools", - "perm": "rw", - }, - { - "cmd": "balancer eval name=option,type=CephString,req=false", - "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan", - "perm": "r", - }, - { - "cmd": "balancer eval-verbose name=option,type=CephString,req=false", - "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)", - "perm": "r", - }, - { - "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false", - "desc": "Run optimizer to create a new plan", - "perm": "rw", - }, - { - "cmd": "balancer show name=plan,type=CephString", - "desc": "Show details of an optimization plan", - "perm": "r", - }, - { - "cmd": "balancer rm name=plan,type=CephString", - "desc": "Discard an optimization plan", - "perm": "rw", - }, - { - "cmd": "balancer reset", - "desc": "Discard all optimization plans", - "perm": "rw", - }, - { - "cmd": "balancer dump name=plan,type=CephString", - "desc": "Show an optimization plan", - "perm": "r", - }, - { - "cmd": "balancer ls", - "desc": "List all plans", - "perm": "r", - }, - { - "cmd": "balancer execute name=plan,type=CephString", - "desc": "Execute an optimization plan", - "perm": "rw", - }, - ] active = False run = True plans = {} @@ -427,193 +352,284 @@ class Module(MgrModule): super(Module, self).__init__(*args, **kwargs) self.event = Event() - def handle_command(self, inbuf, command): - self.log.warning("Handling command: '%s'" % str(command)) - if command['prefix'] == 'balancer status': - s = { - 'plans': list(self.plans.keys()), - 'active': self.active, - 'last_optimize_started': self.last_optimize_started, - 'last_optimize_duration': self.last_optimize_duration, - 'optimize_result': self.optimize_result, - 'mode': self.get_module_option('mode'), - } - return (0, json.dumps(s, indent=4, sort_keys=True), '') - elif command['prefix'] == 'balancer mode': - if command['mode'] == 'upmap': - min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '') - if min_compat_client < 'luminous': # works well because version is alphabetized.. - warn = 'min_compat_client "%s" ' \ - '< "luminous", which is required for pg-upmap. ' \ - 'Try "ceph osd set-require-min-compat-client luminous" ' \ - 'before enabling this mode' % min_compat_client - return (-errno.EPERM, '', warn) - elif command['mode'] == 'crush-compat': - ms = MappingState(self.get_osdmap(), - self.get("pg_stats"), - self.get("pool_stats"), - 'initialize compat weight-set') - self.get_compat_weight_set_weights(ms) # ignore error - self.set_module_option('mode', command['mode']) - return (0, '', '') - elif command['prefix'] == 'balancer on': - if not self.active: - self.set_module_option('active', 'true') - self.active = True - self.event.set() - return (0, '', '') - elif command['prefix'] == 'balancer off': - if self.active: - self.set_module_option('active', 'false') - self.active = False - self.event.set() - return (0, '', '') - elif command['prefix'] == 'balancer pool ls': - pool_ids = self.get_module_option('pool_ids') - if pool_ids == '': - return (0, '', '') - pool_ids = pool_ids.split(',') - pool_ids = [int(p) for p in pool_ids] - pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', [])) - should_prune = False - final_ids = [] - final_names = [] - for p in pool_ids: - if p in pool_name_by_id: - final_ids.append(p) - final_names.append(pool_name_by_id[p]) - else: - should_prune = True - if should_prune: # some pools were gone, prune - self.set_module_option('pool_ids', ','.join(final_ids)) - return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '') - elif command['prefix'] == 'balancer pool add': - raw_names = command['pools'] - pool_id_by_name = dict((p['pool_name'], p['pool']) for p in self.get_osdmap().dump().get('pools', [])) - invalid_names = [p for p in raw_names if p not in pool_id_by_name] - if invalid_names: - return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names) - to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name] - existing = self.get_module_option('pool_ids') - final = to_add - if existing != '': - existing = existing.split(',') - final = set(to_add) | set(existing) - self.set_module_option('pool_ids', ','.join(final)) + @CLIReadCommand('balancer status') + def show_status(self) -> Tuple[int, str, str]: + """ + Show balancer status + """ + s = { + 'plans': list(self.plans.keys()), + 'active': self.active, + 'last_optimize_started': self.last_optimize_started, + 'last_optimize_duration': self.last_optimize_duration, + 'optimize_result': self.optimize_result, + 'mode': self.get_module_option('mode'), + } + return (0, json.dumps(s, indent=4, sort_keys=True), '') + + @CLICommand('balancer mode') + def set_mode(self, mode: Mode) -> Tuple[int, str, str]: + """ + Set balancer mode + """ + if mode == Mode.upmap: + min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '') + if min_compat_client < 'luminous': # works well because version is alphabetized.. + warn = ('min_compat_client "%s" ' + '< "luminous", which is required for pg-upmap. ' + 'Try "ceph osd set-require-min-compat-client luminous" ' + 'before enabling this mode' % min_compat_client) + return (-errno.EPERM, '', warn) + elif mode == Mode.crush_compat: + ms = MappingState(self.get_osdmap(), + self.get("pg_stats"), + self.get("pool_stats"), + 'initialize compat weight-set') + self.get_compat_weight_set_weights(ms) # ignore error + self.set_module_option('mode', mode.value) + return (0, '', '') + + @CLICommand('balancer on') + def on(self) -> Tuple[int, str, str]: + """ + Enable automatic balancing + """ + if not self.active: + self.set_module_option('active', 'true') + self.active = True + self.event.set() + return (0, '', '') + + @CLICommand('balancer off') + def off(self) -> Tuple[int, str, str]: + """ + Disable automatic balancing + """ + if self.active: + self.set_module_option('active', 'false') + self.active = False + self.event.set() + return (0, '', '') + + @CLIReadCommand('balancer pool ls') + def pool_ls(self) -> Tuple[int, str, str]: + """ + List automatic balancing pools + + Note that empty list means all existing pools will be automatic balancing targets, + which is the default behaviour of balancer. + """ + pool_ids = self.get_module_option('pool_ids') + if pool_ids == '': return (0, '', '') - elif command['prefix'] == 'balancer pool rm': - raw_names = command['pools'] - existing = self.get_module_option('pool_ids') - if existing == '': # for idempotence - return (0, '', '') + pool_ids = pool_ids.split(',') + pool_ids = [int(p) for p in pool_ids] + pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', [])) + should_prune = False + final_ids = [] + final_names = [] + for p in pool_ids: + if p in pool_name_by_id: + final_ids.append(p) + final_names.append(pool_name_by_id[p]) + else: + should_prune = True + if should_prune: # some pools were gone, prune + self.set_module_option('pool_ids', ','.join(final_ids)) + return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '') + + @CLICommand('balancer pool add') + def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]: + """ + Enable automatic balancing for specific pools + """ + raw_names = pools + pool_id_by_name = dict((p['pool_name'], p['pool']) + for p in self.get_osdmap().dump().get('pools', [])) + invalid_names = [p for p in raw_names if p not in pool_id_by_name] + if invalid_names: + return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names) + to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name] + existing = self.get_module_option('pool_ids') + final = to_add + if existing != '': existing = existing.split(',') - osdmap = self.get_osdmap() - pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])] - pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', [])) - final = [p for p in existing if p in pool_ids] - to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name] - final = set(final) - set(to_delete) - self.set_module_option('pool_ids', ','.join(final)) + final = set(to_add) | set(existing) + self.set_module_option('pool_ids', ','.join(final)) + return (0, '', '') + + @CLICommand('balancer pool rm') + def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]: + """ + Disable automatic balancing for specific pools + """ + raw_names = pools + existing = self.get_module_option('pool_ids') + if existing == '': # for idempotence return (0, '', '') - elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose': - verbose = command['prefix'] == 'balancer eval-verbose' - pools = [] - if 'option' in command: - plan = self.plans.get(command['option']) - if not plan: - # not a plan, does it look like a pool? - osdmap = self.get_osdmap() - valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])] - option = command['option'] - if option not in valid_pool_names: - return (-errno.EINVAL, '', 'option "%s" not a plan or a pool' % option) - pools.append(option) - ms = MappingState(osdmap, self.get("pg_stats"), self.get("pool_stats"), 'pool "%s"' % option) - else: - pools = plan.pools - if plan.mode == 'upmap': - # Note that for upmap, to improve the efficiency, - # we use a basic version of Plan without keeping the obvious - # *redundant* MS member. - # Hence ms might not be accurate here since we are basically - # using an old snapshotted osdmap vs a fresh copy of pg_stats. - # It should not be a big deal though.. - ms = MappingState(plan.osdmap, - self.get("pg_stats"), - self.get("pool_stats"), - 'plan "%s"' % plan.name) - else: - ms = plan.final_state() - else: - ms = MappingState(self.get_osdmap(), + existing = existing.split(',') + osdmap = self.get_osdmap() + pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])] + pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', [])) + final = [p for p in existing if p in pool_ids] + to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name] + final = set(final) - set(to_delete) + self.set_module_option('pool_ids', ','.join(final)) + return (0, '', '') + + def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]: + pools = [] + if option is None: + ms = MappingState(self.get_osdmap(), + self.get("pg_stats"), + self.get("pool_stats"), + 'current cluster') + elif option in self.plans: + plan = self.plans.get(option) + assert plan + pools = plan.pools + if plan.mode == 'upmap': + # Note that for upmap, to improve the efficiency, + # we use a basic version of Plan without keeping the obvious + # *redundant* MS member. + # Hence ms might not be accurate here since we are basically + # using an old snapshotted osdmap vs a fresh copy of pg_stats. + # It should not be a big deal though.. + ms = MappingState(plan.osdmap, self.get("pg_stats"), self.get("pool_stats"), - 'current cluster') - return (0, self.evaluate(ms, pools, verbose=verbose), '') - elif command['prefix'] == 'balancer optimize': - # The GIL can be release by the active balancer, so disallow when active - if self.active: - return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually') - if self.optimizing: - return (-errno.EINVAL, '', 'Balancer finishing up....try again') - pools = [] - if 'pools' in command: - pools = command['pools'] + f'plan "{plan.name}"') + else: + ms = plan.final_state() + else: + # not a plan, does it look like a pool? osdmap = self.get_osdmap() valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])] - invalid_pool_names = [] - for p in pools: - if p not in valid_pool_names: - invalid_pool_names.append(p) - if len(invalid_pool_names): - return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names) - plan = self.plan_create(command['plan'], osdmap, pools) - self.last_optimize_started = time.asctime(time.localtime()) - self.optimize_result = self.in_progress_string - start = time.time() - r, detail = self.optimize(plan) - end = time.time() - self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start))) - if r == 0: - # Add plan if an optimization was created - self.optimize_result = self.success_string - self.plans[command['plan']] = plan - else: - self.optimize_result = detail - return (r, '', detail) - elif command['prefix'] == 'balancer rm': - self.plan_rm(command['plan']) - return (0, '', '') - elif command['prefix'] == 'balancer reset': - self.plans = {} - return (0, '', '') - elif command['prefix'] == 'balancer ls': - return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '') - elif command['prefix'] == 'balancer dump': - plan = self.plans.get(command['plan']) - if not plan: - return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) - return (0, plan.dump(), '') - elif command['prefix'] == 'balancer show': - plan = self.plans.get(command['plan']) - if not plan: - return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) - return (0, plan.show(), '') - elif command['prefix'] == 'balancer execute': - # The GIL can be release by the active balancer, so disallow when active - if self.active: - return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan') - if self.optimizing: - return (-errno.EINVAL, '', 'Balancer finishing up....try again') - plan = self.plans.get(command['plan']) - if not plan: - return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) - r, detail = self.execute(plan) - self.plan_rm(command['plan']) - return (r, '', detail) + if option not in valid_pool_names: + raise ValueError(f'option "{option}" not a plan or a pool') + pools.append(option) + ms = MappingState(osdmap, + self.get("pg_stats"), + self.get("pool_stats"), + f'pool "{option}"') + return ms, pools + + @CLIReadCommand('balancer eval-verbose') + def plan_eval_verbose(self, option: Optional[str] = None): + """ + Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely) + """ + try: + ms, pools = self._state_from_option(option) + return (0, self.evaluate(ms, pools, verbose=True), '') + except ValueError as e: + return (-errno.EINVAL, '', str(e)) + + @CLIReadCommand('balancer eval') + def plan_eval_brief(self, option: Optional[str] = None): + """ + Evaluate data distribution for the current cluster or specific pool or specific plan + """ + try: + ms, pools = self._state_from_option(option) + return (0, self.evaluate(ms, pools, verbose=False), '') + except ValueError as e: + return (-errno.EINVAL, '', str(e)) + + @CLIReadCommand('balancer optimize') + def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]: + """ + Run optimizer to create a new plan + """ + # The GIL can be release by the active balancer, so disallow when active + if self.active: + return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually') + if self.optimizing: + return (-errno.EINVAL, '', 'Balancer finishing up....try again') + osdmap = self.get_osdmap() + valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])] + invalid_pool_names = [] + for p in pools: + if p not in valid_pool_names: + invalid_pool_names.append(p) + if len(invalid_pool_names): + return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names) + plan_ = self.plan_create(plan, osdmap, pools) + self.last_optimize_started = time.asctime(time.localtime()) + self.optimize_result = self.in_progress_string + start = time.time() + r, detail = self.optimize(plan_) + end = time.time() + self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start))) + if r == 0: + # Add plan if an optimization was created + self.optimize_result = self.success_string + self.plans[plan] = plan_ + else: + self.optimize_result = detail + return (r, '', detail) + + @CLIReadCommand('balancer show') + def plan_show(self, plan: str) -> Tuple[int, str, str]: + """ + Show details of an optimization plan + """ + plan_ = self.plans.get(plan) + if not plan_: + return (-errno.ENOENT, '', f'plan {plan} not found') + return (0, plan_.show(), '') + + @CLICommand('balancer rm') + def plan_rm(self, plan: str) -> Tuple[int, str, str]: + """ + Discard an optimization plan + """ + if plan in self.plans: + del self.plans[plan] + return (0, '', '') + + @CLICommand('balancer reset') + def plan_reset(self) -> Tuple[int, str, str]: + """ + Discard all optimization plans + """ + self.plans = {} + return (0, '', '') + + @CLIReadCommand('balancer dump') + def plan_dump(self, plan: str) -> Tuple[int, str, str]: + """ + Show an optimization plan + """ + plan_ = self.plans.get(plan) + if not plan_: + return -errno.ENOENT, '', f'plan {plan} not found' else: - return (-errno.EINVAL, '', - "Command not found '{0}'".format(command['prefix'])) + return (0, plan_.dump(), '') + + @CLIReadCommand('balancer ls') + def plan_ls(self) -> Tuple[int, str, str]: + """ + List all plans + """ + return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '') + + @CLIReadCommand('balancer execute') + def plan_execute(self, plan: str) -> Tuple[int, str, str]: + """ + Execute an optimization plan + """ + # The GIL can be release by the active balancer, so disallow when active + if self.active: + return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan') + if self.optimizing: + return (-errno.EINVAL, '', 'Balancer finishing up....try again') + plan_ = self.plans.get(plan) + if not plan_: + return (-errno.ENOENT, '', f'plan {plan} not found') + r, detail = self.execute(plan_) + self.plan_rm(plan) + return (r, '', detail) def shutdown(self): self.log.info('Stopping') @@ -709,10 +725,6 @@ class Module(MgrModule): pools) return plan - def plan_rm(self, name): - if name in self.plans: - del self.plans[name] - def calc_eval(self, ms, pools): pe = Eval(ms) pool_rule = {}