]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Fix formatting and add docstrings to ceph_manager.py
authorWarren Usui <warren.usui@inktank.com>
Sat, 15 Feb 2014 03:50:38 +0000 (19:50 -0800)
committerWarren Usui <warren.usui@inktank.com>
Thu, 20 Feb 2014 22:51:23 +0000 (14:51 -0800)
Fixes: 6531
Signed-off-by: Warren Usui <warren.usui@inktank.com>
teuthology/task/ceph_manager.py

index b52f137e160eab70761d248405986e31e3783efc..53bf5a00fa166c20e57305b43d7124fb37b8d6cf 100644 (file)
@@ -1,3 +1,6 @@
+"""
+ceph manager -- Thrasher and CephManager objects
+"""
 from cStringIO import StringIO
 import random
 import time
@@ -8,6 +11,9 @@ from teuthology import misc as teuthology
 from teuthology.task import ceph as ceph_task
 
 class Thrasher:
+    """
+    Object used to thrash Ceph
+    """
     def __init__(self, manager, config, logger=None):
         self.ceph_manager = manager
         self.ceph_manager.wait_for_clean()
@@ -31,7 +37,9 @@ class Thrasher:
             self.log = lambda x: self.logger.info(x)
         else:
             def tmp(x):
-                print x
+                """
+                Implement log behavior
+                """
             self.log = tmp
         if self.config is None:
             self.config = dict()
@@ -46,9 +54,14 @@ class Thrasher:
         self.thread = gevent.spawn(self.do_thrash)
 
     def kill_osd(self, osd=None, mark_down=False, mark_out=False):
+        """
+        :param osd: Osd to be killed.
+        :mark_down: Mark down if true.
+        :mark_out: Mark out if true.
+        """
         if osd is None:
             osd = random.choice(self.live_osds)
-        self.log("Killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds)))
+        self.log("Killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
         self.live_osds.remove(osd)
         self.dead_osds.append(osd)
         self.ceph_manager.kill_osd(osd)
@@ -58,14 +71,22 @@ class Thrasher:
             self.out_osd(osd)
 
     def blackhole_kill_osd(self, osd=None):
+        """
+        If all else fails, kill the osd.
+        :param osd: Osd to be killed.
+        """
         if osd is None:
             osd = random.choice(self.live_osds)
-        self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds)))
+        self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
         self.live_osds.remove(osd)
         self.dead_osds.append(osd)
         self.ceph_manager.blackhole_kill_osd(osd)
 
     def revive_osd(self, osd=None):
+        """
+        Revive the osd.
+        :param osd: Osd to be revived.
+        """
         if osd is None:
             osd = random.choice(self.dead_osds)
         self.log("Reviving osd %s" % (str(osd),))
@@ -74,14 +95,22 @@ class Thrasher:
         self.ceph_manager.revive_osd(osd, self.revive_timeout)
 
     def out_osd(self, osd=None):
+        """
+        Mark the osd out
+        :param osd: Osd to be marked.
+        """
         if osd is None:
             osd = random.choice(self.in_osds)
-        self.log("Removing osd %s, in_osds are: %s" % (str(osd),str(self.in_osds)))
+        self.log("Removing osd %s, in_osds are: %s" % (str(osd), str(self.in_osds)))
         self.ceph_manager.mark_out_osd(osd)
         self.in_osds.remove(osd)
         self.out_osds.append(osd)
 
     def in_osd(self, osd=None):
+        """
+        Mark the osd out
+        :param osd: Osd to be marked.
+        """
         if osd is None:
             osd = random.choice(self.out_osds)
         if osd in self.dead_osds:
@@ -105,6 +134,9 @@ class Thrasher:
         self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity', str(osd), str(pa))
 
     def all_up(self):
+        """
+        Make sure all osds are up and not out.
+        """
         while len(self.dead_osds) > 0:
             self.log("reviving osd")
             self.revive_osd()
@@ -113,20 +145,32 @@ class Thrasher:
             self.in_osd()
 
     def do_join(self):
+        """
+        Break out of this Ceph loop
+        """
         self.stopping = True
         self.thread.get()
 
     def grow_pool(self):
