]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "add erasure code repair / scrub tests"
authorDavid Zafman <david.zafman@inktank.com>
Tue, 13 Jan 2015 19:12:25 +0000 (11:12 -0800)
committerDavid Zafman <david.zafman@inktank.com>
Tue, 13 Jan 2015 19:12:25 +0000 (11:12 -0800)
tasks/ceph_manager.py
tasks/repair_test.py

index f33079193a8ff9d62b49e1a6ca5b808428e2ca20..b1ffc80ec77cf150e94f6cf5a782cc6fac04de90 100644 (file)
@@ -2,11 +2,9 @@
 ceph manager -- Thrasher and CephManager objects
 """
 from cStringIO import StringIO
-import contextlib
 import random
 import time
 import gevent
-import base64
 import json
 import threading
 import os
@@ -31,8 +29,7 @@ def write_conf(ctx, conf_path=DEFAULT_CONF_PATH):
             'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
             'sudo', 'python',
             '-c',
-            ('import shutil, sys; '
-             'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
+            'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
             conf_path,
             run.Raw('&&'),
             'sudo', 'chmod', '0644', conf_path,
@@ -50,8 +47,12 @@ def make_admin_daemon_dir(ctx, remote):
     :param ctx: Context
     :param remote: Remote site
     """
-    remote.run(args=['sudo',
-                     'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
+    remote.run(
+            args=[
+                'sudo',
+                'install', '-d', '-m0777', '--', '/var/run/ceph',
+                ],
+            )
 
 
 def mount_osd_data(ctx, remote, osd):
@@ -63,17 +64,14 @@ def mount_osd_data(ctx, remote, osd):
     :param ods: Osd name
     """
     log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
-    if (remote in ctx.disk_config.remote_to_roles_to_dev and
-            osd in ctx.disk_config.remote_to_roles_to_dev[remote]):
+    if remote in ctx.disk_config.remote_to_roles_to_dev and osd in ctx.disk_config.remote_to_roles_to_dev[remote]:
         dev = ctx.disk_config.remote_to_roles_to_dev[remote][osd]
-        mount_options = ctx.disk_config.\
-            remote_to_roles_to_dev_mount_options[remote][osd]
+        mount_options = ctx.disk_config.remote_to_roles_to_dev_mount_options[remote][osd]
         fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][osd]
         mnt = os.path.join('/var/lib/ceph/osd', 'ceph-{id}'.format(id=osd))
 
-        log.info('Mounting osd.{o}: dev: {n}, '
-                 'mountpoint: {p}, type: {t}, options: {v}'.format(
-                     o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options))
+        log.info('Mounting osd.{o}: dev: {n}, mountpoint: {p}, type: {t}, options: {v}'.format(
+                 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options))
 
         remote.run(
             args=[
@@ -87,6 +85,11 @@ def mount_osd_data(ctx, remote, osd):
             )
 
 
+def cmd_exists(cmd):
+    return subprocess.call("type " + cmd, shell=True,
+                           stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0
+
+
 class Thrasher:
     """
     Object used to thrash Ceph
@@ -107,6 +110,10 @@ class Thrasher:
             self.revive_timeout += 120
         self.clean_wait = self.config.get('clean_wait', 0)
         self.minin = self.config.get("min_in", 3)
+        if cmd_exists("ceph-objectstore-tool"):
+            self.ceph_objectstore_tool = self.config.get('ceph_objectstore_tool', False)
+        else:
+            self.ceph_objectstore_tool = False
         self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
 
         num_osds = self.in_osds + self.out_osds
@@ -131,22 +138,6 @@ class Thrasher:
             manager.raw_cluster_cmd('--', 'mon', 'tell', '*', 'injectargs',
                                     '--mon-osd-down-out-interval 0')
         self.thread = gevent.spawn(self.do_thrash)
-        if self.cmd_exists_on_osds("ceph-objectstore-tool"):
-            self.ceph_objectstore_tool = self.config.get('ceph_objectstore_tool', True)
-            self.test_rm_past_intervals = self.config.get('test_rm_past_intervals', True)
-        else:
-            self.ceph_objectstore_tool = False
-            self.test_rm_past_intervals = False
-            self.log("Unable to test ceph_objectstore_tool, not available on all OSD nodes")
-
-    def cmd_exists_on_osds(self, cmd):
-        allremotes = self.ceph_manager.ctx.cluster.only(teuthology.is_type('osd')).remotes.keys()
-        allremotes = list(set(allremotes))
-        for remote in allremotes:
-            proc = remote.run(args=['type', cmd], wait=True, check_status=False, stdout=StringIO(), stderr=StringIO())
-            if proc.exitstatus != 0:
-                return False;
-        return True;
 
     def kill_osd(self, osd=None, mark_down=False, mark_out=False):
         """
@@ -156,8 +147,7 @@ class Thrasher:
         """
         if osd is None:
             osd = random.choice(self.live_osds)
-        self.log("Killing osd %s, live_osds are %s" % (str(osd),
-                                                       str(self.live_osds)))
+        self.log("Killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
         self.live_osds.remove(osd)
         self.dead_osds.append(osd)
         self.ceph_manager.kill_osd(osd)
@@ -167,83 +157,52 @@ class Thrasher:
             self.out_osd(osd)
         if self.ceph_objectstore_tool:
             self.log("Testing ceph-objectstore-tool on down osd")
-            (remote,) = self.ceph_manager.ctx.\
-                cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+            (remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
             FSPATH = self.ceph_manager.get_filepath()
             JPATH = os.path.join(FSPATH, "journal")
             exp_osd = imp_osd = osd
             exp_remote = imp_remote = remote
             # If an older osd is available we'll move a pg from there
-            if (len(self.dead_osds) > 1 and
-                    random.random() < self.chance_move_pg):
+            if len(self.dead_osds) > 1 and random.random() < self.chance_move_pg:
                 exp_osd = random.choice(self.dead_osds[:-1])
-                (exp_remote,) = self.ceph_manager.ctx.\
-                    cluster.only('osd.{o}'.format(o=exp_osd)).\
-                    remotes.iterkeys()
-            if ('keyvaluestore_backend' in
-                    self.ceph_manager.ctx.ceph.conf['osd']):
-                prefix = ("sudo ceph-objectstore-tool "
-                          "--data-path {fpath} --journal-path {jpath} "
-                          "--type keyvaluestore "
-                          "--log-file="
-                          "/var/log/ceph/objectstore_tool.\\$pid.log ".
-                          format(fpath=FSPATH, jpath=JPATH))
+                (exp_remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=exp_osd)).remotes.iterkeys()
+            if 'keyvaluestore_backend' in self.ceph_manager.ctx.ceph.conf['osd']:
+                prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --type keyvaluestore-dev --log-file=/var/log/ceph/objectstore_tool.\\$pid.log ".format(fpath=FSPATH, jpath=JPATH)
             else:
-                prefix = ("sudo ceph-objectstore-tool "
-                          "--data-path {fpath} --journal-path {jpath} "
-                          "--log-file="
-                          "/var/log/ceph/objectstore_tool.\\$pid.log ".
-                          format(fpath=FSPATH, jpath=JPATH))
+                prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --log-file=/var/log/ceph/objectstore_tool.\\$pid.log ".format(fpath=FSPATH, jpath=JPATH)
             cmd = (prefix + "--op list-pgs").format(id=exp_osd)
-            proc = exp_remote.run(args=cmd, wait=True,
-                                  check_status=False, stdout=StringIO())
+            proc = exp_remote.run(args=cmd, wait=True, check_status=True, stdout=StringIO())
             if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: "
-                                "exp list-pgs failure with status {ret}".
-                                format(ret=proc.exitstatus))
+                raise Exception("ceph-objectstore-tool: exp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
             pgs = proc.stdout.getvalue().split('\n')[:-1]
             if len(pgs) == 0:
                 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
                 return
             pg = random.choice(pgs)
-            exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
-            exp_path = os.path.join(exp_path, "data")
-            exp_path = os.path.join(exp_path,
-                                    "exp.{pg}.{id}".format(pg=pg, id=exp_osd))
+            exp_path = os.path.join(os.path.join(teuthology.get_testdir(self.ceph_manager.ctx), "data"), "exp.{pg}.{id}".format(pg=pg, id=exp_osd))
             # export
-            cmd = prefix + "--op export --pgid {pg} --file {file}"
-            cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
+            cmd = (prefix + "--op export --pgid {pg} --file {file}").format(id=exp_osd, pg=pg, file=exp_path)
             proc = exp_remote.run(args=cmd)
             if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: "
-                                "export failure with status {ret}".
-                                format(ret=proc.exitstatus))
+                raise Exception("ceph-objectstore-tool: export failure with status {ret}".format(ret=proc.exitstatus))
             # remove
-            cmd = prefix + "--op remove --pgid {pg}"
-            cmd = cmd.format(id=exp_osd, pg=pg)
+            cmd = (prefix + "--op remove --pgid {pg}").format(id=exp_osd, pg=pg)
             proc = exp_remote.run(args=cmd)
             if proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: "
-                                "remove failure with status {ret}".
-                                format(ret=proc.exitstatus))
+                raise Exception("ceph-objectstore-tool: remove failure with status {ret}".format(ret=proc.exitstatus))
             # If there are at least 2 dead osds we might move the pg
             if exp_osd != imp_osd:
                 # If pg isn't already on this osd, then we will move it there
                 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
-                proc = imp_remote.run(args=cmd, wait=True,
-                                      check_status=False, stdout=StringIO())
+                proc = imp_remote.run(args=cmd, wait=True, check_status=True, stdout=StringIO())
                 if proc.exitstatus:
-                    raise Exception("ceph-objectstore-tool: "
-                                    "imp list-pgs failure with status {ret}".
-                                    format(ret=proc.exitstatus))
+                    raise Exception("ceph-objectstore-tool: imp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
                 pgs = proc.stdout.getvalue().split('\n')[:-1]
                 if pg not in pgs:
-                    self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
-                             format(pg=pg, fosd=exp_osd, tosd=imp_osd))
+                    self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".format(pg=pg, fosd=exp_osd, tosd=imp_osd))
                     if imp_remote != exp_remote:
                         # Copy export file to the other machine
-                        self.log("Transfer export file from {srem} to {trem}".
-                                 format(srem=exp_remote, trem=imp_remote))
+                        self.log("Transfer export file from {srem} to {trem}".format(srem=exp_remote, trem=imp_remote))
                         tmpexport = Remote.get_file(exp_remote, exp_path)
                         Remote.put_file(imp_remote, tmpexport, exp_path)
                         os.remove(tmpexport)
@@ -252,49 +211,15 @@ class Thrasher:
                     imp_osd = exp_osd
                     imp_remote = exp_remote
             # import
-            cmd = (prefix + "--op import --file {file}")
-            cmd = cmd.format(id=imp_osd, file=exp_path)
+            cmd = (prefix + "--op import --file {file}").format(id=imp_osd, file=exp_path)
             imp_remote.run(args=cmd)
-            if proc.exitstatus == 10:
-                self.log("Pool went away before processing "
-                         "an import...ignored");
-            elif proc.exitstatus:
-                raise Exception("ceph-objectstore-tool: "
-                                "import failure with status {ret}".
-                                format(ret=proc.exitstatus))
+            if proc.exitstatus:
+                raise Exception("ceph-objectstore-tool: import failure with status {ret}".format(ret=proc.exitstatus))
             cmd = "rm -f {file}".format(file=exp_path)
             exp_remote.run(args=cmd)
             if imp_remote != exp_remote:
                 imp_remote.run(args=cmd)
 
-    def rm_past_intervals(self, osd=None):
-        """
-        :param osd: Osd to find pg to remove past intervals
-        """
-        if self.test_rm_past_intervals:
-            if osd is None:
-                osd = random.choice(self.dead_osds)
-            self.log("Use ceph_objectstore_tool to remove past intervals")
-            (remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
-            FSPATH = self.ceph_manager.get_filepath()
-            JPATH = os.path.join(FSPATH, "journal")
-            if 'keyvaluestore_backend' in self.ceph_manager.ctx.ceph.conf['osd']:
-                prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --type keyvaluestore ".format(fpath=FSPATH, jpath=JPATH)
-            else:
-                prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} ".format(fpath=FSPATH, jpath=JPATH)
-            cmd = (prefix + "--op list-pgs").format(id=osd)
-            proc = remote.run(args=cmd, wait=True, check_status=False, stdout=StringIO())
-            if proc.exitstatus:
-                raise Exception("ceph_objectstore_tool: exp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
-            pgs = proc.stdout.getvalue().split('\n')[:-1]
-            if len(pgs) == 0:
-                self.log("No PGs found for osd.{osd}".format(osd=osd))
-                return
-            pg = random.choice(pgs)
-            cmd = (prefix + "--op rm-past-intervals --pgid {pg}").format(id=osd, pg=pg)
-            proc = remote.run(args=cmd)
-            if proc.exitstatus:
-                raise Exception("ceph_objectstore_tool: rm-past-intervals failure with status {ret}".format(ret=proc.exitstatus))
 
     def blackhole_kill_osd(self, osd=None):
         """
@@ -303,8 +228,7 @@ class Thrasher:
         """
         if osd is None:
             osd = random.choice(self.live_osds)
-        self.log("Blackholing and then killing osd %s, live_osds are %s" %
-                 (str(osd), str(self.live_osds)))
+        self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
         self.live_osds.remove(osd)
         self.dead_osds.append(osd)
         self.ceph_manager.blackhole_kill_osd(osd)
@@ -328,8 +252,7 @@ class Thrasher:
         """
         if osd is None:
             osd = random.choice(self.in_osds)
-        self.log("Removing osd %s, in_osds are: %s" %
-                 (str(osd), str(self.in_osds)))
+        self.log("Removing osd %s, in_osds are: %s" % (str(osd), str(self.in_osds)))
         self.ceph_manager.mark_out_osd(osd)
         self.in_osds.remove(osd)
         self.out_osds.append(osd)
@@ -347,7 +270,7 @@ class Thrasher:
         self.out_osds.remove(osd)
         self.in_osds.append(osd)
         self.ceph_manager.mark_in_osd(osd)
-        self.log("Added osd %s" % (str(osd),))
+        self.log("Added osd %s"%(str(osd),))
 
     def reweight_osd(self, osd=None):
         """
@@ -358,8 +281,7 @@ class Thrasher:
             osd = random.choice(self.in_osds)
         val = random.uniform(.1, 1.0)
         self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
-        self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
-                                          str(osd), str(val))
+        self.ceph_manager.raw_cluster_cmd('osd', 'reweight', str(osd), str(val))
 
     def primary_affinity(self, osd=None):
         if osd is None:
@@ -371,8 +293,7 @@ class Thrasher:
         else:
             pa = 0
         self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
-        self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
-                                          str(osd), str(pa))
+        self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity', str(osd), str(pa))
 
     def all_up(self):
         """
@@ -397,17 +318,15 @@ class Thrasher:
         Increase the size of the pool
         """
         pool = self.ceph_manager.get_pool()
