+"""
+ceph manager -- Thrasher and CephManager objects
+"""
from cStringIO import StringIO
import random
import time
from teuthology.task import ceph as ceph_task
class Thrasher:
+ """
+ Object used to thrash Ceph
+ """
def __init__(self, manager, config, logger=None):
self.ceph_manager = manager
self.ceph_manager.wait_for_clean()
self.log = lambda x: self.logger.info(x)
else:
def tmp(x):
- print x
+ """
+ Implement log behavior
+ """
self.log = tmp
if self.config is None:
self.config = dict()
self.thread = gevent.spawn(self.do_thrash)
def kill_osd(self, osd=None, mark_down=False, mark_out=False):
+ """
+ :param osd: Osd to be killed.
+ :mark_down: Mark down if true.
+ :mark_out: Mark out if true.
+ """
if osd is None:
osd = random.choice(self.live_osds)
- self.log("Killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds)))
+ self.log("Killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
self.live_osds.remove(osd)
self.dead_osds.append(osd)
self.ceph_manager.kill_osd(osd)
self.out_osd(osd)
def blackhole_kill_osd(self, osd=None):
+ """
+ If all else fails, kill the osd.
+ :param osd: Osd to be killed.
+ """
if osd is None:
osd = random.choice(self.live_osds)
- self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds)))
+ self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
self.live_osds.remove(osd)
self.dead_osds.append(osd)
self.ceph_manager.blackhole_kill_osd(osd)
def revive_osd(self, osd=None):
+ """
+ Revive the osd.
+ :param osd: Osd to be revived.
+ """
if osd is None:
osd = random.choice(self.dead_osds)
self.log("Reviving osd %s" % (str(osd),))
self.ceph_manager.revive_osd(osd, self.revive_timeout)
def out_osd(self, osd=None):
+ """
+ Mark the osd out
+ :param osd: Osd to be marked.
+ """
if osd is None:
osd = random.choice(self.in_osds)
- self.log("Removing osd %s, in_osds are: %s" % (str(osd),str(self.in_osds)))
+ self.log("Removing osd %s, in_osds are: %s" % (str(osd), str(self.in_osds)))
self.ceph_manager.mark_out_osd(osd)
self.in_osds.remove(osd)
self.out_osds.append(osd)
def in_osd(self, osd=None):
+ """
+ Mark the osd out
+ :param osd: Osd to be marked.
+ """
if osd is None:
osd = random.choice(self.out_osds)
if osd in self.dead_osds:
self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity', str(osd), str(pa))
def all_up(self):
+ """
+ Make sure all osds are up and not out.
+ """
while len(self.dead_osds) > 0:
self.log("reviving osd")
self.revive_osd()
self.in_osd()
def do_join(self):
+ """
+ Break out of this Ceph loop
+ """
self.stopping = True
self.thread.get()
def grow_pool(self):
+ """
+ Increase the size of the pool
+ """
pool = self.ceph_manager.get_pool()
self.log("Growing pool %s"%(pool,))
self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
def fix_pgp_num(self):
+ """
+ Fix number of pgs in pool.
+ """
pool = self.ceph_manager.get_pool()
self.log("fixing pg num pool %s"%(pool,))
self.ceph_manager.set_pool_pgpnum(pool)
def test_pool_min_size(self):
+ """
+ Kill and revive all osds except one.
+ """
self.log("test_pool_min_size")
self.all_up()
self.ceph_manager.wait_for_recovery(
)
def inject_pause(self, conf_key, duration, check_after, should_be_down):
+ """
+ Pause injection testing. Check for osd being down when finished.
+ """
the_one = random.choice(self.live_osds)
self.log("inject_pause on {osd}".format(osd = the_one))
self.log(
self.revive_osd()
def choose_action(self):
+ """
+ Random action selector.
+ """
chance_down = self.config.get('chance_down', 0.4)
chance_test_min_size = self.config.get('chance_test_min_size', 0)
- chance_test_backfill_full= self.config.get('chance_test_backfill_full', 0)
+ chance_test_backfill_full = self.config.get('chance_test_backfill_full', 0)
if isinstance(chance_down, int):
chance_down = float(chance_down) / 100
minin = self.minin
self.config.get('chance_inject_pause_long', 0),)]:
actions.append(scenario)
- total = sum([y for (x,y) in actions])
+ total = sum([y for (x, y) in actions])
val = random.uniform(0, total)
for (action, prob) in actions:
if val < prob:
return None
def do_thrash(self):
+ """
+ Loop to select random actions to thrash ceph manager with.
+ """
cleanint = self.config.get("clean_interval", 60)
- maxdead = self.config.get("max_dead", 0);
+ maxdead = self.config.get("max_dead", 0)
delay = self.config.get("op_delay", 5)
self.log("starting do_thrash")
while not self.stopping:
self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
"dead_osds: ", self.dead_osds, "live_osds: ",
self.live_osds]]))
- if random.uniform(0,1) < (float(delay) / cleanint):
+ if random.uniform(0, 1) < (float(delay) / cleanint):
while len(self.dead_osds) > maxdead:
self.revive_osd()
if random.uniform(0, 1) < float(
self.all_up()
class CephManager:
+ """
+ Ceph manager object.
+ Contains several local functions that form a bulk of this module.
+ """
def __init__(self, controller, ctx=None, config=None, logger=None):
self.lock = threading.RLock()
self.ctx = ctx
self.log = lambda x: logger.info(x)
else:
def tmp(x):
+ """
+ implement log behavior.
+ """
print x
self.log = tmp
if self.config is None:
self.pools[pool] = self.get_pool_property(pool, 'pg_num')
def raw_cluster_cmd(self, *args):
+ """
+ Start ceph on a raw cluster. Return count
+ """
testdir = teuthology.get_testdir(self.ctx)
ceph_args = [
'adjust-ulimits',
return proc.stdout.getvalue()
def raw_cluster_cmd_result(self, *args):
+ """
+ Start ceph on a cluster. Return success or failure information.
+ """
testdir = teuthology.get_testdir(self.ctx)
ceph_args = [
'adjust-ulimits',
return proc.exitstatus
def do_rados(self, remote, cmd):
+ """
+ Execute a remote rados command.
+ """
testdir = teuthology.get_testdir(self.ctx)
pre = [
'adjust-ulimits',
'ceph-coverage',
'{tdir}/archive/coverage'.format(tdir=testdir),
'rados',
- ];
+ ]
pre.extend(cmd)
proc = remote.run(
args=pre,
def rados_write_objects(
self, pool, num_objects, size, timelimit, threads, cleanup=False):
+ """
+ Write rados objects
+ Threads not used yet.
+ """
args = [
'-p', pool,
'--num-objects', num_objects,
return self.do_rados(self.controller, map(str, args))
def do_put(self, pool, obj, fname):
+ """
+ Implement rados put operation
+ """
return self.do_rados(
self.controller,
[
)
def do_get(self, pool, obj, fname='/dev/null'):
+ """
+ Implement rados get operation
+ """
return self.do_rados(
self.controller,
[
)
def osd_admin_socket(self, osdnum, command, check_status=True):
+ """
+ Remotely start up ceph specifying the admin socket
+ """
testdir = teuthology.get_testdir(self.ctx)
remote = None
for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
if int(id_) == int(osdnum):
remote = _remote
assert remote is not None
- args=[
+ args = [
'sudo',
'adjust-ulimits',
'ceph-coverage',
)
def get_pgid(self, pool, pgnum):
+ """
+ :param pool: pool number
+ :param pgnum: pg number
+ :returns: a string representing this pg.
+ """
poolnum = self.get_pool_num(pool)
pg_str = "{poolnum}.{pgnum}".format(
poolnum=poolnum,
"""
get number for pool (e.g., data -> 2)
"""
- out = self.raw_cluster_cmd('osd','dump','--format=json')
+ out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
for i in j['pools']:
if i['pool_name'] == pool:
"""
list all pool names
"""
- out = self.raw_cluster_cmd('osd','dump','--format=json')
+ out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
self.log(j['pools'])
return [str(i['pool_name']) for i in j['pools']]
[self.remove_pool(i) for i in self.list_pools()]
def kick_recovery_wq(self, osdnum):
+ """
+ Run kick_recovery_wq on cluster.
+ """
return self.raw_cluster_cmd(
'tell', "osd.%d" % (int(osdnum),),
'debug',
'0')
def wait_run_admin_socket(self, osdnum, args=['version'], timeout=75):
+ """
+ If osd_admin_socket call suceeds, return. Otherwise wait
+ five seconds and try again.
+ """
tries = 0
while True:
proc = self.osd_admin_socket(
time.sleep(5)
def set_config(self, osdnum, **argdict):
- for k,v in argdict.iteritems():
+ """
+ :param osdnum: osd number
+ :param argdict: dictionary containing values to set.
+ """
+ for k, v in argdict.iteritems():
self.wait_run_admin_socket(
osdnum,
['config', 'set', str(k), str(v)])
def raw_cluster_status(self):
+ """
+ Get status from cluster
+ """
status = self.raw_cluster_cmd('status', '--format=json-pretty')
return json.loads(status)
def raw_osd_status(self):
+ """
+ Get osd status from cluster
+ """
return self.raw_cluster_cmd('osd', 'dump')
def get_osd_status(self):
+ """
+ Get osd statuses sorted by states that the osds are in.
+ """
osd_lines = filter(
lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
self.raw_osd_status().split('\n'))
'raw' : osd_lines}
def get_num_pgs(self):
+ """
+ Check cluster status for the number of pgs
+ """
status = self.raw_cluster_status()
self.log(status)
return status['pgmap']['num_pgs']
def create_pool_with_unique_name(self, pg_num=1, ec_pool=False):
+ """
+ Create a pool named unique_pool_X where X is unique.
+ """
name = ""
with self.lock:
- name = "unique_pool_%s"%(str(self.next_pool_id),)
+ name = "unique_pool_%s" % (str(self.next_pool_id),)
self.next_pool_id += 1
self.create_pool(name, pg_num, ec_pool=ec_pool)
return name
def create_pool(self, pool_name, pg_num=1, ec_pool=False):
+ """
+ Create a pool named from the pool_name parameter.
+ :param pool_name: name of the pool being created.
+ :param pg_num: initial number of pgs.
+ """
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(pg_num, int)
self.pools[pool_name] = pg_num
def remove_pool(self, pool_name):
+ """
+ Remove the indicated pool
+ :param pool_name: Pool to be removed
+ """
with self.lock:
assert isinstance(pool_name, str)
assert pool_name in self.pools
)
def get_pool(self):
+ """
+ Pick a random pool
+ """
with self.lock:
- return random.choice(self.pools.keys());
+ return random.choice(self.pools.keys())
def get_pool_pg_num(self, pool_name):
+ """
+ Return the number of pgs in the pool specified.
+ """
with self.lock:
assert isinstance(pool_name, str)
if pool_name in self.pools:
return self.pools[pool_name]
- return 0;
+ return 0
def get_pool_property(self, pool_name, prop):
+ """
+ :param pool_name: pool
+ :param prop: property to be checked.
+ :returns: property as an int value.
+ """
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(prop, str)
return int(output.split()[1])
def set_pool_property(self, pool_name, prop, val):
+ """
+ :param pool_name: pool
+ :param prop: property to be set.
+ :param val: value to set.
+
+ This routine retries if set operation fails.
+ """
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(prop, str)
time.sleep(2)
def expand_pool(self, pool_name, by, max_pgs):
+ """
+ Increase the number of pgs in a pool
+ """
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(by, int)
self.pools[pool_name] = new_pg_num
def set_pool_pgpnum(self, pool_name):
+ """
+ Set pgpnum property of pool_name pool.
+ """
with self.lock:
assert isinstance(pool_name, str)
assert pool_name in self.pools
self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
def list_pg_missing(self, pgid):
+ """
+ return list of missing pgs with the id specified
+ """
r = None
offset = {}
while True:
- out = self.raw_cluster_cmd('--', 'pg',pgid,'list_missing',
+ out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
json.dumps(offset))
j = json.loads(out)
if r is None:
return r
def get_pg_stats(self):
- out = self.raw_cluster_cmd('pg','dump','--format=json')
+ """
+ Dump the cluster and get pg stats
+ """
+ out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
return j['pg_stats']
def compile_pg_status(self):
+ """
+ Return a histogram of pg state values
+ """
ret = {}
j = self.get_pg_stats()
for pg in j:
return ret
def pg_scrubbing(self, pool, pgnum):
+ """
+ pg scrubbing wrapper
+ """
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
return 'scrub' in stats['state']
def pg_repairing(self, pool, pgnum):
+ """
+ pg repairing wrapper
+ """
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
return 'repair' in stats['state']
def pg_inconsistent(self, pool, pgnum):
+ """
+ pg inconsistent wrapper
+ """
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
return 'inconsistent' in stats['state']
def get_last_scrub_stamp(self, pool, pgnum):
+ """
+ Get the timestamp of the last scrub.
+ """
stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
return stats["last_scrub_stamp"]
def do_pg_scrub(self, pool, pgnum, stype):
+ """
+ Scrub pg and wait for scrubbing to finish
+ """
init = self.get_last_scrub_stamp(pool, pgnum)
self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
while init == self.get_last_scrub_stamp(pool, pgnum):
time.sleep(10)
def get_single_pg_stats(self, pgid):
+ """
+ Return pg for the pgid specified.
+ """
all_stats = self.get_pg_stats()
for pg in all_stats:
return None
def get_osd_dump(self):
- out = self.raw_cluster_cmd('osd','dump','--format=json')
+ """
+ Dump osds
+ :returns: all osds
+ """
+ out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
return j['osds']
def get_stuck_pgs(self, type_, threshold):
- out = self.raw_cluster_cmd('pg','dump_stuck', type_, str(threshold),
+ """
+ :returns: stuck pg information from the cluster
+ """
+ out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
'--format=json')
return json.loads(out)
def get_num_unfound_objects(self):
+ """
+ Check cluster status to get the number of unfound objects
+ """
status = self.raw_cluster_status()
self.log(status)
return status['pgmap'].get('unfound_objects', 0)
def get_num_creating(self):
+ """
+ Find the number of pgs in creating mode.
+ """
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
return num
def get_num_active_clean(self):
+ """
+ Find the number of active and clean pgs.
+ """
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
return num
def get_num_active_recovered(self):
+ """
+ Find the number of active and recovered pgs.
+ """
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
return num
def get_num_active(self):
+ """
+ Find the number of active pgs.
+ """
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
num += 1
return num
- def get_num_down(self):
+ def get_num_down(self):
+ """
+ Find the number of pgs that are down.
+ """
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
return num
def get_num_active_down(self):
+ """
+ Find the number of pgs that are either active or down.
+ """
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
return num
def is_clean(self):
+ """
+ True if all pgs are clean
+ """
return self.get_num_active_clean() == self.get_num_pgs()
def is_recovered(self):
+ """
+ True if all pgs have recovered
+ """
return self.get_num_active_recovered() == self.get_num_pgs()
def is_active_or_down(self):
+ """
+ True if all pgs are active or down
+ """
return self.get_num_active_down() == self.get_num_pgs()
def wait_for_clean(self, timeout=None):
+ """
+ Returns trues when all pgs are clean.
+ """
self.log("waiting for clean")
start = time.time()
num_active_clean = self.get_num_active_clean()
self.log("clean!")
def are_all_osds_up(self):
+ """
+ Returns true if all osds are up.
+ """
x = self.get_osd_dump()
return (len(x) == \
sum([(y['up'] > 0) for y in x]))
def wait_for_all_up(self, timeout=None):
+ """
+ When this exits, either the timeout has expired, or all
+ osds are up.
+ """
self.log("waiting for all up")
start = time.time()
while not self.are_all_osds_up():
self.log("all up!")
def wait_for_recovery(self, timeout=None):
+ """
+ Check peering. When this exists, we have recovered.
+ """
self.log("waiting for recovery to complete")
start = time.time()
num_active_recovered = self.get_num_active_recovered()
self.log("recovered!")
def wait_for_active(self, timeout=None):
+ """
+ Check peering. When this exists, we are definitely active
+ """
self.log("waiting for peering to complete")
start = time.time()
num_active = self.get_num_active()
self.log("active!")
def wait_for_active_or_down(self, timeout=None):
+ """
+ Check peering. When this exists, we are definitely either
+ active or down
+ """
self.log("waiting for peering to complete or become blocked")
start = time.time()
num_active_down = self.get_num_active_down()
self.log("active or down!")
def osd_is_up(self, osd):
+ """
+ Wrapper for osd check
+ """
osds = self.get_osd_dump()
return osds[osd]['up'] > 0
def wait_till_osd_is_up(self, osd, timeout=None):
- self.log('waiting for osd.%d to be up' % osd);
+ """
+ Loop waiting for osd.
+ """
+ self.log('waiting for osd.%d to be up' % osd)
start = time.time()
while not self.osd_is_up(osd):
if timeout is not None:
self.log('osd.%d is up' % osd)
def is_active(self):
+ """
+ Wrapper to check if active
+ """
return self.get_num_active() == self.get_num_pgs()
def wait_till_active(self, timeout=None):
+ """
+ Wait until osds are active.
+ """
self.log("waiting till active")
start = time.time()
while not self.is_active():
self.log("active!")
def mark_out_osd(self, osd):
+ """
+ Wrapper to mark osd out.
+ """
self.raw_cluster_cmd('osd', 'out', str(osd))
def kill_osd(self, osd):
+ """
+ Kill osds by either power cycling (if indicated by the config)
+ or by stopping.
+ """
if self.config.get('powercycle'):
(remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
self.ctx.daemons.get_daemon('osd', osd).stop()
def blackhole_kill_osd(self, osd):
+ """
+ Stop osd if nothing else works.
+ """
self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
'injectargs', '--filestore-blackhole')
time.sleep(2)
self.ctx.daemons.get_daemon('osd', osd).stop()
def revive_osd(self, osd, timeout=75):
+ """
+ Revive osds by either power cycling (if indicated by the config)
+ or by restarting.
+ """
if self.config.get('powercycle'):
(remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
timeout=timeout)
def mark_down_osd(self, osd):
+ """
+ Cluster command wrapper
+ """
self.raw_cluster_cmd('osd', 'down', str(osd))
def mark_in_osd(self, osd):
+ """
+ Cluster command wrapper
+ """
self.raw_cluster_cmd('osd', 'in', str(osd))
## monitors
def signal_mon(self, mon, sig):
+ """
+ Wrapper to local get_deamon call
+ """
self.ctx.daemons.get_daemon('mon', mon).signal(sig)
def kill_mon(self, mon):
+ """
+ Kill the monitor by either power cycling (if the config says so),
+ or by doing a stop.
+ """
if self.config.get('powercycle'):
(remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
self.log('kill_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
self.ctx.daemons.get_daemon('mon', mon).stop()
def revive_mon(self, mon):
+ """
+ Restart by either power cycling (if the config says so),
+ or by doing a normal restart.
+ """
if self.config.get('powercycle'):
(remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
self.log('revive_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
self.ctx.daemons.get_daemon('mon', mon).restart()
def get_mon_status(self, mon):
+ """
+ Extract all the monitor status information from the cluster
+ """
addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
out = self.raw_cluster_cmd('-m', addr, 'mon_status')
return json.loads(out)
def get_mon_quorum(self):
+ """
+ Extract monitor quorum information from the cluster
+ """
out = self.raw_cluster_cmd('quorum_status')
j = json.loads(out)
self.log('quorum_status is %s' % out)
return j['quorum']
def wait_for_mon_quorum_size(self, size, timeout=300):
+ """
+ Loop until quorum size is reached.
+ """
self.log('waiting for quorum size %d' % size)
start = time.time()
while not len(self.get_mon_quorum()) == size:
self.log("quorum is size %d" % size)
def get_mon_health(self, debug=False):
- out = self.raw_cluster_cmd('health', '--format=json')
- if debug:
- self.log('health:\n{h}'.format(h=out))
- return json.loads(out)
+ """
+ Extract all the monitor health information.
+ """
+ out = self.raw_cluster_cmd('health', '--format=json')
+ if debug:
+ self.log('health:\n{h}'.format(h=out))
+ return json.loads(out)
## metadata servers
def kill_mds(self, mds):
+ """
+ Powercyle if set in config, otherwise just stop.
+ """
if self.config.get('powercycle'):
(remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
self.log('kill_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
self.ctx.daemons.get_daemon('mds', mds).stop()
def kill_mds_by_rank(self, rank):
+ """
+ kill_mds wrapper to kill based on rank passed.
+ """
status = self.get_mds_status_by_rank(rank)
self.kill_mds(status['name'])
def revive_mds(self, mds, standby_for_rank=None):
+ """
+ Revive mds -- do an ipmpi powercycle (if indicated by the config)
+ and then restart (using --hot-standby if specified.
+ """
if self.config.get('powercycle'):
(remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
self.log('revive_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
ceph_task.make_admin_daemon_dir(self.ctx, remote)
args = []
if standby_for_rank:
- args.extend(['--hot-standby', standby_for_rank])
+ args.extend(['--hot-standby', standby_for_rank])
self.ctx.daemons.get_daemon('mds', mds).restart(*args)
def revive_mds_by_rank(self, rank, standby_for_rank=None):
+ """
+ revive_mds wrapper to revive based on rank passed.
+ """
status = self.get_mds_status_by_rank(rank)
self.revive_mds(status['name'], standby_for_rank)
def get_mds_status(self, mds):
+ """
+ Run cluster commands for the mds in order to get mds information
+ """
out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
j = json.loads(' '.join(out.splitlines()[1:]))
# collate; for dup ids, larger gid wins.
for info in j['info'].itervalues():
- if info['name'] == mds:
- return info
+ if info['name'] == mds:
+ return info
return None
def get_mds_status_by_rank(self, rank):
+ """
+ Run cluster commands for the mds in order to get mds information
+ check rank.
+ """
out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
j = json.loads(' '.join(out.splitlines()[1:]))
# collate; for dup ids, larger gid wins.
for info in j['info'].itervalues():
- if info['rank'] == rank:
- return info
+ if info['rank'] == rank:
+ return info
return None
def get_mds_status_all(self):
+ """
+ Run cluster command to extract all the mds status.
+ """
out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
j = json.loads(' '.join(out.splitlines()[1:]))
return j