From 7cc0940f89070dadab5b9102b1e78362f762f402 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Wed, 27 Mar 2013 14:48:31 -0700 Subject: [PATCH] librados: add async flush interface Sometimes you don't want flush to block, and can't modify already scheduled aio_writes. This will be useful for a librbd async flush interface. Signed-off-by: Josh Durgin --- src/include/rados/librados.h | 14 +++++- src/include/rados/librados.hpp | 10 ++++ src/librados/AioCompletionImpl.h | 36 ++++++++++++++ src/librados/IoCtxImpl.cc | 36 ++++++++++++++ src/librados/IoCtxImpl.h | 2 + src/librados/librados.cc | 14 ++++++ src/test/librados/aio.cc | 80 ++++++++++++++++++++++++++++++++ 7 files changed, 191 insertions(+), 1 deletion(-) diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index fa6ff982bb779..f2a60add25cf5 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -24,7 +24,7 @@ extern "C" { #endif #define LIBRADOS_VER_MAJOR 0 -#define LIBRADOS_VER_MINOR 50 +#define LIBRADOS_VER_MINOR 51 #define LIBRADOS_VER_EXTRA 0 #define LIBRADOS_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra) @@ -1446,6 +1446,18 @@ int rados_aio_read(rados_ioctx_t io, const char *oid, int rados_aio_flush(rados_ioctx_t io); +/** + * Schedule a callback for when all currently pending + * aio writes are safe. This is a non-blocking version of + * rados_aio_flush(). + * + * @param io the context to flush + * @param completion what to do when the writes are safe + * @returns 0 on success, negative error code on failure + */ +int rados_aio_flush_async(rados_ioctx_t io, rados_completion_t completion); + + /** * Asynchronously get object stats (size/mtime) * diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 887b6aa3e4446..cf3b33d5328aa 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -484,6 +484,16 @@ namespace librados int aio_flush(); + /** + * Schedule a callback for when all currently pending + * aio writes are safe. This is a non-blocking version of + * aio_flush(). + * + * @param c what to do when the writes are safe + * @returns 0 on success, negative error code on failure + */ + int aio_flush_async(AioCompletion *c); + int aio_stat(const std::string& oid, AioCompletion *c, uint64_t *psize, time_t *pmtime); int aio_exec(const std::string& oid, AioCompletion *c, const char *cls, const char *method, diff --git a/src/librados/AioCompletionImpl.h b/src/librados/AioCompletionImpl.h index 66270c92737b1..619e7471605d4 100644 --- a/src/librados/AioCompletionImpl.h +++ b/src/librados/AioCompletionImpl.h @@ -196,6 +196,42 @@ struct C_AioSafe : public Context { } }; +/** + * Fills in all completed request data, and calls both + * complete and safe callbacks if they exist. + * + * Not useful for usual I/O, but for special things like + * flush where we only want to wait for things to be safe, + * but allow users to specify any of the callbacks. + */ +struct C_AioCompleteAndSafe : public Context { + AioCompletionImpl *c; + + C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) { + c->ref++; + } + + void finish(int r) { + c->rval = r; + c->ack = true; + c->safe = true; + rados_callback_t cb_complete = c->callback_complete; + void *cb_arg = c->callback_arg; + if (cb_complete) + cb_complete(c, cb_arg); + + rados_callback_t cb_safe = c->callback_safe; + if (cb_safe) + cb_safe(c, cb_arg); + + c->lock.Lock(); + c->callback_complete = NULL; + c->callback_safe = NULL; + c->cond.Signal(); + c->put_unlock(); + } +}; + } #endif diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 313e15b23e28e..76daec9c6edfc 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -70,22 +70,58 @@ void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) aio_write_list_lock.Lock(); assert(c->io == this); c->aio_write_seq = ++aio_write_seq; + ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c + << " write_seq " << aio_write_seq << dendl; aio_write_list.push_back(&c->aio_write_list_item); aio_write_list_lock.Unlock(); } void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) { + ldout(client->cct, 20) << "complete_aio_write " << c << dendl; aio_write_list_lock.Lock(); assert(c->io == this); c->aio_write_list_item.remove_myself(); + // queue async flush waiters + map >::iterator waiters = + aio_write_waiters.find(c->aio_write_seq); + if (waiters != aio_write_waiters.end()) { + ldout(client->cct, 20) << "found " << waiters->second.size() + << " waiters" << dendl; + for (std::list::iterator it = waiters->second.begin(); + it != waiters->second.end(); ++it) { + client->finisher.queue(new C_AioCompleteAndSafe(*it)); + (*it)->put(); + } + aio_write_waiters.erase(waiters); + } else { + ldout(client->cct, 20) << "found no waiters for tid " + << c->aio_write_seq << dendl; + } aio_write_cond.Signal(); aio_write_list_lock.Unlock(); put(); } +void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c) +{ + ldout(client->cct, 20) << "flush_aio_writes_async " << this + << " completion " << c << dendl; + Mutex::Locker l(aio_write_list_lock); + tid_t seq = aio_write_seq; + ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid " + << seq << dendl; + if (aio_write_list.empty()) { + client->finisher.queue(new C_AioCompleteAndSafe(c)); + } else { + c->get(); + aio_write_waiters[seq].push_back(c); + } +} + void librados::IoCtxImpl::flush_aio_writes() { + ldout(client->cct, 20) << "flush_aio_writes" << dendl; aio_write_list_lock.Lock(); tid_t seq = aio_write_seq; while (!aio_write_list.empty() && diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index c5b14cab8e369..a87a87250254d 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -46,6 +46,7 @@ struct librados::IoCtxImpl { tid_t aio_write_seq; Cond aio_write_cond; xlist aio_write_list; + map > aio_write_waiters; Mutex *lock; Objecter *objecter; @@ -84,6 +85,7 @@ struct librados::IoCtxImpl { void queue_aio_write(struct AioCompletionImpl *c); void complete_aio_write(struct AioCompletionImpl *c); + void flush_aio_writes_async(AioCompletionImpl *c); void flush_aio_writes(); int64_t get_id() { diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 0a00cfc6df2dc..43e377097c2a1 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1000,6 +1000,12 @@ int librados::IoCtx::aio_remove(const std::string& oid, librados::AioCompletion return io_ctx_impl->aio_remove(oid, c->pc); } +int librados::IoCtx::aio_flush_async(librados::AioCompletion *c) +{ + io_ctx_impl->flush_aio_writes_async(c->pc); + return 0; +} + int librados::IoCtx::aio_flush() { io_ctx_impl->flush_aio_writes(); @@ -2229,6 +2235,14 @@ extern "C" int rados_aio_remove(rados_ioctx_t io, const char *o, return ctx->aio_remove(oid, (librados::AioCompletionImpl*)completion); } +extern "C" int rados_aio_flush_async(rados_ioctx_t io, + rados_completion_t completion) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->flush_aio_writes_async((librados::AioCompletionImpl*)completion); + return 0; +} + extern "C" int rados_aio_flush(rados_ioctx_t io) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; diff --git a/src/test/librados/aio.cc b/src/test/librados/aio.cc index 5fcc3177f201b..835f28763b4f1 100644 --- a/src/test/librados/aio.cc +++ b/src/test/librados/aio.cc @@ -677,6 +677,86 @@ TEST(LibRadosAio, FlushPP) { delete my_completion2; } +TEST(LibRadosAio, FlushAsync) { + AioTestData test_data; + rados_completion_t my_completion; + ASSERT_EQ("", test_data.init()); + ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data, + set_completion_complete, set_completion_safe, &my_completion)); + rados_completion_t flush_completion; + ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &flush_completion)); + char buf[128]; + memset(buf, 0xee, sizeof(buf)); + ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo", + my_completion, buf, sizeof(buf), 0)); + ASSERT_EQ(0, rados_aio_flush_async(test_data.m_ioctx, flush_completion)); + { + TestAlarm alarm; + ASSERT_EQ(0, rados_aio_wait_for_complete(flush_completion)); + ASSERT_EQ(0, rados_aio_wait_for_safe(flush_completion)); + } + ASSERT_EQ(1, rados_aio_is_complete(my_completion)); + ASSERT_EQ(1, rados_aio_is_safe(my_completion)); + ASSERT_EQ(1, rados_aio_is_complete(flush_completion)); + ASSERT_EQ(1, rados_aio_is_safe(flush_completion)); + char buf2[128]; + memset(buf2, 0, sizeof(buf2)); + rados_completion_t my_completion2; + ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data, + set_completion_complete, set_completion_safe, &my_completion2)); + ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo", + my_completion2, buf2, sizeof(buf2), 0)); + { + TestAlarm alarm; + ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2)); + } + ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf))); + rados_aio_release(my_completion); + rados_aio_release(my_completion2); + rados_aio_release(flush_completion); +} + +TEST(LibRadosAio, FlushAsyncPP) { + AioTestDataPP test_data; + ASSERT_EQ("", test_data.init()); + AioCompletion *my_completion = test_data.m_cluster.aio_create_completion( + (void*)&test_data, set_completion_complete, set_completion_safe); + AioCompletion *flush_completion = + test_data.m_cluster.aio_create_completion(NULL, NULL, NULL); + AioCompletion *my_completion_null = NULL; + ASSERT_NE(my_completion, my_completion_null); + char buf[128]; + memset(buf, 0xee, sizeof(buf)); + bufferlist bl1; + bl1.append(buf, sizeof(buf)); + ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion, + bl1, sizeof(buf), 0)); + ASSERT_EQ(0, test_data.m_ioctx.aio_flush_async(flush_completion)); + { + TestAlarm alarm; + ASSERT_EQ(0, flush_completion->wait_for_complete()); + ASSERT_EQ(0, flush_completion->wait_for_safe()); + } + ASSERT_EQ(1, my_completion->is_complete()); + ASSERT_EQ(1, my_completion->is_safe()); + ASSERT_EQ(1, flush_completion->is_complete()); + ASSERT_EQ(1, flush_completion->is_safe()); + bufferlist bl2; + AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion( + (void*)&test_data, set_completion_complete, set_completion_safe); + ASSERT_NE(my_completion2, my_completion_null); + ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo", my_completion2, + &bl2, sizeof(buf), 0)); + { + TestAlarm alarm; + ASSERT_EQ(0, my_completion2->wait_for_complete()); + } + ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf))); + delete my_completion; + delete my_completion2; + delete flush_completion; +} + TEST(LibRadosAio, RoundTripWriteFull) { AioTestData test_data; rados_completion_t my_completion, my_completion2, my_completion3; -- 2.39.5