]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph_manager: rework indentation to make it pep8 clean
authorLoic Dachary <ldachary@redhat.com>
Fri, 21 Nov 2014 14:58:27 +0000 (15:58 +0100)
committerLoic Dachary <ldachary@redhat.com>
Tue, 13 Jan 2015 19:22:33 +0000 (20:22 +0100)
Signed-off-by: Loic Dachary <ldachary@redhat.com>
tasks/ceph_manager.py

index 0d0ae1ffaa0334c3cd6ac13575be385ac2381f3e..4159eddeae651fc3ef57fe96818a31b7c1572ffe 100644 (file)
@@ -48,12 +48,8 @@ def make_admin_daemon_dir(ctx, remote):
     :param ctx: Context
     :param remote: Remote site
     """
-    remote.run(
-            args=[
-                'sudo',
-                'install', '-d', '-m0777', '--', '/var/run/ceph',
-                ],
-            )
+    remote.run(args=['sudo',
+                     'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
 
 
 def mount_osd_data(ctx, remote, osd):
@@ -65,14 +61,17 @@ def mount_osd_data(ctx, remote, osd):
     :param ods: Osd name
     """
     log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
-    if remote in ctx.disk_config.remote_to_roles_to_dev and osd in ctx.disk_config.remote_to_roles_to_dev[remote]:
+    if (remote in ctx.disk_config.remote_to_roles_to_dev and
+            osd in ctx.disk_config.remote_to_roles_to_dev[remote]):
         dev = ctx.disk_config.remote_to_roles_to_dev[remote][osd]
-        mount_options = ctx.disk_config.remote_to_roles_to_dev_mount_options[remote][osd]
+        mount_options = ctx.disk_config.\
+            remote_to_roles_to_dev_mount_options[remote][osd]
         fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][osd]
         mnt = os.path.join('/var/lib/ceph/osd', 'ceph-{id}'.format(id=osd))
 