-        self.log("Growing pool %s" % (pool,))
-        self.ceph_manager.expand_pool(pool,
-                                      self.config.get('pool_grow_by', 10),
-                                      self.max_pgs)
+        self.log("Growing pool %s"%(pool,))
+        self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
 
     def fix_pgp_num(self):
         """
         Fix number of pgs in pool.
         """
         pool = self.ceph_manager.get_pool()
-        self.log("fixing pg num pool %s" % (pool,))
+        self.log("fixing pg num pool %s"%(pool,))
         self.ceph_manager.set_pool_pgpnum(pool)
 
     def test_pool_min_size(self):
@@ -442,18 +361,18 @@ class Thrasher:
         Pause injection testing. Check for osd being down when finished.
         """
         the_one = random.choice(self.live_osds)
-        self.log("inject_pause on {osd}".format(osd=the_one))
+        self.log("inject_pause on {osd}".format(osd = the_one))
         self.log(
             "Testing {key} pause injection for duration {duration}".format(
-                key=conf_key,
-                duration=duration
+                key = conf_key,
+                duration = duration
                 ))
         self.log(
             "Checking after {after}, should_be_down={shouldbedown}".format(
-                after=check_after,
-                shouldbedown=should_be_down
+                after = check_after,
+                shouldbedown = should_be_down
                 ))
-        self.ceph_manager.set_config(the_one, **{conf_key: duration})
+        self.ceph_manager.set_config(the_one, **{conf_key:duration})
         if not should_be_down:
             return
         time.sleep(check_after)
@@ -480,9 +399,9 @@ class Thrasher:
         for i in self.live_osds:
             self.ceph_manager.set_config(
                 i,
-                osd_debug_skip_full_check_in_backfill_reservation=
-                random.choice(['false', 'true']),
-                osd_backfill_full_ratio=0)
+                osd_debug_skip_full_check_in_backfill_reservation = random.choice(
+                    ['false', 'true']),
+                osd_backfill_full_ratio = 0)
         for i in range(30):
             status = self.ceph_manager.compile_pg_status()
             if 'backfill' not in status.keys():
@@ -495,8 +414,9 @@ class Thrasher:
         for i in self.live_osds:
             self.ceph_manager.set_config(
                 i,
-                osd_debug_skip_full_check_in_backfill_reservation='false',
-                osd_backfill_full_ratio=0.85)
+                osd_debug_skip_full_check_in_backfill_reservation = \
+                    'false',
+                osd_backfill_full_ratio = 0.85)
 
     def test_map_discontinuity(self):
         """
