+import functools
import random
import struct
import os
-from nose import with_setup
+from nose import with_setup, SkipTest
from nose.tools import eq_ as eq, assert_raises
from rados import Rados
from rbd import (RBD, Image, ImageNotFound, InvalidArgument, ImageExists,
rados = None
ioctx = None
+features = None
IMG_NAME = 'foo'
IMG_SIZE = 8 << 20 # 8 MiB
IMG_ORDER = 22 # 4 MiB objects
assert rados.pool_exists('rbd')
global ioctx
ioctx = rados.open_ioctx('rbd')
+ global features
+ features = os.getenv("RBD_FEATURES")
+ if features is not None:
+ features = int(features)
def tearDown():
global ioctx
rados.shutdown()
def create_image():
- global features
- features = os.getenv("RBD_FEATURES")
if features is not None:
- features = int(features)
RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER, old_format=False,
features=int(features))
else:
def remove_image():
RBD().remove(ioctx, IMG_NAME)
-
+
+def require_features(required_features):
+ def wrapper(fn):
+ def _require_features(*args, **kwargs):
+ global features
+ if features is None:
+ raise SkipTest
+ for feature in required_features:
+ if feature & features != feature:
+ raise SkipTest
+ return fn(*args, **kwargs)
+ return functools.wraps(fn)(_require_features)
+ return wrapper
+
def test_version():
RBD().version()
eq(read, data)
self.image.remove_snap('snap1')
- def test_clone(self):
- if features is None or (features & RBD_FEATURE_LAYERING) == 0:
- return 0
- self.image.create_snap('snap1')
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
- with Image(ioctx, 'clone') as clone:
- image_info = self.image.stat()
- clone_info = clone.stat()
- eq(clone_info['size'], image_info['size'])
- eq(clone_info['size'], clone.overlap())
- RBD().remove(ioctx, 'clone')
- self.image.remove_snap('snap1')
-
- def test_clone_resize(self):
- if features is None or (features & RBD_FEATURE_LAYERING) == 0:
- return 0
- self.image.create_snap('snap1')
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
- with Image(ioctx, 'clone') as clone:
- image_info = self.image.stat()
- clone_info = clone.stat()
- eq(clone_info['size'], image_info['size'])
- eq(clone_info['size'], clone.overlap())
-
- clone.resize(IMG_SIZE / 2)
- image_info = self.image.stat()
- clone_info = clone.stat()
- eq(clone_info['size'], IMG_SIZE / 2)
- eq(image_info['size'], IMG_SIZE)
- eq(clone.overlap(), IMG_SIZE / 2)
-
- clone.resize(IMG_SIZE * 2)
- image_info = self.image.stat()
- clone_info = clone.stat()
- eq(clone_info['size'], IMG_SIZE * 2)
- eq(image_info['size'], IMG_SIZE)
- eq(clone.overlap(), IMG_SIZE / 2)
-
- RBD().remove(ioctx, 'clone')
- self.image.remove_snap('snap1')
+class TestClone(object):
- def test_flatten_errors(self):
+ @require_features([RBD_FEATURE_LAYERING])
+ def setUp(self):
global ioctx
- if features is None or (features & RBD_FEATURE_LAYERING) == 0:
- return 0
+ global features
+ self.rbd = RBD()
+ create_image()
+ self.image = Image(ioctx, IMG_NAME)
+ data = rand_data(256)
+ self.image.write(data, IMG_SIZE / 2)
self.image.create_snap('snap1')
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
-
- # test that we can't flatten a non-clone
- assert_raises(InvalidArgument, self.image.flatten)
-
- # test that we can't flatten a snapshot
- with Image(ioctx, 'clone') as clone:
- clone.create_snap('snap2')
- with Image(ioctx, 'clone', 'snap2') as snap2:
- assert_raises(ReadOnlyImage, snap2.flatten)
- snap2.remove('snap2')
+ global features
+ self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
+ self.clone = Image(ioctx, 'clone')
- def test_flatten_basic(self):
+ def tearDown(self):
global ioctx
- if features is None or (features & RBD_FEATURE_LAYERING) == 0:
- return 0
- self.image.create_snap('snap1')
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
+ self.image.remove_snap('snap1')
+ self.image.close()
+ remove_image()
+ self.clone.close()
+ self.rbd.remove(ioctx, 'clone')
- with Image(ioctx, 'clone') as clone:
- clone.flatten()
- eq(0, clone.overlap())
+ def test_stat(self):
+ image_info = self.image.stat()
+ clone_info = self.clone.stat()
+ eq(clone_info['size'], image_info['size'])
+ eq(clone_info['size'], self.clone.overlap())
+
+ def test_resize_stat(self):
+ self.clone.resize(IMG_SIZE / 2)
+ image_info = self.image.stat()
+ clone_info = self.clone.stat()
+ eq(clone_info['size'], IMG_SIZE / 2)
+ eq(image_info['size'], IMG_SIZE)
+ eq(self.clone.overlap(), IMG_SIZE / 2)
+
+ self.clone.resize(IMG_SIZE * 2)
+ image_info = self.image.stat()
+ clone_info = self.clone.stat()
+ eq(clone_info['size'], IMG_SIZE * 2)
+ eq(image_info['size'], IMG_SIZE)
+ eq(self.clone.overlap(), IMG_SIZE / 2)
+
+ def test_resize_io(self):
+ self.clone.resize(0)
+ self.clone.resize(IMG_SIZE)
+ child_data = self.clone.read(IMG_SIZE / 2, 256)
+ eq(child_data, '\0' * 256)
- RBD().remove(ioctx, 'clone')
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
+ def test_read(self):
+ parent_data = self.image.read(IMG_SIZE / 2, 256)
+ child_data = self.clone.read(IMG_SIZE / 2, 256)
+ eq(child_data, parent_data)
- # resize down/up such that overlap is not the whole size
- with Image(ioctx, 'clone') as clone:
- clone.resize(IMG_SIZE / 2)
- clone.resize(IMG_SIZE * 2)
- eq(IMG_SIZE / 2, clone.overlap())
- clone.flatten()
- eq(0, clone.overlap())
+ def test_write(self):
+ parent_data = self.image.read(IMG_SIZE / 2, 256)
+ new_data = rand_data(256)
+ self.clone.write(new_data, IMG_SIZE / 2 + 256)
+ child_data = self.clone.read(IMG_SIZE / 2 + 256, 256)
+ eq(child_data, new_data)
+ child_data = self.clone.read(IMG_SIZE / 2, 256)
+ eq(child_data, parent_data)
+ parent_data = self.image.read(IMG_SIZE / 2 + 256, 256)
+ eq(parent_data, '\0' * 256)
+
+class TestFlatten(TestClone):
+
+ def test_errors(self):
+ # test that we can't flatten a non-clone
+ assert_raises(InvalidArgument, self.image.flatten)
- RBD().remove(ioctx, 'clone')
- self.image.remove_snap('snap1')
+ # test that we can't flatten a snapshot
+ self.clone.create_snap('snap2')
+ self.clone.set_snap('snap2')
+ assert_raises(ReadOnlyImage, self.clone.flatten)
+ self.clone.remove_snap('snap2')
- def test_flatten_smaller_order(self):
+ def check_flatten_with_order(self, new_order):
global ioctx
- if features is None or (features & RBD_FEATURE_LAYERING) == 0:
- return 0
- self.image.create_snap('snap1')
- new_order = IMG_ORDER - 2
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
- features, new_order)
-
- with Image(ioctx, 'clone') as clone:
- clone.flatten()
- eq(0, clone.overlap())
- RBD().remove(ioctx, 'clone')
+ global features
+ self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ features, new_order)
+ #with Image(ioctx, 'clone2') as clone:
+ clone2 = Image(ioctx, 'clone2')
+ clone2.flatten()
+ eq(clone2.overlap(), 0)
+ clone2.close()
+ self.rbd.remove(ioctx, 'clone2')
# flatten after resizing to non-block size
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
- features, new_order)
- with Image(ioctx, 'clone') as clone:
+ self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ features, new_order)
+ with Image(ioctx, 'clone2') as clone:
clone.resize(IMG_SIZE / 2 - 1)
clone.flatten()
eq(0, clone.overlap())
-
- RBD().remove(ioctx, 'clone')
+ self.rbd.remove(ioctx, 'clone2')
# flatten after resizing to non-block size
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
- features, new_order)
- with Image(ioctx, 'clone') as clone:
+ self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ features, new_order)
+ with Image(ioctx, 'clone2') as clone:
clone.resize(IMG_SIZE / 2 + 1)
clone.flatten()
- eq(0, clone.overlap())
+ eq(clone.overlap(), 0)
+ self.rbd.remove(ioctx, 'clone2')
- RBD().remove(ioctx, 'clone')
- self.image.remove_snap('snap1')
+ def test_basic(self):
+ self.check_flatten_with_order(IMG_ORDER)
- def test_flatten_larger_order(self):
- global ioctx
- if features is None or (features & RBD_FEATURE_LAYERING) == 0:
- return 0
- self.image.create_snap('snap1')
- new_order = IMG_ORDER + 2
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
- features, new_order)
-
- with Image(ioctx, 'clone') as clone:
- clone.flatten()
-
- # flatten after resizing to non-block size
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
- features, new_order)
- with Image(ioctx, 'clone') as clone:
- clone.resize(IMG_SIZE / 2 - 1)
- clone.flatten()
- eq(0, clone.overlap())
+ def test_smaller_order(self):
+ self.check_flatten_with_order(IMG_ORDER - 2)
- RBD().remove(ioctx, 'clone')
-
- # flatten after resizing to non-block size
- RBD().clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone',
- features, new_order)
- with Image(ioctx, 'clone') as clone:
- clone.resize(IMG_SIZE / 2 + 1)
- clone.flatten()
- eq(0, clone.overlap())
-
- RBD().remove(ioctx, 'clone')
- self.image.remove_snap('snap1')
+ def test_larger_order(self):
+ self.check_flatten_with_order(IMG_ORDER + 2)