From 302b93c478b3f4bc2c82bfb08329e3c98389dd97 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Thu, 21 Mar 2013 16:04:10 -0700 Subject: [PATCH] librbd: add an async flush At this point it's a simple wrapper around the ObjectCacher or librados. This is needed for QEMU so that its main thread can continue while a flush is occurring. Since this will be backported, don't update the librbd version yet, just add a #define that QEMU and others can use to detect the presence of aio_flush(). Refs: #3737 Signed-off-by: Josh Durgin --- src/include/rbd/librbd.h | 10 ++++ src/include/rbd/librbd.hpp | 9 +++ src/librbd/AioCompletion.h | 5 +- src/librbd/ImageCtx.cc | 26 +++++---- src/librbd/ImageCtx.h | 1 + src/librbd/internal.cc | 36 ++++++++++++ src/librbd/internal.h | 3 + src/librbd/librbd.cc | 13 +++++ src/test/librbd/test_librbd.cc | 103 +++++++++++++++++++++++++++++++++ 9 files changed, 194 insertions(+), 12 deletions(-) diff --git a/src/include/rbd/librbd.h b/src/include/rbd/librbd.h index f41641bec7ef4..dcd6d22734779 100644 --- a/src/include/rbd/librbd.h +++ b/src/include/rbd/librbd.h @@ -38,6 +38,7 @@ extern "C" { #define LIBRBD_VERSION_CODE LIBRBD_VERSION(LIBRBD_VER_MAJOR, LIBRBD_VER_MINOR, LIBRBD_VER_EXTRA) #define LIBRBD_SUPPORTS_WATCH 0 +#define LIBRBD_SUPPORTS_AIO_FLUSH 1 typedef void *rbd_snap_t; typedef void *rbd_image_t; @@ -323,6 +324,15 @@ int rbd_aio_wait_for_complete(rbd_completion_t c); ssize_t rbd_aio_get_return_value(rbd_completion_t c); void rbd_aio_release(rbd_completion_t c); int rbd_flush(rbd_image_t image); +/** + * Start a flush if caching is enabled. Get a callback when + * the currently pending writes are on disk. + * + * @param image the image to flush writes to + * @param c what to call when flushing is complete + * @returns 0 on success, negative error code on failure + */ +int rbd_aio_flush(rbd_image_t image, rbd_completion_t c); #ifdef __cplusplus } diff --git a/src/include/rbd/librbd.hpp b/src/include/rbd/librbd.hpp index c256df267f9a9..a7bfcf43233c8 100644 --- a/src/include/rbd/librbd.hpp +++ b/src/include/rbd/librbd.hpp @@ -181,6 +181,15 @@ public: int aio_discard(uint64_t off, uint64_t len, RBD::AioCompletion *c); int flush(); + /** + * Start a flush if caching is enabled. Get a callback when + * the currently pending writes are on disk. + * + * @param image the image to flush writes to + * @param c what to call when flushing is complete + * @returns 0 on success, negative error code on failure + */ + int aio_flush(RBD::AioCompletion *c); private: friend class RBD; diff --git a/src/librbd/AioCompletion.h b/src/librbd/AioCompletion.h index 8420d37c8bd6f..899586d51a7a9 100644 --- a/src/librbd/AioCompletion.h +++ b/src/librbd/AioCompletion.h @@ -24,6 +24,7 @@ namespace librbd { AIO_TYPE_READ = 0, AIO_TYPE_WRITE, AIO_TYPE_DISCARD, + AIO_TYPE_FLUSH, AIO_TYPE_NONE, } aio_type_t; @@ -104,12 +105,14 @@ namespace librbd { complete_cb(rbd_comp, complete_arg); } switch (aio_type) { - case AIO_TYPE_READ: + case AIO_TYPE_READ: ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break; case AIO_TYPE_WRITE: ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break; case AIO_TYPE_DISCARD: ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break; + case AIO_TYPE_FLUSH: + ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break; default: lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl; break; diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index cddd0d44ac67c..08c72ef4a60ba 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -220,6 +220,8 @@ namespace librbd { plb.add_u64_counter(l_librbd_aio_discard, "aio_discard"); plb.add_u64_counter(l_librbd_aio_discard_bytes, "aio_discard_bytes"); plb.add_time_avg(l_librbd_aio_discard_latency, "aio_discard_latency"); + plb.add_u64_counter(l_librbd_aio_flush, "aio_flush"); + plb.add_time_avg(l_librbd_aio_flush_latency, "aio_flush_latency"); plb.add_u64_counter(l_librbd_snap_create, "snap_create"); plb.add_u64_counter(l_librbd_snap_remove, "snap_remove"); plb.add_u64_counter(l_librbd_snap_rollback, "snap_rollback"); @@ -522,24 +524,26 @@ namespace librbd { } } + void ImageCtx::flush_cache_aio(Context *onfinish) { + cache_lock.Lock(); + object_cacher->flush_set(object_set, onfinish); + cache_lock.Unlock(); + } + int ImageCtx::flush_cache() { int r = 0; Mutex mylock("librbd::ImageCtx::flush_cache"); Cond cond; bool done; Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); - cache_lock.Lock(); - bool already_flushed = object_cacher->flush_set(object_set, onfinish); - cache_lock.Unlock(); - if (!already_flushed) { - mylock.Lock(); - while (!done) { - ldout(cct, 20) << "waiting for cache to be flushed" << dendl; - cond.Wait(mylock); - } - mylock.Unlock(); - ldout(cct, 20) << "finished flushing cache" << dendl; + flush_cache_aio(onfinish); + mylock.Lock(); + while (!done) { + ldout(cct, 20) << "waiting for cache to be flushed" << dendl; + cond.Wait(mylock); } + mylock.Unlock(); + ldout(cct, 20) << "finished flushing cache" << dendl; return r; } diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index 550d747524be1..fde24fb3e2475 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -132,6 +132,7 @@ namespace librbd { Context *onfinish); int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off); void user_flushed(); + void flush_cache_aio(Context *onfinish); int flush_cache(); void shutdown_cache(); void invalidate_cache(); diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index fcefdc43161e7..09348171eb685 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -2435,6 +2435,12 @@ reprotect_and_return_err: req->complete(rados_aio_get_return_value(c)); } + void rados_ctx_cb(rados_completion_t c, void *arg) + { + Context *comp = reinterpret_cast(arg); + comp->complete(rados_aio_get_return_value(c)); + } + // validate extent against image size; clip to image size if necessary int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len) { @@ -2463,6 +2469,36 @@ reprotect_and_return_err: return 0; } + int aio_flush(ImageCtx *ictx, AioCompletion *c) + { + CephContext *cct = ictx->cct; + ldout(cct, 20) << "aio_flush " << ictx << " completion " << c << dendl; + + int r = ictx_check(ictx); + if (r < 0) + return r; + + ictx->user_flushed(); + + c->get(); + c->add_request(); + c->init_time(ictx, AIO_TYPE_FLUSH); + C_AioWrite *req_comp = new C_AioWrite(cct, c); + if (ictx->object_cacher) { + ictx->flush_cache_aio(req_comp); + } else { + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb); + ictx->data_ctx.aio_flush_async(rados_completion); + rados_completion->release(); + } + c->finish_adding_requests(cct); + c->put(); + ictx->perfcounter->inc(l_librbd_aio_flush); + + return 0; + } + int flush(ImageCtx *ictx) { CephContext *cct = ictx->cct; diff --git a/src/librbd/internal.h b/src/librbd/internal.h index 7c06bf9e7682f..f1392f690a257 100644 --- a/src/librbd/internal.h +++ b/src/librbd/internal.h @@ -37,6 +37,8 @@ enum { l_librbd_aio_discard, l_librbd_aio_discard_bytes, l_librbd_aio_discard_latency, + l_librbd_aio_flush, + l_librbd_aio_flush_latency, l_librbd_snap_create, l_librbd_snap_remove, @@ -177,6 +179,7 @@ namespace librbd { char *buf, bufferlist *pbl, AioCompletion *c); int aio_read(ImageCtx *ictx, const vector >& image_extents, char *buf, bufferlist *pbl, AioCompletion *c); + int aio_flush(ImageCtx *ictx, AioCompletion *c); int flush(ImageCtx *ictx); int _flush(ImageCtx *ictx); diff --git a/src/librbd/librbd.cc b/src/librbd/librbd.cc index a155abb03764d..02d8fbf321167 100644 --- a/src/librbd/librbd.cc +++ b/src/librbd/librbd.cc @@ -481,6 +481,12 @@ namespace librbd { return librbd::flush(ictx); } + int Image::aio_flush(RBD::AioCompletion *c) + { + ImageCtx *ictx = (ImageCtx *)ctx; + return librbd::aio_flush(ictx, (librbd::AioCompletion *)c->pc); + } + } // namespace librbd extern "C" void rbd_version(int *major, int *minor, int *extra) @@ -1066,6 +1072,13 @@ extern "C" int rbd_flush(rbd_image_t image) return librbd::flush(ictx); } +extern "C" int rbd_aio_flush(rbd_image_t image, rbd_completion_t c) +{ + librbd::ImageCtx *ictx = (librbd::ImageCtx *)image; + librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c; + return librbd::aio_flush(ictx, (librbd::AioCompletion *)comp->pc); +} + extern "C" int rbd_aio_is_complete(rbd_completion_t c) { librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c; diff --git a/src/test/librbd/test_librbd.cc b/src/test/librbd/test_librbd.cc index 9cde09dcdbe79..e38d317485fc2 100644 --- a/src/test/librbd/test_librbd.cc +++ b/src/test/librbd/test_librbd.cc @@ -1391,3 +1391,106 @@ TEST(LibRBD, LockingPP) ioctx.close(); ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados)); } + +TEST(LibRBD, FlushAio) +{ + rados_t cluster; + rados_ioctx_t ioctx; + string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool(pool_name, &cluster)); + rados_ioctx_create(cluster, pool_name.c_str(), &ioctx); + + rbd_image_t image; + int order = 0; + const char *name = "testimg"; + uint64_t size = 2 << 20; + size_t num_aios = 256; + + ASSERT_EQ(0, create_image(ioctx, name, size, &order)); + ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL)); + + char test_data[TEST_IO_SIZE + 1]; + size_t i; + for (i = 0; i < TEST_IO_SIZE; ++i) { + test_data[i] = (char) (rand() % (126 - 33) + 33); + } + + rbd_completion_t write_comps[num_aios]; + for (i = 0; i < num_aios; ++i) { + ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &write_comps[i])); + uint64_t offset = rand() % (size - TEST_IO_SIZE); + ASSERT_EQ(0, rbd_aio_write(image, offset, TEST_IO_SIZE, test_data, + write_comps[i])); + } + + rbd_completion_t flush_comp; + ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &flush_comp)); + ASSERT_EQ(0, rbd_aio_flush(image, flush_comp)); + ASSERT_EQ(0, rbd_aio_wait_for_complete(flush_comp)); + ASSERT_EQ(1, rbd_aio_is_complete(flush_comp)); + rbd_aio_release(flush_comp); + + for (i = 0; i < num_aios; ++i) { + ASSERT_EQ(1, rbd_aio_is_complete(write_comps[i])); + rbd_aio_release(write_comps[i]); + } + + ASSERT_EQ(0, rbd_close(image)); + ASSERT_EQ(0, rbd_remove(ioctx, name)); + rados_ioctx_destroy(ioctx); + ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster)); +} + +TEST(LibRBD, FlushAioPP) +{ + librados::Rados rados; + librados::IoCtx ioctx; + string pool_name = get_temp_pool_name(); + + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + + { + librbd::RBD rbd; + librbd::Image image; + int order = 0; + const char *name = "testimg"; + uint64_t size = 2 << 20; + size_t num_aios = 256; + + ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order)); + ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL)); + + char test_data[TEST_IO_SIZE + 1]; + size_t i; + for (i = 0; i < TEST_IO_SIZE; ++i) { + test_data[i] = (char) (rand() % (126 - 33) + 33); + } + + librbd::RBD::AioCompletion *write_comps[num_aios]; + for (i = 0; i < num_aios; ++i) { + ceph::bufferlist bl; + bl.append(test_data, strlen(test_data)); + write_comps[i] = new librbd::RBD::AioCompletion(NULL, NULL); + uint64_t offset = rand() % (size - TEST_IO_SIZE); + ASSERT_EQ(0, image.aio_write(offset, TEST_IO_SIZE, bl, + write_comps[i])); + } + + librbd::RBD::AioCompletion *flush_comp = + new librbd::RBD::AioCompletion(NULL, NULL); + ASSERT_EQ(0, image.aio_flush(flush_comp)); + ASSERT_EQ(0, flush_comp->wait_for_complete()); + ASSERT_EQ(1, flush_comp->is_complete()); + delete flush_comp; + + for (i = 0; i < num_aios; ++i) { + librbd::RBD::AioCompletion *comp = write_comps[i]; + ASSERT_EQ(1, comp->is_complete()); + delete comp; + } + } + + ioctx.close(); + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados)); +} -- 2.39.5