+        """
+        Increase the size of the pool
+        """
         pool = self.ceph_manager.get_pool()
         self.log("Growing pool %s"%(pool,))
         self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
 
     def fix_pgp_num(self):
+        """
+        Fix number of pgs in pool.
+        """
         pool = self.ceph_manager.get_pool()
         self.log("fixing pg num pool %s"%(pool,))
         self.ceph_manager.set_pool_pgpnum(pool)
 
     def test_pool_min_size(self):
+        """
+        Kill and revive all osds except one.
+        """
         self.log("test_pool_min_size")
         self.all_up()
         self.ceph_manager.wait_for_recovery(
@@ -151,6 +195,9 @@ class Thrasher:
             )
 
     def inject_pause(self, conf_key, duration, check_after, should_be_down):
+        """
+        Pause injection testing. Check for osd being down when finished.
+        """
         the_one = random.choice(self.live_osds)
         self.log("inject_pause on {osd}".format(osd = the_one))
         self.log(
@@ -245,9 +292,12 @@ class Thrasher:
         self.revive_osd()
 
     def choose_action(self):
+        """
+        Random action selector.
+        """
         chance_down = self.config.get('chance_down', 0.4)
         chance_test_min_size = self.config.get('chance_test_min_size', 0)
-        chance_test_backfill_full= self.config.get('chance_test_backfill_full', 0)
+        chance_test_backfill_full = self.config.get('chance_test_backfill_full', 0)
         if isinstance(chance_down, int):
             chance_down = float(chance_down) / 100
         minin = self.minin
@@ -286,7 +336,7 @@ class Thrasher:
                  self.config.get('chance_inject_pause_long', 0),)]:
                 actions.append(scenario)
 
-        total = sum([y for (x,y) in actions])
+        total = sum([y for (x, y) in actions])
         val = random.uniform(0, total)
         for (action, prob) in actions:
             if val < prob:
@@ -295,15 +345,18 @@ class Thrasher:
         return None
 
     def do_thrash(self):
+        """
+        Loop to select random actions to thrash ceph manager with.
+        """
         cleanint = self.config.get("clean_interval", 60)
-        maxdead = self.config.get("max_dead", 0);
+        maxdead = self.config.get("max_dead", 0)
         delay = self.config.get("op_delay", 5)
         self.log("starting do_thrash")
         while not self.stopping:
             self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
                                                 "dead_osds: ", self.dead_osds, "live_osds: ",
                                                 self.live_osds]]))
