]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/rados: add asynchronous write,append,read,write_full operations
authorSamuel Just <samuel.just@dreamhost.com>
Sat, 10 Dec 2011 01:55:57 +0000 (17:55 -0800)
committerSamuel Just <samuel.just@dreamhost.com>
Tue, 13 Dec 2011 00:00:06 +0000 (16:00 -0800)
Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/pybind/rados.py
src/test/pybind/test_rados.py

index ef747df141dcbec78b5a8933c2aac437a542ac4e..6ed6dd28b2673f7452853d7417a58d7bbfaa9493 100755 (executable)
@@ -2,7 +2,9 @@
 Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
 """
 from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_int, c_long, \
-    create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer
+    create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer, \
+    CFUNCTYPE
+import threading
 import ctypes
 import errno
 import time
@@ -363,6 +365,33 @@ class Snap(object):
             raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
         return datetime.fromtimestamp(snap_time.value)
 
+class Completion(object):
+    """completion object"""
+    def __init__(self, ioctx, rados_comp, oncomplete, onsafe):
+        self.rados_comp = rados_comp
+        self.oncomplete = oncomplete
+        self.onsafe = onsafe
+        self.ioctx = ioctx
+
+    def wait_for_safe(self):
+        return self.ioctx.librados.rados_aio_is_safe(
+            self.rados_comp
+            )
+
+    def wait_for_complete(self):
+        return self.ioctx.librados.rados_aio_is_complete(
+            self.rados_comp
+            )
+
+    def get_return_value(self):
+        return self.ioctx.librados.rados_aio_get_return_value(
+            self.rados_comp)
+
+    def __del__(self):
+        self.ioctx.librados.rados_aio_release(
+            self.rados_comp
+            )
+
 class Ioctx(object):
     """rados.Ioctx object"""
     def __init__(self, name, librados, io):
@@ -371,6 +400,12 @@ class Ioctx(object):
         self.io = io
         self.state = "open"
         self.locator_key = ""
+        self.safe_cbs = {}
+        self.complete_cbs = {}
+        RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p)
+        self.__aio_safe_cb_c = RADOS_CB(self.__aio_safe_cb)
+        self.__aio_complete_cb_c = RADOS_CB(self.__aio_complete_cb)
+        self.lock = threading.Lock()
 
     def __enter__(self):
         return self
@@ -382,6 +417,113 @@ class Ioctx(object):
     def __del__(self):
         self.close()
 
+    def __aio_safe_cb(self, completion, _):
+        cb = None
+        with self.lock:
+            cb = self.safe_cbs[completion]
+            del self.safe_cbs[completion]
+        cb.onsafe(cb)
+        return 0
+
+    def __aio_complete_cb(self, completion, _):
+        cb = None
+        with self.lock:
+            cb = self.complete_cbs[completion]
+            del self.complete_cbs[completion]
+        cb.oncomplete(cb)
+        return 0
+
+    def __get_completion(self, oncomplete, onsafe):
+        completion = c_void_p(0)
+        complete_cb = None
+        safe_cb = None
+        if oncomplete:
+            complete_cb = self.__aio_complete_cb_c
+        if onsafe:
+            safe_cb = self.__aio_safe_cb_c
+        ret = self.librados.rados_aio_create_completion(
+            c_void_p(0),
+            complete_cb,
+            safe_cb,
+            byref(completion)
+            )
+        if ret < 0:
+            raise make_ex(ret, "error getting a completion")
+        with self.lock:
+            completion_obj = Completion(self, completion, oncomplete, onsafe)
+            if oncomplete:
+                self.complete_cbs[completion.value] = completion_obj
+            if onsafe:
+                self.safe_cbs[completion.value] = completion_obj
+        return completion_obj
+
+    def aio_write(self, object_name, to_write, offset=0,
+                  oncomplete=None, onsafe=None):
+        completion = self.__get_completion(oncomplete, onsafe)
+        ret = self.librados.rados_aio_write(
+            self.io,
+            c_char_p(object_name),
+            completion.rados_comp,
+            c_char_p(to_write),
+            c_size_t(len(to_write)),
+            c_uint64(offset))
+        if ret < 0:
+            raise make_ex(ret, "error writing object %s" % object_name)
+        return completion
+
+    def aio_write_full(self, object_name, to_write,
+                       oncomplete=None, onsafe=None):
+        completion = self.__get_completion(oncomplete, onsafe)
+        ret = self.librados.rados_aio_write_full(
+            self.io,
+            c_char_p(object_name),
+            completion.rados_comp,
+            c_char_p(to_write),
+            c_size_t(len(to_write)))
+        if ret < 0:
+            raise make_ex(ret, "error writing object %s" % object_name)
+        return completion
+
+    def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
+        completion = self.__get_completion(oncomplete, onsafe)
+        ret = self.librados.rados_aio_append(
+            self.io,
+            c_char_p(object_name),
+            completion.rados_comp,
+            c_char_p(to_append),
+            c_size_t(len(to_append)))
+        if ret < 0:
+            raise make_ex(ret, "error appending to object %s" % object_name)
+        return completion
+
+    def aio_flush(self):
+        ret = self.librados.rados_aio_flush(
+            self.io)
+        if ret < 0:
+            raise make_ex(ret, "error flushing")
+
+    def aio_read(self, object_name, length, offset, oncomplete):
+        """
+        oncomplete will be called with the returned read value as
+        well as the completion:
+
+        oncomplete(completion, data_read)
+        """
+        buf = create_string_buffer(length)
+        def oncomplete_(completion):
+            return oncomplete(completion, buf.value)
+        completion = self.__get_completion(oncomplete_, None)
+        ret = self.librados.rados_aio_read(
+            self.io,
+            c_char_p(object_name),
+            completion.rados_comp,
+            buf,
+            c_size_t(length),
+            c_uint64(offset))
+        if ret < 0:
+            raise make_ex(ret, "error reading %s" % object_name)
+        return completion
+
     def require_ioctx_open(self):
         if self.state != "open":
             raise IoctxStateError("The pool is %s" % self.state)
index 279ba4f3e4b476191b352316e2a7472ef4de8710..a2bb871b6d863e4aec6aa2ca63c00de1c63daeb9 100644 (file)
@@ -1,5 +1,6 @@
 from nose.tools import eq_ as eq, assert_raises
 from rados import Rados, ObjectExists, ObjectNotFound, ANONYMOUS_AUID, ADMIN_AUID
+import threading
 
 class TestPool(object):
 
@@ -152,3 +153,77 @@ class TestIoctx(object):
         objects = [i for i in self.ioctx.list_objects()]
         eq(objects, [])
         self.ioctx.set_locator_key("")
+
+    def test_aio_write(self):
+        lock = threading.Condition()
+        count = [0]
+        def cb(blah):
+            with lock:
+                count[0] += 1
+                lock.notify()
+            return 0
+        comp = self.ioctx.aio_write("foo", "bar", 0, cb, cb)
+        comp.wait_for_complete()
+        comp.wait_for_safe()
+        with lock:
+            while count[0] < 2:
+                lock.wait()
+        eq(comp.get_return_value(), 0)
+        contents = self.ioctx.read("foo")
+        eq(contents, "bar")
+        [i.remove() for i in self.ioctx.list_objects()]
+
+    def test_aio_append(self):
+        lock = threading.Condition()
+        count = [0]
+        def cb(blah):
+            with lock:
+                count[0] += 1
+                lock.notify()
+            return 0
+        comp = self.ioctx.aio_write("foo", "bar", 0, cb, cb)
+        comp2 = self.ioctx.aio_append("foo", "baz", cb, cb)
+        comp.wait_for_complete()
+        contents = self.ioctx.read("foo")
+        eq(contents, "barbaz")
+        with lock:
+            while count[0] < 4:
+                lock.wait()
+        eq(comp.get_return_value(), 0)
+        [i.remove() for i in self.ioctx.list_objects()]
+
+    def test_aio_write_full(self):
+        lock = threading.Condition()
+        count = [0]
+        def cb(blah):
+            with lock:
+                count[0] += 1
+                lock.notify()
+            return 0
+        self.ioctx.aio_write("foo", "barbaz", 0, cb, cb)
+        comp = self.ioctx.aio_write_full("foo", "bar", cb, cb)
+        comp.wait_for_complete()
+        comp.wait_for_safe()
+        with lock:
+            while count[0] < 2:
+                lock.wait()
+        eq(comp.get_return_value(), 0)
+        contents = self.ioctx.read("foo")
+        eq(contents, "bar")
+        [i.remove() for i in self.ioctx.list_objects()]
+
+    def test_aio_read(self):
+        retval = [None]
+        lock = threading.Condition()
+        def cb(_, buf):
+            with lock:
+                retval[0] = buf
+                lock.notify()
+        self.ioctx.write("foo", "bar")
+        self.ioctx.aio_read("foo", 3, 0, cb)
+        with lock:
+            while retval[0] is None:
+                lock.wait()
+        eq(retval[0], "bar")
+        [i.remove() for i in self.ioctx.list_objects()]
+