]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test_rbd.py: refactor cloning tests
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 25 Jul 2012 21:32:56 +0000 (14:32 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Sat, 4 Aug 2012 01:48:53 +0000 (18:48 -0700)
Move into a separate class that requires layering to be enabled,
so the common step of creating and deleting a clone doesn't
need to be repeated in each test.

Move flatten tests into a subclass so they can be run separately
more easily.

Move the checks for the layering feature into a generic decorator
that skips tests if the specified feature is not being used.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/test/pybind/test_rbd.py

index 4bd8de9c0bba21ad08c5b7b919cd950468e7bf66..2ae47dbf2f547f2783ba0cf09316747a4e401d5f 100644 (file)
@@ -1,8 +1,9 @@
+import functools
 import random
 import struct
 import os
 
-from nose import with_setup
+from nose import with_setup, SkipTest
 from nose.tools import eq_ as eq, assert_raises
 from rados import Rados
 from rbd import (RBD, Image, ImageNotFound, InvalidArgument, ImageExists,
@@ -12,6 +13,7 @@ from rbd import (RBD, Image, ImageNotFound, InvalidArgument, ImageExists,
 
 rados = None
 ioctx = None
+features = None
 IMG_NAME = 'foo'
 IMG_SIZE = 8 << 20 # 8 MiB
 IMG_ORDER = 22 # 4 MiB objects
@@ -23,6 +25,10 @@ def setUp():
     assert rados.pool_exists('rbd')
     global ioctx
     ioctx = rados.open_ioctx('rbd')
+    global features
+    features = os.getenv("RBD_FEATURES")
+    if features is not None:
+        features = int(features)
 
 def tearDown():
     global ioctx
@@ -31,10 +37,7 @@ def tearDown():
     rados.shutdown()
 
 def create_image():
-    global features
-    features = os.getenv("RBD_FEATURES")
     if features is not None:
-        features = int(features)
         RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER, old_format=False,
                      features=int(features))
     else:
@@ -42,7 +45,20 @@ def create_image():
 
 def remove_image():
     RBD().remove(ioctx, IMG_NAME)
-    
+
+def require_features(required_features):
+    def wrapper(fn):
+        def _require_features(*args, **kwargs):
+            global features
+            if features is None:
+                raise SkipTest
+            for feature in required_features:
+                if feature & features != feature:
+                    raise SkipTest
+            return fn(*args, **kwargs)
+        return functools.wraps(fn)(_require_features)
+    return wrapper
+
 def test_version():
     RBD().version()
 
@@ -367,153 +383,120 @@ class TestImage(object):
         eq(read, data)
         self.image.remove_snap('snap1')
 
-    def test_clone(self):
-        if features is None or (features & RBD_FEATURE_LAYERING) == 0:
-            return 0
-        self.image.create_snap('snap1')
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
-        with Image(ioctx, 'clone') as clone:
-            image_info = self.image.stat()
-            clone_info = clone.stat()
-            eq(clone_info['size'], image_info['size'])
-            eq(clone_info['size'], clone.overlap())
-        RBD().remove(ioctx, 'clone')
-        self.image.remove_snap('snap1')
-
-    def test_clone_resize(self):
-        if features is None or (features & RBD_FEATURE_LAYERING) == 0:
-            return 0
-        self.image.create_snap('snap1')
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
-        with Image(ioctx, 'clone') as clone:
-            image_info = self.image.stat()
-            clone_info = clone.stat()
-            eq(clone_info['size'], image_info['size'])
-            eq(clone_info['size'], clone.overlap())
-
-            clone.resize(IMG_SIZE / 2)
-            image_info = self.image.stat()
-            clone_info = clone.stat()
-            eq(clone_info['size'], IMG_SIZE / 2)
-            eq(image_info['size'], IMG_SIZE)
-            eq(clone.overlap(), IMG_SIZE / 2)
-
-            clone.resize(IMG_SIZE * 2)
-            image_info = self.image.stat()
-            clone_info = clone.stat()
-            eq(clone_info['size'], IMG_SIZE * 2)
-            eq(image_info['size'], IMG_SIZE)
-            eq(clone.overlap(), IMG_SIZE / 2)
-
-        RBD().remove(ioctx, 'clone')
-        self.image.remove_snap('snap1')
+class TestClone(object):
 
-    def test_flatten_errors(self):
+    @require_features([RBD_FEATURE_LAYERING])
+    def setUp(self):
         global ioctx
-        if features is None or (features & RBD_FEATURE_LAYERING) == 0:
-            return 0
+        global features
+        self.rbd = RBD()
+        create_image()
+        self.image = Image(ioctx, IMG_NAME)
+        data = rand_data(256)
+        self.image.write(data, IMG_SIZE / 2)
         self.image.create_snap('snap1')
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
-
-        # test that we can't flatten a non-clone
-        assert_raises(InvalidArgument, self.image.flatten)
-
-        # test that we can't flatten a snapshot
-        with Image(ioctx, 'clone') as clone:
-            clone.create_snap('snap2')
-        with Image(ioctx, 'clone', 'snap2') as snap2:
-            assert_raises(ReadOnlyImage, snap2.flatten)
-            snap2.remove('snap2')
+        global features
+        self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
+        self.clone = Image(ioctx, 'clone')
 
-    def test_flatten_basic(self):
+    def tearDown(self):
         global ioctx
-        if features is None or (features & RBD_FEATURE_LAYERING) == 0:
-            return 0
-        self.image.create_snap('snap1')
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
+        self.image.remove_snap('snap1')
+        self.image.close()
+        remove_image()
+        self.clone.close()
+        self.rbd.remove(ioctx, 'clone')
 
-        with Image(ioctx, 'clone') as clone:
-            clone.flatten()
-            eq(0, clone.overlap())
+    def test_stat(self):
+        image_info = self.image.stat()
+        clone_info = self.clone.stat()
+        eq(clone_info['size'], image_info['size'])
+        eq(clone_info['size'], self.clone.overlap())
+
+    def test_resize_stat(self):
+        self.clone.resize(IMG_SIZE / 2)
+        image_info = self.image.stat()
+        clone_info = self.clone.stat()
+        eq(clone_info['size'], IMG_SIZE / 2)
+        eq(image_info['size'], IMG_SIZE)
+        eq(self.clone.overlap(), IMG_SIZE / 2)
+
+        self.clone.resize(IMG_SIZE * 2)
+        image_info = self.image.stat()
+        clone_info = self.clone.stat()
+        eq(clone_info['size'], IMG_SIZE * 2)
+        eq(image_info['size'], IMG_SIZE)
+        eq(self.clone.overlap(), IMG_SIZE / 2)
+
+    def test_resize_io(self):
+        self.clone.resize(0)
+        self.clone.resize(IMG_SIZE)
+        child_data = self.clone.read(IMG_SIZE / 2, 256)
+        eq(child_data, '\0' * 256)
 
-        RBD().remove(ioctx, 'clone')
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
+    def test_read(self):
+        parent_data = self.image.read(IMG_SIZE / 2, 256)
+        child_data = self.clone.read(IMG_SIZE / 2, 256)
+        eq(child_data, parent_data)
 
-        # resize down/up such that overlap is not the whole size
-        with Image(ioctx, 'clone') as clone:
-            clone.resize(IMG_SIZE / 2)
-            clone.resize(IMG_SIZE * 2)
-            eq(IMG_SIZE / 2, clone.overlap())
-            clone.flatten()
-            eq(0, clone.overlap())
+    def test_write(self):
+        parent_data = self.image.read(IMG_SIZE / 2, 256)
+        new_data = rand_data(256)
+        self.clone.write(new_data, IMG_SIZE / 2 + 256)
+        child_data = self.clone.read(IMG_SIZE / 2 + 256, 256)
+        eq(child_data, new_data)
+        child_data = self.clone.read(IMG_SIZE / 2, 256)
+        eq(child_data, parent_data)
+        parent_data = self.image.read(IMG_SIZE / 2 + 256, 256)
+        eq(parent_data, '\0' * 256)
+
+class TestFlatten(TestClone):
+
+    def test_errors(self):
+        # test that we can't flatten a non-clone
+        assert_raises(InvalidArgument, self.image.flatten)
 
-        RBD().remove(ioctx, 'clone')
-        self.image.remove_snap('snap1')
+        # test that we can't flatten a snapshot
+        self.clone.create_snap('snap2')
+        self.clone.set_snap('snap2')
+        assert_raises(ReadOnlyImage, self.clone.flatten)
+        self.clone.remove_snap('snap2')
 
-    def test_flatten_smaller_order(self):
+    def check_flatten_with_order(self, new_order):
         global ioctx
-        if features is None or (features & RBD_FEATURE_LAYERING) == 0:
-            return 0
-        self.image.create_snap('snap1')
-        new_order = IMG_ORDER - 2
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
-                    features, new_order)
-
-        with Image(ioctx, 'clone') as clone:
-            clone.flatten()
-            eq(0, clone.overlap())
-        RBD().remove(ioctx, 'clone')
+        global features
+        self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+                       features, new_order)
+        #with Image(ioctx, 'clone2') as clone:
+        clone2 = Image(ioctx, 'clone2')
+        clone2.flatten()
+        eq(clone2.overlap(), 0)
+        clone2.close()
+        self.rbd.remove(ioctx, 'clone2')
 
         # flatten after resizing to non-block size
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
-                    features, new_order)
-        with Image(ioctx, 'clone') as clone:
+        self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+                       features, new_order)
+        with Image(ioctx, 'clone2') as clone:
             clone.resize(IMG_SIZE / 2 - 1)
             clone.flatten()
             eq(0, clone.overlap())
