void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
+ uint64_t rados_ioctx_get_id(rados_ioctx_t io)
+ int rados_ioctx_get_pool_name(rados_ioctx_t io, char *buf, unsigned maxlen)
int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
rados_snap_t *snapid)
self.require_ioctx_open()
return SnapIterator(self)
+ def get_pool_id(self):
+ """
+ Get pool id
+
+ :returns: int - pool id
+ """
+ with nogil:
+ ret = rados_ioctx_get_id(self.io)
+ return ret;
+
+ def get_pool_name(self):
+ """
+ Get pool name
+
+ :returns: str - pool name
+ """
+ cdef:
+ int name_len = 10
+ char *name = NULL
+
+ try:
+ while True:
+ name = <char *>realloc_chk(name, name_len)
+ with nogil:
+ ret = rados_ioctx_get_pool_name(self.io, name, name_len)
+ if ret > 0:
+ break
+ elif ret != -errno.ERANGE:
+ raise make_ex(ret, "get pool name error")
+ else:
+ name_len = name_len * 2
+
+ return decode_cstr(name)
+ finally:
+ free(name)
+
+
@requires(('snap_name', str_type))
def create_snap(self, snap_name):
"""
stored_xattrs[key] = value
eq(stored_xattrs, xattrs)
+ def test_get_pool_id(self):
+ eq(self.ioctx.get_pool_id(), self.rados.pool_lookup('test_pool'))
+
+ def test_get_pool_name(self):
+ eq(self.ioctx.get_pool_name(), 'test_pool')
+
def test_create_snap(self):
assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
self.ioctx.create_snap('foo')