import re
import gevent
import json
+import threading
+from teuthology import misc as teuthology
+from ..orchestra import run
class Thrasher:
def __init__(self, manager, config, logger=None):
self.stopping = False
self.logger = logger
self.config = config
+ num_osds = self.in_osds + self.out_osds
+ self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
if self.logger is not None:
self.log = lambda x: self.logger.info(x)
else:
self.out_osds.remove(osd)
self.in_osds.append(osd)
self.ceph_manager.mark_in_osd(osd)
+ self.log("Added osd %s"%(str(osd),))
def all_up(self):
while len(self.dead_osds) > 0:
+ self.log("reviving osd")
self.revive_osd()
while len(self.out_osds) > 0:
+ self.log("inning osd")
self.in_osd()
def do_join(self):
self.stopping = True
self.thread.get()
+ def grow_pool(self):
+ pool = self.ceph_manager.get_pool()
+ self.log("Growing pool %s"%(pool,))
+ self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
+
+ def fix_pgp_num(self):
+ pool = self.ceph_manager.get_pool()
+ self.log("fixing pg num pool %s"%(pool,))
+ self.ceph_manager.set_pool_pgpnum(pool)
+
def test_pool_min_size(self):
self.log("test_pool_min_size")
self.all_up()
)
def choose_action(self):
- chance_down = self.config.get("chance_down", 0)
- chance_test_min_size = self.config.get("chance_test_min_size", 0)
+ chance_down = self.config.get('chance_down', 0)
+ chance_test_min_size = self.config.get('chance_test_min_size', 0)
if isinstance(chance_down, int):
chance_down = float(chance_down) / 100
minin = self.config.get("min_in", 2)
mindead = self.config.get("min_dead", 0)
self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
- (minin,minout,minlive,mindead))
+ (minin, minout, minlive, mindead))
actions = []
if len(self.in_osds) > minin:
actions.append((self.out_osd, 1.0,))
actions.append((self.in_osd, 1.7,))
if len(self.dead_osds) > mindead:
actions.append((self.revive_osd, 1.0,))
+ actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),))
+ actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),))
actions.append((self.test_pool_min_size, chance_test_min_size,))
total = sum([y for (x,y) in actions])
class CephManager:
def __init__(self, controller, ctx=None, logger=None):
+ self.lock = threading.RLock()
self.ctx = ctx
self.controller = controller
if (logger):
def tmp(x):
print x
self.log = tmp
+ self.pools = {}
+ self.pools['data'] = self.get_pool_property('data', 'pg_num')
def raw_cluster_cmd(self, *args):
ceph_args = [
"\d* pgs:",
status).group(0).split()[0])
+ def create_pool(self, pool_name, pg_num=1):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ assert isinstance(pg_num, int)
+ assert pool_name not in self.pools
+ self.log("creating pool_name %s"%(pool_name,))
+ self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num))
+ self.pools[pool_name] = pg_num
+
+ def remove_pool(self, pool_name):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ assert pool_name in self.pools
+ self.log("creating pool_name %s"%(pool_name,))
+ del self.pools[pool_name]
+ self.raw_cluster_cmd('osd', 'pool', 'delete', pool_name)
+
+ def get_pool(self):
+ with self.lock:
+ return random.choice(self.pools.keys());
+
+ def get_pool_pg_num(self, pool_name):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ if pool_name in self.pools:
+ return self.pools[pool_name]
+ return 0;
+
+ def get_pool_property(self, pool_name, prop):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ assert isinstance(prop, str)
+ output = self.raw_cluster_cmd(
+ 'osd',
+ 'pool',
+ 'get',
+ pool_name,
+ prop)
+ return int(output.split()[1])
+
+ def set_pool_property(self, pool_name, prop, val):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ assert isinstance(prop, str)
+ assert isinstance(val, int)
+ self.raw_cluster_cmd(
+ 'osd',
+ 'pool',
+ 'set',
+ pool_name,
+ prop,
+ str(val),
+ '--allow-experimental-feature')
+
+ def expand_pool(self, pool_name, by, max_pgs):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ assert isinstance(by, int)
+ assert pool_name in self.pools
+ if self.get_num_creating() > 0:
+ return
+ if (self.pools[pool_name] + by) > max_pgs:
+ return
+ self.log("increase pool size by %d"%(by,))
+ new_pg_num = self.pools[pool_name] + by
+ self.set_pool_property(pool_name, "pg_num", new_pg_num)
+ self.pools[pool_name] = new_pg_num
+
+ def set_pool_pgpnum(self, pool_name):
+ with self.lock:
+ assert isinstance(pool_name, str)
+ assert pool_name in self.pools
+ if self.get_num_creating() > 0:
+ return
+ self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
+
def list_pg_missing(self, pgid):
r = None
offset = {}
else:
return int(match.group(0).split('/')[0])
+ def get_num_creating(self):
+ pgs = self.get_pg_stats()
+ num = 0
+ for pg in pgs:
+ if 'creating' in pg['state']:
+ num += 1
+ return num
+
def get_num_active_clean(self):
pgs = self.get_pg_stats()
num = 0