eq(contents, b"bar")
[i.remove() for i in self.ioctx.list_objects()]
+ def test_aio_cmpext(self):
+ lock = threading.Condition()
+ count = [0]
+ def cb(blah):
+ with lock:
+ count[0] += 1
+ lock.notify()
+ return 0
+
+ self.ioctx.write('test_object', b'abcdefghi')
+ comp = self.ioctx.aio_cmpext('test_object', b'abcdefghi', 0, cb)
+ comp.wait_for_complete()
+ with lock:
+ while count[0] < 1:
+ lock.wait()
+ eq(comp.get_return_value(), 0)
+
def test_aio_write_no_comp_ref(self):
lock = threading.Condition()
count = [0]