From f2dbe5edd7114f14b41af59ede41454ff4c47a54 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 11 Dec 2012 14:21:48 -0800 Subject: [PATCH] CephManager: add ability to test split Signed-off-by: Samuel Just --- teuthology/task/ceph_manager.py | 113 +++++++++++++++++++++++++++++++- teuthology/task/thrashosds.py | 6 ++ 2 files changed, 116 insertions(+), 3 deletions(-) diff --git a/teuthology/task/ceph_manager.py b/teuthology/task/ceph_manager.py index eb7707e3af56f..69aa07995df75 100644 --- a/teuthology/task/ceph_manager.py +++ b/teuthology/task/ceph_manager.py @@ -4,6 +4,9 @@ import time import re import gevent import json +import threading +from teuthology import misc as teuthology +from ..orchestra import run class Thrasher: def __init__(self, manager, config, logger=None): @@ -17,6 +20,8 @@ class Thrasher: self.stopping = False self.logger = logger self.config = config + num_osds = self.in_osds + self.out_osds + self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds if self.logger is not None: self.log = lambda x: self.logger.info(x) else: @@ -71,17 +76,30 @@ class Thrasher: self.out_osds.remove(osd) self.in_osds.append(osd) self.ceph_manager.mark_in_osd(osd) + self.log("Added osd %s"%(str(osd),)) def all_up(self): while len(self.dead_osds) > 0: + self.log("reviving osd") self.revive_osd() while len(self.out_osds) > 0: + self.log("inning osd") self.in_osd() def do_join(self): self.stopping = True self.thread.get() + def grow_pool(self): + pool = self.ceph_manager.get_pool() + self.log("Growing pool %s"%(pool,)) + self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs) + + def fix_pgp_num(self): + pool = self.ceph_manager.get_pool() + self.log("fixing pg num pool %s"%(pool,)) + self.ceph_manager.set_pool_pgpnum(pool) + def test_pool_min_size(self): self.log("test_pool_min_size") self.all_up() @@ -107,8 +125,8 @@ class Thrasher: ) def choose_action(self): - chance_down = self.config.get("chance_down", 0) - chance_test_min_size = self.config.get("chance_test_min_size", 0) + chance_down = self.config.get('chance_down', 0) + chance_test_min_size = self.config.get('chance_test_min_size', 0) if isinstance(chance_down, int): chance_down = float(chance_down) / 100 minin = self.config.get("min_in", 2) @@ -117,7 +135,7 @@ class Thrasher: mindead = self.config.get("min_dead", 0) self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' % - (minin,minout,minlive,mindead)) + (minin, minout, minlive, mindead)) actions = [] if len(self.in_osds) > minin: actions.append((self.out_osd, 1.0,)) @@ -127,6 +145,8 @@ class Thrasher: actions.append((self.in_osd, 1.7,)) if len(self.dead_osds) > mindead: actions.append((self.revive_osd, 1.0,)) + actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),)) + actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),)) actions.append((self.test_pool_min_size, chance_test_min_size,)) total = sum([y for (x,y) in actions]) @@ -158,6 +178,7 @@ class Thrasher: class CephManager: def __init__(self, controller, ctx=None, logger=None): + self.lock = threading.RLock() self.ctx = ctx self.controller = controller if (logger): @@ -166,6 +187,8 @@ class CephManager: def tmp(x): print x self.log = tmp + self.pools = {} + self.pools['data'] = self.get_pool_property('data', 'pg_num') def raw_cluster_cmd(self, *args): ceph_args = [ @@ -254,6 +277,82 @@ class CephManager: "\d* pgs:", status).group(0).split()[0]) + def create_pool(self, pool_name, pg_num=1): + with self.lock: + assert isinstance(pool_name, str) + assert isinstance(pg_num, int) + assert pool_name not in self.pools + self.log("creating pool_name %s"%(pool_name,)) + self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num)) + self.pools[pool_name] = pg_num + + def remove_pool(self, pool_name): + with self.lock: + assert isinstance(pool_name, str) + assert pool_name in self.pools + self.log("creating pool_name %s"%(pool_name,)) + del self.pools[pool_name] + self.raw_cluster_cmd('osd', 'pool', 'delete', pool_name) + + def get_pool(self): + with self.lock: + return random.choice(self.pools.keys()); + + def get_pool_pg_num(self, pool_name): + with self.lock: + assert isinstance(pool_name, str) + if pool_name in self.pools: + return self.pools[pool_name] + return 0; + + def get_pool_property(self, pool_name, prop): + with self.lock: + assert isinstance(pool_name, str) + assert isinstance(prop, str) + output = self.raw_cluster_cmd( + 'osd', + 'pool', + 'get', + pool_name, + prop) + return int(output.split()[1]) + + def set_pool_property(self, pool_name, prop, val): + with self.lock: + assert isinstance(pool_name, str) + assert isinstance(prop, str) + assert isinstance(val, int) + self.raw_cluster_cmd( + 'osd', + 'pool', + 'set', + pool_name, + prop, + str(val), + '--allow-experimental-feature') + + def expand_pool(self, pool_name, by, max_pgs): + with self.lock: + assert isinstance(pool_name, str) + assert isinstance(by, int) + assert pool_name in self.pools + if self.get_num_creating() > 0: + return + if (self.pools[pool_name] + by) > max_pgs: + return + self.log("increase pool size by %d"%(by,)) + new_pg_num = self.pools[pool_name] + by + self.set_pool_property(pool_name, "pg_num", new_pg_num) + self.pools[pool_name] = new_pg_num + + def set_pool_pgpnum(self, pool_name): + with self.lock: + assert isinstance(pool_name, str) + assert pool_name in self.pools + if self.get_num_creating() > 0: + return + self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name]) + def list_pg_missing(self, pgid): r = None offset = {} @@ -309,6 +408,14 @@ class CephManager: else: return int(match.group(0).split('/')[0]) + def get_num_creating(self): + pgs = self.get_pg_stats() + num = 0 + for pg in pgs: + if 'creating' in pg['state']: + num += 1 + return num + def get_num_active_clean(self): pgs = self.get_pg_stats() num = 0 diff --git a/teuthology/task/thrashosds.py b/teuthology/task/thrashosds.py index 2548c0a2b8e34..3325cef3cd497 100644 --- a/teuthology/task/thrashosds.py +++ b/teuthology/task/thrashosds.py @@ -57,6 +57,11 @@ def task(ctx, config): to become clean after each cluster change. If this doesn't happen within the timeout, an exception will be raised. + chance_pgnum_grow: (0) chance to increase a pool's size + chance_pgpnum_fix: (0) chance to adjust pgpnum to pg for a pool + pool_grow_by: (10) amount to increase pgnum by + max_pgs_per_pool_osd: (1200) don't expand pools past this size per osd + example: tasks: @@ -80,6 +85,7 @@ def task(ctx, config): ctx=ctx, logger=log.getChild('ceph_manager'), ) + ctx.manager = manager thrash_proc = ceph_manager.Thrasher( manager, config, -- 2.39.5