]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/balancer: rough framework
authorSage Weil <sage@redhat.com>
Thu, 3 Aug 2017 20:23:08 +0000 (16:23 -0400)
committerSage Weil <sage@redhat.com>
Wed, 6 Sep 2017 20:46:40 +0000 (16:46 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/balancer/module.py
src/vstart.sh

index c95f88de0591d8d88b643c481dece276b3ecaec6..6ce774c22516034c055298f1aa6ab1a3f8740512 100644 (file)
 Balance PG distribution across OSDs.
 """
 
+import errno
 import json
+import math
 import random
 import time
 from mgr_module import MgrModule, CommandResult
 from threading import Event
 
 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
-default_mode = 'upmap'
-default_sleep_interval = 30   # seconds
+default_mode = 'none'
+default_sleep_interval = 60   # seconds
 default_max_misplaced = .03   # max ratio of pgs replaced at a time
 
+TIME_FORMAT = '%Y-%m-%d %H:%M:%S %Z'
+
+
+class MappingState:
+    def __init__(self, osdmap, pg_dump, desc=''):
+        self.desc = desc
+        self.osdmap = osdmap
+        self.osdmap_dump = self.osdmap.dump()
+        self.crush = osdmap.get_crush()
+        self.crush_dump = self.crush.dump()
+        self.pg_dump = pg_dump
+        self.pg_stat = {
+            i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
+        }
+
+
+class Plan:
+    def __init__(self, name, ms):
+        self.mode = 'unknown'
+        self.name = name
+        self.initial = ms
+
+        self.osd_weights = {}
+        self.compat_ws = {}
+        self.inc = ms.osdmap.new_incremental()
+
+    def final_state(self):
+        self.inc.set_osd_reweights(self.osd_weights)
+        self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
+        return MappingState(self.initial.osdmap.apply_incremental(self.inc),
+                            self.initial.pg_dump,
+                            'plan %s final' % self.name)
+
+    def dump(self):
+        return json.dumps(self.inc.dump(), indent=4)
+
+    def show(self):
+        ls = []
+        ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
+        ls.append('# starting crush version %d' %
+                  self.initial.osdmap.get_crush_version())
+        ls.append('# mode %s' % self.mode)
+        if len(self.compat_ws) and \
+           '-1' in self.crush_dump.get('choose_args', {}):
+            ls.append('ceph osd crush weight-set create-compat')
+        for osd, weight in self.compat_ws.iteritems():
+            ls.append('ceph osd crush weight-set reweight-compat %s %f' %
+                      (osd, weight))
+        for osd, weight in self.osd_weights.iteritems():
+            ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
+        incdump = self.inc.dump()
+        for pgid in incdump.get('old_pg_upmap_items', []):
+            ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
+        for item in incdump.get('new_pg_upmap_items', []):
+            osdlist = []
+            for m in item['mappings']:
+                osdlist += [m['from'], m['to']]
+            ls.append('ceph osd pg-upmap-items %s %s' %
+                      (item['pgid'], ' '.join([str(a) for a in osdlist])))
+        return '\n'.join(ls)
+
+
+class Eval:
+    pool_name = {}       # pool id -> pool name
+    pool_id = {}         # pool name -> id
+    pool_roots = {}      # pool name -> root name
+    target_by_root = {}  # root name -> target weight map
+    count_by_pool = {}
+    count_by_root = {}
+    actual_by_pool = {}  # pool -> by_* -> actual weight map
+    actual_by_root = {}  # pool -> by_* -> actual weight map
+    total_by_pool = {}   # pool -> by_* -> total
+    total_by_root = {}   # root -> by_* -> total
+    stats_by_pool = {}   # pool -> by_* -> stddev or avg -> value
+    stats_by_root = {}   # root -> by_* -> stddev or avg -> value
+
+    score_by_pool = {}
+    score_by_root = {}
+
+    score = 0.0
+
+    def __init__(self, ms):
+        self.ms = ms
+
+    def show(self):
+        r = self.ms.desc + '\n'
+        r += 'target_by_root %s\n' % self.target_by_root
+        r += 'actual_by_pool %s\n' % self.actual_by_pool
+        r += 'actual_by_root %s\n' % self.actual_by_root
+        r += 'count_by_pool %s\n' % self.count_by_pool
+        r += 'count_by_root %s\n' % self.count_by_root
+        r += 'total_by_pool %s\n' % self.total_by_pool
+        r += 'total_by_root %s\n' % self.total_by_root
+        r += 'stats_by_root %s\n' % self.stats_by_root
+        r += 'score_by_pool %s\n' % self.score_by_pool
+        r += 'score_by_root %s\n' % self.score_by_root
+        r += 'score %f (lower is better)\n' % self.score
+        return r
+
+    def calc_stats(self, count, target, total):
+        num = max(len(target), 1)
+        r = {}
+        for t in ('pgs', 'objects', 'bytes'):
+            avg = float(total[t]) / float(num)
+            dev = 0.0
+            for k, v in count[t].iteritems():
+                # adjust/normalize by weight
+                adjusted = float(v) / target[k] / float(num)
+                dev += (avg - adjusted) * (avg - adjusted)
+            stddev = math.sqrt(dev / float(max(num - 1, 1)))
+            r[t] = {
+                'avg': avg,
+                'stddev': stddev,
+            }
+        return r
+
 class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "balancer status",
+            "desc": "Show balancer status",
+            "perm": "r",
+        },
+        {
+            "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
+            "desc": "Set balancer mode",
+            "perm": "rw",
+        },
+        {
+            "cmd": "balancer on",
+            "desc": "Enable automatic balancing",
+            "perm": "rw",
+        },
+        {
+            "cmd": "balancer off",
+            "desc": "Disable automatic balancing",
+            "perm": "rw",
+        },
+        {
+            "cmd": "balancer eval name=plan,type=CephString,req=false",
+            "desc": "Evaluate data distribution for the current cluster or specific plan",
+            "perm": "r",
+        },
+        {
+            "cmd": "balancer optimize name=plan,type=CephString",
+            "desc": "Run optimizer to create a new plan",
+            "perm": "rw",
+        },
+        {
+            "cmd": "balancer show name=plan,type=CephString",
+            "desc": "Show details of an optimization plan",
+            "perm": "r",
+        },
+        {
+            "cmd": "balancer rm name=plan,type=CephString",
+            "desc": "Discard an optimization plan",
+            "perm": "rw",
+        },
+        {
+            "cmd": "balancer reset",
+            "desc": "Discard all optimization plans",
+            "perm": "rw",
+        },
+        {
+            "cmd": "balancer dump name=plan,type=CephString",
+            "desc": "Show an optimization plan",
+            "perm": "r",
+        },
+        {
+            "cmd": "balancer execute name=plan,type=CephString",
+            "desc": "Execute an optimization plan",
+            "perm": "r",
+        },
+    ]
+    active = False
     run = True
+    plans = {}
+    mode = ''
 
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
 
     def handle_command(self, command):
-        return (-errno.EINVAL, '',
-                "Command not found '{0}'".format(command['prefix']))
+        self.log.warn("Handling command: '%s'" % str(command))
+        if command['prefix'] == 'balancer status':
+            s = {
+                'plans': self.plans.keys(),
+                'active': self.active,
+                'mode': self.get_config('mode', default_mode),
+            }
+            return (0, json.dumps(s, indent=4), '')
+        elif command['prefix'] == 'balancer mode':
+            self.set_config('mode', command['mode'])
+            return (0, '', '')
+        elif command['prefix'] == 'balancer on':
+            if not self.active:
+                self.set_config('active', '1')
+                self.active = True
+            self.event.set()
+            return (0, '', '')
+        elif command['prefix'] == 'balancer off':
+            if self.active:
+                self.set_config('active', '')
+                self.active = False
+            self.event.set()
+            return (0, '', '')
+        elif command['prefix'] == 'balancer eval':
+            if 'plan' in command:
+                plan = self.plans.get(command['plan'])
+                if not plan:
+                    return (-errno.ENOENT, '', 'plan %s not found' %
+                            command['plan'])
+                ms = plan.final_state()
+            else:
+                ms = MappingState(self.get_osdmap(),
+                                  self.get("pg_dump"),
+                                  'current cluster')
+            return (0, self.evaluate(ms), '')
+        elif command['prefix'] == 'balancer optimize':
+            plan = self.plan_create(command['plan'])
+            self.optimize(plan)
+            return (0, '', '')
+        elif command['prefix'] == 'balancer rm':
+            self.plan_rm(command['name'])
+            return (0, '', '')
+        elif command['prefix'] == 'balancer reset':
+            self.plans = {}
+            return (0, '', '')
+        elif command['prefix'] == 'balancer dump':
+            plan = self.plans.get(command['plan'])
+            if not plan:
+                return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
+            return (0, plan.dump(), '')
+        elif command['prefix'] == 'balancer show':
+            plan = self.plans.get(command['plan'])
+            if not plan:
+                return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
+            return (0, plan.show(), '')
+        elif command['prefix'] == 'balancer execute':
+            plan = self.plans.get(command['plan'])
+            if not plan:
+                return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
+            self.execute(plan)
+            self.plan_rm(plan)
+            return (0, '', '')
+        else:
+            return (-errno.EINVAL, '',
+                    "Command not found '{0}'".format(command['prefix']))
 
     def shutdown(self):
         self.log.info('Stopping')
@@ -33,41 +274,252 @@ class Module(MgrModule):
     def serve(self):
         self.log.info('Starting')
         while self.run:
-            mode = self.get_config('mode', default_mode)
+            self.log.debug('Waking up')
+            self.active = self.get_config('active', '') is not ''
             sleep_interval = float(self.get_config('sleep_interval',
                                                    default_sleep_interval))
-            max_misplaced = float(self.get_config('max_misplaced',
-                                                  default_max_misplaced))
-            self.log.info('Mode %s, sleep interval %f, max misplaced %f' %
-                          (mode, sleep_interval, max_misplaced))
-
-            info = self.get('pg_status')
-            if info.get('unknown_ratio', 0.0) + info.get('degraded_ratio', 0.0) + info.get('inactive_ratio', 0.0) > 0.0:
-                self.log.info('Some PGs are unknown, degraded, or inactive; waiting')
-            elif info.get('misplaced_ratio', 0.0) >= max_misplaced:
-                self.log.info('Too many PGs (%f > max %f) are misplaced; waiting' % (info.get('misplaced_ratio', 0.0), max_misplaced))
-            else:
-                if mode == 'upmap':
-                    self.do_upmap()
-                elif mode == 'crush':
-                    self.do_crush(compat=False)
-                elif mode == 'crush-compat':
-                    self.do_crush(compat=True)
-                elif mode == 'osd_weight':
-                    self.osd_weight()
-                elif mode == 'none':
-                    self.log.info('Idle')
-                else:
-                    self.log.info('Unrecognized mode %s' % mode)
+            if self.active:
+                self.log.debug('Running')
+                plan = self.plan_create('auto-foo')
+                self.optimize(plan)
+                #self.plan_apply(plan)
+            self.log.debug('Sleeping for %d', sleep_interval)
             self.event.wait(sleep_interval)
+            self.event.clear()
+
+    def plan_create(self, name):
+        plan = Plan(name, MappingState(self.get_osdmap(),
+                                       self.get("pg_dump"),
+                                       'plan %s initial' % name))
+        self.plans[name] = plan
+        return plan
+
+    def plan_rm(self, name):
+        if name in self.plans:
+            del self.plans[name]
+
+    def calc_eval(self, ms):
+        pe = Eval(ms)
+        pool_rule = {}
+        for p in ms.osdmap_dump.get('pools',[]):
+            pe.pool_name[p['pool']] = p['pool_name']
+            pe.pool_id[p['pool_name']] = p['pool']
+            pool_rule[p['pool_name']] = p['crush_rule']
+            pe.pool_roots[p['pool_name']] = []
+        pools = pe.pool_id.keys()
+        if len(pools) == 0:
+            return pe
+        self.log.debug('pool_name %s' % pe.pool_name)
+        self.log.debug('pool_id %s' % pe.pool_id)
+        self.log.debug('pools %s' % pools)
+        self.log.debug('pool_rule %s' % pool_rule)
+
+        # get expected distributions by root
+        actual_by_root = {}
+        roots = ms.crush.find_takes()
+        for root in roots:
+            rname = ms.crush.get_item_name(root)
+            ls = ms.osdmap.get_pools_by_take(root)
+            for poolid in ls:
+                pe.pool_roots[pe.pool_name[poolid]].append(rname)
+            pe.target_by_root[rname] = ms.crush.get_take_weight_osd_map(root)
+            actual_by_root[rname] = {
+                'pgs': {},
+                'objects': {},
+                'bytes': {},
+            }
+            for osd in pe.target_by_root[rname].iterkeys():
+                actual_by_root[rname]['pgs'][osd] = 0
+                actual_by_root[rname]['objects'][osd] = 0
+                actual_by_root[rname]['bytes'][osd] = 0
+            pe.total_by_root[rname] = {
+                'pgs': 0,
+                'objects': 0,
+                'bytes': 0,
+            }
+        self.log.debug('pool_roots %s' % pe.pool_roots)
+        self.log.debug('target_by_root %s' % pe.target_by_root)
+
+        # pool and root actual
+        for pool in pools:
+            pi = [p for p in ms.osdmap_dump.get('pools',[])
+                  if p['pool_name'] == pool][0]
+            poolid = pi['pool']
+            pm = ms.osdmap.map_pool_pgs_up(poolid)
+            pgs = 0
+            objects = 0
+            bytes = 0
+            pgs_by_osd = {}
+            objects_by_osd = {}
+            bytes_by_osd = {}
+            for root in pe.pool_roots[pool]:
+                for osd in pe.target_by_root[root].iterkeys():
+                    pgs_by_osd[osd] = 0
+                    objects_by_osd[osd] = 0
+                    bytes_by_osd[osd] = 0
+            for pgid, up in pm.iteritems():
+                for osd in [int(osd) for osd in up]:
+                    pgs_by_osd[osd] += 1
+                    objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
+                    bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
+                    # pick a root to associate this pg instance with.
+                    # note that this is imprecise if the roots have
+                    # overlapping children.
+                    # FIXME: divide bytes by k for EC pools.
+                    for root in pe.pool_roots[pool]:
+                        if osd in pe.target_by_root[root]:
+                            actual_by_root[root]['pgs'][osd] += 1
+                            actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
+                            actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
+                            pgs += 1
+                            objects += ms.pg_stat[pgid]['num_objects']
+                            bytes += ms.pg_stat[pgid]['num_bytes']
+                            pe.total_by_root[root]['pgs'] += 1
+                            pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
+                            pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
+                            break
+            pe.count_by_pool[pool] = {
+                'pgs': {
+                    k: v
+                    for k, v in pgs_by_osd.iteritems()
+                },
+                'objects': {
+                    k: v
+                    for k, v in objects_by_osd.iteritems()
+                },
+                'bytes': {
+                    k: v
+                    for k, v in bytes_by_osd.iteritems()
+                },
+            }
+            pe.actual_by_pool[pool] = {
+                'pgs': {
+                    k: float(v) / float(max(pgs, 1))
+                    for k, v in pgs_by_osd.iteritems()
+                },
+                'objects': {
+                    k: float(v) / float(max(objects, 1))
+                    for k, v in objects_by_osd.iteritems()
+                },
+                'bytes': {
+                    k: float(v) / float(max(bytes, 1))
+                    for k, v in bytes_by_osd.iteritems()
+                },
+            }
+            pe.total_by_pool[pool] = {
+                'pgs': pgs,
+                'objects': objects,
+                'bytes': bytes,
+            }
+        for root, m in pe.total_by_root.iteritems():
+            pe.count_by_root[root] = {
+                'pgs': {
+                    k: float(v)
+                    for k, v in actual_by_root[root]['pgs'].iteritems()
+                },
+                'objects': {
+                    k: float(v)
+                    for k, v in actual_by_root[root]['objects'].iteritems()
+                },
+                'bytes': {
+                    k: float(v)
+                    for k, v in actual_by_root[root]['bytes'].iteritems()
+                },
+            }
+            pe.actual_by_root[root] = {
+                'pgs': {
+                    k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
+                    for k, v in actual_by_root[root]['pgs'].iteritems()
+                },
+                'objects': {
+                    k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
+                    for k, v in actual_by_root[root]['objects'].iteritems()
+                },
+                'bytes': {
+                    k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
+                    for k, v in actual_by_root[root]['bytes'].iteritems()
+                },
+            }
+        self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
+        self.log.debug('actual_by_root %s' % pe.actual_by_root)
+
+        # average and stddev
+        pe.stats_by_root = {
+            a: pe.calc_stats(
+                b,
+                pe.target_by_root[a],
+                pe.total_by_root[a]
+            ) for a, b in pe.count_by_root.iteritems()
+        }
+
+        # aggregate score (normalize the stddev by count)
+        pe.score_by_root = {
+            r: {
+                'pgs': pe.stats_by_root[r]['pgs']['stddev'] / max(1, pe.total_by_root[r]['pgs']),
+                'objects': pe.stats_by_root[r]['objects']['stddev'] / max(1, pe.total_by_root[r]['objects']),
+                'bytes': pe.stats_by_root[r]['bytes']['stddev'] / max(1, pe.total_by_root[r]['bytes']),
+            } for r in pe.total_by_root.keys()
+        }
+
+        # total score is just average of normalized stddevs
+        pe.score = 0.0
+        for r, vs in pe.score_by_root.iteritems():
+            for k, v in vs.iteritems():
+                pe.score += v
+        pe.score /= 3 * len(roots)
+        return pe
 
-    def do_upmap(self):
+    def evaluate(self, ms):
+        pe = self.calc_eval(ms)
+        return pe.show()
+
+    def optimize(self, plan):
+        self.log.info('Optimize plan %s' % plan.name)
+        plan.mode = self.get_config('mode', default_mode)
+        max_misplaced = float(self.get_config('max_misplaced',
+                                              default_max_misplaced))
+        self.log.info('Mode %s, max misplaced %f' %
+                      (plan.mode, max_misplaced))
+
+        info = self.get('pg_status')
+        unknown = info.get('unknown_pgs_ratio', 0.0)
+        degraded = info.get('degraded_ratio', 0.0)
+        inactive = info.get('inactive_pgs_ratio', 0.0)
+        misplaced = info.get('misplaced_ratio', 0.0)
+        self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
+                       unknown, degraded, inactive, misplaced)
+        if unknown > 0.0:
+            self.log.info('Some PGs (%f) are unknown; waiting', unknown)
+        elif degraded > 0.0:
+            self.log.info('Some PGs (%f) are degraded; waiting', degraded)
+        elif inactive > 0.0:
+            self.log.info('Some PGs (%f) are inactive; waiting', inactive)
+        elif misplaced >= max_misplaced:
+            self.log.info('Too many PGs (%f > %f) are misplaced; waiting',
+                          misplaced, max_misplaced)
+        else:
+            if plan.mode == 'upmap':
+                self.do_upmap(plan)
+            elif plan.mode == 'crush':
+                self.do_crush()
+            elif plan.mode == 'crush-compat':
+                self.do_crush_compat()
+            elif plan.mode == 'osd_weight':
+                self.osd_weight()
+            elif plan.mode == 'none':
+                self.log.info('Idle')
+            else:
+                self.log.info('Unrecognized mode %s' % plan.mode)
+
+        ##
+
+    def do_upmap(self, plan):
         self.log.info('do_upmap')
         max_iterations = self.get_config('upmap_max_iterations', 10)
         max_deviation = self.get_config('upmap_max_deviation', .01)
 
-        osdmap_dump = self.get('osd_map')
-        pools = [str(i['pool_name']) for i in osdmap_dump.get('pools',[])]
+        ms = plan.initial
+        pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
         if len(pools) == 0:
             self.log.info('no pools, nothing to do')
             return
@@ -75,21 +527,238 @@ class Module(MgrModule):
         random.shuffle(pools)
         self.log.info('pools %s' % pools)
 
-        osdmap = self.get_osdmap()
-        inc = osdmap.new_incremental()
+        inc = plan.inc
         total_did = 0
         left = max_iterations
         for pool in pools:
-            did = osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
+            did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
             total_did += did
             left -= did
             if left <= 0:
                 break
         self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
 
-        incdump = inc.dump()
-        self.log.debug('resulting inc is %s' % incdump)
+    def do_crush_compat(self, plan):
+        self.log.info('do_crush_compat')
+        osdmap = self.get_osdmap()
+        crush = osdmap.get_crush()
+
+        # get current compat weight-set weights
+        weight_set = self.get_compat_weight_set_weights()
 
+        ms = plan.initial
+        pe = self.calc_eval(ms)
+
+        # get subtree weight maps, check for overlap
+        roots = crush.find_takes()
+        self.log.debug('roots %s', roots)
+        weight_maps = {}
+        visited = {}
+        overlap = {}
+        for root in roots:
+            weight_maps[root] = crush.get_take_weight_osd_map(root)
+            self.log.debug(' map for %d: %s' % (root, weight_maps[root]))
+            for osd in weight_maps[root].iterkeys():
+                if osd in visited:
+                    overlap[osd] = 1
+                visited[osd] = 1
+        if len(overlap) > 0:
+            self.log.err('error: some osds belong to multiple subtrees: %s' %
+                         overlap)
+            return
+
+        # select a cost mode: pg, pgsize, or utilization
+        # FIXME: when we add utilization support, we need to throttle back
+        # so that we don't run if *any* objects are misplaced.  with 'pgs' we
+        # can look at up mappings, which lets us look ahead a bit.
+        cost_mode = 'pg'
+
+        # go
+        random.shuffle(roots)
+        for root in roots:
+            pools = osdmap.get_pools_by_take(root)
+            self.log.info('Balancing root %s pools %s' % (root, pools))
+            wm = weight_maps[root]
+            util = self.get_util(cost_mode, pools, wm.keys())
+            self.log.info('wm %s util %s' % (wm, util))
+            if len(util) == 0:
+                self.log.info('no utilization information, stopping')
+                return
+            target = self.get_target(util)
+            self.log.info('target %s' % target)
+            if target == 0:
+                continue
+
+            queue = sorted(util, key=lambda osd: -abs(target - util[osd]))
+            self.log.info('queue %s' % queue)
+
+            for osd in queue:
+                deviation = float(util[osd]) - float(target)
+                if deviation == 0:
+                    break
+                self.log.debug('osd.%d deviation %f', osd, deviation)
+                weight = weight_set[osd]
+                calc_weight = (float(target) / float(util[osd])) * weight
+                new_weight = weight * .7 + calc_weight * .3
+                self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
+                               new_weight)
+                self.compat_weight_set_reweight(osd, new_weight)
+
+    def compat_weight_set_reweight(self, osd, new_weight):
+        self.log.debug('ceph osd crush weight-set reweight-compat')
+        result = CommandResult('')
+        self.send_command(result, 'mon', '', json.dumps({
+            'prefix': 'osd crush weight-set reweight-compat',
+            'format': 'json',
+            'item': 'osd.%d' % osd,
+            'weight': [new_weight],
+        }), '')
+        r, outb, outs = result.wait()
+        if r != 0:
+            self.log.error('Error setting compat weight-set osd.%d to %f' %
+            (osd, new_weight))
+            return
+
+    def get_compat_weight_set_weights(self):
+        # enable compat weight-set
+        self.log.debug('ceph osd crush weight-set create-compat')
+        result = CommandResult('')
+        self.send_command(result, 'mon', '', json.dumps({
+            'prefix': 'osd crush weight-set create-compat',
+            'format': 'json',
+        }), '')
+        r, outb, outs = result.wait()
+        if r != 0:
+            self.log.error('Error creating compat weight-set')
+            return
+
+        result = CommandResult('')
+        self.send_command(result, 'mon', '', json.dumps({
+            'prefix': 'osd crush dump',
+            'format': 'json',
+        }), '')
+        r, outb, outs = result.wait()
+        if r != 0:
+            self.log.error('Error dumping crush map')
+            return
+        try:
+            crushmap = json.loads(outb)
+        except:
+            raise RuntimeError('unable to parse crush map')
+
+        raw = crushmap.get('choose_args',{}).get('-1', [])
+        weight_set = {}
+        for b in raw:
+            bucket = None
+            for t in crushmap['buckets']:
+                if t['id'] == b['bucket_id']:
+                    bucket = t
+                    break
+            if not bucket:
+                raise RuntimeError('could not find bucket %s' % b['bucket_id'])
+            self.log.debug('bucket items %s' % bucket['items'])
+            self.log.debug('weight set %s' % b['weight_set'][0])
+            if len(bucket['items']) != len(b['weight_set'][0]):
+                raise RuntimeError('weight-set size does not match bucket items')
+            for pos in range(len(bucket['items'])):
+                weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
+
+        self.log.debug('weight_set weights %s' % weight_set)
+        return weight_set
+
+    def get_util(self, cost_mode, pools, osds):
+        if cost_mode == 'pg' or \
+           cost_mode == 'pg_bytes' or \
+           cost_mode == 'pg_objects':
+            util_map = {}
+            for osd in osds:
+                util_map[osd] = 0
+            dump = self.get('pg_dump')
+            #self.log.info('dump %s' % dump)
+            self.log.info('osds %s' % osds)
+            for pg in dump['pg_stats']:
+                inpool = False
+                for pool in pools:
+                    if pg['pgid'].startswith(str(pool) + '.'):
+                        inpool = True
+                        break
+                if not inpool:
+                    self.log.info('skipping %s' % pg['pgid'])
+                    continue
+                self.log.info('pg %s osds %s' % (pg['pgid'], pg['up']))
+                for osd in [int(a) for a in pg['up']]:
+                    if osd in osds:
+                        if cost_mode == 'pg':
+                            util_map[osd] += 1
+                        elif cost_mode == 'pg_bytes':
+                            util_map[osd] += pg['stat_sum']['num_bytes']
+                        elif cost_mode == 'pg_objects':
+                            util_map[osd] += pg['stat_sum']['num_objects']
+            return util_map
+        else:
+            raise RuntimeError('unsupported cost mode %s' % cost_mode)
+
+    def get_target(self, util_map):
+        total = 0
+        count = 0
+        for k, v in util_map.iteritems():
+            total += v;
+            count += 1
+        return total / count
+
+    def do_crush(self):
+        self.log.info('do_crush (not yet implemented)')
+
+    def do_osd_weight(self):
+        self.log.info('do_osd_weight (not yet implemented)')
+
+    def execute(self, plan):
+        self.log.info('Executing plan %s' % plan.name)
+
+        commands = []
+
+        # compat weight-set
+        if len(plan.compat_ws) and \
+           '-1' in plan.crush_dump.get('choose_args', {}):
+            self.log.debug('ceph osd crush weight-set create-compat')
+            result = CommandResult('')
+            self.send_command(result, 'mon', '', json.dumps({
+                'prefix': 'osd crush weight-set create-compat',
+                'format': 'json',
+            }), '')
+            r, outb, outs = result.wait()
+            if r != 0:
+                self.log.error('Error creating compat weight-set')
+                return
+
+        for osd, weight in plan.compat_ws.iteritems():
+            self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
+                          osd, weight)
+            result = CommandResult('foo')
+            self.send_command(result, 'mon', '', json.dumps({
+                'prefix': 'osd crush weight-set reweight-compat',
+                'format': 'json',
+                'item': 'osd.%d' % osd,
+                'weight': [weight],
+            }), 'foo')
+            commands.append(result)
+
+        # new_weight
+        reweightn = {}
+        for osd, weight in plan.osd_weights.iteritems():
+            reweightn[int(osd)] = float(weight) / float(0x10000)
+        if len(reweightn):
+            self.log.info('ceph osd reweightn %s', reweightn)
+            result = CommandResult('foo')
+            self.send_command(result, 'mon', '', json.dumps({
+                'prefix': 'osd reweightn',
+                'format': 'json',
+                'weights': json.dumps(reweightn),
+            }), 'foo')
+            commands.append(result)
+
+        # upmap
+        incdump = plan.inc.dump()
         for pgid in incdump.get('old_pg_upmap_items', []):
             self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
             result = CommandResult('foo')
@@ -98,10 +767,7 @@ class Module(MgrModule):
                 'format': 'json',
                 'pgid': pgid,
             }), 'foo')
-            r, outb, outs = result.wait()
-            if r != 0:
-                self.log.error('Error removing pg-upmap on %s' % pgid)
-                break;
+            commands.append(result)
 
         for item in incdump.get('new_pg_upmap_items', []):
             self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
@@ -116,14 +782,11 @@ class Module(MgrModule):
                 'pgid': item['pgid'],
                 'id': osdlist,
             }), 'foo')
+            commands.append(result)
+
+        # wait for commands
+        for result in commands:
             r, outb, outs = result.wait()
             if r != 0:
-                self.log.error('Error setting pg-upmap on %s' % item['pgid'])
-                break;
-
-
-    def do_crush(self, compat):
-        self.log.info('do_crush compat=%b' % compat)
-
-    def do_osd_weight(self):
-        self.log.info('do_osd_weight')
+                self.log.error('Error on command')
+                return
index ee10520e3d2440a55ab2011a58dcb26892cf96e9..e49b129eea330472b4b676d02f6fc0bd788a37f2 100755 (executable)
@@ -520,7 +520,7 @@ $COSDMEMSTORE
 $COSDSHORT
 $extra_conf
 [mon]
-        mgr initial modules = restful status dashboard
+        mgr initial modules = restful status dashboard balancer
         mon pg warn min per osd = 3
         mon osd allow primary affinity = true
         mon reweight min pgs per osd = 4