-        log.info('Mounting osd.{o}: dev: {n}, mountpoint: {p}, type: {t}, options: {v}'.format(
-                 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options))
+        log.info('Mounting osd.{o}: dev: {n}, '
+                 'mountpoint: {p}, type: {t}, options: {v}'.format(
+                     o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options))
 
         remote.run(
             args=[
@@ -148,7 +147,8 @@ class Thrasher:
         """
         if osd is None:
             osd = random.choice(self.live_osds)
-        self.log("Killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
+        self.log("Killing osd %s, live_osds are %s" % (str(osd),
+                                                       str(self.live_osds)))
         self.live_osds.remove(osd)
         self.dead_osds.append(osd)
         self.ceph_manager.kill_osd(osd)
@@ -158,52 +158,83 @@ class Thrasher:
             self.out_osd(osd)
         if self.ceph_objectstore_tool:
             self.log("Testing ceph-objectstore-tool on down osd")
-            (remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+            (remote,) = self.ceph_manager.ctx.\
+                cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
             FSPATH = self.ceph_manager.get_filepath()
             JPATH = os.path.join(FSPATH, "journal")
             exp_osd = imp_osd = osd
             exp_remote = imp_remote = remote
             # If an older osd is available we'll move a pg from there
-            if len(self.dead_osds) > 1 and random.random() < self.chance_move_pg:
+            if (len(self.dead_osds) > 1 and
+                    random.random() < self.chance_move_pg):
                 exp_osd = random.choice(self.dead_osds[:-1])
-                (exp_remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=exp_osd)).remotes.iterkeys()
-            if 'keyvaluestore_backend' in self.ceph_manager.ctx.ceph.conf['osd']:
-                prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --type keyvaluestore-dev --log-file=/var/log/ceph/objectstore_tool.\\$pid.log ".format(fpath=FSPATH, jpath=JPATH)
+                (exp_remote,) = self.ceph_manager.ctx.\
+                    cluster.only('osd.{o}'.format(o=exp_osd)).\
+                    remotes.iterkeys()
+            if ('keyvaluestore_backend' in
+                    self.ceph_manager.ctx.ceph.conf['osd']):
+                prefix = ("sudo ceph-objectstore-tool "
+                          "--data-path {fpath} --journal-path {jpath} "
+                          "--type keyvaluestore "
+                          "--log-file="
+                          "/var/log/ceph/objectstore_tool.\\$pid.log".
+                          format(fpath=FSPATH, jpath=JPATH))
             else:
-                prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --log-file=/var/log/ceph/objectstore_tool.\\$pid.log ".format(fpath=FSPATH, jpath=JPATH)
+                prefix = ("sudo ceph-objectstore-tool "
+                          "--data-path {fpath} --journal-path {jpath} "
+                          "--log-file="
+                          "/var/log/ceph/objectstore_tool.\\$pid.log".
+                          format(fpath=FSPATH, jpath=JPATH))
             cmd = (prefix + "--op list-pgs").format(id=exp_osd)
-            proc = exp_remote.run(args=cmd, wait=True, check_status=True, stdout=StringIO())
+            proc = exp_remote.run(args=cmd, wait=True,
+                                  check_status=True, stdout=StringIO())
             if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: exp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
+                raise Exception("ceph-objectstore-tool: "
+                                "exp list-pgs failure with status {ret}".
+                                format(ret=proc.exitstatus))
             pgs = proc.stdout.getvalue().split('\n')[:-1]
             if len(pgs) == 0:
                 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
                 return
             pg = random.choice(pgs)
-            exp_path = os.path.join(os.path.join(teuthology.get_testdir(self.ceph_manager.ctx), "data"), "exp.{pg}.{id}".format(pg=pg, id=exp_osd))
+            exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
+            exp_path = os.path.join(exp_path, "data")
+            exp_path = os.path.join(exp_path,
+                                    "exp.{pg}.{id}".format(pg=pg, id=exp_osd))
             # export
-            cmd = (prefix + "--op export --pgid {pg} --file {file}").format(id=exp_osd, pg=pg, file=exp_path)
+            cmd = prefix + "--op export --pgid {pg} --file {file}"
+            cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
             proc = exp_remote.run(args=cmd)
             if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: export failure with status {ret}".format(ret=proc.exitstatus))
+                raise Exception("ceph-objectstore-tool: "
+                                "export failure with status {ret}".
+                                format(ret=proc.exitstatus))
             # remove
-            cmd = (prefix + "--op remove --pgid {pg}").format(id=exp_osd, pg=pg)
+            cmd = prefix + "--op remove --pgid {pg}"
+            cmd = cmd.format(id=exp_osd, pg=pg)
             proc = exp_remote.run(args=cmd)
             if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: remove failure with status {ret}".format(ret=proc.exitstatus))
+                raise Exception("ceph-objectstore-tool: "
+                                "remove failure with status {ret}".
+                                format(ret=proc.exitstatus))
             # If there are at least 2 dead osds we might move the pg
             if exp_osd != imp_osd:
                 # If pg isn't already on this osd, then we will move it there
                 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
-                proc = imp_remote.run(args=cmd, wait=True, check_status=True, stdout=StringIO())
+                proc = imp_remote.run(args=cmd, wait=True,
+                                      check_status=True, stdout=StringIO())
                 if proc.exitstatus:
-                    raise Exception("ceph-objectstore-tool: imp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
+                    raise Exception("ceph-objectstore-tool: "
+                                    "imp list-pgs failure with status {ret}".
+                                    format(ret=proc.exitstatus))
                 pgs = proc.stdout.getvalue().split('\n')[:-1]
                 if pg not in pgs:
-                    self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".format(pg=pg, fosd=exp_osd, tosd=imp_osd))
+                    self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
+                             format(pg=pg, fosd=exp_osd, tosd=imp_osd))
                     if imp_remote != exp_remote:
                         # Copy export file to the other machine
-                        self.log("Transfer export file from {srem} to {trem}".format(srem=exp_remote, trem=imp_remote))
+                        self.log("Transfer export file from {srem} to {trem}".
+                                 format(srem=exp_remote, trem=imp_remote))
                         tmpexport = Remote.get_file(exp_remote, exp_path)
                         Remote.put_file(imp_remote, tmpexport, exp_path)
                         os.remove(tmpexport)
@@ -212,10 +243,16 @@ class Thrasher:
                     imp_osd = exp_osd
                     imp_remote = exp_remote
             # import
-            cmd = (prefix + "--op import --file {file}").format(id=imp_osd, file=exp_path)
+            cmd = (prefix + "--op import --file {file}")
+            cmd = cmd.format(id=imp_osd, file=exp_path)
             imp_remote.run(args=cmd)
-            if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: import failure with status {ret}".format(ret=proc.exitstatus))
+            if proc.exitstatus == 10:
+                self.log("Pool went away before processing "
+                         "an import...ignored");
+            elif proc.exitstatus:
+                raise Exception("ceph-objectstore-tool: "
+                                "import failure with status {ret}".
+                                format(ret=proc.exitstatus))
             cmd = "rm -f {file}".format(file=exp_path)
             exp_remote.run(args=cmd)
             if imp_remote != exp_remote:
@@ -229,7 +266,8 @@ class Thrasher:
         """
         if osd is None:
             osd = random.choice(self.live_osds)
-        self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
+        self.log("Blackholing and then killing osd %s, live_osds are %s" %
+                 (str(osd), str(self.live_osds)))
         self.live_osds.remove(osd)
         self.dead_osds.append(osd)
         self.ceph_manager.blackhole_kill_osd(osd)
@@ -253,7 +291,8 @@ class Thrasher:
         """
         if osd is None:
             osd = random.choice(self.in_osds)
-        self.log("Removing osd %s, in_osds are: %s" % (str(osd), str(self.in_osds)))
+        self.log("Removing osd %s, in_osds are: %s" %
+                 (str(osd), str(self.in_osds)))
         self.ceph_manager.mark_out_osd(osd)
         self.in_osds.remove(osd)
         self.out_osds.append(osd)
@@ -271,7 +310,7 @@ class Thrasher:
         self.out_osds.remove(osd)
         self.in_osds.append(osd)
         self.ceph_manager.mark_in_osd(osd)
-        self.log("Added osd %s"%(str(osd),))
+        self.log("Added osd %s" % (str(osd),))
 
     def reweight_osd(self, osd=None):
         """
@@ -282,7 +321,8 @@ class Thrasher:
             osd = random.choice(self.in_osds)
         val = random.uniform(.1, 1.0)
         self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
-        self.ceph_manager.raw_cluster_cmd('osd', 'reweight', str(osd), str(val))
+        self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
+                                          str(osd), str(val))
 
     def primary_affinity(self, osd=None):
         if osd is None:
@@ -294,7 +334,8 @@ class Thrasher:
         else:
             pa = 0
         self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
-        self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity', str(osd), str(pa))
+        self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
+                                          str(osd), str(pa))
 
     def all_up(self):
         """
@@ -319,15 +360,17 @@ class Thrasher:
         Increase the size of the pool
         """
         pool = self.ceph_manager.get_pool()