-            if random.uniform(0,1) < (float(delay) / cleanint):
+            if random.uniform(0, 1) < (float(delay) / cleanint):
                 while len(self.dead_osds) > maxdead:
                     self.revive_osd()
                 if random.uniform(0, 1) < float(
@@ -319,6 +372,10 @@ class Thrasher:
         self.all_up()
 
 class CephManager:
+    """
+    Ceph manager object. 
+    Contains several local functions that form a bulk of this module.
+    """
     def __init__(self, controller, ctx=None, config=None, logger=None):
         self.lock = threading.RLock()
         self.ctx = ctx
@@ -330,6 +387,9 @@ class CephManager:
             self.log = lambda x: logger.info(x)
         else:
             def tmp(x):
+                """
+                implement log behavior.
+                """
                 print x
             self.log = tmp
         if self.config is None:
@@ -340,6 +400,9 @@ class CephManager:
             self.pools[pool] = self.get_pool_property(pool, 'pg_num')
 
     def raw_cluster_cmd(self, *args):
+        """
+        Start ceph on a raw cluster.  Return count
+        """
         testdir = teuthology.get_testdir(self.ctx)
         ceph_args = [
                 'adjust-ulimits',
@@ -355,6 +418,9 @@ class CephManager:
         return proc.stdout.getvalue()
 
     def raw_cluster_cmd_result(self, *args):
+        """
+        Start ceph on a cluster.  Return success or failure information.
+        """
         testdir = teuthology.get_testdir(self.ctx)
         ceph_args = [
                 'adjust-ulimits',
@@ -370,13 +436,16 @@ class CephManager:
         return proc.exitstatus
 
     def do_rados(self, remote, cmd):
+        """
+        Execute a remote rados command.
+        """
         testdir = teuthology.get_testdir(self.ctx)
         pre = [
             'adjust-ulimits',
             'ceph-coverage',
             '{tdir}/archive/coverage'.format(tdir=testdir),
             'rados',
-            ];
+            ]
         pre.extend(cmd)
         proc = remote.run(
             args=pre,
@@ -386,6 +455,10 @@ class CephManager:
 
     def rados_write_objects(
         self, pool, num_objects, size, timelimit, threads, cleanup=False):
+        """
+        Write rados objects
+        Threads not used yet.
+        """
         args = [
             '-p', pool,
             '--num-objects', num_objects,
@@ -397,6 +470,9 @@ class CephManager:
         return self.do_rados(self.controller, map(str, args))
 
     def do_put(self, pool, obj, fname):
+        """
+        Implement rados put operation
+        """
         return self.do_rados(
             self.controller,
             [
@@ -409,6 +485,9 @@ class CephManager:
             )
 
     def do_get(self, pool, obj, fname='/dev/null'):
+        """
+        Implement rados get operation
+        """
         return self.do_rados(
             self.controller,
             [
@@ -421,6 +500,9 @@ class CephManager:
             )
 
     def osd_admin_socket(self, osdnum, command, check_status=True):
+        """
+        Remotely start up ceph specifying the admin socket
+        """
         testdir = teuthology.get_testdir(self.ctx)
         remote = None
         for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
@@ -428,7 +510,7 @@ class CephManager:
                 if int(id_) == int(osdnum):
                     remote = _remote
         assert remote is not None
-        args=[
+        args = [
             'sudo',
             'adjust-ulimits',
             'ceph-coverage',
@@ -446,6 +528,11 @@ class CephManager:
             )
 
     def get_pgid(self, pool, pgnum):
+        """
+        :param pool: pool number
+        :param pgnum: pg number
+        :returns: a string representing this pg.
+        """
         poolnum = self.get_pool_num(pool)
         pg_str = "{poolnum}.{pgnum}".format(
             poolnum=poolnum,
@@ -480,7 +567,7 @@ class CephManager:
         """
         get number for pool (e.g., data -> 2)
         """
-        out = self.raw_cluster_cmd('osd','dump','--format=json')
+        out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
         j = json.loads('\n'.join(out.split('\n')[1:]))
         for i in j['pools']:
             if i['pool_name'] == pool:
@@ -491,7 +578,7 @@ class CephManager:
         """
         list all pool names
         """
-        out = self.raw_cluster_cmd('osd','dump','--format=json')
+        out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
         j = json.loads('\n'.join(out.split('\n')[1:]))
         self.log(j['pools'])
         return [str(i['pool_name']) for i in j['pools']]
@@ -503,6 +590,9 @@ class CephManager:
         [self.remove_pool(i) for i in self.list_pools()]
 
     def kick_recovery_wq(self, osdnum):
+        """
+        Run kick_recovery_wq on cluster.
+        """
         return self.raw_cluster_cmd(
             'tell', "osd.%d" % (int(osdnum),),
             'debug',
@@ -510,6 +600,10 @@ class CephManager:
             '0')
 
     def wait_run_admin_socket(self, osdnum, args=['version'], timeout=75):
+        """
+        If osd_admin_socket call suceeds, return.  Otherwise wait 
+        five seconds and try again.
+        """
         tries = 0
         while True:
             proc = self.osd_admin_socket(
@@ -528,19 +622,32 @@ class CephManager:
                 time.sleep(5)
 
     def set_config(self, osdnum, **argdict):
-        for k,v in argdict.iteritems():
+        """
+        :param osdnum: osd number
+        :param argdict: dictionary containing values to set.
+        """
+        for k, v in argdict.iteritems():
             self.wait_run_admin_socket(
                 osdnum,
                 ['config', 'set', str(k), str(v)])
 
     def raw_cluster_status(self):
+        """
+        Get status from cluster
+        """
         status = self.raw_cluster_cmd('status', '--format=json-pretty')
         return json.loads(status)
 
     def raw_osd_status(self):
+        """
+        Get osd status from cluster
+        """
         return self.raw_cluster_cmd('osd', 'dump')
 
     def get_osd_status(self):
+        """
+        Get osd statuses sorted by states that the osds are in.
+        """
         osd_lines = filter(
             lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
             self.raw_osd_status().split('\n'))
@@ -566,19 +673,30 @@ class CephManager:
                  'raw' : osd_lines}
 
     def get_num_pgs(self):
+        """
+        Check cluster status for the number of pgs
+        """
         status = self.raw_cluster_status()
         self.log(status)
         return status['pgmap']['num_pgs']
 
     def create_pool_with_unique_name(self, pg_num=1, ec_pool=False):
+        """
+        Create a pool named unique_pool_X where X is unique.
+        """ 
         name = ""
         with self.lock:
-            name = "unique_pool_%s"%(str(self.next_pool_id),)
+            name = "unique_pool_%s" % (str(self.next_pool_id),)
             self.next_pool_id += 1
             self.create_pool(name, pg_num, ec_pool=ec_pool)
         return name
 
     def create_pool(self, pool_name, pg_num=1, ec_pool=False):
+        """
+        Create a pool named from the pool_name parameter.
+        :param pool_name: name of the pool being created.
+        :param pg_num: initial number of pgs.
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             assert isinstance(pg_num, int)
@@ -595,6 +713,10 @@ class CephManager:
             self.pools[pool_name] = pg_num
 
     def remove_pool(self, pool_name):
+        """
+        Remove the indicated pool
+        :param pool_name: Pool to be removed
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             assert pool_name in self.pools
@@ -606,17 +728,28 @@ class CephManager:
                 )
 
     def get_pool(self):
+        """
+        Pick a random pool
+        """
         with self.lock:
-            return random.choice(self.pools.keys());
+            return random.choice(self.pools.keys())
 
     def get_pool_pg_num(self, pool_name):
+        """
+        Return the number of pgs in the pool specified.
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             if pool_name in self.pools:
                 return self.pools[pool_name]
-            return 0;
+            return 0
 
     def get_pool_property(self, pool_name, prop):
+        """
+        :param pool_name: pool
+        :param prop: property to be checked.
+        :returns: property as an int value.
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             assert isinstance(prop, str)
@@ -629,6 +762,13 @@ class CephManager:
             return int(output.split()[1])
 
     def set_pool_property(self, pool_name, prop, val):
+        """
+        :param pool_name: pool
+        :param prop: property to be set.
+        :param val: value to set.
+
+        This routine retries if set operation fails.
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             assert isinstance(prop, str)
@@ -651,6 +791,9 @@ class CephManager:
                 time.sleep(2)
 
     def expand_pool(self, pool_name, by, max_pgs):
+        """
+        Increase the number of pgs in a pool
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             assert isinstance(by, int)
@@ -665,6 +808,9 @@ class CephManager:
             self.pools[pool_name] = new_pg_num
 
     def set_pool_pgpnum(self, pool_name):
+        """
+        Set pgpnum property of pool_name pool.
+        """
         with self.lock:
             assert isinstance(pool_name, str)
             assert pool_name in self.pools
@@ -673,10 +819,13 @@ class CephManager:
             self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
 
     def list_pg_missing(self, pgid):
+        """
+        return list of missing pgs with the id specified
+        """
         r = None
         offset = {}
         while True:
-            out = self.raw_cluster_cmd('--', 'pg',pgid,'list_missing',
+            out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
                                        json.dumps(offset))
             j = json.loads(out)
             if r is None:
@@ -693,11 +842,17 @@ class CephManager:
         return r
 
     def get_pg_stats(self):
-        out = self.raw_cluster_cmd('pg','dump','--format=json')
+        """
+        Dump the cluster and get pg stats
+        """
+        out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
         j = json.loads('\n'.join(out.split('\n')[1:]))
         return j['pg_stats']
 
     def compile_pg_status(self):
+        """
+        Return a histogram of pg state values
+        """
         ret = {}
         j = self.get_pg_stats()
         for pg in j:
@@ -708,25 +863,40 @@ class CephManager:
         return ret
 
     def pg_scrubbing(self, pool, pgnum):
+        """
+        pg scrubbing wrapper
+        """
         pgstr = self.get_pgid(pool, pgnum)
         stats = self.get_single_pg_stats(pgstr)
         return 'scrub' in stats['state']
 
     def pg_repairing(self, pool, pgnum):
+        """
+        pg repairing wrapper
+        """
         pgstr = self.get_pgid(pool, pgnum)
         stats = self.get_single_pg_stats(pgstr)
         return 'repair' in stats['state']
 
     def pg_inconsistent(self, pool, pgnum):
+        """
+        pg inconsistent wrapper
+        """
         pgstr = self.get_pgid(pool, pgnum)
         stats = self.get_single_pg_stats(pgstr)
         return 'inconsistent' in stats['state']
 
     def get_last_scrub_stamp(self, pool, pgnum):
+        """
+        Get the timestamp of the last scrub.
+        """
         stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
         return stats["last_scrub_stamp"]
 
     def do_pg_scrub(self, pool, pgnum, stype):
+        """
+        Scrub pg and wait for scrubbing to finish
+        """
         init = self.get_last_scrub_stamp(pool, pgnum)
         self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
         while init == self.get_last_scrub_stamp(pool, pgnum):
@@ -734,6 +904,9 @@ class CephManager:
             time.sleep(10)
 
     def get_single_pg_stats(self, pgid):
+        """
+        Return pg for the pgid specified.
+        """
         all_stats = self.get_pg_stats()
 
         for pg in all_stats:
@@ -743,21 +916,34 @@ class CephManager:
         return None
 
     def get_osd_dump(self):
-        out = self.raw_cluster_cmd('osd','dump','--format=json')
+        """
+        Dump osds
+        :returns: all osds 
+        """ 
+        out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
         j = json.loads('\n'.join(out.split('\n')[1:]))
         return j['osds']
 
     def get_stuck_pgs(self, type_, threshold):
-        out = self.raw_cluster_cmd('pg','dump_stuck', type_, str(threshold),
+        """
+        :returns: stuck pg information from the cluster
+        """ 
+        out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
                                    '--format=json')
         return json.loads(out)
 
     def get_num_unfound_objects(self):
+        """
+        Check cluster status to get the number of unfound objects
+        """ 
         status = self.raw_cluster_status()
         self.log(status)
         return status['pgmap'].get('unfound_objects', 0)
 
     def get_num_creating(self):
+        """
+        Find the number of pgs in creating mode.
+        """
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
@@ -766,6 +952,9 @@ class CephManager:
         return num
 
     def get_num_active_clean(self):
+        """
+        Find the number of active and clean pgs.
+        """
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
@@ -774,6 +963,9 @@ class CephManager:
         return num
 
     def get_num_active_recovered(self):
+        """
+        Find the number of active and recovered pgs.
+        """
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
@@ -782,6 +974,9 @@ class CephManager:
         return num
 
     def get_num_active(self):
+        """
+        Find the number of active pgs.
+        """
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
@@ -789,7 +984,10 @@ class CephManager:
                 num += 1
         return num
 
-    def get_num_down(self):
+    def get_num_down(self): 
+        """
+        Find the number of pgs that are down.
+        """
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
@@ -799,6 +997,9 @@ class CephManager:
         return num
 
     def get_num_active_down(self):
+        """
+        Find the number of pgs that are either active or down.
+        """
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
@@ -809,15 +1010,27 @@ class CephManager:
         return num
 
     def is_clean(self):
+        """
+        True if all pgs are clean
+        """
         return self.get_num_active_clean() == self.get_num_pgs()
 
     def is_recovered(self):
+        """
+        True if all pgs have recovered
+        """
         return self.get_num_active_recovered() == self.get_num_pgs()
 
     def is_active_or_down(self):
+        """
+        True if all pgs are active or down
+        """
         return self.get_num_active_down() == self.get_num_pgs()
 
     def wait_for_clean(self, timeout=None):
+        """
+        Returns trues when all pgs are clean.
+        """
         self.log("waiting for clean")
         start = time.time()
         num_active_clean = self.get_num_active_clean()
@@ -833,11 +1046,18 @@ class CephManager:
         self.log("clean!")
 
     def are_all_osds_up(self):
+        """
+        Returns true if all osds are up.
+        """
         x = self.get_osd_dump()
         return (len(x) == \
                     sum([(y['up'] > 0) for y in x]))
 
     def wait_for_all_up(self, timeout=None):
+        """
+        When this exits, either the timeout has expired, or all
+        osds are up.
+        """
         self.log("waiting for all up")
         start = time.time()
         while not self.are_all_osds_up():
@@ -848,6 +1068,9 @@ class CephManager:
         self.log("all up!")
 
     def wait_for_recovery(self, timeout=None):
+        """
+        Check peering. When this exists, we have recovered.
+        """
         self.log("waiting for recovery to complete")
         start = time.time()
         num_active_recovered = self.get_num_active_recovered()
@@ -863,6 +1086,9 @@ class CephManager:
         self.log("recovered!")
 
     def wait_for_active(self, timeout=None):
+        """
+        Check peering. When this exists, we are definitely active
+        """
         self.log("waiting for peering to complete")
         start = time.time()
         num_active = self.get_num_active()
@@ -878,6 +1104,10 @@ class CephManager:
         self.log("active!")
 
     def wait_for_active_or_down(self, timeout=None):
+        """
+        Check peering. When this exists, we are definitely either
+        active or down
+        """
         self.log("waiting for peering to complete or become blocked")
         start = time.time()
         num_active_down = self.get_num_active_down()
@@ -893,11 +1123,17 @@ class CephManager:
         self.log("active or down!")
 
     def osd_is_up(self, osd):
+        """
+        Wrapper for osd check
+        """
         osds = self.get_osd_dump()
         return osds[osd]['up'] > 0
 
     def wait_till_osd_is_up(self, osd, timeout=None):
-        self.log('waiting for osd.%d to be up' % osd);
+        """
+        Loop waiting for osd.
+        """
+        self.log('waiting for osd.%d to be up' % osd)
         start = time.time()
         while not self.osd_is_up(osd):
             if timeout is not None:
@@ -907,9 +1143,15 @@ class CephManager:
         self.log('osd.%d is up' % osd)
 
     def is_active(self):
+        """
+        Wrapper to check if active
+        """
         return self.get_num_active() == self.get_num_pgs()
 
     def wait_till_active(self, timeout=None):
+        """
+        Wait until osds are active.
+        """
         self.log("waiting till active")
         start = time.time()
         while not self.is_active():
@@ -920,9 +1162,16 @@ class CephManager:
         self.log("active!")
 
     def mark_out_osd(self, osd):
+        """
+        Wrapper to mark osd out.
+        """
         self.raw_cluster_cmd('osd', 'out', str(osd))
 
     def kill_osd(self, osd):
+        """
+        Kill osds by either power cycling (if indicated by the config)
+        or by stopping.
+        """
         if self.config.get('powercycle'):
             (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
             self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
@@ -932,12 +1181,19 @@ class CephManager:
             self.ctx.daemons.get_daemon('osd', osd).stop()
 
     def blackhole_kill_osd(self, osd):
+        """
+        Stop osd if nothing else works.  
+        """
         self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
                              'injectargs', '--filestore-blackhole')
         time.sleep(2)
         self.ctx.daemons.get_daemon('osd', osd).stop()
 
     def revive_osd(self, osd, timeout=75):
+        """
+        Revive osds by either power cycling (if indicated by the config)
+        or by restarting.
+        """
         if self.config.get('powercycle'):
             (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
             self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
@@ -959,18 +1215,31 @@ class CephManager:
                                    timeout=timeout)
 
     def mark_down_osd(self, osd):
+        """
+        Cluster command wrapper
+        """
         self.raw_cluster_cmd('osd', 'down', str(osd))
 
     def mark_in_osd(self, osd):
+        """
+        Cluster command wrapper
+        """
         self.raw_cluster_cmd('osd', 'in', str(osd))
 
 
     ## monitors
 
     def signal_mon(self, mon, sig):
+        """
+        Wrapper to local get_deamon call
+        """
         self.ctx.daemons.get_daemon('mon', mon).signal(sig)
 
     def kill_mon(self, mon):
+        """
+        Kill the monitor by either power cycling (if the config says so),
+        or by doing a stop.
+        """
         if self.config.get('powercycle'):
             (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
             self.log('kill_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
@@ -980,6 +1249,10 @@ class CephManager:
             self.ctx.daemons.get_daemon('mon', mon).stop()
 
     def revive_mon(self, mon):
+        """
+        Restart by either power cycling (if the config says so),
+        or by doing a normal restart.
+        """
         if self.config.get('powercycle'):
             (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
             self.log('revive_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
@@ -989,17 +1262,26 @@ class CephManager:
         self.ctx.daemons.get_daemon('mon', mon).restart()
 
     def get_mon_status(self, mon):
+        """
+        Extract all the monitor status information from the cluster
+        """
         addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
         out = self.raw_cluster_cmd('-m', addr, 'mon_status')
         return json.loads(out)
 
     def get_mon_quorum(self):
+        """
+        Extract monitor quorum information from the cluster
+        """
         out = self.raw_cluster_cmd('quorum_status')
         j = json.loads(out)
         self.log('quorum_status is %s' % out)
         return j['quorum']
 
     def wait_for_mon_quorum_size(self, size, timeout=300):
+        """
+        Loop until quorum size is reached.
+        """
         self.log('waiting for quorum size %d' % size)
         start = time.time()
         while not len(self.get_mon_quorum()) == size:
@@ -1010,14 +1292,20 @@ class CephManager:
         self.log("quorum is size %d" % size)
 
     def get_mon_health(self, debug=False):
-      out = self.raw_cluster_cmd('health', '--format=json')
-      if debug:
-        self.log('health:\n{h}'.format(h=out))
-      return json.loads(out)
+        """
+        Extract all the monitor health information.
+        """
+        out = self.raw_cluster_cmd('health', '--format=json')
+        if debug:
+            self.log('health:\n{h}'.format(h=out))
+        return json.loads(out)
 
     ## metadata servers
 
     def kill_mds(self, mds):
+        """
+        Powercyle if set in config, otherwise just stop.
+        """
         if self.config.get('powercycle'):
             (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
             self.log('kill_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
@@ -1027,10 +1315,17 @@ class CephManager:
             self.ctx.daemons.get_daemon('mds', mds).stop()
 
     def kill_mds_by_rank(self, rank):
+        """
+        kill_mds wrapper to kill based on rank passed.
+        """
         status = self.get_mds_status_by_rank(rank)
         self.kill_mds(status['name'])
 
     def revive_mds(self, mds, standby_for_rank=None):
+        """
+        Revive mds -- do an ipmpi powercycle (if indicated by the config)
+        and then restart (using --hot-standby if specified.
+        """
         if self.config.get('powercycle'):
             (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
             self.log('revive_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
@@ -1039,32 +1334,45 @@ class CephManager:
             ceph_task.make_admin_daemon_dir(self.ctx, remote)
         args = []
         if standby_for_rank:
-          args.extend(['--hot-standby', standby_for_rank])
+            args.extend(['--hot-standby', standby_for_rank])
         self.ctx.daemons.get_daemon('mds', mds).restart(*args)
 
     def revive_mds_by_rank(self, rank, standby_for_rank=None):
+        """
+        revive_mds wrapper to revive based on rank passed.
+        """
         status = self.get_mds_status_by_rank(rank)
         self.revive_mds(status['name'], standby_for_rank)
 
     def get_mds_status(self, mds):
+        """
+        Run cluster commands for the mds in order to get mds information
+        """
         out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
         j = json.loads(' '.join(out.splitlines()[1:]))
         # collate; for dup ids, larger gid wins.
         for info in j['info'].itervalues():
-          if info['name'] == mds:
-              return info
+            if info['name'] == mds:
+                return info
         return None
 
     def get_mds_status_by_rank(self, rank):
+        """
+        Run cluster commands for the mds in order to get mds information
+        check rank.
+        """
         out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
         j = json.loads(' '.join(out.splitlines()[1:]))
         # collate; for dup ids, larger gid wins.
         for info in j['info'].itervalues():
-          if info['rank'] == rank:
-              return info
+            if info['rank'] == rank:
+                return info
         return None
 
     def get_mds_status_all(self):
+        """
+        Run cluster command to extract all the mds status.
+        """
         out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
         j = json.loads(' '.join(out.splitlines()[1:]))
         return j