import functools
import random
+import socket
import struct
import os
rados = None
ioctx = None
features = None
-IMG_NAME = 'foo'
+image_idx = 0
+image_name = None
+pool_idx = 0
+pool_name = None
IMG_SIZE = 8 << 20 # 8 MiB
IMG_ORDER = 22 # 4 MiB objects
global rados
rados = Rados(conffile='')
rados.connect()
- assert rados.pool_exists('rbd')
+ global pool_name
+ pool_name = get_temp_pool_name()
+ rados.create_pool(pool_name)
global ioctx
- ioctx = rados.open_ioctx('rbd')
+ ioctx = rados.open_ioctx(pool_name)
global features
features = os.getenv("RBD_FEATURES")
if features is not None:
global ioctx
ioctx.__del__()
global rados
+ rados.delete_pool(pool_name)
rados.shutdown()
+def get_temp_pool_name():
+ global pool_idx
+ pool_idx += 1
+ return "test-rbd-api-" + socket.gethostname() + '-' + str(os.getpid()) + \
+ '-' + str(pool_idx)
+
+def get_temp_image_name():
+ global image_idx
+ image_idx += 1
+ return "image" + str(image_idx)
+
def create_image():
+ global image_name
+ image_name = get_temp_image_name()
if features is not None:
- RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER, old_format=False,
+ RBD().create(ioctx, image_name, IMG_SIZE, IMG_ORDER, old_format=False,
features=int(features))
else:
- RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER, old_format=True)
+ RBD().create(ioctx, image_name, IMG_SIZE, IMG_ORDER, old_format=True)
def remove_image():
- RBD().remove(ioctx, IMG_NAME)
+ if image_name is not None:
+ RBD().remove(ioctx, image_name)
def require_features(required_features):
def wrapper(fn):
rados.conf_set('rbd_default_stripe_count', str(stripe_count or 0))
if stripe_unit is not None:
rados.conf_set('rbd_default_stripe_unit', str(stripe_unit or 0))
+ image_name = get_temp_image_name()
if exception is None:
- RBD().create(ioctx, IMG_NAME, IMG_SIZE)
+ RBD().create(ioctx, image_name, IMG_SIZE)
try:
- with Image(ioctx, IMG_NAME) as image:
+ with Image(ioctx, image_name) as image:
eq(format == 1, image.old_format())
expected_order = order
expected_stripe_unit = 1 << actual_order
eq(expected_stripe_unit, image.stripe_unit())
finally:
- RBD().remove(ioctx, IMG_NAME)
+ RBD().remove(ioctx, image_name)
else:
- assert_raises(exception, RBD().create, ioctx, IMG_NAME, IMG_SIZE)
+ assert_raises(exception, RBD().create, ioctx, image_name, IMG_SIZE)
finally:
for k, v in orig_vals.iteritems():
rados.conf_set(k, v)
def test_context_manager():
with Rados(conffile='') as cluster:
- with cluster.open_ioctx('rbd') as ioctx:
- RBD().create(ioctx, IMG_NAME, IMG_SIZE)
- with Image(ioctx, IMG_NAME) as image:
+ with cluster.open_ioctx(pool_name) as ioctx:
+ image_name = get_temp_image_name()
+ RBD().create(ioctx, image_name, IMG_SIZE)
+ with Image(ioctx, image_name) as image:
data = rand_data(256)
image.write(data, 0)
read = image.read(0, 256)
- RBD().remove(ioctx, IMG_NAME)
+ RBD().remove(ioctx, image_name)
eq(data, read)
def test_open_read_only():
with Rados(conffile='') as cluster:
- with cluster.open_ioctx('rbd') as ioctx:
- RBD().create(ioctx, IMG_NAME, IMG_SIZE)
+ with cluster.open_ioctx(pool_name) as ioctx:
+ image_name = get_temp_image_name()
+ RBD().create(ioctx, image_name, IMG_SIZE)
data = rand_data(256)
- with Image(ioctx, IMG_NAME) as image:
+ with Image(ioctx, image_name) as image:
image.write(data, 0)
image.create_snap('snap')
- with Image(ioctx, IMG_NAME, read_only=True) as image:
+ with Image(ioctx, image_name, read_only=True) as image:
read = image.read(0, 256)
eq(data, read)
assert_raises(ReadOnlyImage, image.write, data, 0)
assert_raises(ReadOnlyImage, image.unprotect_snap, 'snap')
assert_raises(ReadOnlyImage, image.unprotect_snap, 'snap')
assert_raises(ReadOnlyImage, image.flatten)
- with Image(ioctx, IMG_NAME) as image:
+ with Image(ioctx, image_name) as image:
image.remove_snap('snap')
- RBD().remove(ioctx, IMG_NAME)
+ RBD().remove(ioctx, image_name)
eq(data, read)
def test_open_dne():
for i in xrange(100):
- assert_raises(ImageNotFound, Image, ioctx, IMG_NAME + 'dne')
- assert_raises(ImageNotFound, Image, ioctx, IMG_NAME, 'snap')
+ image_name = get_temp_image_name()
+ assert_raises(ImageNotFound, Image, ioctx, image_name + 'dne')
+ assert_raises(ImageNotFound, Image, ioctx, image_name, 'snap')
def test_open_readonly_dne():
for i in xrange(100):
- assert_raises(ImageNotFound, Image, ioctx, IMG_NAME + 'dne', read_only=True)
- assert_raises(ImageNotFound, Image, ioctx, IMG_NAME, 'snap', read_only=True)
+ image_name = get_temp_image_name()
+ assert_raises(ImageNotFound, Image, ioctx, image_name + 'dne',
+ read_only=True)
+ assert_raises(ImageNotFound, Image, ioctx, image_name, 'snap',
+ read_only=True)
def test_remove_dne():
assert_raises(ImageNotFound, remove_image)
@with_setup(create_image, remove_image)
def test_list():
- eq([IMG_NAME], RBD().list(ioctx))
+ eq([image_name], RBD().list(ioctx))
@with_setup(create_image, remove_image)
def test_rename():
rbd = RBD()
- rbd.rename(ioctx, IMG_NAME, IMG_NAME + '2')
- eq([IMG_NAME + '2'], rbd.list(ioctx))
- rbd.rename(ioctx, IMG_NAME + '2', IMG_NAME)
- eq([IMG_NAME], rbd.list(ioctx))
+ image_name2 = get_temp_image_name()
+ rbd.rename(ioctx, image_name, image_name2)
+ eq([image_name2], rbd.list(ioctx))
+ rbd.rename(ioctx, image_name2, image_name)
+ eq([image_name], rbd.list(ioctx))
def rand_data(size):
l = [random.Random().getrandbits(64) for _ in xrange(size/8)]
def setUp(self):
self.rbd = RBD()
create_image()
- self.image = Image(ioctx, IMG_NAME)
+ self.image = Image(ioctx, image_name)
def tearDown(self):
self.image.close()
global ioctx
data = rand_data(256)
self.image.write(data, 256)
- self.image.copy(ioctx, IMG_NAME + '2')
- assert_raises(ImageExists, self.image.copy, ioctx, IMG_NAME + '2')
- copy = Image(ioctx, IMG_NAME + '2')
+ image_name = get_temp_image_name()
+ self.image.copy(ioctx, image_name)
+ assert_raises(ImageExists, self.image.copy, ioctx, image_name)
+ copy = Image(ioctx, image_name)
copy_data = copy.read(256, 256)
copy.close()
- self.rbd.remove(ioctx, IMG_NAME + '2')
+ self.rbd.remove(ioctx, image_name)
eq(data, copy_data)
def test_create_snap(self):
self.image.write(data, 0)
read = self.image.read(0, 256)
eq(read, data)
- at_snapshot = Image(ioctx, IMG_NAME, 'snap1')
+ at_snapshot = Image(ioctx, image_name, 'snap1')
snap_data = at_snapshot.read(0, 256)
at_snapshot.close()
eq(snap_data, '\0' * 256)
read = self.image.read(0, 256)
eq(read, data)
self.image.remove_snap('snap1')
-
+
def test_lock_unlock(self):
assert_raises(ImageNotFound, self.image.unlock, '')
self.image.lock_exclusive('')
self.image.discard(0, 1 << IMG_ORDER)
self.image.create_snap('snap2')
self.image.set_snap('snap2')
- check_diff(self.image, 0, IMG_SIZE,
- 'snap1', [(0, 512, False)])
+ check_diff(self.image, 0, IMG_SIZE, 'snap1', [(0, 512, False)])
self.image.remove_snap('snap1')
self.image.remove_snap('snap2')
global features
self.rbd = RBD()
create_image()
- self.image = Image(ioctx, IMG_NAME)
+ self.image = Image(ioctx, image_name)
data = rand_data(256)
self.image.write(data, IMG_SIZE / 2)
self.image.create_snap('snap1')
global features
self.image.protect_snap('snap1')
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
- self.clone = Image(ioctx, 'clone')
+ self.clone_name = get_temp_image_name()
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx, self.clone_name,
+ features)
+ self.clone = Image(ioctx, self.clone_name)
def tearDown(self):
global ioctx
self.clone.close()
- self.rbd.remove(ioctx, 'clone')
+ self.rbd.remove(ioctx, self.clone_name)
self.image.unprotect_snap('snap1')
self.image.remove_snap('snap1')
self.image.close()
def test_unprotected(self):
self.image.create_snap('snap2')
global features
- assert_raises(InvalidArgument, self.rbd.clone, ioctx, IMG_NAME, 'snap2', ioctx, 'clone2', features)
+ clone_name2 = get_temp_image_name()
+ assert_raises(InvalidArgument, self.rbd.clone, ioctx, image_name,
+ 'snap2', ioctx, clone_name2, features)
self.image.remove_snap('snap2')
def test_unprotect_with_children(self):
# validate parent info of clone created by TestClone.setUp
(pool, image, snap) = self.clone.parent_info()
- eq(pool, 'rbd')
- eq(image, IMG_NAME)
+ eq(pool, pool_name)
+ eq(image, image_name)
eq(snap, 'snap1')
# create a new pool...
- rados.create_pool('rbd2')
- other_ioctx = rados.open_ioctx('rbd2')
+ pool_name2 = get_temp_pool_name()
+ rados.create_pool(pool_name2)
+ other_ioctx = rados.open_ioctx(pool_name2)
# ...with a clone of the same parent
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', other_ioctx, 'other_clone', features)
- self.other_clone = Image(other_ioctx, 'other_clone')
+ other_clone_name = get_temp_image_name()
+ self.rbd.clone(ioctx, image_name, 'snap1', other_ioctx,
+ other_clone_name, features)
+ self.other_clone = Image(other_ioctx, other_clone_name)
# validate its parent info
(pool, image, snap) = self.other_clone.parent_info()
- eq(pool, 'rbd')
- eq(image, IMG_NAME)
+ eq(pool, pool_name)
+ eq(image, image_name)
eq(snap, 'snap1')
# can't unprotect snap with children
# close and remove other pool's clone
self.other_clone.close()
- self.rbd.remove(other_ioctx, 'other_clone')
+ self.rbd.remove(other_ioctx, other_clone_name)
# check that we cannot yet remove the parent snap
assert_raises(ImageBusy, self.image.remove_snap, 'snap1')
other_ioctx.close()
- rados.delete_pool('rbd2')
+ rados.delete_pool(pool_name2)
# unprotect, remove parent snap happen in cleanup, and should succeed
actual = self.image.list_children()
# dedup for cache pools until
# http://tracker.ceph.com/issues/8187 is fixed
- deduped = set([('rbd', image[1]) for image in actual])
+ deduped = set([(pool_name, image[1]) for image in actual])
eq(deduped, set(expected))
def test_list_children(self):
global ioctx
global features
self.image.set_snap('snap1')
- self.check_children([('rbd', 'clone')])
+ self.check_children([(pool_name, self.clone_name)])
self.clone.close()
- self.rbd.remove(ioctx, 'clone')
+ self.rbd.remove(ioctx, self.clone_name)
eq(self.image.list_children(), [])
+ clone_name = get_temp_image_name() + '_'
expected_children = []
for i in xrange(10):
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone%d' % i, features)
- expected_children.append(('rbd', 'clone%d' % i))
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx,
+ clone_name + str(i), features)
+ expected_children.append((pool_name, clone_name + str(i)))
self.check_children(expected_children)
for i in xrange(10):
- self.rbd.remove(ioctx, 'clone%d' % i)
+ self.rbd.remove(ioctx, clone_name + str(i))
expected_children.pop(0)
self.check_children(expected_children)
eq(self.image.list_children(), [])
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
- self.check_children([('rbd', 'clone')])
- self.clone = Image(ioctx, 'clone')
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx, self.clone_name,
+ features)
+ self.check_children([(pool_name, self.clone_name)])
+ self.clone = Image(ioctx, self.clone_name)
def test_flatten_errors(self):
# test that we can't flatten a non-clone
def check_flatten_with_order(self, new_order):
global ioctx
global features
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ clone_name2 = get_temp_image_name()
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx, clone_name2,
features, new_order)
#with Image(ioctx, 'clone2') as clone:
- clone2 = Image(ioctx, 'clone2')
+ clone2 = Image(ioctx, clone_name2)
clone2.flatten()
eq(clone2.overlap(), 0)
clone2.close()
- self.rbd.remove(ioctx, 'clone2')
+ self.rbd.remove(ioctx, clone_name2)
# flatten after resizing to non-block size
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx, clone_name2,
features, new_order)
- with Image(ioctx, 'clone2') as clone:
+ with Image(ioctx, clone_name2) as clone:
clone.resize(IMG_SIZE / 2 - 1)
clone.flatten()
eq(0, clone.overlap())
- self.rbd.remove(ioctx, 'clone2')
+ self.rbd.remove(ioctx, clone_name2)
# flatten after resizing to non-block size
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx, clone_name2,
features, new_order)
- with Image(ioctx, 'clone2') as clone:
+ with Image(ioctx, clone_name2) as clone:
clone.resize(IMG_SIZE / 2 + 1)
clone.flatten()
eq(clone.overlap(), 0)
- self.rbd.remove(ioctx, 'clone2')
+ self.rbd.remove(ioctx, clone_name2)
def test_flatten_basic(self):
self.check_flatten_with_order(IMG_ORDER)
def test_flatten_drops_cache(self):
global ioctx
global features
- self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone2',
+ clone_name2 = get_temp_image_name()
+ self.rbd.clone(ioctx, image_name, 'snap1', ioctx, clone_name2,
features, IMG_ORDER)
- with Image(ioctx, 'clone2') as clone:
- with Image(ioctx, 'clone2') as clone2:
+ with Image(ioctx, clone_name2) as clone:
+ with Image(ioctx, clone_name2) as clone2:
# cache object non-existence
data = clone.read(IMG_SIZE / 2, 256)
clone2_data = clone2.read(IMG_SIZE / 2, 256)
eq(data, after_flatten)
after_flatten = clone2.read(IMG_SIZE / 2, 256)
eq(data, after_flatten)
- self.rbd.remove(ioctx, 'clone2')
+ self.rbd.remove(ioctx, clone_name2)
def test_flatten_multi_level(self):
self.clone.create_snap('snap2')
self.clone.protect_snap('snap2')
- self.rbd.clone(ioctx, 'clone', 'snap2', ioctx, 'clone3', features)
+ clone_name3 = get_temp_image_name()
+ self.rbd.clone(ioctx, self.clone_name, 'snap2', ioctx, clone_name3,
+ features)
self.clone.flatten()
- with Image(ioctx, 'clone3') as clone3:
+ with Image(ioctx, clone_name3) as clone3:
clone3.flatten()
self.clone.unprotect_snap('snap2')
self.clone.remove_snap('snap2')
- self.rbd.remove(ioctx, 'clone3')
+ self.rbd.remove(ioctx, clone_name3)
def test_resize_flatten_multi_level(self):
self.clone.create_snap('snap2')
self.clone.protect_snap('snap2')
- self.rbd.clone(ioctx, 'clone', 'snap2', ioctx, 'clone3', features)
+ clone_name3 = get_temp_image_name()
+ self.rbd.clone(ioctx, self.clone_name, 'snap2', ioctx, clone_name3,
+ features)
self.clone.resize(1)
orig_data = self.image.read(0, 256)
- with Image(ioctx, 'clone3') as clone3:
+ with Image(ioctx, clone_name3) as clone3:
clone3_data = clone3.read(0, 256)
eq(orig_data, clone3_data)
self.clone.flatten()
- with Image(ioctx, 'clone3') as clone3:
+ with Image(ioctx, clone_name3) as clone3:
clone3_data = clone3.read(0, 256)
eq(orig_data, clone3_data)
- self.rbd.remove(ioctx, 'clone3')
+ self.rbd.remove(ioctx, clone_name3)
self.clone.unprotect_snap('snap2')
self.clone.remove_snap('snap2')