self.rbd.remove(ioctx, clone_name)
def test_group_snap_rollback(self):
- eq([], list(self.group.list_images()))
- self.group.add_image(ioctx, image_name)
- with Image(ioctx, image_name) as image:
- image.write(b'\0' * 256, 0)
+ for _ in range(1, 3):
+ create_image()
+ self.image_names.append(image_name)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ image.write(b'1' * 256, 0)
+ with Image(ioctx, self.image_names[1]) as image:
+ image.write(b'2' * 256, 0)
+ with Image(ioctx, self.image_names[2]) as image:
+ image.write(b'3' * 256, 0)
+ self.group.add_image(ioctx, self.image_names[0])
+ snap_name1 = get_temp_snap_name()
+ self.group.create_snap(snap_name1)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ image.write(b'4' * 256, 0)
+ with Image(ioctx, self.image_names[1]) as image:
+ image.write(b'5' * 256, 0)
+ with Image(ioctx, self.image_names[2]) as image:
+ image.write(b'6' * 256, 0)
+ self.group.add_image(ioctx, self.image_names[1])
+ snap_name2 = get_temp_snap_name()
+ self.group.create_snap(snap_name2)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ image.write(b'7' * 256, 0)
+ with Image(ioctx, self.image_names[1]) as image:
+ image.write(b'8' * 256, 0)
+ with Image(ioctx, self.image_names[2]) as image:
+ image.write(b'9' * 256, 0)
+ self.group.add_image(ioctx, self.image_names[2])
+ snap_name3 = get_temp_snap_name()
+ self.group.create_snap(snap_name3)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ image.write(b'a' * 256, 0)
+ with Image(ioctx, self.image_names[1]) as image:
+ image.write(b'b' * 256, 0)
+ with Image(ioctx, self.image_names[2]) as image:
+ image.write(b'c' * 256, 0)
+
+ for i in range(0, 3):
+ self.group.remove_image(ioctx, self.image_names[i])
+ with Image(ioctx, self.image_names[0]) as image:
+ image_snaps = list(image.list_snaps())
+ assert [s['namespace'] for s in image_snaps] == [RBD_SNAP_NAMESPACE_TYPE_GROUP,
+ RBD_SNAP_NAMESPACE_TYPE_GROUP,
+ RBD_SNAP_NAMESPACE_TYPE_GROUP]
+ with Image(ioctx, self.image_names[1]) as image:
+ image_snaps = list(image.list_snaps())
+ assert [s['namespace'] for s in image_snaps] == [RBD_SNAP_NAMESPACE_TYPE_GROUP,
+ RBD_SNAP_NAMESPACE_TYPE_GROUP]
+ with Image(ioctx, self.image_names[2]) as image:
+ image_snaps = list(image.list_snaps())
+ assert [s['namespace'] for s in image_snaps] == [RBD_SNAP_NAMESPACE_TYPE_GROUP]
+
+ # group = []
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ read = image.read(0, 256)
+ assert read == b'a' * 256
+ with Image(ioctx, self.image_names[1]) as image:
read = image.read(0, 256)
- eq(read, b'\0' * 256)
+ assert read == b'b' * 256
+ with Image(ioctx, self.image_names[2]) as image:
+ read = image.read(0, 256)
+ assert read == b'c' * 256
- global snap_name
- eq([], list(self.group.list_snaps()))
- self.group.create_snap(snap_name)
- eq([snap_name], [snap['name'] for snap in self.group.list_snaps()])
+ # group = [img0]
+ self.group.add_image(ioctx, self.image_names[0])
+ self.group.rollback_to_snap(snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
- with Image(ioctx, image_name) as image:
- data = rand_data(256)
- image.write(data, 0)
+ with Image(ioctx, self.image_names[0]) as image:
+ read = image.read(0, 256)
+ assert read == b'1' * 256
+ with Image(ioctx, self.image_names[1]) as image:
+ read = image.read(0, 256)
+ assert read == b'b' * 256
+ with Image(ioctx, self.image_names[2]) as image:
+ read = image.read(0, 256)
+ assert read == b'c' * 256
+
+ # group = [img1]
+ self.group.remove_image(ioctx, self.image_names[0])
+ self.group.add_image(ioctx, self.image_names[1])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ # group = [img2]
+ self.group.remove_image(ioctx, self.image_names[1])
+ self.group.add_image(ioctx, self.image_names[2])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ # group = [img0 img1]
+ self.group.remove_image(ioctx, self.image_names[2])
+ # re-add in reverse order to test that order doesn't matter
+ self.group.add_image(ioctx, self.image_names[1])
+ self.group.add_image(ioctx, self.image_names[0])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ self.group.rollback_to_snap(snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ with Image(ioctx, self.image_names[0]) as image:
read = image.read(0, 256)
- eq(read, data)
+ assert read == b'4' * 256
+ with Image(ioctx, self.image_names[1]) as image:
+ read = image.read(0, 256)
+ assert read == b'5' * 256
+ with Image(ioctx, self.image_names[2]) as image:
+ read = image.read(0, 256)
+ assert read == b'c' * 256
+
+ # group = [img0 img2]
+ self.group.remove_image(ioctx, self.image_names[1])
+ self.group.add_image(ioctx, self.image_names[2])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ # group = [img1 img2]
+ self.group.remove_image(ioctx, self.image_names[0])
+ self.group.add_image(ioctx, self.image_names[1])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ # group = [img0 img1 img2]
+ self.group.add_image(ioctx, self.image_names[0])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ self.group.rollback_to_snap(snap_name3)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ read = image.read(0, 256)
+ assert read == b'7' * 256
+ with Image(ioctx, self.image_names[1]) as image:
+ read = image.read(0, 256)
+ assert read == b'8' * 256
+ with Image(ioctx, self.image_names[2]) as image:
+ read = image.read(0, 256)
+ assert read == b'9' * 256
- self.group.rollback_to_snap(snap_name)
- with Image(ioctx, image_name) as image:
+ # group = [img0 img1]
+ self.group.remove_image(ioctx, self.image_names[2])
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name1)
+ self.group.rollback_to_snap(snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ read = image.read(0, 256)
+ assert read == b'4' * 256
+ with Image(ioctx, self.image_names[1]) as image:
+ read = image.read(0, 256)
+ assert read == b'5' * 256
+ with Image(ioctx, self.image_names[2]) as image:
read = image.read(0, 256)
- eq(read, b'\0' * 256)
+ assert read == b'9' * 256
- self.group.remove_image(ioctx, image_name)
- eq([], list(self.group.list_images()))
- self.group.remove_snap(snap_name)
- eq([], list(self.group.list_snaps()))
+ # group = [img0]
+ self.group.remove_image(ioctx, self.image_names[1])
+ self.group.rollback_to_snap(snap_name1)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name2)
+ assert_raises(InvalidArgument, self.group.rollback_to_snap, snap_name3)
+
+ with Image(ioctx, self.image_names[0]) as image:
+ read = image.read(0, 256)
+ assert read == b'1' * 256
+ with Image(ioctx, self.image_names[1]) as image:
+ read = image.read(0, 256)
+ assert read == b'5' * 256
+ with Image(ioctx, self.image_names[2]) as image:
+ read = image.read(0, 256)
+ assert read == b'9' * 256
class TestMigration(object):