@@ -539,8 +459,7 @@ class Thrasher:
         """
         chance_down = self.config.get('chance_down', 0.4)
         chance_test_min_size = self.config.get('chance_test_min_size', 0)
-        chance_test_backfill_full = \
-            self.config.get('chance_test_backfill_full', 0)
+        chance_test_backfill_full = self.config.get('chance_test_backfill_full', 0)
         if isinstance(chance_down, int):
             chance_down = float(chance_down) / 100
         minin = self.minin
@@ -548,45 +467,35 @@ class Thrasher:
         minlive = self.config.get("min_live", 2)
         mindead = self.config.get("min_dead", 0)
 
-        self.log('choose_action: min_in %d min_out '
-                 '%d min_live %d min_dead %d' %
+        self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
                  (minin, minout, minlive, mindead))
         actions = []
         if len(self.in_osds) > minin:
             actions.append((self.out_osd, 1.0,))
         if len(self.live_osds) > minlive and chance_down > 0:
             actions.append((self.kill_osd, chance_down,))
-        if len(self.dead_osds) > 1:
-            actions.append((self.rm_past_intervals, 1.0,))
         if len(self.out_osds) > minout:
             actions.append((self.in_osd, 1.7,))
         if len(self.dead_osds) > mindead:
             actions.append((self.revive_osd, 1.0,))
         if self.config.get('thrash_primary_affinity', True):
             actions.append((self.primary_affinity, 1.0,))
-        actions.append((self.reweight_osd,
-                        self.config.get('reweight_osd', .5),))
-        actions.append((self.grow_pool,
-                        self.config.get('chance_pgnum_grow', 0),))
-        actions.append((self.fix_pgp_num,
-                        self.config.get('chance_pgpnum_fix', 0),))
-        actions.append((self.test_pool_min_size,
-                        chance_test_min_size,))
-        actions.append((self.test_backfill_full,
-                        chance_test_backfill_full,))
+        actions.append((self.reweight_osd, self.config.get('reweight_osd',.5),))
+        actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),))
+        actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),))
+        actions.append((self.test_pool_min_size, chance_test_min_size,))
+        actions.append((self.test_backfill_full, chance_test_backfill_full,))
         for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
             for scenario in [
-                (lambda:
-                 self.inject_pause(key,
-                                   self.config.get('pause_short', 3),
-                                   0,
-                                   False),
+                (lambda: self.inject_pause(key,
+                                           self.config.get('pause_short', 3),
+                                           0,
+                                           False),
                  self.config.get('chance_inject_pause_short', 1),),
-                (lambda:
-                 self.inject_pause(key,
-                                   self.config.get('pause_long', 80),
-                                   self.config.get('pause_check_after', 70),
-                                   True),
+                (lambda: self.inject_pause(key,
+                                           self.config.get('pause_long', 80),
+                                           self.config.get('pause_check_after', 70),
+                                           True),
                  self.config.get('chance_inject_pause_long', 0),)]:
                 actions.append(scenario)
 
@@ -608,11 +517,9 @@ class Thrasher:
         delay = self.config.get("op_delay", 5)
         self.log("starting do_thrash")
         while not self.stopping:
-            to_log = [str(x) for x in ["in_osds: ", self.in_osds,
-                                       "out_osds: ", self.out_osds,
-                                       "dead_osds: ", self.dead_osds,
-                                       "live_osds: ", self.live_osds]]
-            self.log(" ".join(to_log))
+            self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
+                                                "dead_osds: ", self.dead_osds, "live_osds: ",
+                                                self.live_osds]]))
             if random.uniform(0, 1) < (float(delay) / cleanint):
                 while len(self.dead_osds) > maxdead:
                     self.revive_osd()
@@ -620,7 +527,7 @@ class Thrasher:
                     self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
                                                       str(osd), str(1))
                 if random.uniform(0, 1) < float(
-                        self.config.get('chance_test_map_discontinuity', 0)):
+                    self.config.get('chance_test_map_discontinuity', 0)):
                     self.test_map_discontinuity()
                 else:
                     self.ceph_manager.wait_for_recovery(
@@ -635,69 +542,6 @@ class Thrasher:
             time.sleep(delay)
         self.all_up()
 
-
-class ObjectStoreTool:
-
-    def __init__(self, manager, pool, **kwargs):
-        self.manager = manager
-        self.pool = pool
-        self.osd = kwargs.get('osd', None)
-        self.object_name = kwargs.get('object_name', None)
-        if self.osd and self.pool and self.object_name:
-            if self.osd == "primary":
-                self.osd = self.manager.get_object_primary(self.pool,
-                                                           self.object_name)
-        assert self.osd
-        if self.object_name:
-            self.pgid = self.manager.get_object_pg_with_shard(self.pool,
-                                                              self.object_name,
-                                                              self.osd)
-        self.remote = self.manager.ctx.\
-            cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
-        path = self.manager.get_filepath().format(id=self.osd)
-        self.paths = ("--data-path {path} --journal-path {path}/journal".
-                      format(path=path))
-
-    def build_cmd(self, options, args, stdin):
-        lines = []
-        if self.object_name:
-            lines.append("object=$(sudo ceph-objectstore-tool "
-                         "{paths} --pgid {pgid} --op list |"
-                         "grep '\"oid\":\"{name}\"')".
-                         format(paths=self.paths,
-                                pgid=self.pgid,
-                                name=self.object_name))
-            args = '"$object" ' + args
-            options += " --pgid {pgid}".format(pgid=self.pgid)
-        cmd = ("sudo ceph-objectstore-tool {paths} {options} {args}".
-               format(paths=self.paths,
-                      args=args,
-                      options=options))
-        if stdin:
-            cmd = ("echo {payload} | base64 --decode | {cmd}".
-                   format(payload=base64.encode(kwargs['stdin']),
-                          cmd=cmd))
-        lines.append(cmd)
-        return "\n".join(lines)
-
-    def run(self, options, args, stdin=None):
-        self.manager.kill_osd(self.osd)
-        cmd = self.build_cmd(options, args, stdin)
-        self.manager.log(cmd)
-        try:
-            proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
-                                   check_status=False,
-                                   stdout=StringIO(),
-                                   stderr=StringIO())
-            proc.wait()
-            if proc.exitstatus != 0:
-                self.manager.log("failed with " + str(proc.exitstatus))
-                error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
-                raise Exception(error)
-        finally:
-            self.manager.revive_osd(self.osd)
-
-
 class CephManager:
     """
     Ceph manager object.