-        self.log("Growing pool %s"%(pool,))
-        self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
+        self.log("Growing pool %s" % (pool,))
+        self.ceph_manager.expand_pool(pool,
+                                      self.config.get('pool_grow_by', 10),
+                                      self.max_pgs)
 
     def fix_pgp_num(self):
         """
         Fix number of pgs in pool.
         """
         pool = self.ceph_manager.get_pool()
-        self.log("fixing pg num pool %s"%(pool,))
+        self.log("fixing pg num pool %s" % (pool,))
         self.ceph_manager.set_pool_pgpnum(pool)
 
     def test_pool_min_size(self):
@@ -362,18 +405,18 @@ class Thrasher:
         Pause injection testing. Check for osd being down when finished.
         """
         the_one = random.choice(self.live_osds)
-        self.log("inject_pause on {osd}".format(osd = the_one))
+        self.log("inject_pause on {osd}".format(osd=the_one))
         self.log(
             "Testing {key} pause injection for duration {duration}".format(
-                key = conf_key,
-                duration = duration
+                key=conf_key,
+                duration=duration
                 ))
         self.log(
             "Checking after {after}, should_be_down={shouldbedown}".format(
-                after = check_after,
-                shouldbedown = should_be_down
+                after=check_after,
+                shouldbedown=should_be_down
                 ))
-        self.ceph_manager.set_config(the_one, **{conf_key:duration})
+        self.ceph_manager.set_config(the_one, **{conf_key: duration})
         if not should_be_down:
             return
         time.sleep(check_after)
@@ -400,9 +443,9 @@ class Thrasher:
         for i in self.live_osds:
             self.ceph_manager.set_config(
                 i,
-                osd_debug_skip_full_check_in_backfill_reservation = random.choice(
-                    ['false', 'true']),
-                osd_backfill_full_ratio = 0)
+                osd_debug_skip_full_check_in_backfill_reservation=
+                random.choice(['false', 'true']),
+                osd_backfill_full_ratio=0)
         for i in range(30):
             status = self.ceph_manager.compile_pg_status()
             if 'backfill' not in status.keys():
@@ -415,9 +458,8 @@ class Thrasher:
         for i in self.live_osds:
             self.ceph_manager.set_config(
                 i,
-                osd_debug_skip_full_check_in_backfill_reservation = \
-                    'false',
-                osd_backfill_full_ratio = 0.85)
+                osd_debug_skip_full_check_in_backfill_reservation='false',
+                osd_backfill_full_ratio=0.85)
 
     def test_map_discontinuity(self):
         """
