]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/rados: add Ioctx::aio_writesame() and test Ioctx::aio_writesame()
authorzhangjiao <zhangjiao@cmss.chinamobile.com>
Fri, 8 Nov 2019 15:32:54 +0000 (23:32 +0800)
committerzhangjiao <zhangjiao@cmss.chinamobile.com>
Tue, 3 Dec 2019 07:25:09 +0000 (15:25 +0800)
Signed-off-by: Zhang Jiao <zhangjiao@cmss.chinamobile.com>
src/pybind/rados/rados.pyx
src/test/pybind/test_rados.py

index 3093bb265f0edf2ca0daa0e741c9e9b065951de8..64611b61ca665a63fd17498a722241d57965f4eb 100644 (file)
@@ -275,6 +275,7 @@ cdef extern from "rados/librados.h" nogil:
     int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
     int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
     int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
+    int rados_aio_writesame(rados_ioctx_t io, const char *oid, rados_completion_t completion, const char *buf, size_t data_len, size_t write_len, uint64_t off)
     int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
     int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
     int rados_aio_flush(rados_ioctx_t io)
@@ -2409,6 +2410,49 @@ cdef class Ioctx(object):
             raise make_ex(ret, "error writing object %s" % object_name)
         return completion
 
+    @requires(('object_name', str_type), ('to_write', bytes), ('write_len', int),
+              ('offset', int), ('oncomplete', opt(Callable)))
+    def aio_writesame(self, object_name, to_write, write_len, offset=0,
+                      oncomplete=None):
+        """    
+        Asynchronously write the same buffer multiple times
+
+        :param object_name: name of the object
+        :type object_name: str
+        :param to_write: data to write
+        :type to_write: bytes
+        :param write_len: total number of bytes to write
+        :type write_len: int
+        :param offset: byte offset in the object to begin writing at
+        :type offset: int
+        :param oncomplete: what to do when the writesame is safe and 
+            complete in memory on all replicas
+        :type oncomplete: completion
+        :raises: :class:`Error`
+        :returns: completion object
+        """
+
+        object_name = cstr(object_name, 'object_name')
+
+        cdef:
+            Completion completion
+            char* _object_name = object_name
+            char* _to_write = to_write
+            size_t _data_len = len(to_write)
+            size_t _write_len = write_len
+            uint64_t _offset = offset
+
+        completion = self.__get_completion(oncomplete, None)
+        self.__track_completion(completion)
+        with nogil:
+            ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp, 
+                                       _to_write, _data_len, _write_len, _offset)
+
+        if ret < 0:
+            completion._cleanup()
+            raise make_ex(ret, "error writing object %s" % object_name)
+        return completion
+
     @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
               ('onsafe', opt(Callable)))
     def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
index 8542aea87708573524e0843a6d05917bd92d02a6..705629ce19ab48cab7354e1a1d2d396429bd4a14 100644 (file)
@@ -699,6 +699,23 @@ class TestIoctx(object):
         eq(contents, b"bar")
         [i.remove() for i in self.ioctx.list_objects()]
 
+    def test_aio_writesame(self):
+        lock = threading.Condition()
+        count = [0]
+        def cb(blah):
+            with lock:
+                count[0] += 1
+                lock.notify()
+            return 0
+        comp = self.ioctx.aio_writesame("abc", b"rzx", 9, 0, cb)
+        comp.wait_for_complete()
+        with lock:
+            while count[0] < 1:
+                lock.wait()
+        eq(comp.get_return_value(), 0)
+        eq(self.ioctx.read("abc"), b"rzxrzxrzx")
+        [i.remove() for i in self.ioctx.list_objects()]
+
     def test_aio_stat(self):
         lock = threading.Condition()
         count = [0]