@@ -735,11 +579,11 @@ class CephManager:
         """
         testdir = teuthology.get_testdir(self.ctx)
         ceph_args = [
-            'adjust-ulimits',
-            'ceph-coverage',
-            '{tdir}/archive/coverage'.format(tdir=testdir),
-            'ceph',
-        ]
+                'adjust-ulimits',
+                'ceph-coverage',
+                '{tdir}/archive/coverage'.format(tdir=testdir),
+                'ceph',
+                ]
         ceph_args.extend(args)
         proc = self.controller.run(
             args=ceph_args,
@@ -753,11 +597,11 @@ class CephManager:
         """
         testdir = teuthology.get_testdir(self.ctx)
         ceph_args = [
-            'adjust-ulimits',
-            'ceph-coverage',
-            '{tdir}/archive/coverage'.format(tdir=testdir),
-            'ceph',
-        ]
+                'adjust-ulimits',
+                'ceph-coverage',
+                '{tdir}/archive/coverage'.format(tdir=testdir),
+                'ceph',
+                ]
         ceph_args.extend(args)
         proc = self.controller.run(
             args=ceph_args,
@@ -783,8 +627,8 @@ class CephManager:
             )
         return proc
 
-    def rados_write_objects(self, pool, num_objects, size,
-                            timelimit, threads, cleanup=False):
+    def rados_write_objects(
+        self, pool, num_objects, size, timelimit, threads, cleanup=False):
         """
         Write rados objects
         Threads not used yet.
@@ -796,8 +640,7 @@ class CephManager:
             'bench', timelimit,
             'write'
             ]
-        if not cleanup:
-            args.append('--no-cleanup')
+        if not cleanup: args.append('--no-cleanup')
         return self.do_rados(self.controller, map(str, args))
 
     def do_put(self, pool, obj, fname):
@@ -838,25 +681,20 @@ class CephManager:
         Get the Remote for the host where a particular service runs.
 
         :param service_type: 'mds', 'osd', 'client'
-        :param service_id: The second part of a role, e.g. '0' for
-                           the role 'client.0'
-        :return: a Remote instance for the host where the
-                 requested role is placed
+        :param service_id: The second part of a role, e.g. '0' for the role 'client.0'
+        :return: a Remote instance for the host where the requested role is placed
         """
         for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
             for id_ in teuthology.roles_of_type(roles_for_host, service_type):
                 if id_ == str(service_id):
                     return _remote
 
-        raise KeyError("Service {0}.{1} not found".format(service_type,
-                                                          service_id))
+        raise KeyError("Service {0}.{1} not found".format(service_type, service_id))
 
-    def admin_socket(self, service_type, service_id,
-                     command, check_status=True):
+    def admin_socket(self, service_type, service_id, command, check_status=True):
         """
         Remotely start up ceph specifying the admin socket
-        :param command: a list of words to use as the command
-                        to the admin socket
+        :param command a list of words to use as the command to the admin socket
         """
         testdir = teuthology.get_testdir(self.ctx)
         remote = self.find_remote(service_type, service_id)
@@ -879,9 +717,6 @@ class CephManager:
             check_status=check_status
             )
 
-    def objectstore_tool(self, pool, options, args, **kwargs):
-        return ObjectStoreTool(self, pool, **kwargs).run(options, args)
-
     def get_pgid(self, pool, pgnum):
         """
         :param pool: pool name
@@ -948,34 +783,32 @@ class CephManager:
             'kick_recovery_wq',
             '0')
 
-    def wait_run_admin_socket(self, service_type,
-                              service_id, args=['version'], timeout=75):
+    def wait_run_admin_socket(self, service_type, service_id, args=['version'], timeout=75):
         """
         If osd_admin_socket call suceeds, return.  Otherwise wait
         five seconds and try again.
         """
         tries = 0
         while True:
-            proc = self.admin_socket(service_type, service_id,
-                                     args, check_status=False)
+            proc = self.admin_socket(service_type, service_id, args, check_status=False)
             if proc.exitstatus is 0:
                 break
             else:
                 tries += 1
                 if (tries * 5) > timeout:
-                    raise Exception('timed out waiting for admin_socket '
-                                    'to appear after {type}.{id} restart'.
-                                    format(type=service_type,
-                                           id=service_id))
-                self.log("waiting on admin_socket for {type}-{id}, "
-                         "{command}".format(type=service_type,
-                                            id=service_id,
-                                            command=args))
+                    raise Exception('timed out waiting for admin_socket to appear after {type}.{id} restart'.format(
+                        type=service_type,
+                        id=service_id))
+                self.log(
+                    "waiting on admin_socket for {type}-{id}, {command}".format(
+                        type=service_type,
+                        id=service_id,
+                        command=args))
                 time.sleep(5)
 
     def get_pool_dump(self, pool):
         """