@@ -460,7 +502,8 @@ class Thrasher:
         """
         chance_down = self.config.get('chance_down', 0.4)
         chance_test_min_size = self.config.get('chance_test_min_size', 0)
-        chance_test_backfill_full = self.config.get('chance_test_backfill_full', 0)
+        chance_test_backfill_full = \
+            self.config.get('chance_test_backfill_full', 0)
         if isinstance(chance_down, int):
             chance_down = float(chance_down) / 100
         minin = self.minin
@@ -468,7 +511,8 @@ class Thrasher:
         minlive = self.config.get("min_live", 2)
         mindead = self.config.get("min_dead", 0)
 
-        self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
+        self.log('choose_action: min_in %d min_out '
+                 '%d min_live %d min_dead %d' %
                  (minin, minout, minlive, mindead))
         actions = []
         if len(self.in_osds) > minin:
@@ -481,22 +525,29 @@ class Thrasher:
             actions.append((self.revive_osd, 1.0,))
         if self.config.get('thrash_primary_affinity', True):
             actions.append((self.primary_affinity, 1.0,))
-        actions.append((self.reweight_osd, self.config.get('reweight_osd',.5),))
-        actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),))
-        actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),))
-        actions.append((self.test_pool_min_size, chance_test_min_size,))
-        actions.append((self.test_backfill_full, chance_test_backfill_full,))
+        actions.append((self.reweight_osd,
+                        self.config.get('reweight_osd', .5),))
+        actions.append((self.grow_pool,
+                        self.config.get('chance_pgnum_grow', 0),))
+        actions.append((self.fix_pgp_num,
+                        self.config.get('chance_pgpnum_fix', 0),))
+        actions.append((self.test_pool_min_size,
+                        chance_test_min_size,))
+        actions.append((self.test_backfill_full,
+                        chance_test_backfill_full,))
         for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
             for scenario in [
-                (lambda: self.inject_pause(key,
-                                           self.config.get('pause_short', 3),
-                                           0,
-                                           False),
+                (lambda:
+                 self.inject_pause(key,
+                                   self.config.get('pause_short', 3),
+                                   0,
+                                   False),
                  self.config.get('chance_inject_pause_short', 1),),
-                (lambda: self.inject_pause(key,
-                                           self.config.get('pause_long', 80),
-                                           self.config.get('pause_check_after', 70),
-                                           True),
+                (lambda:
+                 self.inject_pause(key,
+                                   self.config.get('pause_long', 80),
+                                   self.config.get('pause_check_after', 70),
+                                   True),
                  self.config.get('chance_inject_pause_long', 0),)]:
                 actions.append(scenario)
 
@@ -518,9 +569,11 @@ class Thrasher:
         delay = self.config.get("op_delay", 5)
         self.log("starting do_thrash")
         while not self.stopping:
-            self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
-                                                "dead_osds: ", self.dead_osds, "live_osds: ",
-                                                self.live_osds]]))
+            to_log = [str(x) for x in ["in_osds: ", self.in_osds,
+                                       "out_osds: ", self.out_osds,
+                                       "dead_osds: ", self.dead_osds,
+                                       "live_osds: ", self.live_osds]]
+            self.log(" ".join(to_log))
             if random.uniform(0, 1) < (float(delay) / cleanint):
                 while len(self.dead_osds) > maxdead:
                     self.revive_osd()
@@ -528,7 +581,7 @@ class Thrasher:
                     self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
                                                       str(osd), str(1))
                 if random.uniform(0, 1) < float(
-                    self.config.get('chance_test_map_discontinuity', 0)):
+                        self.config.get('chance_test_map_discontinuity', 0)):
                     self.test_map_discontinuity()
                 else:
                     self.ceph_manager.wait_for_recovery(
@@ -580,11 +633,11 @@ class CephManager:
         """
         testdir = teuthology.get_testdir(self.ctx)
         ceph_args = [
-                'adjust-ulimits',
-                'ceph-coverage',
-                '{tdir}/archive/coverage'.format(tdir=testdir),
-                'ceph',
-                ]
+            'adjust-ulimits',
+            'ceph-coverage',
+            '{tdir}/archive/coverage'.format(tdir=testdir),
+            'ceph',
+        ]
         ceph_args.extend(args)
         proc = self.controller.run(
             args=ceph_args,
@@ -598,11 +651,11 @@ class CephManager:
         """
         testdir = teuthology.get_testdir(self.ctx)
         ceph_args = [
-                'adjust-ulimits',
-                'ceph-coverage',
-                '{tdir}/archive/coverage'.format(tdir=testdir),
-                'ceph',
-                ]
+            'adjust-ulimits',
+            'ceph-coverage',
+            '{tdir}/archive/coverage'.format(tdir=testdir),
+            'ceph',
+        ]
         ceph_args.extend(args)
         proc = self.controller.run(
             args=ceph_args,
@@ -628,8 +681,8 @@ class CephManager:
             )
         return proc
 
-    def rados_write_objects(
-        self, pool, num_objects, size, timelimit, threads, cleanup=False):
+    def rados_write_objects(self, pool, num_objects, size,
+                            timelimit, threads, cleanup=False):
         """
         Write rados objects
         Threads not used yet.
@@ -641,7 +694,8 @@ class CephManager:
             'bench', timelimit,
             'write'
             ]
