time_t tv_sec
long tv_nsec
-cdef extern from "limits.h":
- cdef uint64_t INT64_MAX
-
cdef extern from "rados/librados.h":
enum:
_LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
int rbd_snap_remove(rbd_image_t image, const char *snapname)
int rbd_snap_remove2(rbd_image_t image, const char *snapname, uint32_t flags,
librbd_progress_fn_t cb, void *cbdata)
+ int rbd_snap_remove_by_id(rbd_image_t image, uint64_t snap_id)
int rbd_snap_rollback(rbd_image_t image, const char *snapname)
int rbd_snap_rename(rbd_image_t image, const char *snapname,
const char* dstsnapsname)
size_t snap_group_namespace_size)
void rbd_snap_group_namespace_cleanup(rbd_snap_group_namespace_t *group_spec,
size_t snap_group_namespace_size)
+ int rbd_snap_get_trash_namespace(rbd_image_t image, uint64_t snap_id,
+ char *original_name, size_t max_length)
int rbd_flatten(rbd_image_t image)
if ret != 0:
raise make_ex(ret, 'error removing snapshot %s from %s with flags %llx' % (name, self.name, flags))
+ def remove_snap_by_id(self, snap_id):
+ """
+ Delete a snapshot of the image by its id.
+
+ :param id: the id of the snapshot
+ :type name: int
+ :raises: :class:`IOError`, :class:`ImageBusy`
+ """
+ cdef:
+ uint64_t _snap_id = snap_id
+ with nogil:
+ ret = rbd_snap_remove_by_id(self.image, _snap_id)
+ if ret != 0:
+ raise make_ex(ret, 'error removing snapshot %s from %s' % (snap_id, self.name))
+
def rollback_to_snap(self, name):
"""
Revert the image to its contents at a snapshot. This is a
"""
Get the snapshot namespace type.
:param snap_id: the snapshot id of a snap shot
+ :type key: int
"""
cdef:
rbd_snap_namespace_type_t namespace_type
"""
get the group namespace details.
:param snap_id: the snapshot id of the group snapshot
+ :type key: int
+ :returns: dict - contains the following keys:
+
+ * ``pool`` (int) - pool id
+
+ * ``name`` (str) - group name
+
+ * ``snap_name`` (str) - group snap name
"""
cdef:
rbd_snap_group_namespace_t group_namespace
sizeof(rbd_snap_group_namespace_t))
return info
+ def snap_get_trash_namespace(self, snap_id):
+ """
+ get the trash namespace details.
+ :param snap_id: the snapshot id of the trash snapshot
+ :type key: int
+ :returns: dict - contains the following keys:
+
+ * ``original_name`` (str) - original snap name
+ """
+ cdef:
+ uint64_t _snap_id = snap_id
+ size_t _size = 512
+ char *_name = NULL
+ try:
+ while True:
+ _name = <char*>realloc_chk(_name, _size);
+ with nogil:
+ ret = rbd_snap_get_trash_namespace(self.image, _snap_id,
+ _name, _size)
+ if ret >= 0:
+ break
+ elif ret != -errno.ERANGE:
+ raise make_ex(ret, 'error getting snapshot trash '
+ 'namespace image: %s, snap_id: %d' % (self.name, snap_id))
+ return {
+ 'original_name' : decode_cstr(_name)
+ }
+ finally:
+ free(_name)
+
+
cdef class LockOwnerIterator(object):
"""
Iterator over managed lock owners for an image
* ``size`` (int) - size of the image at the time of snapshot (in bytes)
* ``name`` (str) - name of the snapshot
+
+ * ``namespace`` (int) - enum for snap namespace
+
+ * ``group`` (dict) - optional for group namespace snapshots
+
+ * ``trash`` (dict) - optional for trash namespace snapshots
"""
cdef rbd_snap_info_t *snaps
def __iter__(self):
for i in range(self.num_snaps):
- yield {
+ s = {
'id' : self.snaps[i].id,
'size' : self.snaps[i].size,
'name' : decode_cstr(self.snaps[i].name),
+ 'namespace' : self.image.snap_get_namespace_type(self.snaps[i].id)
}
+ if s['namespace'] == RBD_SNAP_NAMESPACE_TYPE_GROUP:
+ try:
+ group = self.image.snap_get_group_namespace(self.snaps[i].id)
+ except:
+ group = None
+ s['group'] = group
+ elif s['namespace'] == RBD_SNAP_NAMESPACE_TYPE_TRASH:
+ try:
+ trash = self.image.snap_get_trash_namespace(self.snaps[i].id)
+ except:
+ trash = None
+ s['trash'] = trash
+ yield s
def __dealloc__(self):
if self.snaps:
RBD_MIRROR_MODE_DISABLED, RBD_MIRROR_MODE_IMAGE,
RBD_MIRROR_MODE_POOL, RBD_MIRROR_IMAGE_ENABLED,
RBD_MIRROR_IMAGE_DISABLED, MIRROR_IMAGE_STATUS_STATE_UNKNOWN,
- RBD_LOCK_MODE_EXCLUSIVE, RBD_OPERATION_FEATURE_GROUP)
+ RBD_LOCK_MODE_EXCLUSIVE, RBD_OPERATION_FEATURE_GROUP,
+ RBD_SNAP_NAMESPACE_TYPE_TRASH)
rados = None
ioctx = None
self.clone.unprotect_snap('snap2')
self.clone.remove_snap('snap2')
+ def test_trash_snapshot(self):
+ self.image.create_snap('snap2')
+ global features
+ clone_name = get_temp_image_name()
+ rados.conf_set("rbd_default_clone_format", "2")
+ self.rbd.clone(ioctx, image_name, 'snap2', ioctx, clone_name, features)
+ rados.conf_set("rbd_default_clone_format", "auto")
+
+ self.image.remove_snap('snap2')
+ self.rbd.remove(ioctx, clone_name)
+
+ snaps = [s for s in self.image.list_snaps() if s['name'] != 'snap1']
+ eq([RBD_SNAP_NAMESPACE_TYPE_TRASH], [s['namespace'] for s in snaps])
+ eq([{'original_name' : 'snap2'}], [s['trash'] for s in snaps])
+
class TestExclusiveLock(object):
@require_features([RBD_FEATURE_EXCLUSIVE_LOCK])
eq([snap_name], [snap['name'] for snap in self.group.list_snaps()])
for snap in self.image.list_snaps():
- eq(rbd.RBD_SNAP_NAMESPACE_TYPE_GROUP,
- self.image.snap_get_namespace_type(snap['id']))
-
- info = self.image.snap_get_group_namespace(snap['id'])
+ eq(rbd.RBD_SNAP_NAMESPACE_TYPE_GROUP, snap['namespace'])
+ info = snap['group']
eq(group_name, info['group_name'])
eq(snap_name, info['group_snap_name'])