Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
"""
from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_int, c_long, \
- create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer
+ create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer, \
+ CFUNCTYPE
+import threading
import ctypes
import errno
import time
raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
return datetime.fromtimestamp(snap_time.value)
+class Completion(object):
+ """completion object"""
+ def __init__(self, ioctx, rados_comp, oncomplete, onsafe):
+ self.rados_comp = rados_comp
+ self.oncomplete = oncomplete
+ self.onsafe = onsafe
+ self.ioctx = ioctx
+
+ def wait_for_safe(self):
+ return self.ioctx.librados.rados_aio_is_safe(
+ self.rados_comp
+ )
+
+ def wait_for_complete(self):
+ return self.ioctx.librados.rados_aio_is_complete(
+ self.rados_comp
+ )
+
+ def get_return_value(self):
+ return self.ioctx.librados.rados_aio_get_return_value(
+ self.rados_comp)
+
+ def __del__(self):
+ self.ioctx.librados.rados_aio_release(
+ self.rados_comp
+ )
+
class Ioctx(object):
"""rados.Ioctx object"""
def __init__(self, name, librados, io):
self.io = io
self.state = "open"
self.locator_key = ""
+ self.safe_cbs = {}
+ self.complete_cbs = {}
+ RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p)
+ self.__aio_safe_cb_c = RADOS_CB(self.__aio_safe_cb)
+ self.__aio_complete_cb_c = RADOS_CB(self.__aio_complete_cb)
+ self.lock = threading.Lock()
def __enter__(self):
return self
def __del__(self):
self.close()
+ def __aio_safe_cb(self, completion, _):
+ cb = None
+ with self.lock:
+ cb = self.safe_cbs[completion]
+ del self.safe_cbs[completion]
+ cb.onsafe(cb)
+ return 0
+
+ def __aio_complete_cb(self, completion, _):
+ cb = None
+ with self.lock:
+ cb = self.complete_cbs[completion]
+ del self.complete_cbs[completion]
+ cb.oncomplete(cb)
+ return 0
+
+ def __get_completion(self, oncomplete, onsafe):
+ completion = c_void_p(0)
+ complete_cb = None
+ safe_cb = None
+ if oncomplete:
+ complete_cb = self.__aio_complete_cb_c
+ if onsafe:
+ safe_cb = self.__aio_safe_cb_c
+ ret = self.librados.rados_aio_create_completion(
+ c_void_p(0),
+ complete_cb,
+ safe_cb,
+ byref(completion)
+ )
+ if ret < 0:
+ raise make_ex(ret, "error getting a completion")
+ with self.lock:
+ completion_obj = Completion(self, completion, oncomplete, onsafe)
+ if oncomplete:
+ self.complete_cbs[completion.value] = completion_obj
+ if onsafe:
+ self.safe_cbs[completion.value] = completion_obj
+ return completion_obj
+
+ def aio_write(self, object_name, to_write, offset=0,
+ oncomplete=None, onsafe=None):
+ completion = self.__get_completion(oncomplete, onsafe)
+ ret = self.librados.rados_aio_write(
+ self.io,
+ c_char_p(object_name),
+ completion.rados_comp,
+ c_char_p(to_write),
+ c_size_t(len(to_write)),
+ c_uint64(offset))
+ if ret < 0:
+ raise make_ex(ret, "error writing object %s" % object_name)
+ return completion
+
+ def aio_write_full(self, object_name, to_write,
+ oncomplete=None, onsafe=None):
+ completion = self.__get_completion(oncomplete, onsafe)
+ ret = self.librados.rados_aio_write_full(
+ self.io,
+ c_char_p(object_name),
+ completion.rados_comp,
+ c_char_p(to_write),
+ c_size_t(len(to_write)))
+ if ret < 0:
+ raise make_ex(ret, "error writing object %s" % object_name)
+ return completion
+
+ def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
+ completion = self.__get_completion(oncomplete, onsafe)
+ ret = self.librados.rados_aio_append(
+ self.io,
+ c_char_p(object_name),
+ completion.rados_comp,
+ c_char_p(to_append),
+ c_size_t(len(to_append)))
+ if ret < 0:
+ raise make_ex(ret, "error appending to object %s" % object_name)
+ return completion
+
+ def aio_flush(self):
+ ret = self.librados.rados_aio_flush(
+ self.io)
+ if ret < 0:
+ raise make_ex(ret, "error flushing")
+
+ def aio_read(self, object_name, length, offset, oncomplete):
+ """
+ oncomplete will be called with the returned read value as
+ well as the completion:
+
+ oncomplete(completion, data_read)
+ """
+ buf = create_string_buffer(length)
+ def oncomplete_(completion):
+ return oncomplete(completion, buf.value)
+ completion = self.__get_completion(oncomplete_, None)
+ ret = self.librados.rados_aio_read(
+ self.io,
+ c_char_p(object_name),
+ completion.rados_comp,
+ buf,
+ c_size_t(length),
+ c_uint64(offset))
+ if ret < 0:
+ raise make_ex(ret, "error reading %s" % object_name)
+ return completion
+
def require_ioctx_open(self):
if self.state != "open":
raise IoctxStateError("The pool is %s" % self.state)
from nose.tools import eq_ as eq, assert_raises
from rados import Rados, ObjectExists, ObjectNotFound, ANONYMOUS_AUID, ADMIN_AUID
+import threading
class TestPool(object):
objects = [i for i in self.ioctx.list_objects()]
eq(objects, [])
self.ioctx.set_locator_key("")
+
+ def test_aio_write(self):
+ lock = threading.Condition()
+ count = [0]
+ def cb(blah):
+ with lock:
+ count[0] += 1
+ lock.notify()
+ return 0
+ comp = self.ioctx.aio_write("foo", "bar", 0, cb, cb)
+ comp.wait_for_complete()
+ comp.wait_for_safe()
+ with lock:
+ while count[0] < 2:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+ contents = self.ioctx.read("foo")
+ eq(contents, "bar")
+ [i.remove() for i in self.ioctx.list_objects()]
+
+ def test_aio_append(self):
+ lock = threading.Condition()
+ count = [0]
+ def cb(blah):
+ with lock:
+ count[0] += 1
+ lock.notify()
+ return 0
+ comp = self.ioctx.aio_write("foo", "bar", 0, cb, cb)
+ comp2 = self.ioctx.aio_append("foo", "baz", cb, cb)
+ comp.wait_for_complete()
+ contents = self.ioctx.read("foo")
+ eq(contents, "barbaz")
+ with lock:
+ while count[0] < 4:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+ [i.remove() for i in self.ioctx.list_objects()]
+
+ def test_aio_write_full(self):
+ lock = threading.Condition()
+ count = [0]
+ def cb(blah):
+ with lock:
+ count[0] += 1
+ lock.notify()
+ return 0
+ self.ioctx.aio_write("foo", "barbaz", 0, cb, cb)
+ comp = self.ioctx.aio_write_full("foo", "bar", cb, cb)
+ comp.wait_for_complete()
+ comp.wait_for_safe()
+ with lock:
+ while count[0] < 2:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+ contents = self.ioctx.read("foo")
+ eq(contents, "bar")
+ [i.remove() for i in self.ioctx.list_objects()]
+
+ def test_aio_read(self):
+ retval = [None]
+ lock = threading.Condition()
+ def cb(_, buf):
+ with lock:
+ retval[0] = buf
+ lock.notify()
+ self.ioctx.write("foo", "bar")
+ self.ioctx.aio_read("foo", 3, 0, cb)
+ with lock:
+ while retval[0] is None:
+ lock.wait()
+ eq(retval[0], "bar")
+ [i.remove() for i in self.ioctx.list_objects()]
+