-        if not cleanup: args.append('--no-cleanup')
+        if not cleanup:
+            args.append('--no-cleanup')
         return self.do_rados(self.controller, map(str, args))
 
     def do_put(self, pool, obj, fname):
@@ -682,20 +736,25 @@ class CephManager:
         Get the Remote for the host where a particular service runs.
 
         :param service_type: 'mds', 'osd', 'client'
-        :param service_id: The second part of a role, e.g. '0' for the role 'client.0'
-        :return: a Remote instance for the host where the requested role is placed
+        :param service_id: The second part of a role, e.g. '0' for
+                           the role 'client.0'
+        :return: a Remote instance for the host where the
+                 requested role is placed
         """
         for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
             for id_ in teuthology.roles_of_type(roles_for_host, service_type):
                 if id_ == str(service_id):
                     return _remote
 
-        raise KeyError("Service {0}.{1} not found".format(service_type, service_id))
+        raise KeyError("Service {0}.{1} not found".format(service_type,
+                                                          service_id))
 
-    def admin_socket(self, service_type, service_id, command, check_status=True):
+    def admin_socket(self, service_type, service_id,
+                     command, check_status=True):
         """
         Remotely start up ceph specifying the admin socket
-        :param command a list of words to use as the command to the admin socket
+        :param command: a list of words to use as the command
+                        to the admin socket
         """
         testdir = teuthology.get_testdir(self.ctx)
         remote = self.find_remote(service_type, service_id)
@@ -784,32 +843,34 @@ class CephManager:
             'kick_recovery_wq',
             '0')
 
-    def wait_run_admin_socket(self, service_type, service_id, args=['version'], timeout=75):
+    def wait_run_admin_socket(self, service_type,
+                              service_id, args=['version'], timeout=75):
         """
         If osd_admin_socket call suceeds, return.  Otherwise wait
         five seconds and try again.
         """
         tries = 0
         while True:
-            proc = self.admin_socket(service_type, service_id, args, check_status=False)
+            proc = self.admin_socket(service_type, service_id,
+                                     args, check_status=False)
             if proc.exitstatus is 0:
                 break
             else:
                 tries += 1
                 if (tries * 5) > timeout:
-                    raise Exception('timed out waiting for admin_socket to appear after {type}.{id} restart'.format(
-                        type=service_type,
-                        id=service_id))
-                self.log(
-                    "waiting on admin_socket for {type}-{id}, {command}".format(
-                        type=service_type,
-                        id=service_id,
-                        command=args))
+                    raise Exception('timed out waiting for admin_socket '
+                                    'to appear after {type}.{id} restart'.
+                                    format(type=service_type,
+                                           id=service_id))
+                self.log("waiting on admin_socket for {type}-{id}, "
+                         "{command}".format(type=service_type,
+                                            id=service_id,
+                                            command=args))
                 time.sleep(5)
 
     def get_pool_dump(self, pool):
         """
