ceph manager -- Thrasher and CephManager objects
"""
from cStringIO import StringIO
-import contextlib
import random
import time
import gevent
-import base64
import json
import threading
import os
'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
'sudo', 'python',
'-c',
- ('import shutil, sys; '
- 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
+ 'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
conf_path,
run.Raw('&&'),
'sudo', 'chmod', '0644', conf_path,
:param ctx: Context
:param remote: Remote site
"""
- remote.run(args=['sudo',
- 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
+ remote.run(
+ args=[
+ 'sudo',
+ 'install', '-d', '-m0777', '--', '/var/run/ceph',
+ ],
+ )
def mount_osd_data(ctx, remote, osd):
:param ods: Osd name
"""
log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
- if (remote in ctx.disk_config.remote_to_roles_to_dev and
- osd in ctx.disk_config.remote_to_roles_to_dev[remote]):
+ if remote in ctx.disk_config.remote_to_roles_to_dev and osd in ctx.disk_config.remote_to_roles_to_dev[remote]:
dev = ctx.disk_config.remote_to_roles_to_dev[remote][osd]
- mount_options = ctx.disk_config.\
- remote_to_roles_to_dev_mount_options[remote][osd]
+ mount_options = ctx.disk_config.remote_to_roles_to_dev_mount_options[remote][osd]
fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][osd]
mnt = os.path.join('/var/lib/ceph/osd', 'ceph-{id}'.format(id=osd))
- log.info('Mounting osd.{o}: dev: {n}, '
- 'mountpoint: {p}, type: {t}, options: {v}'.format(
- o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options))
+ log.info('Mounting osd.{o}: dev: {n}, mountpoint: {p}, type: {t}, options: {v}'.format(
+ o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options))
remote.run(
args=[
)
+def cmd_exists(cmd):
+ return subprocess.call("type " + cmd, shell=True,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0
+
+
class Thrasher:
"""
Object used to thrash Ceph
self.revive_timeout += 120
self.clean_wait = self.config.get('clean_wait', 0)
self.minin = self.config.get("min_in", 3)
+ if cmd_exists("ceph-objectstore-tool"):
+ self.ceph_objectstore_tool = self.config.get('ceph_objectstore_tool', False)
+ else:
+ self.ceph_objectstore_tool = False
self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
num_osds = self.in_osds + self.out_osds
manager.raw_cluster_cmd('--', 'mon', 'tell', '*', 'injectargs',
'--mon-osd-down-out-interval 0')
self.thread = gevent.spawn(self.do_thrash)
- if self.cmd_exists_on_osds("ceph-objectstore-tool"):
- self.ceph_objectstore_tool = self.config.get('ceph_objectstore_tool', True)
- self.test_rm_past_intervals = self.config.get('test_rm_past_intervals', True)
- else:
- self.ceph_objectstore_tool = False
- self.test_rm_past_intervals = False
- self.log("Unable to test ceph_objectstore_tool, not available on all OSD nodes")
-
- def cmd_exists_on_osds(self, cmd):
- allremotes = self.ceph_manager.ctx.cluster.only(teuthology.is_type('osd')).remotes.keys()
- allremotes = list(set(allremotes))
- for remote in allremotes:
- proc = remote.run(args=['type', cmd], wait=True, check_status=False, stdout=StringIO(), stderr=StringIO())
- if proc.exitstatus != 0:
- return False;
- return True;
def kill_osd(self, osd=None, mark_down=False, mark_out=False):
"""
"""
if osd is None:
osd = random.choice(self.live_osds)
- self.log("Killing osd %s, live_osds are %s" % (str(osd),
- str(self.live_osds)))
+ self.log("Killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
self.live_osds.remove(osd)
self.dead_osds.append(osd)
self.ceph_manager.kill_osd(osd)
self.out_osd(osd)
if self.ceph_objectstore_tool:
self.log("Testing ceph-objectstore-tool on down osd")
- (remote,) = self.ceph_manager.ctx.\
- cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+ (remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
FSPATH = self.ceph_manager.get_filepath()
JPATH = os.path.join(FSPATH, "journal")
exp_osd = imp_osd = osd
exp_remote = imp_remote = remote
# If an older osd is available we'll move a pg from there
- if (len(self.dead_osds) > 1 and
- random.random() < self.chance_move_pg):
+ if len(self.dead_osds) > 1 and random.random() < self.chance_move_pg:
exp_osd = random.choice(self.dead_osds[:-1])
- (exp_remote,) = self.ceph_manager.ctx.\
- cluster.only('osd.{o}'.format(o=exp_osd)).\
- remotes.iterkeys()
- if ('keyvaluestore_backend' in
- self.ceph_manager.ctx.ceph.conf['osd']):
- prefix = ("sudo ceph-objectstore-tool "
- "--data-path {fpath} --journal-path {jpath} "
- "--type keyvaluestore "
- "--log-file="
- "/var/log/ceph/objectstore_tool.\\$pid.log ".
- format(fpath=FSPATH, jpath=JPATH))
+ (exp_remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=exp_osd)).remotes.iterkeys()
+ if 'keyvaluestore_backend' in self.ceph_manager.ctx.ceph.conf['osd']:
+ prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --type keyvaluestore-dev --log-file=/var/log/ceph/objectstore_tool.\\$pid.log ".format(fpath=FSPATH, jpath=JPATH)
else:
- prefix = ("sudo ceph-objectstore-tool "
- "--data-path {fpath} --journal-path {jpath} "
- "--log-file="
- "/var/log/ceph/objectstore_tool.\\$pid.log ".
- format(fpath=FSPATH, jpath=JPATH))
+ prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --log-file=/var/log/ceph/objectstore_tool.\\$pid.log ".format(fpath=FSPATH, jpath=JPATH)
cmd = (prefix + "--op list-pgs").format(id=exp_osd)
- proc = exp_remote.run(args=cmd, wait=True,
- check_status=False, stdout=StringIO())
+ proc = exp_remote.run(args=cmd, wait=True, check_status=True, stdout=StringIO())
if proc.exitstatus:
- raise Exception("ceph-objectstore-tool: "
- "exp list-pgs failure with status {ret}".
- format(ret=proc.exitstatus))
+ raise Exception("ceph-objectstore-tool: exp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
pgs = proc.stdout.getvalue().split('\n')[:-1]
if len(pgs) == 0:
self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
return
pg = random.choice(pgs)
- exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
- exp_path = os.path.join(exp_path, "data")
- exp_path = os.path.join(exp_path,
- "exp.{pg}.{id}".format(pg=pg, id=exp_osd))
+ exp_path = os.path.join(os.path.join(teuthology.get_testdir(self.ceph_manager.ctx), "data"), "exp.{pg}.{id}".format(pg=pg, id=exp_osd))
# export
- cmd = prefix + "--op export --pgid {pg} --file {file}"
- cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
+ cmd = (prefix + "--op export --pgid {pg} --file {file}").format(id=exp_osd, pg=pg, file=exp_path)
proc = exp_remote.run(args=cmd)
if proc.exitstatus:
- raise Exception("ceph-objectstore-tool: "
- "export failure with status {ret}".
- format(ret=proc.exitstatus))
+ raise Exception("ceph-objectstore-tool: export failure with status {ret}".format(ret=proc.exitstatus))
# remove
- cmd = prefix + "--op remove --pgid {pg}"
- cmd = cmd.format(id=exp_osd, pg=pg)
+ cmd = (prefix + "--op remove --pgid {pg}").format(id=exp_osd, pg=pg)
proc = exp_remote.run(args=cmd)
if proc.exitstatus:
- raise Exception("ceph-objectstore-tool: "
- "remove failure with status {ret}".
- format(ret=proc.exitstatus))
+ raise Exception("ceph-objectstore-tool: remove failure with status {ret}".format(ret=proc.exitstatus))
# If there are at least 2 dead osds we might move the pg
if exp_osd != imp_osd:
# If pg isn't already on this osd, then we will move it there
cmd = (prefix + "--op list-pgs").format(id=imp_osd)
- proc = imp_remote.run(args=cmd, wait=True,
- check_status=False, stdout=StringIO())
+ proc = imp_remote.run(args=cmd, wait=True, check_status=True, stdout=StringIO())
if proc.exitstatus:
- raise Exception("ceph-objectstore-tool: "
- "imp list-pgs failure with status {ret}".
- format(ret=proc.exitstatus))
+ raise Exception("ceph-objectstore-tool: imp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
pgs = proc.stdout.getvalue().split('\n')[:-1]
if pg not in pgs:
- self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
- format(pg=pg, fosd=exp_osd, tosd=imp_osd))
+ self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".format(pg=pg, fosd=exp_osd, tosd=imp_osd))
if imp_remote != exp_remote:
# Copy export file to the other machine
- self.log("Transfer export file from {srem} to {trem}".
- format(srem=exp_remote, trem=imp_remote))
+ self.log("Transfer export file from {srem} to {trem}".format(srem=exp_remote, trem=imp_remote))
tmpexport = Remote.get_file(exp_remote, exp_path)
Remote.put_file(imp_remote, tmpexport, exp_path)
os.remove(tmpexport)
imp_osd = exp_osd
imp_remote = exp_remote
# import
- cmd = (prefix + "--op import --file {file}")
- cmd = cmd.format(id=imp_osd, file=exp_path)
+ cmd = (prefix + "--op import --file {file}").format(id=imp_osd, file=exp_path)
imp_remote.run(args=cmd)
- if proc.exitstatus == 10:
- self.log("Pool went away before processing "
- "an import...ignored");
- elif proc.exitstatus:
- raise Exception("ceph-objectstore-tool: "
- "import failure with status {ret}".
- format(ret=proc.exitstatus))
+ if proc.exitstatus:
+ raise Exception("ceph-objectstore-tool: import failure with status {ret}".format(ret=proc.exitstatus))
cmd = "rm -f {file}".format(file=exp_path)
exp_remote.run(args=cmd)
if imp_remote != exp_remote:
imp_remote.run(args=cmd)
- def rm_past_intervals(self, osd=None):
- """
- :param osd: Osd to find pg to remove past intervals
- """
- if self.test_rm_past_intervals:
- if osd is None:
- osd = random.choice(self.dead_osds)
- self.log("Use ceph_objectstore_tool to remove past intervals")
- (remote,) = self.ceph_manager.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
- FSPATH = self.ceph_manager.get_filepath()
- JPATH = os.path.join(FSPATH, "journal")
- if 'keyvaluestore_backend' in self.ceph_manager.ctx.ceph.conf['osd']:
- prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} --type keyvaluestore ".format(fpath=FSPATH, jpath=JPATH)
- else:
- prefix = "sudo ceph-objectstore-tool --data-path {fpath} --journal-path {jpath} ".format(fpath=FSPATH, jpath=JPATH)
- cmd = (prefix + "--op list-pgs").format(id=osd)
- proc = remote.run(args=cmd, wait=True, check_status=False, stdout=StringIO())
- if proc.exitstatus:
- raise Exception("ceph_objectstore_tool: exp list-pgs failure with status {ret}".format(ret=proc.exitstatus))
- pgs = proc.stdout.getvalue().split('\n')[:-1]
- if len(pgs) == 0:
- self.log("No PGs found for osd.{osd}".format(osd=osd))
- return
- pg = random.choice(pgs)
- cmd = (prefix + "--op rm-past-intervals --pgid {pg}").format(id=osd, pg=pg)
- proc = remote.run(args=cmd)
- if proc.exitstatus:
- raise Exception("ceph_objectstore_tool: rm-past-intervals failure with status {ret}".format(ret=proc.exitstatus))
def blackhole_kill_osd(self, osd=None):
"""
"""
if osd is None:
osd = random.choice(self.live_osds)
- self.log("Blackholing and then killing osd %s, live_osds are %s" %
- (str(osd), str(self.live_osds)))
+ self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd), str(self.live_osds)))
self.live_osds.remove(osd)
self.dead_osds.append(osd)
self.ceph_manager.blackhole_kill_osd(osd)
"""
if osd is None:
osd = random.choice(self.in_osds)
- self.log("Removing osd %s, in_osds are: %s" %
- (str(osd), str(self.in_osds)))
+ self.log("Removing osd %s, in_osds are: %s" % (str(osd), str(self.in_osds)))
self.ceph_manager.mark_out_osd(osd)
self.in_osds.remove(osd)
self.out_osds.append(osd)
self.out_osds.remove(osd)
self.in_osds.append(osd)
self.ceph_manager.mark_in_osd(osd)
- self.log("Added osd %s" % (str(osd),))
+ self.log("Added osd %s"%(str(osd),))
def reweight_osd(self, osd=None):
"""
osd = random.choice(self.in_osds)
val = random.uniform(.1, 1.0)
self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
- self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
- str(osd), str(val))
+ self.ceph_manager.raw_cluster_cmd('osd', 'reweight', str(osd), str(val))
def primary_affinity(self, osd=None):
if osd is None:
else:
pa = 0
self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
- self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
- str(osd), str(pa))
+ self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity', str(osd), str(pa))
def all_up(self):
"""
Increase the size of the pool
"""
pool = self.ceph_manager.get_pool()
- self.log("Growing pool %s" % (pool,))
- self.ceph_manager.expand_pool(pool,
- self.config.get('pool_grow_by', 10),
- self.max_pgs)
+ self.log("Growing pool %s"%(pool,))
+ self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
def fix_pgp_num(self):
"""
Fix number of pgs in pool.
"""
pool = self.ceph_manager.get_pool()
- self.log("fixing pg num pool %s" % (pool,))
+ self.log("fixing pg num pool %s"%(pool,))
self.ceph_manager.set_pool_pgpnum(pool)
def test_pool_min_size(self):
Pause injection testing. Check for osd being down when finished.
"""
the_one = random.choice(self.live_osds)
- self.log("inject_pause on {osd}".format(osd=the_one))
+ self.log("inject_pause on {osd}".format(osd = the_one))
self.log(
"Testing {key} pause injection for duration {duration}".format(
- key=conf_key,
- duration=duration
+ key = conf_key,
+ duration = duration
))
self.log(
"Checking after {after}, should_be_down={shouldbedown}".format(
- after=check_after,
- shouldbedown=should_be_down
+ after = check_after,
+ shouldbedown = should_be_down
))
- self.ceph_manager.set_config(the_one, **{conf_key: duration})
+ self.ceph_manager.set_config(the_one, **{conf_key:duration})
if not should_be_down:
return
time.sleep(check_after)
for i in self.live_osds:
self.ceph_manager.set_config(
i,
- osd_debug_skip_full_check_in_backfill_reservation=
- random.choice(['false', 'true']),
- osd_backfill_full_ratio=0)
+ osd_debug_skip_full_check_in_backfill_reservation = random.choice(
+ ['false', 'true']),
+ osd_backfill_full_ratio = 0)
for i in range(30):
status = self.ceph_manager.compile_pg_status()
if 'backfill' not in status.keys():
for i in self.live_osds:
self.ceph_manager.set_config(
i,
- osd_debug_skip_full_check_in_backfill_reservation='false',
- osd_backfill_full_ratio=0.85)
+ osd_debug_skip_full_check_in_backfill_reservation = \
+ 'false',
+ osd_backfill_full_ratio = 0.85)
def test_map_discontinuity(self):
"""
"""
chance_down = self.config.get('chance_down', 0.4)
chance_test_min_size = self.config.get('chance_test_min_size', 0)
- chance_test_backfill_full = \
- self.config.get('chance_test_backfill_full', 0)
+ chance_test_backfill_full = self.config.get('chance_test_backfill_full', 0)
if isinstance(chance_down, int):
chance_down = float(chance_down) / 100
minin = self.minin
minlive = self.config.get("min_live", 2)
mindead = self.config.get("min_dead", 0)
- self.log('choose_action: min_in %d min_out '
- '%d min_live %d min_dead %d' %
+ self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
(minin, minout, minlive, mindead))
actions = []
if len(self.in_osds) > minin:
actions.append((self.out_osd, 1.0,))
if len(self.live_osds) > minlive and chance_down > 0:
actions.append((self.kill_osd, chance_down,))
- if len(self.dead_osds) > 1:
- actions.append((self.rm_past_intervals, 1.0,))
if len(self.out_osds) > minout:
actions.append((self.in_osd, 1.7,))
if len(self.dead_osds) > mindead:
actions.append((self.revive_osd, 1.0,))
if self.config.get('thrash_primary_affinity', True):
actions.append((self.primary_affinity, 1.0,))
- actions.append((self.reweight_osd,
- self.config.get('reweight_osd', .5),))
- actions.append((self.grow_pool,
- self.config.get('chance_pgnum_grow', 0),))
- actions.append((self.fix_pgp_num,
- self.config.get('chance_pgpnum_fix', 0),))
- actions.append((self.test_pool_min_size,
- chance_test_min_size,))
- actions.append((self.test_backfill_full,
- chance_test_backfill_full,))
+ actions.append((self.reweight_osd, self.config.get('reweight_osd',.5),))
+ actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),))
+ actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),))
+ actions.append((self.test_pool_min_size, chance_test_min_size,))
+ actions.append((self.test_backfill_full, chance_test_backfill_full,))
for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
for scenario in [
- (lambda:
- self.inject_pause(key,
- self.config.get('pause_short', 3),
- 0,
- False),
+ (lambda: self.inject_pause(key,
+ self.config.get('pause_short', 3),
+ 0,
+ False),
self.config.get('chance_inject_pause_short', 1),),
- (lambda:
- self.inject_pause(key,
- self.config.get('pause_long', 80),
- self.config.get('pause_check_after', 70),
- True),
+ (lambda: self.inject_pause(key,
+ self.config.get('pause_long', 80),
+ self.config.get('pause_check_after', 70),
+ True),
self.config.get('chance_inject_pause_long', 0),)]:
actions.append(scenario)
delay = self.config.get("op_delay", 5)
self.log("starting do_thrash")
while not self.stopping:
- to_log = [str(x) for x in ["in_osds: ", self.in_osds,
- "out_osds: ", self.out_osds,
- "dead_osds: ", self.dead_osds,
- "live_osds: ", self.live_osds]]
- self.log(" ".join(to_log))
+ self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
+ "dead_osds: ", self.dead_osds, "live_osds: ",
+ self.live_osds]]))
if random.uniform(0, 1) < (float(delay) / cleanint):
while len(self.dead_osds) > maxdead:
self.revive_osd()
self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
str(osd), str(1))
if random.uniform(0, 1) < float(
- self.config.get('chance_test_map_discontinuity', 0)):
+ self.config.get('chance_test_map_discontinuity', 0)):
self.test_map_discontinuity()
else:
self.ceph_manager.wait_for_recovery(
time.sleep(delay)
self.all_up()
-
-class ObjectStoreTool:
-
- def __init__(self, manager, pool, **kwargs):
- self.manager = manager
- self.pool = pool
- self.osd = kwargs.get('osd', None)
- self.object_name = kwargs.get('object_name', None)
- if self.osd and self.pool and self.object_name:
- if self.osd == "primary":
- self.osd = self.manager.get_object_primary(self.pool,
- self.object_name)
- assert self.osd
- if self.object_name:
- self.pgid = self.manager.get_object_pg_with_shard(self.pool,
- self.object_name,
- self.osd)
- self.remote = self.manager.ctx.\
- cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
- path = self.manager.get_filepath().format(id=self.osd)
- self.paths = ("--data-path {path} --journal-path {path}/journal".
- format(path=path))
-
- def build_cmd(self, options, args, stdin):
- lines = []
- if self.object_name:
- lines.append("object=$(sudo ceph-objectstore-tool "
- "{paths} --pgid {pgid} --op list |"
- "grep '\"oid\":\"{name}\"')".
- format(paths=self.paths,
- pgid=self.pgid,
- name=self.object_name))
- args = '"$object" ' + args
- options += " --pgid {pgid}".format(pgid=self.pgid)
- cmd = ("sudo ceph-objectstore-tool {paths} {options} {args}".
- format(paths=self.paths,
- args=args,
- options=options))
- if stdin:
- cmd = ("echo {payload} | base64 --decode | {cmd}".
- format(payload=base64.encode(kwargs['stdin']),
- cmd=cmd))
- lines.append(cmd)
- return "\n".join(lines)
-
- def run(self, options, args, stdin=None):
- self.manager.kill_osd(self.osd)
- cmd = self.build_cmd(options, args, stdin)
- self.manager.log(cmd)
- try:
- proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
- check_status=False,
- stdout=StringIO(),
- stderr=StringIO())
- proc.wait()
- if proc.exitstatus != 0:
- self.manager.log("failed with " + str(proc.exitstatus))
- error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
- raise Exception(error)
- finally:
- self.manager.revive_osd(self.osd)
-
-
class CephManager:
"""
Ceph manager object.
"""
testdir = teuthology.get_testdir(self.ctx)
ceph_args = [
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'ceph',
- ]
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'ceph',
+ ]
ceph_args.extend(args)
proc = self.controller.run(
args=ceph_args,
"""
testdir = teuthology.get_testdir(self.ctx)
ceph_args = [
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'ceph',
- ]
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'ceph',
+ ]
ceph_args.extend(args)
proc = self.controller.run(
args=ceph_args,
)
return proc
- def rados_write_objects(self, pool, num_objects, size,
- timelimit, threads, cleanup=False):
+ def rados_write_objects(
+ self, pool, num_objects, size, timelimit, threads, cleanup=False):
"""
Write rados objects
Threads not used yet.
'bench', timelimit,
'write'
]
- if not cleanup:
- args.append('--no-cleanup')
+ if not cleanup: args.append('--no-cleanup')
return self.do_rados(self.controller, map(str, args))
def do_put(self, pool, obj, fname):
Get the Remote for the host where a particular service runs.
:param service_type: 'mds', 'osd', 'client'
- :param service_id: The second part of a role, e.g. '0' for
- the role 'client.0'
- :return: a Remote instance for the host where the
- requested role is placed
+ :param service_id: The second part of a role, e.g. '0' for the role 'client.0'
+ :return: a Remote instance for the host where the requested role is placed
"""
for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, service_type):
if id_ == str(service_id):
return _remote
- raise KeyError("Service {0}.{1} not found".format(service_type,
- service_id))
+ raise KeyError("Service {0}.{1} not found".format(service_type, service_id))
- def admin_socket(self, service_type, service_id,
- command, check_status=True):
+ def admin_socket(self, service_type, service_id, command, check_status=True):
"""
Remotely start up ceph specifying the admin socket
- :param command: a list of words to use as the command
- to the admin socket
+ :param command a list of words to use as the command to the admin socket
"""
testdir = teuthology.get_testdir(self.ctx)
remote = self.find_remote(service_type, service_id)
check_status=check_status
)
- def objectstore_tool(self, pool, options, args, **kwargs):
- return ObjectStoreTool(self, pool, **kwargs).run(options, args)
-
def get_pgid(self, pool, pgnum):
"""
:param pool: pool name
'kick_recovery_wq',
'0')
- def wait_run_admin_socket(self, service_type,
- service_id, args=['version'], timeout=75):
+ def wait_run_admin_socket(self, service_type, service_id, args=['version'], timeout=75):
"""
If osd_admin_socket call suceeds, return. Otherwise wait
five seconds and try again.
"""
tries = 0
while True:
- proc = self.admin_socket(service_type, service_id,
- args, check_status=False)
+ proc = self.admin_socket(service_type, service_id, args, check_status=False)
if proc.exitstatus is 0:
break
else:
tries += 1
if (tries * 5) > timeout:
- raise Exception('timed out waiting for admin_socket '
- 'to appear after {type}.{id} restart'.
- format(type=service_type,
- id=service_id))
- self.log("waiting on admin_socket for {type}-{id}, "
- "{command}".format(type=service_type,
- id=service_id,
- command=args))
+ raise Exception('timed out waiting for admin_socket to appear after {type}.{id} restart'.format(
+ type=service_type,
+ id=service_id))
+ self.log(
+ "waiting on admin_socket for {type}-{id}, {command}".format(
+ type=service_type,
+ id=service_id,
+ command=args))
time.sleep(5)
def get_pool_dump(self, pool):
"""
- get the osd dump part of a pool
+ get the osd dump part of a pool
"""
osd_dump = self.get_osd_dump_json()
for i in osd_dump['pools']:
lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
self.raw_osd_status().split('\n'))
self.log(osd_lines)
- in_osds = [int(i[4:].split()[0])
- for i in filter(lambda x: " in " in x, osd_lines)]
- out_osds = [int(i[4:].split()[0])
- for i in filter(lambda x: " out " in x, osd_lines)]
- up_osds = [int(i[4:].split()[0])
- for i in filter(lambda x: " up " in x, osd_lines)]
- down_osds = [int(i[4:].split()[0])
- for i in filter(lambda x: " down " in x, osd_lines)]
- dead_osds = [int(x.id_)
- for x in filter(lambda x:
- not x.running(),
- self.ctx.daemons.
- iter_daemons_of_role('osd'))]
+ in_osds = [int(i[4:].split()[0]) for i in filter(
+ lambda x: " in " in x,
+ osd_lines)]
+ out_osds = [int(i[4:].split()[0]) for i in filter(
+ lambda x: " out " in x,
+ osd_lines)]
+ up_osds = [int(i[4:].split()[0]) for i in filter(
+ lambda x: " up " in x,
+ osd_lines)]
+ down_osds = [int(i[4:].split()[0]) for i in filter(
+ lambda x: " down " in x,
+ osd_lines)]
+ dead_osds = [int(x.id_) for x in
+ filter(lambda x: not x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
live_osds = [int(x.id_) for x in
- filter(lambda x:
- x.running(),
- self.ctx.daemons.iter_daemons_of_role('osd'))]
- return {'in': in_osds, 'out': out_osds, 'up': up_osds,
- 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
- 'raw': osd_lines}
+ filter(lambda x: x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
+ return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds,
+ 'down' : down_osds, 'dead' : dead_osds, 'live' : live_osds,
+ 'raw' : osd_lines}
def get_num_pgs(self):
"""
args = cmd_erasure_code_profile(profile_name, profile)
self.raw_cluster_cmd(*args)
- def create_pool_with_unique_name(self, pg_num=16,
- erasure_code_profile_name=None):
+ def create_pool_with_unique_name(self, pg_num=16, erasure_code_profile_name=None):
"""
Create a pool named unique_pool_X where X is unique.
"""
erasure_code_profile_name=erasure_code_profile_name)
return name
- @contextlib.contextmanager
- def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
- self.create_pool(pool_name, pg_num, erasure_code_profile_name)
- yield
- self.remove_pool(pool_name)
-
- def create_pool(self, pool_name, pg_num=16,
- erasure_code_profile_name=None):
+ def create_pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
"""
Create a pool named from the pool_name parameter.
:param pool_name: name of the pool being created.
:param pg_num: initial number of pgs.
- :param erasure_code_profile_name: if set and !None create an
- erasure coded pool using the profile
+ :param erasure_code_profile_name: if set and !None create an erasure coded pool using the profile
"""
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(pg_num, int)
assert pool_name not in self.pools
- self.log("creating pool_name %s" % (pool_name,))
+ self.log("creating pool_name %s"%(pool_name,))
if erasure_code_profile_name:
- self.raw_cluster_cmd('osd', 'pool', 'create',
- pool_name, str(pg_num), str(pg_num),
- 'erasure', erasure_code_profile_name)
+ self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num), str(pg_num), 'erasure', erasure_code_profile_name)
else:
- self.raw_cluster_cmd('osd', 'pool', 'create',
- pool_name, str(pg_num))
+ self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num))
self.pools[pool_name] = pg_num
def remove_pool(self, pool_name):
assert pool_name in self.pools
self.log("removing pool_name %s" % (pool_name,))
del self.pools[pool_name]
- self.do_rados(self.controller,
- ['rmpool', pool_name, pool_name,
- "--yes-i-really-really-mean-it"])
+ self.do_rados(
+ self.controller,
+ ['rmpool', pool_name, pool_name, "--yes-i-really-really-mean-it"]
+ )
def get_pool(self):
"""
pool_name,
prop,
str(val))
- if r != 11: # EAGAIN
+ if r != 11: # EAGAIN
break
tries += 1
if tries > 50:
- raise Exception('timed out getting EAGAIN '
- 'when setting pool property %s %s = %s' %
- (pool_name, prop, val))
- self.log('got EAGAIN setting pool property, '
- 'waiting a few seconds...')
+ raise Exception('timed out getting EAGAIN when setting pool property %s %s = %s' % (pool_name, prop, val))
+ self.log('got EAGAIN setting pool property, waiting a few seconds...')
time.sleep(2)
def expand_pool(self, pool_name, by, max_pgs):
return
if (self.pools[pool_name] + by) > max_pgs:
return
- self.log("increase pool size by %d" % (by,))
+ self.log("increase pool size by %d"%(by,))
new_pg_num = self.pools[pool_name] + by
self.set_pool_property(pool_name, "pg_num", new_pg_num)
self.pools[pool_name] = new_pg_num
init = self.get_last_scrub_stamp(pool, pgnum)
self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
while init == self.get_last_scrub_stamp(pool, pgnum):
- self.log("waiting for scrub type %s" % (stype,))
+ self.log("waiting for scrub type %s"%(stype,))
time.sleep(10)
def get_single_pg_stats(self, pgid):
return None
- def get_object_pg_with_shard(self, pool, name, osdid):
- """
- """
- pool_dump = self.get_pool_dump(pool)
- object_map = self.get_object_map(pool, name)
- if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
- shard = object_map['acting'].index(osdid)
- return "{pgid}s{shard}".format(pgid=object_map['pgid'],
- shard=shard)
- else:
- return object_map['pgid']
-
- def get_object_primary(self, pool, name):
- """
- """
- object_map = self.get_object_map(pool, name)
- return object_map['acting_primary']
-
- def get_object_map(self, pool, name):
- """
- osd map --format=json converted to a python object
- :returns: the python object
- """
- out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
- return json.loads('\n'.join(out.split('\n')[1:]))
-
def get_osd_dump_json(self):
"""
osd dump --format=json converted to a python object
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
- if (pg['state'].count('active') and
- pg['state'].count('clean') and
- not pg['state'].count('stale')):
+ if pg['state'].count('active') and pg['state'].count('clean') and not pg['state'].count('stale'):
num += 1
return num
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
- if (pg['state'].count('active') and
- not pg['state'].count('recover') and
- not pg['state'].count('backfill') and
- not pg['state'].count('stale')):
+ if pg['state'].count('active') and not pg['state'].count('recover') and not pg['state'].count('backfill') and not pg['state'].count('stale'):
num += 1
return num
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
- if ((pg['state'].count('down') and not
- pg['state'].count('stale')) or
- (pg['state'].count('incomplete') and not
- pg['state'].count('stale'))):
+ if (pg['state'].count('down') and not pg['state'].count('stale')) or \
+ (pg['state'].count('incomplete') and not pg['state'].count('stale')):
num += 1
return num
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
- if ((pg['state'].count('active') and not
- pg['state'].count('stale')) or
- (pg['state'].count('down') and not
- pg['state'].count('stale')) or
- (pg['state'].count('incomplete') and not
- pg['state'].count('stale'))):
+ if (pg['state'].count('active') and not pg['state'].count('stale')) or \
+ (pg['state'].count('down') and not pg['state'].count('stale')) or \
+ (pg['state'].count('incomplete') and not pg['state'].count('stale')):
num += 1
return num
def wait_for_clean(self, timeout=None):
"""
- Returns true when all pgs are clean.
+ Returns trues when all pgs are clean.
"""
self.log("waiting for clean")
start = time.time()
Returns true if all osds are up.
"""
x = self.get_osd_dump()
- return (len(x) == sum([(y['up'] > 0) for y in x]))
+ return (len(x) == \
+ sum([(y['up'] > 0) for y in x]))
def wait_for_all_up(self, timeout=None):
"""
def is_active(self):
"""
- Wrapper to check if all pgs are active
+ Wrapper to check if active
"""
return self.get_num_active() == self.get_num_pgs()
def wait_till_active(self, timeout=None):
"""
- Wait until all pgs are active.
+ Wait until osds are active.
"""
self.log("waiting till active")
start = time.time()
or by stopping.
"""
if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('osd.{o}'.format(o=osd)).
- remotes.iterkeys())
- self.log('kill_osd on osd.{o} '
- 'doing powercycle of {s}'.format(o=osd, s=remote.name))
- assert remote.console is not None, ("powercycling requested "
- "but RemoteConsole is not "
- "initialized. "
- "Check ipmi config.")
+ (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+ self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
+ assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized. Check ipmi config."
remote.console.power_off()
else:
self.ctx.daemons.get_daemon('osd', osd).stop()
or by restarting.
"""
if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('osd.{o}'.format(o=osd)).
- remotes.iterkeys())
- self.log('kill_osd on osd.{o} doing powercycle of {s}'.
- format(o=osd, s=remote.name))
- assert remote.console is not None, ("powercycling requested "
- "but RemoteConsole is not "
- "initialized. "
- "Check ipmi config.")
+ (remote,) = self.ctx.cluster.only('osd.{o}'.format(o=osd)).remotes.iterkeys()
+ self.log('kill_osd on osd.{o} doing powercycle of {s}'.format(o=osd, s=remote.name))
+ assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized. Check ipmi config."
remote.console.power_on()
if not remote.console.check_status(300):
- raise Exception('Failed to revive osd.{o} via ipmi'.
- format(o=osd))
+ raise Exception('Failed to revive osd.{o} via ipmi'.format(o=osd))
teuthology.reconnect(self.ctx, 60, [remote])
mount_osd_data(self.ctx, remote, str(osd))
make_admin_daemon_dir(self.ctx, remote)
"""
self.raw_cluster_cmd('osd', 'in', str(osd))
+
## monitors
+
def signal_mon(self, mon, sig):
"""
Wrapper to local get_deamon call
or by doing a stop.
"""
if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('mon.{m}'.format(m=mon)).
- remotes.iterkeys())
- self.log('kill_mon on mon.{m} doing powercycle of {s}'.
- format(m=mon, s=remote.name))
- assert remote.console is not None, ("powercycling requested "
- "but RemoteConsole is not "
- "initialized. "
- "Check ipmi config.")
-
+ (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
+ self.log('kill_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
+ assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized. Check ipmi config."
remote.console.power_off()
else:
self.ctx.daemons.get_daemon('mon', mon).stop()
or by doing a normal restart.
"""
if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('mon.{m}'.format(m=mon)).
- remotes.iterkeys())
- self.log('revive_mon on mon.{m} doing powercycle of {s}'.
- format(m=mon, s=remote.name))
- assert remote.console is not None, ("powercycling requested "
- "but RemoteConsole is not "
- "initialized. "
- "Check ipmi config.")
-
+ (remote,) = self.ctx.cluster.only('mon.{m}'.format(m=mon)).remotes.iterkeys()
+ self.log('revive_mon on mon.{m} doing powercycle of {s}'.format(m=mon, s=remote.name))
+ assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized. Check ipmi config."
remote.console.power_on()
make_admin_daemon_dir(self.ctx, remote)
self.ctx.daemons.get_daemon('mon', mon).restart()
while not len(self.get_mon_quorum()) == size:
if timeout is not None:
assert time.time() - start < timeout, \
- ('failed to reach quorum size %d '
- 'before timeout expired' % size)
+ 'failed to reach quorum size %d before timeout expired' % size
time.sleep(3)
self.log("quorum is size %d" % size)
Powercyle if set in config, otherwise just stop.
"""
if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
- remotes.iterkeys())
- self.log('kill_mds on mds.{m} doing powercycle of {s}'.
- format(m=mds, s=remote.name))
- assert remote.console is not None, ("powercycling requested "
- "but RemoteConsole is not "
- "initialized. "
- "Check ipmi config.")
+ (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
+ self.log('kill_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
+ assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized. Check ipmi config."
remote.console.power_off()
else:
self.ctx.daemons.get_daemon('mds', mds).stop()
and then restart (using --hot-standby if specified.
"""
if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
- remotes.iterkeys())
- self.log('revive_mds on mds.{m} doing powercycle of {s}'.
- format(m=mds, s=remote.name))
- assert remote.console is not None, ("powercycling requested "
- "but RemoteConsole is not "
- "initialized. "
- "Check ipmi config.")
+ (remote,) = self.ctx.cluster.only('mds.{m}'.format(m=mds)).remotes.iterkeys()
+ self.log('revive_mds on mds.{m} doing powercycle of {s}'.format(m=mds, s=remote.name))
+ assert remote.console is not None, "powercycling requested but RemoteConsole is not initialized. Check ipmi config."
remote.console.power_on()
make_admin_daemon_dir(self.ctx, remote)
args = []
"""
import logging
import time
-import contextlib
import ceph_manager
from teuthology import misc as teuthology
log = logging.getLogger(__name__)
-
-def choose_primary(ctx, pool, num):
+def setup(ctx, config):
"""
- Return primary to test on.
+ Create the repair test pool.
"""
- log.info("Choosing primary")
- return ctx.manager.get_pg_primary(pool, num)
-
+ ctx.manager.wait_for_clean()
+ ctx.manager.create_pool("repair_test_pool", 1)
+ return "repair_test_pool"
-def choose_replica(ctx, pool, num):
+def teardown(ctx, config, pool):
"""
- Return replica to test on.
+ Remove the repair test pool.
"""
- log.info("Choosing replica")
- return ctx.manager.get_pg_replica(pool, num)
-
+ ctx.manager.remove_pool(pool)
+ ctx.manager.wait_for_clean()
-def trunc(ctx, osd, pool, obj):
+def run_test(ctx, config, test):
"""
- truncate an object
+ Setup a test pool, run the test, and clean up afterwards.
+
+ :param test: function passed in, called to run the test.
"""
- log.info("truncating object")
- return ctx.manager.osd_admin_socket(
- osd,
- ['truncobj', pool, obj, '1'])
+ s = setup(ctx, config)
+ test(ctx, config, s)
+ teardown(ctx, config, s)
-
-def dataerr(ctx, osd, pool, obj):
+def choose_primary(ctx):
"""
- cause an error in the data
+ Select a primary for the next test. This routine is typically passed to
+ as a 'chooser function'
"""
- log.info("injecting data err on object")
- return ctx.manager.osd_admin_socket(
- osd,
- ['injectdataerr', pool, obj])
-
-
-def mdataerr(ctx, osd, pool, obj):
+ def ret(pool, num):
+ """
+ Return primary to test on.
+ """
+ log.info("Choosing primary")
+ return ctx.manager.get_pg_primary(pool, num)
+ return ret
+
+def choose_replica(ctx):
"""
- cause an error in the mdata
+ Select a replica for the next test. This routine is typically passed to
+ as a 'chooser function'
"""
- log.info("injecting mdata err on object")
- return ctx.manager.osd_admin_socket(
- osd,
- ['injectmdataerr', pool, obj])
-
-
-def omaperr(ctx, osd, pool, obj):
+ def ret(pool, num):
+ """
+ Return replica to test on.
+ """
+ log.info("Choosing replica")
+ return ctx.manager.get_pg_replica(pool, num)
+ return ret
+
+def trunc(ctx):
"""
- Cause an omap error.
+ Truncate an object in the pool. This function is typically passed as a
+ 'corrupter function'
"""
- log.info("injecting omap err on object")
- return ctx.manager.osd_admin_socket(osd, ['setomapval', pool, obj,
- 'badkey', 'badval'])
-
-
-def repair_test_1(ctx, corrupter, chooser, scrub_type):
+ def ret(osd, pool, obj):
+ """
+ truncate an object
+ """
+ log.info("truncating object")
+ return ctx.manager.osd_admin_socket(
+ osd,
+ ['truncobj', pool, obj, '1'])
+ return ret
+
+def dataerr(ctx):
+ """
+ Generate an error on an object in the pool. This function is typically
+ passed as a 'corrupter function'
+ """
+ def ret(osd, pool, obj):
+ """
+ cause an error in the data
+ """
+ log.info("injecting data err on object")
+ return ctx.manager.osd_admin_socket(
+ osd,
+ ['injectdataerr', pool, obj])
+ return ret
+
+def mdataerr(ctx):
+ """
+ Generate an mdata error on an object in the pool. This function is
+ typically passed as a 'corrupter function'
+ """
+ def ret(osd, pool, obj):
+ """
+ cause an error in the mdata
+ """
+ log.info("injecting mdata err on object")
+ return ctx.manager.osd_admin_socket(
+ osd,
+ ['injectmdataerr', pool, obj])
+ return ret
+
+def omaperr(ctx):
+ """
+ Cause data corruption by injecting omap errors into a pool.
"""
- Creates an object in the pool, corrupts it,
+ def ret(osd, pool, obj):
+ """
+ Cause an omap error.
+ """
+ log.info("injecting omap err on object")
+ return ctx.manager.osd_admin_socket(osd, ['setomapval', pool, obj, 'badkey', 'badval']);
+ return ret
+
+def gen_repair_test_1(corrupter, chooser, scrub_type):
+ """
+ Repair test. Wrapper for the internal ret function.
+
+ The internal ret function creates an object in the pool, corrupts it,
scrubs it, and verifies that the pool is inconsistent. It then repairs
the pool, rescrubs it, and verifies that the pool is consistent
:param chooser: osd type chooser (primary or replica)
:param scrub_type: regular scrub or deep-scrub
"""
- pool = "repair_pool_1"
- ctx.manager.wait_for_clean()
- with ctx.manager.pool(pool, 1):
-
+ def ret(ctx, config, pool):
+ """
+ :param pool: repair test pool
+ """
log.info("starting repair test type 1")
- victim_osd = chooser(ctx, pool, 0)
+ victim_osd = chooser(pool, 0)
# create object
log.info("doing put")
# corrupt object
log.info("corrupting object")
- corrupter(ctx, victim_osd, pool, 'repair_test_obj')
+ corrupter(victim_osd, pool, 'repair_test_obj')
# verify inconsistent
log.info("scrubbing")
# verify consistent
assert not ctx.manager.pg_inconsistent(pool, 0)
log.info("done")
+ return ret
-
-def repair_test_2(ctx, config, chooser):
+def gen_repair_test_2(chooser):
"""
- First creates a set of objects and
+ Repair test. Wrapper for the internal ret function.
+
+ The internal ret function first creates a set of objects and
sets the omap value. It then corrupts an object, does both a scrub
and a deep-scrub, and then corrupts more objects. After that, it
repairs the pool and makes sure that the pool is consistent some
- time after a deep-scrub.
+ time after a deep-scrub.
:param chooser: primary or replica selection routine.
"""
- pool = "repair_pool_2"
- ctx.manager.wait_for_clean()
- with ctx.manager.pool(pool, 1):
+ def ret(ctx, config, pool):
+ """
+ :param pool: repair test pool.
+ """
log.info("starting repair test type 2")
- victim_osd = chooser(ctx, pool, 0)
+ victim_osd = chooser(pool, 0)
first_mon = teuthology.get_first_mon(ctx, config)
(mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
# create object
log.info("doing put and setomapval")
ctx.manager.do_put(pool, 'file1', '/etc/hosts')
- ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file1',
- 'key', 'val'])
+ ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file1', 'key', 'val'])
ctx.manager.do_put(pool, 'file2', '/etc/hosts')
ctx.manager.do_put(pool, 'file3', '/etc/hosts')
ctx.manager.do_put(pool, 'file4', '/etc/hosts')
ctx.manager.do_put(pool, 'file5', '/etc/hosts')
- ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file5',
- 'key', 'val'])
+ ctx.manager.do_rados(mon, ['-p', pool, 'setomapval', 'file5', 'key', 'val'])
ctx.manager.do_put(pool, 'file6', '/etc/hosts')
# corrupt object
log.info("corrupting object")
- omaperr(ctx, victim_osd, pool, 'file1')
+ omaperr(ctx)(victim_osd, pool, 'file1')
# verify inconsistent
log.info("scrubbing")
# Additional corruptions including 2 types for file1
log.info("corrupting more objects")
- dataerr(ctx, victim_osd, pool, 'file1')
- mdataerr(ctx, victim_osd, pool, 'file2')
- trunc(ctx, victim_osd, pool, 'file3')
- omaperr(ctx, victim_osd, pool, 'file6')
+ dataerr(ctx)(victim_osd, pool, 'file1')
+ mdataerr(ctx)(victim_osd, pool, 'file2')
+ trunc(ctx)(victim_osd, pool, 'file3')
+ omaperr(ctx)(victim_osd, pool, 'file6')
# see still inconsistent
log.info("scrubbing")
assert not ctx.manager.pg_inconsistent(pool, 0)
log.info("done")
-
-
-def hinfoerr(ctx, victim, pool, obj):
- """
- cause an error in the hinfo_key
- """
- log.info("remove the hinfo_key")
- ctx.manager.objectstore_tool(pool,
- options='',
- args='rm-attr hinfo_key',
- object_name=obj,
- osd=victim)
-
-
-def repair_test_erasure_code(ctx, corrupter, victim, scrub_type):
- """
- Creates an object in the pool, corrupts it,
- scrubs it, and verifies that the pool is inconsistent. It then repairs
- the pool, rescrubs it, and verifies that the pool is consistent
-
- :param corrupter: error generating function.
- :param chooser: osd type chooser (primary or replica)
- :param scrub_type: regular scrub or deep-scrub
- """
- pool = "repair_pool_3"
- ctx.manager.wait_for_clean()
- with ctx.manager.pool(pool_name=pool, pg_num=1,
- erasure_code_profile_name='default'):
-
- log.info("starting repair test for erasure code")
-
- # create object
- log.info("doing put")
- ctx.manager.do_put(pool, 'repair_test_obj', '/etc/hosts')
-
- # corrupt object
- log.info("corrupting object")
- corrupter(ctx, victim, pool, 'repair_test_obj')
-
- # verify inconsistent
- log.info("scrubbing")
- ctx.manager.do_pg_scrub(pool, 0, scrub_type)
-
- assert ctx.manager.pg_inconsistent(pool, 0)
-
- # repair
- log.info("repairing")
- ctx.manager.do_pg_scrub(pool, 0, "repair")
-
- log.info("re-scrubbing")
- ctx.manager.do_pg_scrub(pool, 0, scrub_type)
-
- # verify consistent
- assert not ctx.manager.pg_inconsistent(pool, 0)
- log.info("done")
-
+ return ret
def task(ctx, config):
"""
- chef:
- install:
- ceph:
- log-whitelist:
- - 'candidate had a read error'
- - 'deep-scrub 0 missing, 1 inconsistent objects'
- - 'deep-scrub 0 missing, 4 inconsistent objects'
- - 'deep-scrub 1 errors'
- - 'deep-scrub 4 errors'
- - '!= known omap_digest'
- - 'repair 0 missing, 1 inconsistent objects'
- - 'repair 0 missing, 4 inconsistent objects'
- - 'repair 1 errors, 1 fixed'
- - 'repair 4 errors, 4 fixed'
- - 'scrub 0 missing, 1 inconsistent'
- - 'scrub 1 errors'
- - 'size 1 != known size'
+ log-whitelist: ['candidate had a read error', 'deep-scrub 0 missing, 1 inconsistent objects', 'deep-scrub 0 missing, 4 inconsistent objects', 'deep-scrub 1 errors', 'deep-scrub 4 errors', '!= known omap_digest', 'repair 0 missing, 1 inconsistent objects', 'repair 0 missing, 4 inconsistent objects', 'repair 1 errors, 1 fixed', 'repair 4 errors, 4 fixed', 'scrub 0 missing, 1 inconsistent', 'scrub 1 errors', 'size 1 != known size']
conf:
osd:
filestore debug inject read err: true
logger=log.getChild('ceph_manager')
)
- ctx.manager.wait_for_all_up()
+ num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
+ log.info('num_osds is %s' % num_osds)
- repair_test_1(ctx, mdataerr, choose_primary, "scrub")
- repair_test_1(ctx, mdataerr, choose_replica, "scrub")
- repair_test_1(ctx, dataerr, choose_primary, "deep-scrub")
- repair_test_1(ctx, dataerr, choose_replica, "deep-scrub")
- repair_test_1(ctx, trunc, choose_primary, "scrub")
- repair_test_1(ctx, trunc, choose_replica, "scrub")
- repair_test_2(ctx, config, choose_primary)
- repair_test_2(ctx, config, choose_replica)
+ while len(ctx.manager.get_osd_status()['up']) < num_osds:
+ time.sleep(10)
- repair_test_erasure_code(ctx, hinfoerr, 'primary', "deep-scrub")
+ tests = [
+ gen_repair_test_1(mdataerr(ctx), choose_primary(ctx), "scrub"),
+ gen_repair_test_1(mdataerr(ctx), choose_replica(ctx), "scrub"),
+ gen_repair_test_1(dataerr(ctx), choose_primary(ctx), "deep-scrub"),
+ gen_repair_test_1(dataerr(ctx), choose_replica(ctx), "deep-scrub"),
+ gen_repair_test_1(trunc(ctx), choose_primary(ctx), "scrub"),
+ gen_repair_test_1(trunc(ctx), choose_replica(ctx), "scrub"),
+ gen_repair_test_2(choose_primary(ctx)),
+ gen_repair_test_2(choose_replica(ctx))
+ ]
+
+ for test in tests:
+ run_test(ctx, config, test)