import threading
import time
+from collections import Callable
from datetime import datetime
from functools import partial, wraps
from itertools import chain
const char * in_buf, size_t in_len, char * buf, size_t out_len)
int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
+ int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
void rados_write_op_omap_clear(rados_write_op_t write_op)
+ void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
+ int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
+ void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
void rados_omap_get_end(rados_omap_iter_t iter)
with nogil:
rados_release_write_op(self.write_op)
+ @requires(('flags', int))
+ def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
+ """
+ Set flags for the last operation added to this write_op.
+ :para flags: flags to apply to the last operation
+ :type flags: int
+ """
+
+ cdef:
+ int _flags = flags
+
+ with nogil:
+ rados_write_op_set_flags(self.write_op, _flags)
+
+
class WriteOpCtx(WriteOp, OpCtx):
"""write operation context manager"""
with nogil:
rados_release_read_op(self.read_op)
+ @requires(('flags', int))
+ def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
+ """
+ Set flags for the last operation added to this read_op.
+ :para flags: flags to apply to the last operation
+ :type flags: int
+ """
+
+ cdef:
+ int _flags = flags
+
+ with nogil:
+ rados_read_op_set_flags(self.read_op, _flags)
+
class ReadOpCtx(ReadOp, OpCtx):
"""read operation context manager"""
if ret != 0:
raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
+ @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
+ def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
+ """
+ excute the real write operation asynchronously
+ :para write_op: write operation object
+ :type write_op: WriteOp
+ :para oid: object name
+ :type oid: str
+ :param oncomplete: what to do when the remove is safe and complete in memory
+ on all replicas
+ :type oncomplete: completion
+ :param onsafe: what to do when the remove is safe and complete on storage
+ on all replicas
+ :type onsafe: completion
+ :para mtime: the time to set the mtime to, 0 for the current time
+ :type mtime: int
+ :para flags: flags to apply to the entire operation
+ :type flags: int
+
+ :raises: :class:`Error`
+ :returns: completion object
+ """
+
+ oid = cstr(oid, 'oid')
+ cdef:
+ WriteOp _write_op = write_op
+ char *_oid = oid
+ Completion completion
+ time_t _mtime = mtime
+ int _flags = flags
+
+ completion = self.__get_completion(oncomplete, onsafe)
+ self.__track_completion(completion)
+
+ with nogil:
+ ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
+ &_mtime, _flags)
+ if ret != 0:
+ completion._cleanup()
+ raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
+ return completion
+
@requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
"""
if ret != 0:
raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
+ @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
+ def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
+ """
+ excute the real read operation
+ :para read_op: read operation object
+ :type read_op: ReadOp
+ :para oid: object name
+ :type oid: str
+ :param oncomplete: what to do when the remove is safe and complete in memory
+ on all replicas
+ :type oncomplete: completion
+ :param onsafe: what to do when the remove is safe and complete on storage
+ on all replicas
+ :type onsafe: completion
+ :para flag: flags to apply to the entire operation
+ :type flag: int
+ """
+ oid = cstr(oid, 'oid')
+ cdef:
+ ReadOp _read_op = read_op
+ char *_oid = oid
+ Completion completion
+ int _flag = flag
+
+ completion = self.__get_completion(oncomplete, onsafe)
+ self.__track_completion(completion)
+
+ with nogil:
+ ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
+ if ret != 0:
+ completion._cleanup()
+ raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
+ return completion
+
@requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
"""
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
- LIBRADOS_SNAP_HEAD, MonitorLog)
+ LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
import time
import threading
import json
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
with WriteOpCtx(self.ioctx) as write_op:
self.ioctx.set_omap(write_op, keys, values)
+ write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
self.ioctx.operate_write_op(write_op, "hw")
with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
eq(ret, 0)
+ read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
self.ioctx.operate_read_op(read_op, "hw")
eq(list(iter), [("2", b"bbb")])
+ def test_set_omap_aio(self):
+ lock = threading.Condition()
+ count = [0]
+ def cb(blah):
+ with lock:
+ count[0] += 1
+ lock.notify()
+ return 0
+
+ keys = ("1", "2", "3", "4")
+ values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
+ with WriteOpCtx(self.ioctx) as write_op:
+ self.ioctx.set_omap(write_op, keys, values)
+ comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
+ comp.wait_for_complete()
+ comp.wait_for_safe()
+ with lock:
+ while count[0] < 2:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+
+ with ReadOpCtx(self.ioctx) as read_op:
+ iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
+ eq(ret, 0)
+ comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
+ comp.wait_for_complete()
+ comp.wait_for_safe()
+ with lock:
+ while count[0] < 4:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+ next(iter)
+ eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
+
+
def test_get_omap_vals_by_keys(self):
keys = ("1", "2", "3", "4")
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
eq(ret, 0)
assert len(out) > 0
- eq(u"pool '\u9ec5' created", out)
+ eq(u"pool '\u9ec5' created", out)
\ No newline at end of file