-        get the osd dump part of a pool 
+        get the osd dump part of a pool
         """
         osd_dump = self.get_osd_dump_json()
         for i in osd_dump['pools']:
@@ -848,25 +909,26 @@ class CephManager:
             lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
             self.raw_osd_status().split('\n'))
         self.log(osd_lines)
-        in_osds = [int(i[4:].split()[0]) for i in filter(
-                lambda x: " in " in x,
-                osd_lines)]
-        out_osds = [int(i[4:].split()[0]) for i in filter(
-                lambda x: " out " in x,
-                osd_lines)]
-        up_osds = [int(i[4:].split()[0]) for i in filter(
-                lambda x: " up " in x,
-                osd_lines)]
-        down_osds = [int(i[4:].split()[0]) for i in filter(
-                lambda x: " down " in x,
-                osd_lines)]
-        dead_osds = [int(x.id_) for x in
-                     filter(lambda x: not x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
+        in_osds = [int(i[4:].split()[0])
+                   for i in filter(lambda x: " in " in x, osd_lines)]
+        out_osds = [int(i[4:].split()[0])
+                    for i in filter(lambda x: " out " in x, osd_lines)]
+        up_osds = [int(i[4:].split()[0])
+                   for i in filter(lambda x: " up " in x, osd_lines)]
+        down_osds = [int(i[4:].split()[0])
+                     for i in filter(lambda x: " down " in x, osd_lines)]
+        dead_osds = [int(x.id_)
+                     for x in filter(lambda x:
+                                     not x.running(),
+                                     self.ctx.daemons.
+                                     iter_daemons_of_role('osd'))]
         live_osds = [int(x.id_) for x in
-                     filter(lambda x: x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
-        return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds,
-                 'down' : down_osds, 'dead' : dead_osds, 'live' : live_osds,
-                 'raw' : osd_lines}
+                     filter(lambda x:
+                            x.running(),
+                            self.ctx.daemons.iter_daemons_of_role('osd'))]
+        return {'in': in_osds, 'out': out_osds, 'up': up_osds,
+                'down': down_osds, 'dead': dead_osds, 'live': live_osds,
+                'raw': osd_lines}
 
     def get_num_pgs(self):
         """
@@ -885,7 +947,8 @@ class CephManager:
             args = cmd_erasure_code_profile(profile_name, profile)
             self.raw_cluster_cmd(*args)
 
-    def create_pool_with_unique_name(self, pg_num=16, erasure_code_profile_name=None):
+    def create_pool_with_unique_name(self, pg_num=16,
+                                     erasure_code_profile_name=None):
         """
         Create a pool named unique_pool_X where X is unique.
         """
@@ -905,22 +968,27 @@ class CephManager:
         yield
         self.remove_pool(pool_name)
 
-    def create_pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
+    def create_pool(self, pool_name, pg_num=16,
+                    erasure_code_profile_name=None):
         """
         Create a pool named from the pool_name parameter.
         :param pool_name: name of the pool being created.
         :param pg_num: initial number of pgs.
-        :param erasure_code_profile_name: if set and !None create an erasure coded pool using the profile 
+        :param erasure_code_profile_name: if set and !None create an
+                                          erasure coded pool using the profile
         """
         with self.lock:
             assert isinstance(pool_name, str)
             assert isinstance(pg_num, int)
             assert pool_name not in self.pools
-            self.log("creating pool_name %s"%(pool_name,))
+            self.log("creating pool_name %s" % (pool_name,))
             if erasure_code_profile_name:
-                self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num), str(pg_num), 'erasure', erasure_code_profile_name)
+                self.raw_cluster_cmd('osd', 'pool', 'create',
+                                     pool_name, str(pg_num), str(pg_num),
+                                     'erasure', erasure_code_profile_name)
             else:
-                self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num))
+                self.raw_cluster_cmd('osd', 'pool', 'create',
+                                     pool_name, str(pg_num))
             self.pools[pool_name] = pg_num
 
     def remove_pool(self, pool_name):
@@ -933,10 +1001,9 @@ class CephManager:
             assert pool_name in self.pools
             self.log("removing pool_name %s" % (pool_name,))
             del self.pools[pool_name]
