rados_read_op_t rados_create_read_op()
void rados_release_read_op(rados_read_op_t read_op)
- int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
+ int rados_aio_create_completion2(void * cb_arg, rados_callback_t cb_complete, rados_completion_t * pc)
void rados_aio_release(rados_completion_t c)
int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
int rados_aio_get_return_value(rados_completion_t c)
int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
- int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
int rados_aio_wait_for_complete(rados_completion_t c)
- int rados_aio_wait_for_safe(rados_completion_t c)
int rados_aio_is_complete(rados_completion_t c)
- int rados_aio_is_safe(rados_completion_t c)
int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
const char * in_buf, size_t in_len, char * buf, size_t out_len)
:returns: True if the operation is safe
"""
- with nogil:
- ret = rados_aio_is_safe(self.rados_comp)
- return ret == 1
+ return self.is_complete()
def is_complete(self):
"""
"""
Wait for an asynchronous operation to be marked safe
- This does not imply that the safe callback has finished.
+ wait_for_safe() is an alias of wait_for_complete() since Luminous
"""
- with nogil:
- rados_aio_wait_for_safe(self.rados_comp)
+ self.wait_for_complete()
def wait_for_complete(self):
"""
Wait for an asynchronous operation to be marked safe and for
the safe callback to have returned
"""
- with nogil:
- rados_aio_wait_for_safe_and_cb(self.rados_comp)
+ return self.wait_for_complete_and_cb()
def wait_for_complete_and_cb(self):
"""
def _complete(self):
self.oncomplete(self)
- with self.ioctx.lock:
- if self.oncomplete:
- self.ioctx.complete_completions.remove(self)
-
- def _safe(self):
- self.onsafe(self)
- with self.ioctx.lock:
- if self.onsafe:
- self.ioctx.safe_completions.remove(self)
+ if self.onsafe:
+ self.onsafe(self)
+ self._cleanup()
def _cleanup(self):
with self.ioctx.lock:
"""read operation context manager"""
-cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
- """
- Callback to onsafe() for asynchronous operations
- """
- cdef object cb = <object>args
- cb._safe()
- return 0
-
-
cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
"""
Callback to oncomplete() for asynchronous operations
cdef:
rados_callback_t complete_cb = NULL
- rados_callback_t safe_cb = NULL
rados_completion_t completion
PyObject* p_completion_obj= <PyObject*>completion_obj
if oncomplete:
complete_cb = <rados_callback_t>&__aio_complete_cb
- if onsafe:
- safe_cb = <rados_callback_t>&__aio_safe_cb
with nogil:
- ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
+ ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
&completion)
if ret < 0:
raise make_ex(ret, "error getting a completion")
self.ioctx.set_omap(write_op, keys, values)
comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
comp.wait_for_complete()
- comp.wait_for_safe()
with lock:
while count[0] < 2:
lock.wait()
eq(ret, 0)
comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
comp.wait_for_complete()
- comp.wait_for_safe()
with lock:
while count[0] < 4:
lock.wait()
return 0
comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
comp.wait_for_complete()
- comp.wait_for_safe()
with lock:
while count[0] < 2:
lock.wait()
self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
comp.wait_for_complete()
- comp.wait_for_safe()
with lock:
while count[0] < 2:
lock.wait()