aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_seq = ++aio_write_seq;
+ ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
+ << " write_seq " << aio_write_seq << dendl;
aio_write_list.push_back(&c->aio_write_list_item);
aio_write_list_lock.Unlock();
}
void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
{
+ ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_list_item.remove_myself();
+ // queue async flush waiters
+ map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters =
+ aio_write_waiters.find(c->aio_write_seq);
+ if (waiters != aio_write_waiters.end()) {
+ ldout(client->cct, 20) << "found " << waiters->second.size()
+ << " waiters" << dendl;
+ for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
+ it != waiters->second.end(); ++it) {
+ client->finisher.queue(new C_AioCompleteAndSafe(*it));
+ (*it)->put();
+ }
+ aio_write_waiters.erase(waiters);
+ } else {
+ ldout(client->cct, 20) << "found no waiters for tid "
+ << c->aio_write_seq << dendl;
+ }
aio_write_cond.Signal();
aio_write_list_lock.Unlock();
put();
}
+void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
+{
+ ldout(client->cct, 20) << "flush_aio_writes_async " << this
+ << " completion " << c << dendl;
+ Mutex::Locker l(aio_write_list_lock);
+ tid_t seq = aio_write_seq;
+ ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid "
+ << seq << dendl;
+ if (aio_write_list.empty()) {
+ client->finisher.queue(new C_AioCompleteAndSafe(c));
+ } else {
+ c->get();
+ aio_write_waiters[seq].push_back(c);
+ }
+}
+
void librados::IoCtxImpl::flush_aio_writes()
{
+ ldout(client->cct, 20) << "flush_aio_writes" << dendl;
aio_write_list_lock.Lock();
tid_t seq = aio_write_seq;
while (!aio_write_list.empty() &&
delete my_completion2;
}
+TEST(LibRadosAio, FlushAsync) {
+ AioTestData test_data;
+ rados_completion_t my_completion;
+ ASSERT_EQ("", test_data.init());
+ ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+ set_completion_complete, set_completion_safe, &my_completion));
+ rados_completion_t flush_completion;
+ ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &flush_completion));
+ char buf[128];
+ memset(buf, 0xee, sizeof(buf));
+ ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
+ my_completion, buf, sizeof(buf), 0));
+ ASSERT_EQ(0, rados_aio_flush_async(test_data.m_ioctx, flush_completion));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, rados_aio_wait_for_complete(flush_completion));
+ ASSERT_EQ(0, rados_aio_wait_for_safe(flush_completion));
+ }
+ ASSERT_EQ(1, rados_aio_is_complete(my_completion));
+ ASSERT_EQ(1, rados_aio_is_safe(my_completion));
+ ASSERT_EQ(1, rados_aio_is_complete(flush_completion));
+ ASSERT_EQ(1, rados_aio_is_safe(flush_completion));
+ char buf2[128];
+ memset(buf2, 0, sizeof(buf2));
+ rados_completion_t my_completion2;
+ ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+ set_completion_complete, set_completion_safe, &my_completion2));
+ ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo",
+ my_completion2, buf2, sizeof(buf2), 0));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
+ }
+ ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
+ rados_aio_release(my_completion);
+ rados_aio_release(my_completion2);
+ rados_aio_release(flush_completion);
+}
+
+TEST(LibRadosAio, FlushAsyncPP) {
+ AioTestDataPP test_data;
+ ASSERT_EQ("", test_data.init());
+ AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
+ (void*)&test_data, set_completion_complete, set_completion_safe);
+ AioCompletion *flush_completion =
+ test_data.m_cluster.aio_create_completion(NULL, NULL, NULL);
+ AioCompletion *my_completion_null = NULL;
+ ASSERT_NE(my_completion, my_completion_null);
+ char buf[128];
+ memset(buf, 0xee, sizeof(buf));
+ bufferlist bl1;
+ bl1.append(buf, sizeof(buf));
+ ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
+ bl1, sizeof(buf), 0));
+ ASSERT_EQ(0, test_data.m_ioctx.aio_flush_async(flush_completion));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, flush_completion->wait_for_complete());
+ ASSERT_EQ(0, flush_completion->wait_for_safe());
+ }
+ ASSERT_EQ(1, my_completion->is_complete());
+ ASSERT_EQ(1, my_completion->is_safe());
+ ASSERT_EQ(1, flush_completion->is_complete());
+ ASSERT_EQ(1, flush_completion->is_safe());
+ bufferlist bl2;
+ AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
+ (void*)&test_data, set_completion_complete, set_completion_safe);
+ ASSERT_NE(my_completion2, my_completion_null);
+ ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo", my_completion2,
+ &bl2, sizeof(buf), 0));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, my_completion2->wait_for_complete());
+ }
+ ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf)));
+ delete my_completion;
+ delete my_completion2;
+ delete flush_completion;
+}
+
TEST(LibRadosAio, RoundTripWriteFull) {
AioTestData test_data;
rados_completion_t my_completion, my_completion2, my_completion3;