-            self.do_rados(
-                self.controller,
-                ['rmpool', pool_name, pool_name, "--yes-i-really-really-mean-it"]
-                )
+            self.do_rados(self.controller,
+                          ['rmpool', pool_name, pool_name,
+                           "--yes-i-really-really-mean-it"])
 
     def get_pool(self):
         """
@@ -993,12 +1060,15 @@ class CephManager:
                     pool_name,
                     prop,
                     str(val))
-                if r != 11: # EAGAIN
+                if r != 11:  # EAGAIN
                     break
                 tries += 1
                 if tries > 50:
-                    raise Exception('timed out getting EAGAIN when setting pool property %s %s = %s' % (pool_name, prop, val))
-                self.log('got EAGAIN setting pool property, waiting a few seconds...')
+                    raise Exception('timed out getting EAGAIN '
+                                    'when setting pool property %s %s = %s' %
+                                    (pool_name, prop, val))
+                self.log('got EAGAIN setting pool property, '
+                         'waiting a few seconds...')
                 time.sleep(2)
 
     def expand_pool(self, pool_name, by, max_pgs):
@@ -1013,7 +1083,7 @@ class CephManager:
                 return
             if (self.pools[pool_name] + by) > max_pgs:
                 return
-            self.log("increase pool size by %d"%(by,))
+            self.log("increase pool size by %d" % (by,))
             new_pg_num = self.pools[pool_name] + by
             self.set_pool_property(pool_name, "pg_num", new_pg_num)
             self.pools[pool_name] = new_pg_num
@@ -1111,7 +1181,7 @@ class CephManager:
         init = self.get_last_scrub_stamp(pool, pgnum)
         self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
         while init == self.get_last_scrub_stamp(pool, pgnum):
-            self.log("waiting for scrub type %s"%(stype,))
+            self.log("waiting for scrub type %s" % (stype,))
             time.sleep(10)
 
     def get_single_pg_stats(self, pgid):
@@ -1175,7 +1245,9 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if pg['state'].count('active') and pg['state'].count('clean') and not pg['state'].count('stale'):
+            if (pg['state'].count('active') and
+                    pg['state'].count('clean') and
+                    not pg['state'].count('stale')):
                 num += 1
         return num
 
@@ -1186,7 +1258,10 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if pg['state'].count('active') and not pg['state'].count('recover') and not pg['state'].count('backfill') and not pg['state'].count('stale'):
+            if (pg['state'].count('active') and
+                    not pg['state'].count('recover') and
+                    not pg['state'].count('backfill') and
+                    not pg['state'].count('stale')):
                 num += 1
         return num
 
@@ -1219,8 +1294,10 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if (pg['state'].count('down') and not pg['state'].count('stale')) or \
-                    (pg['state'].count('incomplete') and not pg['state'].count('stale')):
+            if ((pg['state'].count('down') and not
+                    pg['state'].count('stale')) or
+                (pg['state'].count('incomplete') and not
+                    pg['state'].count('stale'))):
                 num += 1
         return num
 
@@ -1231,9 +1308,12 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if (pg['state'].count('active') and not pg['state'].count('stale')) or \
-                    (pg['state'].count('down') and not pg['state'].count('stale')) or \
-                    (pg['state'].count('incomplete') and not pg['state'].count('stale')):
+            if ((pg['state'].count('active') and not
+                    pg['state'].count('stale')) or
+                (pg['state'].count('down') and not
+                    pg['state'].count('stale')) or
+                (pg['state'].count('incomplete') and not
+                    pg['state'].count('stale'))):
                 num += 1
         return num
 
@@ -1283,8 +1363,7 @@ class CephManager:
         Returns true if all osds are up.
         """
         x = self.get_osd_dump()
-        return (len(x) == \
-                    sum([(y['up'] > 0) for y in x]))
+        return (len(x) == sum([(y['up'] > 0) for y in x]))
 
     def wait_for_all_up(self, timeout=None):
         """
@@ -1411,9 +1490,14 @@ class CephManager:
         or by stopping.
         """
         if self.config.get('powercycle'):
