]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
added thrashosds
authorSamuel Just <samuel.just@dreamhost.com>
Mon, 13 Jun 2011 23:36:21 +0000 (16:36 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Tue, 14 Jun 2011 00:01:02 +0000 (17:01 -0700)
Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
teuthology/task/ceph_manager.py [new file with mode: 0644]
teuthology/task/thrashosds.py [new file with mode: 0644]

diff --git a/teuthology/task/ceph_manager.py b/teuthology/task/ceph_manager.py
new file mode 100644 (file)
index 0000000..f07cd11
--- /dev/null
@@ -0,0 +1,161 @@
+import random
+import time
+import re
+import gevent
+from orchestra import run
+
+class Thrasher(gevent.Greenlet):
+    def __init__(self, manager, logger = None):
+        self.ceph_manager = manager
+        self.ceph_manager.wait_till_clean()
+        osd_status = self.ceph_manager.get_osd_status()
+        self.in_osds = osd_status['in']
+        self.out_osds = osd_status['out']
+        self.stopping = False
+        self.logger = logger
+        if self.logger != None:
+            self.log = lambda x: self.logger.info(x)
+        else:
+            def tmp(x):
+                print x
+            self.log = tmp
+        gevent.Greenlet.__init__(self, self.do_thrash)
+        self.start()
+
+    def wait_till_clean(self):
+       self.log("Waiting until clean")
+       while not self.ceph_manager.is_clean():
+               time.sleep(3)
+               print "..."
+       self.log("Clean!") 
+
+    def remove_osd(self):
+        osd = random.choice(self.in_osds)
+        self.log("Removing osd %s"%(str(osd),))
+        self.in_osds.remove(osd)
+        self.out_osds.append(osd)
+        self.ceph_manager.mark_out_osd(osd)
+
+    def add_osd(self):
+        osd = random.choice(self.out_osds)
+        self.log("Adding osd %s"%(str(osd),))
+        self.out_osds.remove(osd)
+        self.in_osds.append(osd)
+        self.ceph_manager.mark_in_osd(osd)
+
+    def all_up(self):
+        while len(self.out_osds) > 0:
+            self.add_osd()
+
+    def do_join(self):
+        self.stopping = True
+        self.get()
+
+    def do_thrash(self):
+        CLEANINT=60
+        DELAY=5
+        self.log("starting do_thrash")
+        while not self.stopping:
+            self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds]]))
+            if random.uniform(0,1) < (float(DELAY)/CLEANINT):
+                self.wait_till_clean()
+            if (len(self.out_osds) == 0):
+                self.remove_osd()
+            elif (len(self.in_osds) <= 2):
+               self.add_osd()
+            else:
+               x = random.choice([self.remove_osd, self.add_osd])
+               x()
+            time.sleep(DELAY)
+
+class CephManager:
+    def __init__(self, controller, logger=None):
+        self.controller = controller
+        if (logger):
+            self.log = lambda x: logger.info(x)
+        else:
+            def tmp(x):
+                print x
+            self.log = tmp
+
+    def raw_cluster_cmd(self, suffix):
+        proc = self.controller.run(
+            args=[
+                "/bin/sh", "-c",
+                " ".join([
+                        "LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib",
+                        "/tmp/cephtest/binary/usr/local/bin/ceph -k /tmp/cephtest/ceph.keyring -c "+\
+                            "/tmp/cephtest/ceph.conf " + suffix
+                        ])
+                ],
+            stdout=run.PIPE,
+            wait=False
+            )
+        
+        out = ""
+        tmp = proc.stdout.read(1)
+        while tmp:
+            out += tmp
+            tmp = proc.stdout.read(1)
+        return out
+
+    def raw_cluster_status(self):
+        return self.raw_cluster_cmd("-s")
+        
+    def raw_osd_status(self):
+        return self.raw_cluster_cmd("osd dump -o -")
+
+    def raw_pg_status(self):
+        return self.controller.do_ssh("pg dump -o -")
+
+    def get_osd_status(self):
+        osd_lines = filter(
+            lambda x: x[:3] == 'osd' and (("up" in x) or ("down" in x)),
+            self.raw_osd_status().split('\n'))
+        self.log(osd_lines)
+        in_osds = [int(i[3:].split()[0]) for i in filter(
+                lambda x: " in " in x,
+                osd_lines)]
+        out_osds = [int(i[3:].split()[0]) for i in filter(
+                lambda x: " out " in x,
+                osd_lines)]
+        up_osds = [int(i[3:].split()[0]) for i in filter(
+                lambda x: " up " in x,
+                osd_lines)]
+        down_osds = [int(i[3:].split()[0]) for i in filter(
+                lambda x: " down " in x,
+                osd_lines)]
+        return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds, 
+                 'down' : down_osds, 'raw' : osd_lines }
+
+    def get_num_pgs(self):
+        status = self.raw_cluster_status()
+        return int(re.search(
+                "\d* pgs:",
+                status).group(0).split()[0])
+
+    def get_num_active_clean(self):
+        status = self.raw_cluster_status()
+        self.log(status)
+        match = re.search(
+            "\d* active.clean",
+            status)
+        if match == None:
+            return 0
+        else:
+            return int(match.group(0).split()[0])
+
+    def is_clean(self):
+        return self.get_num_active_clean() == self.get_num_pgs()
+
+    def wait_till_clean(self):
+        self.log("waiting till clean")
+        while not self.is_clean():
+            time.sleep(3)
+        self.log("clean!")
+
+    def mark_out_osd(self, osd):
+        self.raw_cluster_cmd("osd out %s"%(str(osd,)))
+
+    def mark_in_osd(self, osd):
+        self.raw_cluster_cmd("osd in %s"%(str(osd,)))
diff --git a/teuthology/task/thrashosds.py b/teuthology/task/thrashosds.py
new file mode 100644 (file)
index 0000000..0c87446
--- /dev/null
@@ -0,0 +1,36 @@
+import contextlib
+import logging
+import ceph_manager
+
+log = logging.getLogger(__name__)
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Run thrashosds
+
+    There is no configuration, all commands are run on mon0 and it stops when
+    __exit__ is called.
+
+    example:
+
+    tasks:
+    - ceph:
+    - thrashosds: 
+    - interactive:
+    """
+    log.info('Beginning thrashosds...')
+    (mon,) = ctx.cluster.only('mon.0').remotes.iterkeys()
+    manager = ceph_manager.CephManager(
+        mon, 
+        logger = log.getChild('ceph_manager'),
+        )
+    thrash_proc = ceph_manager.Thrasher(
+        manager,
+        logger = log.getChild('thrasher'),
+        )
+    try:
+        yield
+    finally:
+        log.info('joining thrashosds')
+        thrash_proc.do_join()