From: Warren Usui Date: Sat, 15 Feb 2014 03:50:38 +0000 (-0800) Subject: Fix formatting and add docstrings to ceph_manager.py X-Git-Tag: 1.1.0~1648^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=30f8938ea3f9c9919082d6401638c7ca3bf6d9b4;p=teuthology.git Fix formatting and add docstrings to ceph_manager.py Fixes: 6531 Signed-off-by: Warren Usui --- diff --git a/teuthology/task/ceph_manager.py b/teuthology/task/ceph_manager.py index b52f137e..53bf5a00 100644 --- a/teuthology/task/ceph_manager.py +++ b/teuthology/task/ceph_manager.py @@ -1,3 +1,6 @@ +""" +ceph manager -- Thrasher and CephManager objects +""" from cStringIO import StringIO import random import time @@ -8,6 +11,9 @@ from teuthology import misc as teuthology from teuthology.task import ceph as ceph_task class Thrasher: + """ + Object used to thrash Ceph + """ def __init__(self, manager, config, logger=None): self.ceph_manager = manager self.ceph_manager.wait_for_clean() @@ -31,7 +37,9 @@ class Thrasher: self.log = lambda x: self.logger.info(x) else: def tmp(x): - print x + """ + Implement log behavior + """ self.log = tmp if self.config is None: self.config = dict() @@ -46,9 +54,14 @@ class Thrasher: self.thread = gevent.spawn(self.do_thrash) def kill_osd(self, osd=None, mark_down=False, mark_out=False): + """ + :param osd: Osd to be killed. + :mark_down: Mark down if true. + :mark_out: Mark out if true. + """ if osd is None: osd = random.choice(self.live_osds) - self.log("Killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds))) + self.log("Killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds))) self.live_osds.remove(osd) self.dead_osds.append(osd) self.ceph_manager.kill_osd(osd) @@ -58,14 +71,22 @@ class Thrasher: self.out_osd(osd) def blackhole_kill_osd(self, osd=None): + """ + If all else fails, kill the osd. + :param osd: Osd to be killed. + """ if osd is None: osd = random.choice(self.live_osds) - self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds))) + self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds))) self.live_osds.remove(osd) self.dead_osds.append(osd) self.ceph_manager.blackhole_kill_osd(osd) def revive_osd(self, osd=None): + """ + Revive the osd. + :param osd: Osd to be revived. + """ if osd is None: osd = random.choice(self.dead_osds) self.log("Reviving osd %s" % (str(osd),)) @@ -74,14 +95,22 @@ class Thrasher: self.ceph_manager.revive_osd(osd, self.revive_timeout) def out_osd(self, osd=None): + """ + Mark the osd out + :param osd: Osd to be marked. + """ if osd is None: osd = random.choice(self.in_osds) - self.log("Removing osd %s, in_osds are: %s" % (str(osd),str(self.in_osds))) + self.log("Removing osd %s, in_osds are: %s" % (str(osd), str(self.in_osds))) self.ceph_manager.mark_out_osd(osd) self.in_osds.remove(osd) self.out_osds.append(osd) def in_osd(self, osd=None): + """ + Mark the osd out + :param osd: Osd to be marked. + """ if osd is None: osd = random.choice(self.out_osds) if osd in self.dead_osds: @@ -105,6 +134,9 @@ class Thrasher: self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity', str(osd), str(pa)) def all_up(self): + """ + Make sure all osds are up and not out. + """ while len(self.dead_osds) > 0: self.log("reviving osd") self.revive_osd() @@ -113,20 +145,32 @@ class Thrasher: self.in_osd() def do_join(self): + """ + Break out of this Ceph loop + """ self.stopping = True self.thread.get() def grow_pool(self): + """ + Increase the size of the pool + """ pool = self.ceph_manager.get_pool() self.log("Growing pool %s"%(pool,)) self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs) def fix_pgp_num(self): + """ + Fix number of pgs in pool. + """ pool = self.ceph_manager.get_pool() self.log("fixing pg num pool %s"%(pool,)) self.ceph_manager.set_pool_pgpnum(pool) def test_pool_min_size(self): + """ + Kill and revive all osds except one. + """ self.log("test_pool_min_size") self.all_up() self.ceph_manager.wait_for_recovery( @@ -151,6 +195,9 @@ class Thrasher: ) def inject_pause(self, conf_key, duration, check_after, should_be_down): + """ + Pause injection testing. Check for osd being down when finished. + """ the_one = random.choice(self.live_osds) self.log("inject_pause on {osd}".format(osd = the_one)) self.log( @@ -245,9 +292,12 @@ class Thrasher: self.revive_osd() def choose_action(self): + """ + Random action selector. + """ chance_down = self.config.get('chance_down', 0.4) chance_test_min_size = self.config.get('chance_test_min_size', 0) - chance_test_backfill_full= self.config.get('chance_test_backfill_full', 0) + chance_test_backfill_full = self.config.get('chance_test_backfill_full', 0) if isinstance(chance_down, int): chance_down = float(chance_down) / 100 minin = self.minin @@ -286,7 +336,7 @@ class Thrasher: self.config.get('chance_inject_pause_long', 0),)]: actions.append(scenario) - total = sum([y for (x,y) in actions]) + total = sum([y for (x, y) in actions]) val = random.uniform(0, total) for (action, prob) in actions: if val < prob: @@ -295,15 +345,18 @@ class Thrasher: return None def do_thrash(self): + """ + Loop to select random actions to thrash ceph manager with. + """ cleanint = self.config.get("clean_interval", 60) - maxdead = self.config.get("max_dead", 0); + maxdead = self.config.get("max_dead", 0) delay = self.config.get("op_delay", 5) self.log("starting do_thrash") while not self.stopping: self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds, "dead_osds: ", self.dead_osds, "live_osds: ", self.live_osds]])) - if random.uniform(0,1) < (float(delay) / cleanint): + if random.uniform(0, 1) < (float(delay) / cleanint): while len(self.dead_osds) > maxdead: self.revive_osd() if random.uniform(0, 1) < float( @@ -319,6 +372,10 @@ class Thrasher: self.all_up() class CephManager: + """ + Ceph manager object. + Contains several local functions that form a bulk of this module. + """ def __init__(self, controller, ctx=None, config=None, logger=None): self.lock = threading.RLock() self.ctx = ctx @@ -330,6 +387,9 @@ class CephManager: self.log = lambda x: logger.info(x) else: def tmp(x): + """ + implement log behavior. + """ print x self.log = tmp if self.config is None: @@ -340,6 +400,9 @@ class CephManager: self.pools[pool] = self.get_pool_property(pool, 'pg_num') def raw_cluster_cmd(self, *args): + """ + Start ceph on a raw cluster. Return count + """ testdir = teuthology.get_testdir(self.ctx) ceph_args = [ 'adjust-ulimits', @@ -355,6 +418,9 @@ class CephManager: return proc.stdout.getvalue() def raw_cluster_cmd_result(self, *args): + """ + Start ceph on a cluster. Return success or failure information. + """ testdir = teuthology.get_testdir(self.ctx) ceph_args = [ 'adjust-ulimits', @@ -370,13 +436,16 @@ class CephManager: return proc.exitstatus def do_rados(self, remote, cmd): + """ + Execute a remote rados command. + """ testdir = teuthology.get_testdir(self.ctx) pre = [ 'adjust-ulimits', 'ceph-coverage', '{tdir}/archive/coverage'.format(tdir=testdir), 'rados', - ]; + ] pre.extend(cmd) proc = remote.run( args=pre, @@ -386,6 +455,10 @@ class CephManager: def rados_write_objects( self, pool, num_objects, size, timelimit, threads, cleanup=False): + """ + Write rados objects + Threads not used yet. + """ args = [ '-p', pool, '--num-objects', num_objects, @@ -397,6 +470,9 @@ class CephManager: return self.do_rados(self.controller, map(str, args)) def do_put(self, pool, obj, fname): + """ + Implement rados put operation + """ return self.do_rados( self.controller, [ @@ -409,6 +485,9 @@ class CephManager: ) def do_get(self, pool, obj, fname='/dev/null'): + """ + Implement rados get operation + """ return self.do_rados( self.controller, [ @@ -421,6 +500,9 @@ class CephManager: ) def osd_admin_socket(self, osdnum, command, check_status=True): + """ + Remotely start up ceph specifying the admin socket + """ testdir = teuthology.get_testdir(self.ctx) remote = None for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems(): @@ -428,7 +510,7 @@ class CephManager: if int(id_) == int(osdnum): remote = _remote assert remote is not None - args=[ + args = [ 'sudo', 'adjust-ulimits', 'ceph-coverage', @@ -446,6 +528,11 @@ class CephManager: ) def get_pgid(self, pool, pgnum): + """ + :param pool: pool number + :param pgnum: pg number + :returns: a string representing this pg. + """ poolnum = self.get_pool_num(pool) pg_str = "{poolnum}.{pgnum}".format( poolnum=poolnum, @@ -480,7 +567,7 @@ class CephManager: """ get number for pool (e.g., data -> 2) """ - out = self.raw_cluster_cmd('osd','dump','--format=json') + out = self.raw_cluster_cmd('osd', 'dump', '--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) for i in j['pools']: if i['pool_name'] == pool: @@ -491,7 +578,7 @@ class CephManager: """ list all pool names """ - out = self.raw_cluster_cmd('osd','dump','--format=json') + out = self.raw_cluster_cmd('osd', 'dump', '--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) self.log(j['pools']) return [str(i['pool_name']) for i in j['pools']] @@ -503,6 +590,9 @@ class CephManager: [self.remove_pool(i) for i in self.list_pools()] def kick_recovery_wq(self, osdnum): + """ + Run kick_recovery_wq on cluster. + """ return self.raw_cluster_cmd( 'tell', "osd.%d" % (int(osdnum),), 'debug', @@ -510,6 +600,10 @@ class CephManager: '0') def wait_run_admin_socket(self, osdnum, args=['version'], timeout=75): + """ + If osd_admin_socket call suceeds, return. Otherwise wait + five seconds and try again. + """ tries = 0 while True: proc = self.osd_admin_socket( @@ -528,19 +622,32 @@ class CephManager: time.sleep(5) def set_config(self, osdnum, **argdict): - for k,v in argdict.iteritems(): + """ + :param osdnum: osd number + :param argdict: dictionary containing values to set. + """ + for k, v in argdict.iteritems(): self.wait_run_admin_socket( osdnum, ['config', 'set', str(k), str(v)]) def raw_cluster_status(self): + """ + Get status from cluster + """ status = self.raw_cluster_cmd('status', '--format=json-pretty') return json.loads(status) def raw_osd_status(self): + """ + Get osd status from cluster + """ return self.raw_cluster_cmd('osd', 'dump') def get_osd_status(self): + """ + Get osd statuses sorted by states that the osds are in. + """ osd_lines = filter( lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)), self.raw_osd_status().split('\n')) @@ -566,19 +673,30 @@ class CephManager: 'raw' : osd_lines} def get_num_pgs(self): + """ + Check cluster status for the number of pgs + """ status = self.raw_cluster_status() self.log(status) return status['pgmap']['num_pgs'] def create_pool_with_unique_name(self, pg_num=1, ec_pool=False): + """ + Create a pool named unique_pool_X where X is unique. + """ name = "" with self.lock: - name = "unique_pool_%s"%(str(self.next_pool_id),) + name = "unique_pool_%s" % (str(self.next_pool_id),) self.next_pool_id += 1 self.create_pool(name, pg_num, ec_pool=ec_pool) return name def create_pool(self, pool_name, pg_num=1, ec_pool=False): + """ + Create a pool named from the pool_name parameter. + :param pool_name: name of the pool being created. + :param pg_num: initial number of pgs. + """ with self.lock: assert isinstance(pool_name, str) assert isinstance(pg_num, int) @@ -595,6 +713,10 @@ class CephManager: self.pools[pool_name] = pg_num def remove_pool(self, pool_name): + """ + Remove the indicated pool + :param pool_name: Pool to be removed + """ with self.lock: assert isinstance(pool_name, str) assert pool_name in self.pools @@ -606,17 +728,28 @@ class CephManager: ) def get_pool(self): + """ + Pick a random pool + """ with self.lock: - return random.choice(self.pools.keys()); + return random.choice(self.pools.keys()) def get_pool_pg_num(self, pool_name): + """ + Return the number of pgs in the pool specified. + """ with self.lock: assert isinstance(pool_name, str) if pool_name in self.pools: return self.pools[pool_name] - return 0; + return 0 def get_pool_property(self, pool_name, prop): + """ + :param pool_name: pool + :param prop: property to be checked. + :returns: property as an int value. + """ with self.lock: assert isinstance(pool_name, str) assert isinstance(prop, str) @@ -629,6 +762,13 @@ class CephManager: return int(output.split()[1]) def set_pool_property(self, pool_name, prop, val): + """ + :param pool_name: pool + :param prop: property to be set. + :param val: value to set. + + This routine retries if set operation fails. + """ with self.lock: assert isinstance(pool_name, str) assert isinstance(prop, str) @@ -651,6 +791,9 @@ class CephManager: time.sleep(2) def expand_pool(self, pool_name, by, max_pgs): + """ + Increase the number of pgs in a pool + """ with self.lock: assert isinstance(pool_name, str) assert isinstance(by, int) @@ -665,6 +808,9 @@ class CephManager: self.pools[pool_name] = new_pg_num def set_pool_pgpnum(self, pool_name): + """ + Set pgpnum property of pool_name pool. + """ with self.lock: assert isinstance(pool_name, str) assert pool_name in self.pools @@ -673,10 +819,13 @@ class CephManager: self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name]) def list_pg_missing(self, pgid): + """ + return list of missing pgs with the id specified + """ r = None offset = {} while True: - out = self.raw_cluster_cmd('--', 'pg',pgid,'list_missing', + out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing', json.dumps(offset)) j = json.loads(out) if r is None: @@ -693,11 +842,17 @@ class CephManager: return r def get_pg_stats(self): - out = self.raw_cluster_cmd('pg','dump','--format=json') + """ + Dump the cluster and get pg stats + """ + out = self.raw_cluster_cmd('pg', 'dump', '--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) return j['pg_stats'] def compile_pg_status(self): + """ + Return a histogram of pg state values + """ ret = {} j = self.get_pg_stats() for pg in j: @@ -708,25 +863,40 @@ class CephManager: return ret def pg_scrubbing(self, pool, pgnum): + """ + pg scrubbing wrapper + """ pgstr = self.get_pgid(pool, pgnum) stats = self.get_single_pg_stats(pgstr) return 'scrub' in stats['state'] def pg_repairing(self, pool, pgnum): + """ + pg repairing wrapper + """ pgstr = self.get_pgid(pool, pgnum) stats = self.get_single_pg_stats(pgstr) return 'repair' in stats['state'] def pg_inconsistent(self, pool, pgnum): + """ + pg inconsistent wrapper + """ pgstr = self.get_pgid(pool, pgnum) stats = self.get_single_pg_stats(pgstr) return 'inconsistent' in stats['state'] def get_last_scrub_stamp(self, pool, pgnum): + """ + Get the timestamp of the last scrub. + """ stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum)) return stats["last_scrub_stamp"] def do_pg_scrub(self, pool, pgnum, stype): + """ + Scrub pg and wait for scrubbing to finish + """ init = self.get_last_scrub_stamp(pool, pgnum) self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum)) while init == self.get_last_scrub_stamp(pool, pgnum): @@ -734,6 +904,9 @@ class CephManager: time.sleep(10) def get_single_pg_stats(self, pgid): + """ + Return pg for the pgid specified. + """ all_stats = self.get_pg_stats() for pg in all_stats: @@ -743,21 +916,34 @@ class CephManager: return None def get_osd_dump(self): - out = self.raw_cluster_cmd('osd','dump','--format=json') + """ + Dump osds + :returns: all osds + """ + out = self.raw_cluster_cmd('osd', 'dump', '--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) return j['osds'] def get_stuck_pgs(self, type_, threshold): - out = self.raw_cluster_cmd('pg','dump_stuck', type_, str(threshold), + """ + :returns: stuck pg information from the cluster + """ + out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold), '--format=json') return json.loads(out) def get_num_unfound_objects(self): + """ + Check cluster status to get the number of unfound objects + """ status = self.raw_cluster_status() self.log(status) return status['pgmap'].get('unfound_objects', 0) def get_num_creating(self): + """ + Find the number of pgs in creating mode. + """ pgs = self.get_pg_stats() num = 0 for pg in pgs: @@ -766,6 +952,9 @@ class CephManager: return num def get_num_active_clean(self): + """ + Find the number of active and clean pgs. + """ pgs = self.get_pg_stats() num = 0 for pg in pgs: @@ -774,6 +963,9 @@ class CephManager: return num def get_num_active_recovered(self): + """ + Find the number of active and recovered pgs. + """ pgs = self.get_pg_stats() num = 0 for pg in pgs: @@ -782,6 +974,9 @@ class CephManager: return num def get_num_active(self): + """ + Find the number of active pgs. + """ pgs = self.get_pg_stats() num = 0 for pg in pgs: @@ -789,7 +984,10 @@ class CephManager: num += 1 return num - def get_num_down(self): + def get_num_down(self): + """ + Find the number of pgs that are down. + """ pgs = self.get_pg_stats() num = 0 for pg in pgs: @@ -799,6 +997,9 @@ class CephManager: return num def get_num_active_down(self): + """ + Find the number of pgs that are either active or down. + """ pgs = self.get_pg_stats() num = 0 for pg in pgs: @@ -809,15 +1010,27 @@ class CephManager: return num def is_clean(self): + """ + True if all pgs are clean + """ return self.get_num_active_clean() == self.get_num_pgs() def is_recovered(self): + """ + True if all pgs have recovered + """ return self.get_num_active_recovered() == self.get_num_pgs() def is_active_or_down(self): + """ + True if all pgs are active or down + """ return self.get_num_active_down() == self.get_num_pgs() def wait_for_clean(self, timeout=None): + """ + Returns trues when all pgs are clean. + """ self.log("waiting for clean") start = time.time() num_active_clean = self.get_num_active_clean() @@ -833,11 +1046,18 @@ class CephManager: self.log("clean!") def are_all_osds_up(self): + """ + Returns true if all osds are up. + """ x = self.get_osd_dump() return (len(x) == \ sum([(y['up'] > 0) for y in x])) def wait_for_all_up(self, timeout=None): + """ + When this exits, either the timeout has expired, or all + osds are up. + """ self.log("waiting for all up") start = time.time() while not self.are_all_osds_up(): @@ -848,6 +1068,9 @@ class CephManager: self.log("all up!") def wait_for_recovery(self, timeout=None): + """ + Check peering. When this exists, we have recovered. + """ self.log("waiting for recovery to complete") start = time.time() num_active_recovered = self.get_num_active_recovered() @@ -863,6 +1086,9 @@ class CephManager: self.log("recovered!") def wait_for_active(self, timeout=None): + """ + Check peering. When this exists, we are definitely active + """ self.log("waiting for peering to complete") start = time.time() num_active = self.get_num_active() @@ -878,6 +1104,10 @@ class CephManager: self.log("active!") def wait_for_active_or_down(self, timeout=None): + """ + Check peering. When this exists, we are definitely either + active or down + """ self.log("waiting for peering to complete or become blocked") start = time.time() num_active_down = self.get_num_active_down() @@ -893,11 +1123,17 @@ class CephManager: self.log("active or down!") def osd_is_up(self, osd): + """ + Wrapper for osd check + """ osds = self.get_osd_dump() return osds[osd]['up'] > 0 def wait_till_osd_is_up(self, osd, timeout=None): - self.log('waiting for osd.%d to be up' % osd); + """ + Loop waiting for osd. + """ + self.log('waiting for osd.%d to be up' % osd) start = time.time() while not self.osd_is_up(osd): if timeout is not None: @@ -907,9 +1143,15 @@ class CephManager: self.log('osd.%d is up' % osd) def is_active(self): + """ + Wrapper to check if active + """ return self.get_num_active() == self.get_num_pgs() def wait_till_active(self, timeout=None): + """ + Wait until osds are active. + """ self.log("waiting till active") start = time.time() while not self.is_active(): @@ -920,9 +1162,16 @@ class CephManager: self.log("active!") def mark_out_osd(self, osd): + """ + Wrapper to mark osd out. + """ self.raw_cluster_cmd('osd', 'out', str(osd)) def kill_osd(self, osd): + """ + Kill osds by either power cycling (if indicated by the config) + or by stopping. + """ if self.config.get('powercycle'): (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys() self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name)) @@ -932,12 +1181,19 @@ class CephManager: self.ctx.daemons.get_daemon('osd', osd).stop() def blackhole_kill_osd(self, osd): + """ + Stop osd if nothing else works. + """ self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd, 'injectargs', '--filestore-blackhole') time.sleep(2) self.ctx.daemons.get_daemon('osd', osd).stop() def revive_osd(self, osd, timeout=75): + """ + Revive osds by either power cycling (if indicated by the config) + or by restarting. + """ if self.config.get('powercycle'): (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys() self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name)) @@ -959,18 +1215,31 @@ class CephManager: timeout=timeout) def mark_down_osd(self, osd): + """ + Cluster command wrapper + """ self.raw_cluster_cmd('osd', 'down', str(osd)) def mark_in_osd(self, osd): + """ + Cluster command wrapper + """ self.raw_cluster_cmd('osd', 'in', str(osd)) ## monitors def signal_mon(self, mon, sig): + """ + Wrapper to local get_deamon call + """ self.ctx.daemons.get_daemon('mon', mon).signal(sig) def kill_mon(self, mon): + """ + Kill the monitor by either power cycling (if the config says so), + or by doing a stop. + """ if self.config.get('powercycle'): (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys() self.log('kill_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name)) @@ -980,6 +1249,10 @@ class CephManager: self.ctx.daemons.get_daemon('mon', mon).stop() def revive_mon(self, mon): + """ + Restart by either power cycling (if the config says so), + or by doing a normal restart. + """ if self.config.get('powercycle'): (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys() self.log('revive_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name)) @@ -989,17 +1262,26 @@ class CephManager: self.ctx.daemons.get_daemon('mon', mon).restart() def get_mon_status(self, mon): + """ + Extract all the monitor status information from the cluster + """ addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr'] out = self.raw_cluster_cmd('-m', addr, 'mon_status') return json.loads(out) def get_mon_quorum(self): + """ + Extract monitor quorum information from the cluster + """ out = self.raw_cluster_cmd('quorum_status') j = json.loads(out) self.log('quorum_status is %s' % out) return j['quorum'] def wait_for_mon_quorum_size(self, size, timeout=300): + """ + Loop until quorum size is reached. + """ self.log('waiting for quorum size %d' % size) start = time.time() while not len(self.get_mon_quorum()) == size: @@ -1010,14 +1292,20 @@ class CephManager: self.log("quorum is size %d" % size) def get_mon_health(self, debug=False): - out = self.raw_cluster_cmd('health', '--format=json') - if debug: - self.log('health:\n{h}'.format(h=out)) - return json.loads(out) + """ + Extract all the monitor health information. + """ + out = self.raw_cluster_cmd('health', '--format=json') + if debug: + self.log('health:\n{h}'.format(h=out)) + return json.loads(out) ## metadata servers def kill_mds(self, mds): + """ + Powercyle if set in config, otherwise just stop. + """ if self.config.get('powercycle'): (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys() self.log('kill_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name)) @@ -1027,10 +1315,17 @@ class CephManager: self.ctx.daemons.get_daemon('mds', mds).stop() def kill_mds_by_rank(self, rank): + """ + kill_mds wrapper to kill based on rank passed. + """ status = self.get_mds_status_by_rank(rank) self.kill_mds(status['name']) def revive_mds(self, mds, standby_for_rank=None): + """ + Revive mds -- do an ipmpi powercycle (if indicated by the config) + and then restart (using --hot-standby if specified. + """ if self.config.get('powercycle'): (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys() self.log('revive_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name)) @@ -1039,32 +1334,45 @@ class CephManager: ceph_task.make_admin_daemon_dir(self.ctx, remote) args = [] if standby_for_rank: - args.extend(['--hot-standby', standby_for_rank]) + args.extend(['--hot-standby', standby_for_rank]) self.ctx.daemons.get_daemon('mds', mds).restart(*args) def revive_mds_by_rank(self, rank, standby_for_rank=None): + """ + revive_mds wrapper to revive based on rank passed. + """ status = self.get_mds_status_by_rank(rank) self.revive_mds(status['name'], standby_for_rank) def get_mds_status(self, mds): + """ + Run cluster commands for the mds in order to get mds information + """ out = self.raw_cluster_cmd('mds', 'dump', '--format=json') j = json.loads(' '.join(out.splitlines()[1:])) # collate; for dup ids, larger gid wins. for info in j['info'].itervalues(): - if info['name'] == mds: - return info + if info['name'] == mds: + return info return None def get_mds_status_by_rank(self, rank): + """ + Run cluster commands for the mds in order to get mds information + check rank. + """ out = self.raw_cluster_cmd('mds', 'dump', '--format=json') j = json.loads(' '.join(out.splitlines()[1:])) # collate; for dup ids, larger gid wins. for info in j['info'].itervalues(): - if info['rank'] == rank: - return info + if info['rank'] == rank: + return info return None def get_mds_status_all(self): + """ + Run cluster command to extract all the mds status. + """ out = self.raw_cluster_cmd('mds', 'dump', '--format=json') j = json.loads(' '.join(out.splitlines()[1:])) return j