if self.ceph_manager.set_pool_pgpnum(pool, force):
self.pools_to_fix_pgp_num.discard(pool)
+ def get_rand_pg_acting_set(self, pool_id=None):
+ """
+ Return an acting set of a random PG, you
+ have the option to specify which pool you
+ want the PG from.
+ """
+ pgs = self.ceph_manager.get_pg_stats()
+ if not pgs:
+ self.log('No pgs; doing nothing')
+ return
+ if pool_id:
+ pgs_in_pool = [pg for pg in pgs if int(pg['pgid'].split('.')[0]) == pool_id]
+ pg = random.choice(pgs_in_pool)
+ else:
+ pg = random.choice(pgs)
+ self.log('Choosing PG {id} with acting set {act}'.format(id=pg['pgid'],act=pg['acting']))
+ return pg['acting']
+
+ def get_k_m_ec_pool(self, pool, pool_json):
+ """
+ Returns k and m
+ """
+ k = 0
+ m = 99
+ try:
+ ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
+ ec_profile = pool_json['erasure_code_profile']
+ ec_profile_json = self.ceph_manager.raw_cluster_cmd(
+ 'osd',
+ 'erasure-code-profile',
+ 'get',
+ ec_profile,
+ '--format=json')
+ ec_json = json.loads(ec_profile_json)
+ local_k = int(ec_json['k'])
+ local_m = int(ec_json['m'])
+ self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
+ k=local_k, m=local_m))
+ if local_k > k:
+ self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
+ k = local_k
+ if local_m < m:
+ self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
+ m = local_m
+ except CommandFailedError:
+ self.log("failed to read erasure_code_profile. %s was likely removed", pool)
+ return None, None
+
+ return k, m
+
def test_pool_min_size(self):
"""
- Loop to selectively push PGs below their min_size and test that recovery
- still occurs.
+ Loop to selectively push PGs to their min_size and test that recovery
+ still occurs. We achieve this by randomly picking a PG and fail the OSDs
+ according to the PG's acting set.
"""
self.log("test_pool_min_size")
self.all_up()
self.ceph_manager.wait_for_recovery(
timeout=self.config.get('timeout')
)
- minout = int(self.config.get("min_out", 1))
- minlive = int(self.config.get("min_live", 2))
- mindead = int(self.config.get("min_dead", 1))
self.log("doing min_size thrashing")
self.ceph_manager.wait_for_clean(timeout=180)
assert self.ceph_manager.is_clean(), \
while time.time() - start < self.config.get("test_min_size_duration", 1800):
# look up k and m from all the pools on each loop, in case it
# changes as the cluster runs
- k = 0
- m = 99
- has_pools = False
pools_json = self.ceph_manager.get_osd_dump_json()['pools']
-
+ if len(pools_json) == 0:
+ self.log("No pools yet, waiting")
+ time.sleep(5)
+ continue
for pool_json in pools_json:
pool = pool_json['pool_name']
- has_pools = True
+ pool_id = pool_json['pool']
pool_type = pool_json['type'] # 1 for rep, 3 for ec
min_size = pool_json['min_size']
self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
- try:
- ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
- if pool_type != PoolType.ERASURE_CODED:
- continue
- ec_profile = pool_json['erasure_code_profile']
- ec_profile_json = self.ceph_manager.raw_cluster_cmd(
- 'osd',
- 'erasure-code-profile',
- 'get',
- ec_profile,
- '--format=json')
- ec_json = json.loads(ec_profile_json)
- local_k = int(ec_json['k'])
- local_m = int(ec_json['m'])
- self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
- k=local_k, m=local_m))
- if local_k > k:
- self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
- k = local_k
- if local_m < m:
- self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
- m = local_m
- except CommandFailedError:
- self.log("failed to read erasure_code_profile. %s was likely removed", pool)
+ if pool_type != PoolType.ERASURE_CODED:
continue
-
- if has_pools :
- self.log("using k={k}, m={m}".format(k=k,m=m))
- else:
- self.log("No pools yet, waiting")
- time.sleep(5)
- continue
-
- if minout > len(self.out_osds): # kill OSDs and mark out
- self.log("forced to out an osd")
- self.kill_osd(mark_out=True)
- continue
- elif mindead > len(self.dead_osds): # kill OSDs but force timeout
- self.log("forced to kill an osd")
- self.kill_osd()
- continue
- else: # make mostly-random choice to kill or revive OSDs
- minup = max(minlive, k)
- rand_val = random.uniform(0, 1)
- self.log("choosing based on number of live OSDs and rand val {rand}".\
- format(rand=rand_val))
- if len(self.live_osds) > minup+1 and rand_val < 0.5:
- # chose to knock out as many OSDs as we can w/out downing PGs
-
- most_killable = min(len(self.live_osds) - minup, m)
- self.log("chose to kill {n} OSDs".format(n=most_killable))
- for i in range(1, most_killable):
- self.kill_osd(mark_out=True)
- time.sleep(10)
- # try a few times since there might be a concurrent pool
- # creation or deletion
- with safe_while(
- sleep=25, tries=5,
- action='check for active or peered') as proceed:
- while proceed():
- if self.ceph_manager.all_active_or_peered():
- break
- self.log('not all PGs are active or peered')
- else: # chose to revive OSDs, bring up a random fraction of the dead ones
- self.log("chose to revive osds")
- for i in range(1, int(rand_val * len(self.dead_osds))):
- self.revive_osd(i)
-
- # let PGs repair themselves or our next knockout might kill one
- self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
-
- # / while not self.stopping
- self.all_up_in()
-
- self.ceph_manager.wait_for_recovery(
- timeout=self.config.get('timeout')
- )
+ else:
+ k, m = self.get_k_m_ec_pool(pool, pool_json)
+ if k == None and m == None:
+ continue
+ self.log("using k={k}, m={m}".format(k=k,m=m))
+
+ self.log("dead_osds={d}, live_osds={ld}".format(d=self.dead_osds, ld=self.live_osds))
+ minup = max(min_size, k)
+ # Choose a random PG and kill OSDs until only min_size remain
+ most_killable = min(len(self.live_osds) - minup, m)
+ self.log("chose to kill {n} OSDs".format(n=most_killable))
+ acting_set = self.get_rand_pg_acting_set(pool_id)
+ assert most_killable < len(acting_set)
+ for i in range(0, most_killable):
+ self.kill_osd(osd=acting_set[i], mark_out=True)
+ self.log("dead_osds={d}, live_osds={ld}".format(d=self.dead_osds, ld=self.live_osds))
+ self.log("check for active or peered")
+ with safe_while(
+ sleep=25, tries=5,
+ action='check for active or peered') as proceed:
+ while proceed():
+ if self.ceph_manager.all_active_or_peered():
+ break
+ self.log('not all PGs are active or peered')
+ self.all_up_in() # revive all OSDs
+ # let PGs repair themselves or our next knockout might kill one
+ # wait_for_recovery since some workloads won't be able to go clean
+ self.ceph_manager.wait_for_recovery(
+ timeout=self.config.get('timeout')
+ )
+ # while not self.stopping
+ self.all_up_in() # revive all OSDs
+
+ # Wait until all PGs are active+clean after we have revived all the OSDs
+ self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
def inject_pause(self, conf_key, duration, check_after, should_be_down):
"""