-        get the osd dump part of a pool
+        get the osd dump part of a pool 
         """
         osd_dump = self.get_osd_dump_json()
         for i in osd_dump['pools']:
@@ -1014,26 +847,25 @@ class CephManager:
             lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
             self.raw_osd_status().split('\n'))
         self.log(osd_lines)
-        in_osds = [int(i[4:].split()[0])
-                   for i in filter(lambda x: " in " in x, osd_lines)]
-        out_osds = [int(i[4:].split()[0])
-                    for i in filter(lambda x: " out " in x, osd_lines)]
-        up_osds = [int(i[4:].split()[0])
-                   for i in filter(lambda x: " up " in x, osd_lines)]
-        down_osds = [int(i[4:].split()[0])
-                     for i in filter(lambda x: " down " in x, osd_lines)]
-        dead_osds = [int(x.id_)
-                     for x in filter(lambda x:
-                                     not x.running(),
-                                     self.ctx.daemons.
-                                     iter_daemons_of_role('osd'))]
+        in_osds = [int(i[4:].split()[0]) for i in filter(
+                lambda x: " in " in x,
+                osd_lines)]
+        out_osds = [int(i[4:].split()[0]) for i in filter(
+                lambda x: " out " in x,
+                osd_lines)]
+        up_osds = [int(i[4:].split()[0]) for i in filter(
+                lambda x: " up " in x,
+                osd_lines)]
+        down_osds = [int(i[4:].split()[0]) for i in filter(
+                lambda x: " down " in x,
+                osd_lines)]
+        dead_osds = [int(x.id_) for x in
+                     filter(lambda x: not x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
         live_osds = [int(x.id_) for x in
-                     filter(lambda x:
-                            x.running(),
-                            self.ctx.daemons.iter_daemons_of_role('osd'))]
-        return {'in': in_osds, 'out': out_osds, 'up': up_osds,
-                'down': down_osds, 'dead': dead_osds, 'live': live_osds,
-                'raw': osd_lines}
+                     filter(lambda x: x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
+        return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds,
+                 'down' : down_osds, 'dead' : dead_osds, 'live' : live_osds,
+                 'raw' : osd_lines}
 
     def get_num_pgs(self):
         """
@@ -1052,8 +884,7 @@ class CephManager:
             args = cmd_erasure_code_profile(profile_name, profile)
             self.raw_cluster_cmd(*args)
 
-    def create_pool_with_unique_name(self, pg_num=16,
-                                     erasure_code_profile_name=None):
+    def create_pool_with_unique_name(self, pg_num=16, erasure_code_profile_name=None):
         """
         Create a pool named unique_pool_X where X is unique.
         """
@@ -1067,33 +898,22 @@ class CephManager:
                 erasure_code_profile_name=erasure_code_profile_name)
         return name
 
-    @contextlib.contextmanager
-    def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
-        self.create_pool(pool_name, pg_num, erasure_code_profile_name)
-        yield
-        self.remove_pool(pool_name)
-
-    def create_pool(self, pool_name, pg_num=16,
-                    erasure_code_profile_name=None):
+    def create_pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
         """
         Create a pool named from the pool_name parameter.
         :param pool_name: name of the pool being created.
         :param pg_num: initial number of pgs.
-        :param erasure_code_profile_name: if set and !None create an
-                                          erasure coded pool using the profile
+        :param erasure_code_profile_name: if set and !None create an erasure coded pool using the profile 
         """
         with self.lock:
             assert isinstance(pool_name, str)
             assert isinstance(pg_num, int)
             assert pool_name not in self.pools
-            self.log("creating pool_name %s" % (pool_name,))
+            self.log("creating pool_name %s"%(pool_name,))
             if erasure_code_profile_name:
-                self.raw_cluster_cmd('osd', 'pool', 'create',
-                                     pool_name, str(pg_num), str(pg_num),
-                                     'erasure', erasure_code_profile_name)
+                self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num), str(pg_num), 'erasure', erasure_code_profile_name)
             else:
-                self.raw_cluster_cmd('osd', 'pool', 'create',
-                                     pool_name, str(pg_num))
+                self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num))
             self.pools[pool_name] = pg_num
 
     def remove_pool(self, pool_name):
@@ -1106,9 +926,10 @@ class CephManager:
             assert pool_name in self.pools
             self.log("removing pool_name %s" % (pool_name,))
             del self.pools[pool_name]
-            self.do_rados(self.controller,
-                          ['rmpool', pool_name, pool_name,
-                           "--yes-i-really-really-mean-it"])
+            self.do_rados(
+                self.controller,
+                ['rmpool', pool_name, pool_name, "--yes-i-really-really-mean-it"]
+                )
 
     def get_pool(self):
         """
@@ -1165,15 +986,12 @@ class CephManager:
                     pool_name,
                     prop,
                     str(val))
-                if r != 11:  # EAGAIN
+                if r != 11: # EAGAIN
                     break
                 tries += 1
                 if tries > 50:
-                    raise Exception('timed out getting EAGAIN '
-                                    'when setting pool property %s %s = %s' %
-                                    (pool_name, prop, val))
-                self.log('got EAGAIN setting pool property, '
-                         'waiting a few seconds...')
+                    raise Exception('timed out getting EAGAIN when setting pool property %s %s = %s' % (pool_name, prop, val))
+                self.log('got EAGAIN setting pool property, waiting a few seconds...')
                 time.sleep(2)
 
     def expand_pool(self, pool_name, by, max_pgs):
@@ -1188,7 +1006,7 @@ class CephManager:
                 return
             if (self.pools[pool_name] + by) > max_pgs:
                 return
-            self.log("increase pool size by %d" % (by,))
+            self.log("increase pool size by %d"%(by,))
             new_pg_num = self.pools[pool_name] + by
             self.set_pool_property(pool_name, "pg_num", new_pg_num)
             self.pools[pool_name] = new_pg_num
@@ -1286,7 +1104,7 @@ class CephManager:
         init = self.get_last_scrub_stamp(pool, pgnum)
         self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
         while init == self.get_last_scrub_stamp(pool, pgnum):
-            self.log("waiting for scrub type %s" % (stype,))
+            self.log("waiting for scrub type %s"%(stype,))
             time.sleep(10)
 
     def get_single_pg_stats(self, pgid):
@@ -1301,32 +1119,6 @@ class CephManager:
 
         return None
 
-    def get_object_pg_with_shard(self, pool, name, osdid):
-        """
-        """
-        pool_dump = self.get_pool_dump(pool)
-        object_map = self.get_object_map(pool, name)
-        if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
-            shard = object_map['acting'].index(osdid)
-            return "{pgid}s{shard}".format(pgid=object_map['pgid'],
-                                           shard=shard)
-        else:
-            return object_map['pgid']
-
-    def get_object_primary(self, pool, name):
-        """
-        """
-        object_map = self.get_object_map(pool, name)
-        return object_map['acting_primary']
-
-    def get_object_map(self, pool, name):
-        """
-        osd map --format=json converted to a python object
-        :returns: the python object
-        """
-        out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
-        return json.loads('\n'.join(out.split('\n')[1:]))
-
     def get_osd_dump_json(self):
         """
         osd dump --format=json converted to a python object
@@ -1376,9 +1168,7 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if (pg['state'].count('active') and
-                    pg['state'].count('clean') and
-                    not pg['state'].count('stale')):
+            if pg['state'].count('active') and pg['state'].count('clean') and not pg['state'].count('stale'):
                 num += 1
         return num
 
