int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
void rados_aio_release(rados_completion_t c)
+ int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
completion_obj.rados_comp = completion
return completion_obj
+ def aio_stat(self, object_name, oncomplete):
+ """
+ Asynchronously get object stats (size/mtime)
+
+ oncomplete will be called with the returned size and mtime
+ as well as the completion:
+
+ oncomplete(completion, size, mtime)
+
+ :param object_name: the name of the object to get stats from
+ :type object_name: str
+ :param oncomplete: what to do when the stat is complete
+ :type oncomplete: completion
+
+ :raises: :class:`Error`
+ :returns: completion object
+ """
+
+ object_name = cstr(object_name, 'object_name')
+
+ cdef:
+ Completion completion
+ char *_object_name = object_name
+ uint64_t psize
+ time_t pmtime
+
+ def oncomplete_(completion_v):
+ cdef Completion _completion_v = completion_v
+ return_value = _completion_v.get_return_value()
+ if return_value >= 0:
+ return oncomplete(_completion_v, psize, time.localtime(pmtime))
+ else:
+ return oncomplete(_completion_v, None, None)
+
+ completion = self.__get_completion(oncomplete_, None)
+ self.__track_completion(completion)
+ with nogil:
+ ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
+ &psize, &pmtime)
+
+ if ret < 0:
+ completion._cleanup()
+ raise make_ex(ret, "error stating %s" % object_name)
+ return completion
+
def aio_write(self, object_name, to_write, offset=0,
oncomplete=None, onsafe=None):
"""
eq(contents, b"bar")
[i.remove() for i in self.ioctx.list_objects()]
+ def test_aio_stat(self):
+ lock = threading.Condition()
+ count = [0]
+ def cb(_, size, mtime):
+ with lock:
+ count[0] += 1
+ lock.notify()
+
+ comp = self.ioctx.aio_stat("foo", cb)
+ comp.wait_for_complete()
+ with lock:
+ while count[0] < 1:
+ lock.wait()
+ eq(comp.get_return_value(), -2)
+
+ self.ioctx.write("foo", b"bar")
+
+ comp = self.ioctx.aio_stat("foo", cb)
+ comp.wait_for_complete()
+ with lock:
+ while count[0] < 2:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+
+ [i.remove() for i in self.ioctx.list_objects()]
+
def _take_down_acting_set(self, pool, objectname):
# find acting_set for pool:objectname and take it down; used to
# verify that async reads don't complete while acting set is missing