LIBRADOS_OP_FLAG_FADVISE_WILLNEED = 0x10
LIBRADOS_OP_FLAG_FADVISE_DONTNEED = 0x20
LIBRADOS_OP_FLAG_FADVISE_NOCACHE = 0x40
+LIBRADOS_SNAP_HEAD = -2
# Are we running Python 2.x
"""
return self.locator_key
+ @requires(('snap_id', int))
+ def set_read(self, snap_id):
+ """
+ Set the snapshot for reading objects.
+
+ To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
+
+ :param snap_id: the snapshot Id
+ :type snap_id: int
+
+ :raises: :class:`TypeError`
+ """
+ self.require_ioctx_open()
+ run_in_thread(self.librados.rados_ioctx_snap_set_read,
+ (self.io, c_uint64(snap_id)))
@requires(('nspace', str_type))
def set_namespace(self, nspace):
from nose.tools import eq_ as eq, ok_ as ok, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
- ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx)
+ ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
+ LIBRADOS_SNAP_HEAD)
import time
import threading
import json
eq(self.ioctx.read("insnap"), b"contents1")
self.ioctx.remove_snap("snap1")
self.ioctx.remove_object("insnap")
+
+ def test_snap_read(self):
+ self.ioctx.write("insnap", b"contents1")
+ self.ioctx.create_snap("snap1")
+ self.ioctx.remove_object("insnap")
+ snap = self.ioctx.lookup_snap("snap1")
+ self.ioctx.set_read(int(snap.snap_id.value))
+ eq(self.ioctx.read("insnap"), b"contents1")
+ self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
+ self.ioctx.write("inhead", b"contents2")
+ eq(self.ioctx.read("inhead"), b"contents2")
+ self.ioctx.remove_snap("snap1")
+ self.ioctx.remove_object("inhead")
def test_set_omap(self):
keys = ("1", "2", "3", "4")