@@ -1389,10 +1179,7 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if (pg['state'].count('active') and
-                    not pg['state'].count('recover') and
-                    not pg['state'].count('backfill') and
-                    not pg['state'].count('stale')):
+            if pg['state'].count('active') and not pg['state'].count('recover') and not pg['state'].count('backfill') and not pg['state'].count('stale'):
                 num += 1
         return num
 
@@ -1425,10 +1212,8 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if ((pg['state'].count('down') and not
-                    pg['state'].count('stale')) or
-                (pg['state'].count('incomplete') and not
-                    pg['state'].count('stale'))):
+            if (pg['state'].count('down') and not pg['state'].count('stale')) or \
+                    (pg['state'].count('incomplete') and not pg['state'].count('stale')):
                 num += 1
         return num
 
@@ -1439,12 +1224,9 @@ class CephManager:
         pgs = self.get_pg_stats()
         num = 0
         for pg in pgs:
-            if ((pg['state'].count('active') and not
-                    pg['state'].count('stale')) or
-                (pg['state'].count('down') and not
-                    pg['state'].count('stale')) or
-                (pg['state'].count('incomplete') and not
-                    pg['state'].count('stale'))):
+            if (pg['state'].count('active') and not pg['state'].count('stale')) or \
+                    (pg['state'].count('down') and not pg['state'].count('stale')) or \
+                    (pg['state'].count('incomplete') and not pg['state'].count('stale')):
                 num += 1
         return num
 
@@ -1468,7 +1250,7 @@ class CephManager:
 
     def wait_for_clean(self, timeout=None):
         """
-        Returns true when all pgs are clean.
+        Returns trues when all pgs are clean.
         """
         self.log("waiting for clean")
         start = time.time()
@@ -1494,7 +1276,8 @@ class CephManager:
         Returns true if all osds are up.
         """
         x = self.get_osd_dump()
-        return (len(x) == sum([(y['up'] > 0) for y in x]))
+        return (len(x) == \
+                    sum([(y['up'] > 0) for y in x]))
 
     def wait_for_all_up(self, timeout=None):
         """
@@ -1592,13 +1375,13 @@ class CephManager:
 
     def is_active(self):
         """
-        Wrapper to check if all pgs are active
+        Wrapper to check if active
         """
         return self.get_num_active() == self.get_num_pgs()
 
     def wait_till_active(self, timeout=None):
         """
-        Wait until all pgs are active.
+        Wait until osds are active.
         """
         self.log("waiting till active")
         start = time.time()
@@ -1621,14 +1404,9 @@ class CephManager:
         or by stopping.
         """
         if self.config.get('powercycle'):
-            (remote,) = (self.ctx.cluster.only('osd.{o}'.format(o=osd)).
-                         remotes.iterkeys())
-            self.log('kill_osd on osd.{o} '
-                     'doing powercycle of {s}'.format(o=osd, s=remote.name))
-            assert remote.console is not None, ("powercycling requested "
-                                                "but RemoteConsole is not "
-                                                "initialized.  "
-                                                "Check ipmi config.")
+            (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+            self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
+            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
             remote.console.power_off()
         else:
             self.ctx.daemons.get_daemon('osd', osd).stop()
@@ -1648,18 +1426,12 @@ class CephManager:
         or by restarting.
         """
         if self.config.get('powercycle'):
