]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
CephManager: add ability to test split
authorSamuel Just <sam.just@inktank.com>
Tue, 11 Dec 2012 22:21:48 +0000 (14:21 -0800)
committerSamuel Just <sam.just@inktank.com>
Tue, 11 Dec 2012 23:11:06 +0000 (15:11 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
teuthology/task/ceph_manager.py
teuthology/task/thrashosds.py

index eb7707e3af56ffa8a94cf1d45ea696edd44857b7..69aa07995df75fc353b7bfedbc63df1ad4463f3f 100644 (file)
@@ -4,6 +4,9 @@ import time
 import re
 import gevent
 import json
+import threading
+from teuthology import misc as teuthology
+from ..orchestra import run
 
 class Thrasher:
     def __init__(self, manager, config, logger=None):
@@ -17,6 +20,8 @@ class Thrasher:
         self.stopping = False
         self.logger = logger
         self.config = config
+        num_osds = self.in_osds + self.out_osds
+        self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
         if self.logger is not None:
             self.log = lambda x: self.logger.info(x)
         else:
@@ -71,17 +76,30 @@ class Thrasher:
         self.out_osds.remove(osd)
         self.in_osds.append(osd)
         self.ceph_manager.mark_in_osd(osd)
+        self.log("Added osd %s"%(str(osd),))
 
     def all_up(self):
         while len(self.dead_osds) > 0:
+            self.log("reviving osd")
             self.revive_osd()
         while len(self.out_osds) > 0:
+            self.log("inning osd")
             self.in_osd()
 
     def do_join(self):
         self.stopping = True
         self.thread.get()
 
+    def grow_pool(self):
+        pool = self.ceph_manager.get_pool()
+        self.log("Growing pool %s"%(pool,))
+        self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
+
+    def fix_pgp_num(self):
+        pool = self.ceph_manager.get_pool()
+        self.log("fixing pg num pool %s"%(pool,))
+        self.ceph_manager.set_pool_pgpnum(pool)
+
     def test_pool_min_size(self):
         self.log("test_pool_min_size")
         self.all_up()
@@ -107,8 +125,8 @@ class Thrasher:
             )
 
     def choose_action(self):
-        chance_down = self.config.get("chance_down", 0)
-        chance_test_min_size = self.config.get("chance_test_min_size", 0)
+        chance_down = self.config.get('chance_down', 0)
+        chance_test_min_size = self.config.get('chance_test_min_size', 0)
         if isinstance(chance_down, int):
             chance_down = float(chance_down) / 100
         minin = self.config.get("min_in", 2)
@@ -117,7 +135,7 @@ class Thrasher:
         mindead = self.config.get("min_dead", 0)
 
         self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
-                 (minin,minout,minlive,mindead))
+                 (minin, minout, minlive, mindead))
         actions = []
         if len(self.in_osds) > minin:
             actions.append((self.out_osd, 1.0,))
@@ -127,6 +145,8 @@ class Thrasher:
             actions.append((self.in_osd, 1.7,))
         if len(self.dead_osds) > mindead:
             actions.append((self.revive_osd, 1.0,))
+        actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),))
+        actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),))
         actions.append((self.test_pool_min_size, chance_test_min_size,))
 
         total = sum([y for (x,y) in actions])
@@ -158,6 +178,7 @@ class Thrasher:
 
 class CephManager:
     def __init__(self, controller, ctx=None, logger=None):
+        self.lock = threading.RLock()
         self.ctx = ctx
         self.controller = controller
         if (logger):
@@ -166,6 +187,8 @@ class CephManager:
             def tmp(x):
                 print x
             self.log = tmp
+        self.pools = {}
+        self.pools['data'] = self.get_pool_property('data', 'pg_num')
 
     def raw_cluster_cmd(self, *args):
         ceph_args = [
@@ -254,6 +277,82 @@ class CephManager:
                 "\d* pgs:",
                 status).group(0).split()[0])
 
+    def create_pool(self, pool_name, pg_num=1):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            assert isinstance(pg_num, int)
+            assert pool_name not in self.pools
+            self.log("creating pool_name %s"%(pool_name,))
+            self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num))
+            self.pools[pool_name] = pg_num
+
+    def remove_pool(self, pool_name):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            assert pool_name in self.pools
+            self.log("creating pool_name %s"%(pool_name,))
+            del self.pools[pool_name]
+            self.raw_cluster_cmd('osd', 'pool', 'delete', pool_name)
+
+    def get_pool(self):
+        with self.lock:
+            return random.choice(self.pools.keys());
+
+    def get_pool_pg_num(self, pool_name):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            if pool_name in self.pools:
+                return self.pools[pool_name]
+            return 0;
+
+    def get_pool_property(self, pool_name, prop):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            assert isinstance(prop, str)
+            output = self.raw_cluster_cmd(
+                'osd',
+                'pool',
+                'get',
+                pool_name,
+                prop)
+            return int(output.split()[1])
+
+    def set_pool_property(self, pool_name, prop, val):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            assert isinstance(prop, str)
+            assert isinstance(val, int)
+            self.raw_cluster_cmd(
+                'osd',
+                'pool',
+                'set',
+                pool_name,
+                prop,
+                str(val),
+                '--allow-experimental-feature')
+
+    def expand_pool(self, pool_name, by, max_pgs):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            assert isinstance(by, int)
+            assert pool_name in self.pools
+            if self.get_num_creating() > 0:
+                return
+            if (self.pools[pool_name] + by) > max_pgs:
+                return
+            self.log("increase pool size by %d"%(by,))
+            new_pg_num = self.pools[pool_name] + by
+            self.set_pool_property(pool_name, "pg_num", new_pg_num)
+            self.pools[pool_name] = new_pg_num
+
+    def set_pool_pgpnum(self, pool_name):
+        with self.lock:
+            assert isinstance(pool_name, str)
+            assert pool_name in self.pools
+            if self.get_num_creating() > 0:
+                return
+            self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
+
     def list_pg_missing(self, pgid):
         r = None
         offset = {}
@@ -309,6 +408,14 @@ class CephManager:
         else:
             return int(match.group(0).split('/')[0])
 
+    def get_num_creating(self):
+        pgs = self.get_pg_stats()
+        num = 0
+        for pg in pgs:
+            if 'creating' in pg['state']:
+                num += 1
+        return num
+
     def get_num_active_clean(self):
         pgs = self.get_pg_stats()
         num = 0
index 2548c0a2b8e34fc2a4df16a40810723064372dee..3325cef3cd49702c18a128bdcc43c2fb58017f0c 100644 (file)
@@ -57,6 +57,11 @@ def task(ctx, config):
        to become clean after each cluster change. If this doesn't
        happen within the timeout, an exception will be raised.
 
+    chance_pgnum_grow: (0) chance to increase a pool's size
+    chance_pgpnum_fix: (0) chance to adjust pgpnum to pg for a pool
+    pool_grow_by: (10) amount to increase pgnum by
+    max_pgs_per_pool_osd: (1200) don't expand pools past this size per osd
+
     example:
 
     tasks:
@@ -80,6 +85,7 @@ def task(ctx, config):
         ctx=ctx,
         logger=log.getChild('ceph_manager'),
         )
+    ctx.manager = manager
     thrash_proc = ceph_manager.Thrasher(
         manager,
         config,