eq(contents, "bar")
[i.remove() for i in self.ioctx.list_objects()]
- def test_aio_read(self):
- retval = [None]
- lock = threading.Condition()
- def cb(_, buf):
- with lock:
- retval[0] = buf
- lock.notify()
- payload = "bar\000frob"
- self.ioctx.write("foo", payload)
-
- # find acting_set for 'foo' and take it down; issue read; verify
- # read doesn't complete until OSDs are back up
+ def _take_down_acting_set(self, pool, objectname):
+ # find acting_set for pool:objectname and take it down; used to
+ # verify that async reads don't complete while acting set is missing
cmd = {
"prefix":"osd map",
- "pool":"test_pool",
- "object":"foo",
+ "pool":pool,
+ "object":objectname,
"format":"json",
}
r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), '')
# wait for OSDs to acknowledge the down
eq(self.rados.wait_for_latest_osdmap(), 0)
+
+ def _let_osds_back_up(self):
+ cmd = {"prefix":"osd unset", "key":"noup"}
+ r, _, _ = self.rados.mon_command(json.dumps(cmd), '')
+ eq(r, 0)
+
+ def test_aio_read(self):
+ # this is a list so that the local cb() can modify it
+ retval = [None]
+ lock = threading.Condition()
+ def cb(_, buf):
+ with lock:
+ retval[0] = buf
+ lock.notify()
+ payload = "bar\000frob"
+ self.ioctx.write("foo", payload)
+
+ # test1: use wait_for_complete() and wait for cb by
+ # watching retval[0]
+ self._take_down_acting_set('test_pool', 'foo')
comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
eq(False, comp.is_complete())
-
time.sleep(3)
-
- # read should not yet be complete
eq(False, comp.is_complete())
with lock:
eq(None, retval[0])
-
- # let OSDs come back up, verify read completes
- cmd = {"prefix":"osd unset", "key":"noup"}
- r, _, _ = self.rados.mon_command(json.dumps(cmd), '')
- eq(r, 0)
+ self._let_osds_back_up()
comp.wait_for_complete()
+ loops = 0
with lock:
- while retval[0] is None:
- lock.wait()
+ while retval[0] is None and loops <= 10:
+ lock.wait(timeout=5)
+ loops += 1
+ assert(loops <= 10)
+
+ eq(retval[0], payload)
+
+ # test2: use wait_for_complete_and_cb(), verify retval[0] is
+ # set by the time we regain control
+
+ retval[0] = None
+ self._take_down_acting_set('test_pool', 'foo')
+ comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
+ eq(False, comp.is_complete())
+ time.sleep(3)
+ eq(False, comp.is_complete())
+ with lock:
+ eq(None, retval[0])
+ self._let_osds_back_up()
+
+ comp.wait_for_complete_and_cb()
+ assert(retval[0] is not None)
eq(retval[0], payload)
[i.remove() for i in self.ioctx.list_objects()]