self.onsafe = onsafe
self.ioctx = ioctx
- def wait_for_safe(self):
+ def is_safe(self):
"""
Is an asynchronous operation safe?
This does not imply that the safe callback has finished.
- :returns: whether the operation is safe
+ :returns: True if the operation is safe
"""
return run_in_thread(self.ioctx.librados.rados_aio_is_safe,
- (self.rados_comp,))
+ (self.rados_comp,)) == 1
- def wait_for_complete(self):
+ def is_complete(self):
"""
Has an asynchronous operation completed?
This does not imply that the safe callback has finished.
- :returns: whether the operation is completed
+ :returns: True if the operation is completed
"""
return run_in_thread(self.ioctx.librados.rados_aio_is_complete,
- (self.rados_comp,))
+ (self.rados_comp,)) == 1
+
+ def wait_for_safe(self):
+ """
+ Wait for an asynchronous operation to be marked safe
+
+ This does not imply that the safe callback has finished.
+ """
+ run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe,
+ (self.rados_comp,))
+
+ def wait_for_complete(self):
+ """
+ Wait for an asynchronous operation to complete
+
+ This does not imply that the complete callback has finished.
+ """
+ run_in_thread(self.ioctx.librados.rados_aio_wait_for_complete,
+ (self.rados_comp,))
+
+ def wait_for_safe_and_cb(self):
+ """
+ Wait for an asynchronous operation to be marked safe and for
+ the safe callback to have returned
+ """
+ run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe_and_cb,
+ (self.rados_comp,))
+
+ def wait_for_complete_and_cb(self):
+ """
+ Wait for an asynchronous operation to complete and for the
+ complete callback to have returned
+
+ :returns: whether the operation is completed
+ """
+ return run_in_thread(
+ self.ioctx.librados.rados_aio_wait_for_complete_and_cb,
+ (self.rados_comp,)
+ )
def get_return_value(self):
"""
from nose.tools import eq_ as eq, assert_raises
from rados import (Rados, Error, Object, ObjectExists, ObjectNotFound,
ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES)
+import time
import threading
import json
import errno
self.ioctx = self.rados.open_ioctx('test_pool')
def tearDown(self):
+ cmd = {"prefix":"osd unset", "key":"noup"}
+ self.rados.mon_command(json.dumps(cmd), '')
self.ioctx.close()
self.rados.delete_pool('test_pool')
self.rados.shutdown()
lock.notify()
payload = "bar\000frob"
self.ioctx.write("foo", payload)
- self.ioctx.aio_read("foo", len(payload), 0, cb)
+
+ # find acting_set for 'foo' and take it down; issue read; verify
+ # read doesn't complete until OSDs are back up
+ cmd = {
+ "prefix":"osd map",
+ "pool":"test_pool",
+ "object":"foo",
+ "format":"json",
+ }
+ r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), '')
+ objmap = json.loads(jsonout)
+ acting_set = objmap['acting']
+ cmd = {"prefix":"osd set", "key":"noup"}
+ r, _, _ = self.rados.mon_command(json.dumps(cmd), '')
+ eq(r, 0)
+ cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
+ r, _, _ = self.rados.mon_command(json.dumps(cmd), '')
+ eq(r, 0)
+
+ # wait for OSDs to acknowledge the down
+ eq(self.rados.wait_for_latest_osdmap(), 0)
+ comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
+ eq(False, comp.is_complete())
+
+ time.sleep(3)
+
+ # read should not yet be complete
+ eq(False, comp.is_complete())
+ with lock:
+ eq(None, retval[0])
+
+ # let OSDs come back up, verify read completes
+ cmd = {"prefix":"osd unset", "key":"noup"}
+ r, _, _ = self.rados.mon_command(json.dumps(cmd), '')
+ eq(r, 0)
+ comp.wait_for_complete()
with lock:
while retval[0] is None:
lock.wait()
eq(retval[0], payload)
+
[i.remove() for i in self.ioctx.list_objects()]
class TestObject(object):