-
-        RBD().remove(ioctx, 'clone')
+        self.rbd.remove(ioctx, 'clone2')
 
         # flatten after resizing to non-block size
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
-                    features, new_order)
-        with Image(ioctx, 'clone') as clone:
+        self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+                       features, new_order)
+        with Image(ioctx, 'clone2') as clone:
             clone.resize(IMG_SIZE / 2 + 1)
             clone.flatten()
-            eq(0, clone.overlap())
+            eq(clone.overlap(), 0)
+        self.rbd.remove(ioctx, 'clone2')
 
-        RBD().remove(ioctx, 'clone')
-        self.image.remove_snap('snap1')
+    def test_basic(self):
+        self.check_flatten_with_order(IMG_ORDER)
 
-    def test_flatten_larger_order(self):
-        global ioctx
-        if features is None or (features & RBD_FEATURE_LAYERING) == 0:
-            return 0
-        self.image.create_snap('snap1')
-        new_order = IMG_ORDER + 2
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
-                    features, new_order)
-
-        with Image(ioctx, 'clone') as clone:
-            clone.flatten()
-
-        # flatten after resizing to non-block size
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
-                    features, new_order)
-        with Image(ioctx, 'clone') as clone:
-            clone.resize(IMG_SIZE / 2 - 1)
-            clone.flatten()
-            eq(0, clone.overlap())
+    def test_smaller_order(self):
+        self.check_flatten_with_order(IMG_ORDER - 2)
 
-        RBD().remove(ioctx, 'clone')
-
-        # flatten after resizing to non-block size
-        RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
-                    features, new_order)
-        with Image(ioctx, 'clone') as clone:
-            clone.resize(IMG_SIZE / 2 + 1)
-            clone.flatten()
-            eq(0, clone.overlap())
-
-        RBD().remove(ioctx, 'clone')
-        self.image.remove_snap('snap1')
+    def test_larger_order(self):
+        self.check_flatten_with_order(IMG_ORDER + 2)