-            (remote,) = (self.ctx.cluster.only('osd.{o}'.format(o=osd)).
-                         remotes.iterkeys())
-            self.log('kill_osd on osd.{o} doing powercycle of {s}'.
-                     format(o=osd, s=remote.name))
-            assert remote.console is not None, ("powercycling requested "
-                                                "but RemoteConsole is not "
-                                                "initialized.  "
-                                                "Check ipmi config.")
+            (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+            self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
+            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
             remote.console.power_on()
             if not remote.console.check_status(300):
-                raise Exception('Failed to revive osd.{o} via ipmi'.
-                                format(o=osd))
+                raise Exception('Failed to revive osd.{o} via ipmi'.format(o=osd))
             teuthology.reconnect(self.ctx, 60, [remote])
             mount_osd_data(self.ctx, remote, str(osd))
             make_admin_daemon_dir(self.ctx, remote)
@@ -1685,7 +1457,9 @@ class CephManager:
         """
         self.raw_cluster_cmd('osd', 'in', str(osd))
 
+
     ## monitors
+
     def signal_mon(self, mon, sig):
         """
         Wrapper to local get_deamon call
@@ -1698,15 +1472,9 @@ class CephManager:
         or by doing a stop.
         """
         if self.config.get('powercycle'):
-            (remote,) = (self.ctx.cluster.only('mon.{m}'.format(m=mon)).
-                         remotes.iterkeys())
-            self.log('kill_mon on mon.{m} doing powercycle of {s}'.
-                     format(m=mon, s=remote.name))
-            assert remote.console is not None, ("powercycling requested "
-                                                "but RemoteConsole is not "
-                                                "initialized.  "
-                                                "Check ipmi config.")
-
+            (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
+            self.log('kill_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
+            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
             remote.console.power_off()
         else:
             self.ctx.daemons.get_daemon('mon', mon).stop()
@@ -1717,15 +1485,9 @@ class CephManager:
         or by doing a normal restart.
         """
         if self.config.get('powercycle'):
-            (remote,) = (self.ctx.cluster.only('mon.{m}'.format(m=mon)).
-                         remotes.iterkeys())
-            self.log('revive_mon on mon.{m} doing powercycle of {s}'.
-                     format(m=mon, s=remote.name))
-            assert remote.console is not None, ("powercycling requested "
-                                                "but RemoteConsole is not "
-                                                "initialized.  "
-                                                "Check ipmi config.")
-
+            (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
+            self.log('revive_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
+            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
             remote.console.power_on()
             make_admin_daemon_dir(self.ctx, remote)
         self.ctx.daemons.get_daemon('mon', mon).restart()
@@ -1756,8 +1518,7 @@ class CephManager:
         while not len(self.get_mon_quorum()) == size:
             if timeout is not None:
                 assert time.time() - start < timeout, \
-                    ('failed to reach quorum size %d '
-                     'before timeout expired' % size)
+                    'failed to reach quorum size %d before timeout expired' % size
             time.sleep(3)
         self.log("quorum is size %d" % size)
 
@@ -1777,14 +1538,9 @@ class CephManager:
         Powercyle if set in config, otherwise just stop.
         """
         if self.config.get('powercycle'):
-            (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
-                         remotes.iterkeys())
-            self.log('kill_mds on mds.{m} doing powercycle of {s}'.
-                     format(m=mds, s=remote.name))
-            assert remote.console is not None, ("powercycling requested "
-                                                "but RemoteConsole is not "
-                                                "initialized.  "
-                                                "Check ipmi config.")
+            (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
+            self.log('kill_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
+            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
             remote.console.power_off()
         else:
             self.ctx.daemons.get_daemon('mds', mds).stop()
@@ -1802,14 +1558,9 @@ class CephManager:
         and then restart (using --hot-standby if specified.
         """
         if self.config.get('powercycle'):
-            (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
-                         remotes.iterkeys())
-            self.log('revive_mds on mds.{m} doing powercycle of {s}'.
-                     format(m=mds, s=remote.name))
-            assert remote.console is not None, ("powercycling requested "
-                                                "but RemoteConsole is not "
-                                                "initialized.  "
-                                                "Check ipmi config.")
+            (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
+            self.log('revive_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
+            assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized.  Check ipmi config."
             remote.console.power_on()
             make_admin_daemon_dir(self.ctx, remote)
         args = []
index 8899fcd9d4c629c4fcf05d400282d95a1e842b78..c99f6edae7d183c3104ac21e2bd628ae68cd7044 100644 (file)
@@ -3,72 +3,125 @@ Test pool repairing after objects are damaged.
 """
 import logging
 import time
-import contextlib
 
 import ceph_manager
 from teuthology import misc as teuthology
 
 log = logging.getLogger(__name__)
 
-
-def choose_primary(ctx, pool, num):
+def setup(ctx, config):
     """
-    Return primary to test on.
+    Create the repair test pool.
     """
-    log.info("Choosing primary")
-    return ctx.manager.get_pg_primary(pool, num)
-
+    ctx.manager.wait_for_clean()
+    ctx.manager.create_pool("repair_test_pool", 1)
+    return "repair_test_pool"
 
-def choose_replica(ctx, pool, num):
+def teardown(ctx, config, pool):
     """
-    Return replica to test on.
+    Remove the repair test pool.
     """
-    log.info("Choosing replica")
-    return ctx.manager.get_pg_replica(pool, num)
-
+    ctx.manager.remove_pool(pool)
+    ctx.manager.wait_for_clean()
 
-def trunc(ctx, osd, pool, obj):
+def run_test(ctx, config, test):
     """
-    truncate an object
+    Setup a test pool, run the test, and clean up afterwards.
+   
+    :param test: function passed in, called to run the test.
     """
-    log.info("truncating object")
-    return ctx.manager.osd_admin_socket(
-        osd,
-        ['truncobj', pool, obj, '1'])
+    s = setup(ctx, config)
+    test(ctx, config, s)
+    teardown(ctx, config, s)
 
-
-def dataerr(ctx, osd, pool, obj):
+def choose_primary(ctx):
     """
-    cause an error in the data
+    Select a primary for the next test.  This routine is typically passed to
+    as a 'chooser function'
     """
-    log.info("injecting data err on object")
-    return ctx.manager.osd_admin_socket(
-        osd,
-        ['injectdataerr', pool, obj])
-
-
-def mdataerr(ctx, osd, pool, obj):
+    def ret(pool, num):
+        """
+        Return primary to test on.
+        """
+        log.info("Choosing primary")
+        return ctx.manager.get_pg_primary(pool, num)
+    return ret
+
+def choose_replica(ctx):
     """
-    cause an error in the mdata
+    Select a replica for the next test.  This routine is typically passed to
+    as a 'chooser function'
     """
-    log.info("injecting mdata err on object")
-    return ctx.manager.osd_admin_socket(
-        osd,
-        ['injectmdataerr', pool, obj])
-
-
-def omaperr(ctx, osd, pool, obj):
+    def ret(pool, num):
+        """
+        Return replica to test on.
+        """
+        log.info("Choosing replica")
+        return ctx.manager.get_pg_replica(pool, num)
+    return ret
+
+def trunc(ctx):
     """
-    Cause an omap error.
+    Truncate an object in the pool. This function is typically passed as a
+    'corrupter function'
     """
-    log.info("injecting omap err on object")
-    return ctx.manager.osd_admin_socket(osd, ['setomapval', pool, obj,
-                                              'badkey', 'badval'])
-
-
-def repair_test_1(ctx, corrupter, chooser, scrub_type):
+    def ret(osd, pool, obj):
+        """
+        truncate an object
+        """
+        log.info("truncating object")
+        return ctx.manager.osd_admin_socket(
+            osd,
+            ['truncobj', pool, obj, '1'])
+    return ret
+
+def dataerr(ctx):
+    """
+    Generate an error on an object in the pool. This function is typically
+    passed as a 'corrupter function'
+    """
+    def ret(osd, pool, obj):
+        """
+        cause an error in the data 
+        """
+        log.info("injecting data err on object")
+        return ctx.manager.osd_admin_socket(
+            osd,
+            ['injectdataerr', pool, obj])
+    return ret
+
+def mdataerr(ctx):
+    """
+    Generate an mdata error on an object in the pool. This function is
+    typically passed as a 'corrupter function'
+    """
+    def ret(osd, pool, obj):
+        """
+        cause an error in the mdata
+        """
+        log.info("injecting mdata err on object")
+        return ctx.manager.osd_admin_socket(
+            osd,
+            ['injectmdataerr', pool, obj])
+    return ret
+
+def omaperr(ctx):
+    """
+    Cause data corruption by injecting omap errors into a pool.
     """
-    Creates an object in the pool, corrupts it,
+    def ret(osd, pool, obj):
+        """
+        Cause an omap error.
+        """
+        log.info("injecting omap err on object")
+        return ctx.manager.osd_admin_socket(osd, ['setomapval', pool, obj, 'badkey', 'badval']);
+    return ret
+
+def gen_repair_test_1(corrupter, chooser, scrub_type):
+    """
+    Repair test.  Wrapper for the internal ret function.
+
+    The internal ret function creates an object in the pool, corrupts it,
     scrubs it, and verifies that the pool is inconsistent.  It then repairs
     the pool, rescrubs it, and verifies that the pool is consistent
 
@@ -77,12 +130,12 @@ def repair_test_1(ctx, corrupter, chooser, scrub_type):
     :param chooser: osd type chooser (primary or replica)
     :param scrub_type: regular scrub or deep-scrub
     """
-    pool = "repair_pool_1"
-    ctx.manager.wait_for_clean()
-    with ctx.manager.pool(pool, 1):
-
+    def ret(ctx, config, pool):
+        """
+        :param pool: repair test pool
+        """
         log.info("starting repair test type 1")
-        victim_osd = chooser(ctx, pool, 0)
+        victim_osd = chooser(pool, 0)
 
         # create object
         log.info("doing put")
@@ -90,7 +143,7 @@ def repair_test_1(ctx, corrupter, chooser, scrub_type):
 
         # corrupt object
         log.info("corrupting object")
-        corrupter(ctx, victim_osd, pool, 'repair_test_obj')
+        corrupter(victim_osd, pool, 'repair_test_obj')
 
         # verify inconsistent
         log.info("scrubbing")
@@ -108,42 +161,43 @@ def repair_test_1(ctx, corrupter, chooser, scrub_type):
         # verify consistent
         assert not ctx.manager.pg_inconsistent(pool, 0)
         log.info("done")
+    return ret
 
-
-def repair_test_2(ctx, config, chooser):
+def gen_repair_test_2(chooser):
     """
-    First creates a set of objects and
+    Repair test.  Wrapper for the internal ret function.
+
+    The internal ret function first creates a set of objects and
     sets the omap value.  It then corrupts an object, does both a scrub
     and a deep-scrub, and then corrupts more objects.  After that, it
     repairs the pool and makes sure that the pool is consistent some
-    time after a deep-scrub.
+    time after a deep-scrub. 
 
     :param chooser: primary or replica selection routine.
     """
-    pool = "repair_pool_2"
-    ctx.manager.wait_for_clean()
-    with ctx.manager.pool(pool, 1):
+    def ret(ctx, config, pool):
+        """
+        :param pool: repair test pool. 
+        """
         log.info("starting repair test type 2")
-        victim_osd = chooser(ctx, pool, 0)
+        victim_osd = chooser(pool, 0)
         first_mon = teuthology.get_first_mon(ctx, config)
         (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
 
         # create object
         log.info("doing put and setomapval")
         ctx.manager.do_put(pool, 'file1', '/etc/hosts')
-        ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file1',
-                                   'key', 'val'])
+        ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file1', 'key', 'val'])
         ctx.manager.do_put(pool, 'file2', '/etc/hosts')
         ctx.manager.do_put(pool, 'file3', '/etc/hosts')
         ctx.manager.do_put(pool, 'file4', '/etc/hosts')
         ctx.manager.do_put(pool, 'file5', '/etc/hosts')
-        ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file5',
-                                   'key', 'val'])
+        ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file5', 'key', 'val'])
         ctx.manager.do_put(pool, 'file6', '/etc/hosts')
 
         # corrupt object
         log.info("corrupting object")
-        omaperr(ctxvictim_osd, pool, 'file1')
+        omaperr(ctx)(victim_osd, pool, 'file1')
 
         # verify inconsistent
         log.info("scrubbing")
@@ -159,10 +213,10 @@ def repair_test_2(ctx, config, chooser):
 
         # Additional corruptions including 2 types for file1
         log.info("corrupting more objects")
-        dataerr(ctxvictim_osd, pool, 'file1')
-        mdataerr(ctxvictim_osd, pool, 'file2')
-        trunc(ctxvictim_osd, pool, 'file3')
-        omaperr(ctxvictim_osd, pool, 'file6')
+        dataerr(ctx)(victim_osd, pool, 'file1')
+        mdataerr(ctx)(victim_osd, pool, 'file2')
+        trunc(ctx)(victim_osd, pool, 'file3')
+        omaperr(ctx)(victim_osd, pool, 'file6')
 
         # see still inconsistent
         log.info("scrubbing")
@@ -189,62 +243,7 @@ def repair_test_2(ctx, config, chooser):
         assert not ctx.manager.pg_inconsistent(pool, 0)
 
         log.info("done")
-
-
-def hinfoerr(ctx, victim, pool, obj):
-    """
-    cause an error in the hinfo_key
-    """
-    log.info("remove the hinfo_key")
-    ctx.manager.objectstore_tool(pool,
-                                 options='',
-                                 args='rm-attr hinfo_key',
-                                 object_name=obj,
-                                 osd=victim)
-
-
-def repair_test_erasure_code(ctx, corrupter, victim, scrub_type):
-    """
-    Creates an object in the pool, corrupts it,
-    scrubs it, and verifies that the pool is inconsistent.  It then repairs
-    the pool, rescrubs it, and verifies that the pool is consistent
-
-    :param corrupter: error generating function.
-    :param chooser: osd type chooser (primary or replica)
-    :param scrub_type: regular scrub or deep-scrub
-    """
-    pool = "repair_pool_3"
-    ctx.manager.wait_for_clean()
-    with ctx.manager.pool(pool_name=pool, pg_num=1,
-                          erasure_code_profile_name='default'):
-
-        log.info("starting repair test for erasure code")
-
-        # create object
-        log.info("doing put")
-        ctx.manager.do_put(pool, 'repair_test_obj', '/etc/hosts')
-
-        # corrupt object
-        log.info("corrupting object")
-        corrupter(ctx, victim, pool, 'repair_test_obj')
-
-        # verify inconsistent
-        log.info("scrubbing")
-        ctx.manager.do_pg_scrub(pool, 0, scrub_type)
-
-        assert ctx.manager.pg_inconsistent(pool, 0)
-
-        # repair
-        log.info("repairing")
-        ctx.manager.do_pg_scrub(pool, 0, "repair")
-
-        log.info("re-scrubbing")
-        ctx.manager.do_pg_scrub(pool, 0, scrub_type)
-
-        # verify consistent
-        assert not ctx.manager.pg_inconsistent(pool, 0)
-        log.info("done")
-
+    return ret
 
 def task(ctx, config):
     """
@@ -262,20 +261,7 @@ def task(ctx, config):
     - chef:
     - install:
     - ceph:
-        log-whitelist:
-          - 'candidate had a read error'
-          - 'deep-scrub 0 missing, 1 inconsistent objects'
-          - 'deep-scrub 0 missing, 4 inconsistent objects'
-          - 'deep-scrub 1 errors'
-          - 'deep-scrub 4 errors'
-          - '!= known omap_digest'
-          - 'repair 0 missing, 1 inconsistent objects'
-          - 'repair 0 missing, 4 inconsistent objects'
-          - 'repair 1 errors, 1 fixed'
-          - 'repair 4 errors, 4 fixed'
-          - 'scrub 0 missing, 1 inconsistent'
-          - 'scrub 1 errors'
-          - 'size 1 != known size'
+        log-whitelist: ['candidate had a read error', 'deep-scrub 0 missing, 1 inconsistent objects', 'deep-scrub 0 missing, 4 inconsistent objects', 'deep-scrub 1 errors', 'deep-scrub 4 errors', '!= known omap_digest', 'repair 0 missing, 1 inconsistent objects', 'repair 0 missing, 4 inconsistent objects', 'repair 1 errors, 1 fixed', 'repair 4 errors, 4 fixed', 'scrub 0 missing, 1 inconsistent', 'scrub 1 errors', 'size 1 != known size']
         conf:
           osd:
             filestore debug inject read err: true
@@ -296,15 +282,22 @@ def task(ctx, config):
             logger=log.getChild('ceph_manager')
             )
 
-    ctx.manager.wait_for_all_up()
+    num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
+    log.info('num_osds is %s' % num_osds)
 
-    repair_test_1(ctx, mdataerr, choose_primary, "scrub")
-    repair_test_1(ctx, mdataerr, choose_replica, "scrub")
-    repair_test_1(ctx, dataerr, choose_primary, "deep-scrub")
-    repair_test_1(ctx, dataerr, choose_replica, "deep-scrub")
-    repair_test_1(ctx, trunc, choose_primary, "scrub")
-    repair_test_1(ctx, trunc, choose_replica, "scrub")
-    repair_test_2(ctx, config, choose_primary)
-    repair_test_2(ctx, config, choose_replica)
+    while len(ctx.manager.get_osd_status()['up']) < num_osds:
+        time.sleep(10)
 
-    repair_test_erasure_code(ctx, hinfoerr, 'primary', "deep-scrub")
+    tests = [
+        gen_repair_test_1(mdataerr(ctx), choose_primary(ctx), "scrub"),
+        gen_repair_test_1(mdataerr(ctx), choose_replica(ctx), "scrub"),
+        gen_repair_test_1(dataerr(ctx), choose_primary(ctx), "deep-scrub"),
+        gen_repair_test_1(dataerr(ctx), choose_replica(ctx), "deep-scrub"),
+        gen_repair_test_1(trunc(ctx), choose_primary(ctx), "scrub"),
+        gen_repair_test_1(trunc(ctx), choose_replica(ctx), "scrub"),
+        gen_repair_test_2(choose_primary(ctx)),
+        gen_repair_test_2(choose_replica(ctx))
+        ]
+
+    for test in tests:
+        run_test(ctx, config, test)