#endif
#define LIBRADOS_VER_MAJOR 0
-#define LIBRADOS_VER_MINOR 50
+#define LIBRADOS_VER_MINOR 51
#define LIBRADOS_VER_EXTRA 0
#define LIBRADOS_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
int rados_aio_flush(rados_ioctx_t io);
+/**
+ * Schedule a callback for when all currently pending
+ * aio writes are safe. This is a non-blocking version of
+ * rados_aio_flush().
+ *
+ * @param io the context to flush
+ * @param completion what to do when the writes are safe
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_aio_flush_async(rados_ioctx_t io, rados_completion_t completion);
+
+
/**
* Asynchronously get object stats (size/mtime)
*
int aio_flush();
+ /**
+ * Schedule a callback for when all currently pending
+ * aio writes are safe. This is a non-blocking version of
+ * aio_flush().
+ *
+ * @param c what to do when the writes are safe
+ * @returns 0 on success, negative error code on failure
+ */
+ int aio_flush_async(AioCompletion *c);
+
int aio_stat(const std::string& oid, AioCompletion *c, uint64_t *psize, time_t *pmtime);
int aio_exec(const std::string& oid, AioCompletion *c, const char *cls, const char *method,
}
};
+/**
+ * Fills in all completed request data, and calls both
+ * complete and safe callbacks if they exist.
+ *
+ * Not useful for usual I/O, but for special things like
+ * flush where we only want to wait for things to be safe,
+ * but allow users to specify any of the callbacks.
+ */
+struct C_AioCompleteAndSafe : public Context {
+ AioCompletionImpl *c;
+
+ C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
+ c->ref++;
+ }
+
+ void finish(int r) {
+ c->rval = r;
+ c->ack = true;
+ c->safe = true;
+ rados_callback_t cb_complete = c->callback_complete;
+ void *cb_arg = c->callback_arg;
+ if (cb_complete)
+ cb_complete(c, cb_arg);
+
+ rados_callback_t cb_safe = c->callback_safe;
+ if (cb_safe)
+ cb_safe(c, cb_arg);
+
+ c->lock.Lock();
+ c->callback_complete = NULL;
+ c->callback_safe = NULL;
+ c->cond.Signal();
+ c->put_unlock();
+ }
+};
+
}
#endif
aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_seq = ++aio_write_seq;
+ ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
+ << " write_seq " << aio_write_seq << dendl;
aio_write_list.push_back(&c->aio_write_list_item);
aio_write_list_lock.Unlock();
}
void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
{
+ ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_list_item.remove_myself();
+ // queue async flush waiters
+ map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters =
+ aio_write_waiters.find(c->aio_write_seq);
+ if (waiters != aio_write_waiters.end()) {
+ ldout(client->cct, 20) << "found " << waiters->second.size()
+ << " waiters" << dendl;
+ for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
+ it != waiters->second.end(); ++it) {
+ client->finisher.queue(new C_AioCompleteAndSafe(*it));
+ (*it)->put();
+ }
+ aio_write_waiters.erase(waiters);
+ } else {
+ ldout(client->cct, 20) << "found no waiters for tid "
+ << c->aio_write_seq << dendl;
+ }
aio_write_cond.Signal();
aio_write_list_lock.Unlock();
put();
}
+void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
+{
+ ldout(client->cct, 20) << "flush_aio_writes_async " << this
+ << " completion " << c << dendl;
+ Mutex::Locker l(aio_write_list_lock);
+ tid_t seq = aio_write_seq;
+ ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid "
+ << seq << dendl;
+ if (aio_write_list.empty()) {
+ client->finisher.queue(new C_AioCompleteAndSafe(c));
+ } else {
+ c->get();
+ aio_write_waiters[seq].push_back(c);
+ }
+}
+
void librados::IoCtxImpl::flush_aio_writes()
{
+ ldout(client->cct, 20) << "flush_aio_writes" << dendl;
aio_write_list_lock.Lock();
tid_t seq = aio_write_seq;
while (!aio_write_list.empty() &&
tid_t aio_write_seq;
Cond aio_write_cond;
xlist<AioCompletionImpl*> aio_write_list;
+ map<tid_t, std::list<AioCompletionImpl*> > aio_write_waiters;
Mutex *lock;
Objecter *objecter;
void queue_aio_write(struct AioCompletionImpl *c);
void complete_aio_write(struct AioCompletionImpl *c);
+ void flush_aio_writes_async(AioCompletionImpl *c);
void flush_aio_writes();
int64_t get_id() {
return io_ctx_impl->aio_remove(oid, c->pc);
}
+int librados::IoCtx::aio_flush_async(librados::AioCompletion *c)
+{
+ io_ctx_impl->flush_aio_writes_async(c->pc);
+ return 0;
+}
+
int librados::IoCtx::aio_flush()
{
io_ctx_impl->flush_aio_writes();
return ctx->aio_remove(oid, (librados::AioCompletionImpl*)completion);
}
+extern "C" int rados_aio_flush_async(rados_ioctx_t io,
+ rados_completion_t completion)
+{
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ ctx->flush_aio_writes_async((librados::AioCompletionImpl*)completion);
+ return 0;
+}
+
extern "C" int rados_aio_flush(rados_ioctx_t io)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
delete my_completion2;
}
+TEST(LibRadosAio, FlushAsync) {
+ AioTestData test_data;
+ rados_completion_t my_completion;
+ ASSERT_EQ("", test_data.init());
+ ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+ set_completion_complete, set_completion_safe, &my_completion));
+ rados_completion_t flush_completion;
+ ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &flush_completion));
+ char buf[128];
+ memset(buf, 0xee, sizeof(buf));
+ ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
+ my_completion, buf, sizeof(buf), 0));
+ ASSERT_EQ(0, rados_aio_flush_async(test_data.m_ioctx, flush_completion));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, rados_aio_wait_for_complete(flush_completion));
+ ASSERT_EQ(0, rados_aio_wait_for_safe(flush_completion));
+ }
+ ASSERT_EQ(1, rados_aio_is_complete(my_completion));
+ ASSERT_EQ(1, rados_aio_is_safe(my_completion));
+ ASSERT_EQ(1, rados_aio_is_complete(flush_completion));
+ ASSERT_EQ(1, rados_aio_is_safe(flush_completion));
+ char buf2[128];
+ memset(buf2, 0, sizeof(buf2));
+ rados_completion_t my_completion2;
+ ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+ set_completion_complete, set_completion_safe, &my_completion2));
+ ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo",
+ my_completion2, buf2, sizeof(buf2), 0));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
+ }
+ ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
+ rados_aio_release(my_completion);
+ rados_aio_release(my_completion2);
+ rados_aio_release(flush_completion);
+}
+
+TEST(LibRadosAio, FlushAsyncPP) {
+ AioTestDataPP test_data;
+ ASSERT_EQ("", test_data.init());
+ AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
+ (void*)&test_data, set_completion_complete, set_completion_safe);
+ AioCompletion *flush_completion =
+ test_data.m_cluster.aio_create_completion(NULL, NULL, NULL);
+ AioCompletion *my_completion_null = NULL;
+ ASSERT_NE(my_completion, my_completion_null);
+ char buf[128];
+ memset(buf, 0xee, sizeof(buf));
+ bufferlist bl1;
+ bl1.append(buf, sizeof(buf));
+ ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
+ bl1, sizeof(buf), 0));
+ ASSERT_EQ(0, test_data.m_ioctx.aio_flush_async(flush_completion));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, flush_completion->wait_for_complete());
+ ASSERT_EQ(0, flush_completion->wait_for_safe());
+ }
+ ASSERT_EQ(1, my_completion->is_complete());
+ ASSERT_EQ(1, my_completion->is_safe());
+ ASSERT_EQ(1, flush_completion->is_complete());
+ ASSERT_EQ(1, flush_completion->is_safe());
+ bufferlist bl2;
+ AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
+ (void*)&test_data, set_completion_complete, set_completion_safe);
+ ASSERT_NE(my_completion2, my_completion_null);
+ ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo", my_completion2,
+ &bl2, sizeof(buf), 0));
+ {
+ TestAlarm alarm;
+ ASSERT_EQ(0, my_completion2->wait_for_complete());
+ }
+ ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf)));
+ delete my_completion;
+ delete my_completion2;
+ delete flush_completion;
+}
+
TEST(LibRadosAio, RoundTripWriteFull) {
AioTestData test_data;
rados_completion_t my_completion, my_completion2, my_completion3;