-            (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
-            self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
-            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
+            (remote,) = (self.ctx.cluster.only('osd.{o}'.format(o=osd)).
+                         remotes.iterkeys())
+            self.log('kill_osd on osd.{o} '
+                     'doing powercycle of {s}'.format(o=osd, s=remote.name))
+            assert remote.console is not None, ("powercycling requested "
+                                                "but RemoteConsole is not "
+                                                "initialized.  "
+                                                "Check ipmi config.")
             remote.console.power_off()
         else:
             self.ctx.daemons.get_daemon('osd', osd).stop()
@@ -1433,12 +1517,18 @@ class CephManager:
         or by restarting.
         """
         if self.config.get('powercycle'):
-            (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
-            self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
-            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
+            (remote,) = (self.ctx.cluster.only('osd.{o}'.format(o=osd)).
+                         remotes.iterkeys())
+            self.log('kill_osd on osd.{o} doing powercycle of {s}'.
+                     format(o=osd, s=remote.name))
+            assert remote.console is not None, ("powercycling requested "
+                                                "but RemoteConsole is not "
+                                                "initialized.  "
+                                                "Check ipmi config.")
             remote.console.power_on()
             if not remote.console.check_status(300):
-                raise Exception('Failed to revive osd.{o} via ipmi'.format(o=osd))
+                raise Exception('Failed to revive osd.{o} via ipmi'.
+                                format(o=osd))
             teuthology.reconnect(self.ctx, 60, [remote])
             mount_osd_data(self.ctx, remote, str(osd))
             make_admin_daemon_dir(self.ctx, remote)
@@ -1464,9 +1554,7 @@ class CephManager:
         """
         self.raw_cluster_cmd('osd', 'in', str(osd))
 
-
     ## monitors
-
     def signal_mon(self, mon, sig):
         """
         Wrapper to local get_deamon call
@@ -1479,9 +1567,15 @@ class CephManager:
         or by doing a stop.
         """
         if self.config.get('powercycle'):
-            (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
-            self.log('kill_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
-            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
+            (remote,) = (self.ctx.cluster.only('mon.{m}'.format(m=mon)).
+                         remotes.iterkeys())
+            self.log('kill_mon on mon.{m} doing powercycle of {s}'.
+                     format(m=mon, s=remote.name))
+            assert remote.console is not None, ("powercycling requested "
+                                                "but RemoteConsole is not "
+                                                "initialized.  "
+                                                "Check ipmi config.")
+
             remote.console.power_off()
         else:
             self.ctx.daemons.get_daemon('mon', mon).stop()
@@ -1492,9 +1586,15 @@ class CephManager:
         or by doing a normal restart.
         """
         if self.config.get('powercycle'):
-            (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
-            self.log('revive_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
-            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
+            (remote,) = (self.ctx.cluster.only('mon.{m}'.format(m=mon)).
+                         remotes.iterkeys())
+            self.log('revive_mon on mon.{m} doing powercycle of {s}'.
+                     format(m=mon, s=remote.name))
+            assert remote.console is not None, ("powercycling requested "
+                                                "but RemoteConsole is not "
+                                                "initialized.  "
+                                                "Check ipmi config.")
+
             remote.console.power_on()
             make_admin_daemon_dir(self.ctx, remote)
         self.ctx.daemons.get_daemon('mon', mon).restart()
@@ -1525,7 +1625,8 @@ class CephManager:
         while not len(self.get_mon_quorum()) == size:
             if timeout is not None:
                 assert time.time() - start < timeout, \
-                    'failed to reach quorum size %d before timeout expired' % size
+                    ('failed to reach quorum size %d '
+                     'before timeout expired' % size)
             time.sleep(3)
         self.log("quorum is size %d" % size)
 
@@ -1545,9 +1646,14 @@ class CephManager:
         Powercyle if set in config, otherwise just stop.
         """
         if self.config.get('powercycle'):
-            (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
-            self.log('kill_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
-            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
+            (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
+                         remotes.iterkeys())
+            self.log('kill_mds on mds.{m} doing powercycle of {s}'.
+                     format(m=mds, s=remote.name))
+            assert remote.console is not None, ("powercycling requested "
+                                                "but RemoteConsole is not "
+                                                "initialized.  "
+                                                "Check ipmi config.")
             remote.console.power_off()
         else:
             self.ctx.daemons.get_daemon('mds', mds).stop()
@@ -1565,9 +1671,14 @@ class CephManager:
         and then restart (using --hot-standby if specified.
         """
         if self.config.get('powercycle'):
-            (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
-            self.log('revive_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
-            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
+            (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
+                         remotes.iterkeys())
+            self.log('revive_mds on mds.{m} doing powercycle of {s}'.
+                     format(m=mds, s=remote.name))
+            assert remote.console is not None, ("powercycling requested "
+                                                "but RemoteConsole is not "
+                                                "initialized.  "
+                                                "Check ipmi config.")
             remote.console.power_on()
             make_admin_daemon_dir(self.ctx, remote)
         args = []