#define LIBRBD_VERSION_CODE LIBRBD_VERSION(LIBRBD_VER_MAJOR, LIBRBD_VER_MINOR, LIBRBD_VER_EXTRA)
#define LIBRBD_SUPPORTS_WATCH 0
+#define LIBRBD_SUPPORTS_AIO_FLUSH 1
typedef void *rbd_snap_t;
typedef void *rbd_image_t;
ssize_t rbd_aio_get_return_value(rbd_completion_t c);
void rbd_aio_release(rbd_completion_t c);
int rbd_flush(rbd_image_t image);
+/**
+ * Start a flush if caching is enabled. Get a callback when
+ * the currently pending writes are on disk.
+ *
+ * @param image the image to flush writes to
+ * @param c what to call when flushing is complete
+ * @returns 0 on success, negative error code on failure
+ */
+int rbd_aio_flush(rbd_image_t image, rbd_completion_t c);
#ifdef __cplusplus
}
int aio_discard(uint64_t off, uint64_t len, RBD::AioCompletion *c);
int flush();
+ /**
+ * Start a flush if caching is enabled. Get a callback when
+ * the currently pending writes are on disk.
+ *
+ * @param image the image to flush writes to
+ * @param c what to call when flushing is complete
+ * @returns 0 on success, negative error code on failure
+ */
+ int aio_flush(RBD::AioCompletion *c);
private:
friend class RBD;
AIO_TYPE_READ = 0,
AIO_TYPE_WRITE,
AIO_TYPE_DISCARD,
+ AIO_TYPE_FLUSH,
AIO_TYPE_NONE,
} aio_type_t;
complete_cb(rbd_comp, complete_arg);
}
switch (aio_type) {
- case AIO_TYPE_READ:
+ case AIO_TYPE_READ:
ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
case AIO_TYPE_WRITE:
ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
case AIO_TYPE_DISCARD:
ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
+ case AIO_TYPE_FLUSH:
+ ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
default:
lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
plb.add_u64_counter(l_librbd_aio_discard, "aio_discard");
plb.add_u64_counter(l_librbd_aio_discard_bytes, "aio_discard_bytes");
plb.add_time_avg(l_librbd_aio_discard_latency, "aio_discard_latency");
+ plb.add_u64_counter(l_librbd_aio_flush, "aio_flush");
+ plb.add_time_avg(l_librbd_aio_flush_latency, "aio_flush_latency");
plb.add_u64_counter(l_librbd_snap_create, "snap_create");
plb.add_u64_counter(l_librbd_snap_remove, "snap_remove");
plb.add_u64_counter(l_librbd_snap_rollback, "snap_rollback");
}
}
+ void ImageCtx::flush_cache_aio(Context *onfinish) {
+ cache_lock.Lock();
+ object_cacher->flush_set(object_set, onfinish);
+ cache_lock.Unlock();
+ }
+
int ImageCtx::flush_cache() {
int r = 0;
Mutex mylock("librbd::ImageCtx::flush_cache");
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
- cache_lock.Lock();
- bool already_flushed = object_cacher->flush_set(object_set, onfinish);
- cache_lock.Unlock();
- if (!already_flushed) {
- mylock.Lock();
- while (!done) {
- ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
- cond.Wait(mylock);
- }
- mylock.Unlock();
- ldout(cct, 20) << "finished flushing cache" << dendl;
+ flush_cache_aio(onfinish);
+ mylock.Lock();
+ while (!done) {
+ ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+ cond.Wait(mylock);
}
+ mylock.Unlock();
+ ldout(cct, 20) << "finished flushing cache" << dendl;
return r;
}
Context *onfinish);
int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off);
void user_flushed();
+ void flush_cache_aio(Context *onfinish);
int flush_cache();
void shutdown_cache();
void invalidate_cache();
req->complete(rados_aio_get_return_value(c));
}
+ void rados_ctx_cb(rados_completion_t c, void *arg)
+ {
+ Context *comp = reinterpret_cast<Context *>(arg);
+ comp->complete(rados_aio_get_return_value(c));
+ }
+
// validate extent against image size; clip to image size if necessary
int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len)
{
return 0;
}
+ int aio_flush(ImageCtx *ictx, AioCompletion *c)
+ {
+ CephContext *cct = ictx->cct;
+ ldout(cct, 20) << "aio_flush " << ictx << " completion " << c << dendl;
+
+ int r = ictx_check(ictx);
+ if (r < 0)
+ return r;
+
+ ictx->user_flushed();
+
+ c->get();
+ c->add_request();
+ c->init_time(ictx, AIO_TYPE_FLUSH);
+ C_AioWrite *req_comp = new C_AioWrite(cct, c);
+ if (ictx->object_cacher) {
+ ictx->flush_cache_aio(req_comp);
+ } else {
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+ ictx->data_ctx.aio_flush_async(rados_completion);
+ rados_completion->release();
+ }
+ c->finish_adding_requests(cct);
+ c->put();
+ ictx->perfcounter->inc(l_librbd_aio_flush);
+
+ return 0;
+ }
+
int flush(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
l_librbd_aio_discard,
l_librbd_aio_discard_bytes,
l_librbd_aio_discard_latency,
+ l_librbd_aio_flush,
+ l_librbd_aio_flush_latency,
l_librbd_snap_create,
l_librbd_snap_remove,
char *buf, bufferlist *pbl, AioCompletion *c);
int aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
char *buf, bufferlist *pbl, AioCompletion *c);
+ int aio_flush(ImageCtx *ictx, AioCompletion *c);
int flush(ImageCtx *ictx);
int _flush(ImageCtx *ictx);
return librbd::flush(ictx);
}
+ int Image::aio_flush(RBD::AioCompletion *c)
+ {
+ ImageCtx *ictx = (ImageCtx *)ctx;
+ return librbd::aio_flush(ictx, (librbd::AioCompletion *)c->pc);
+ }
+
} // namespace librbd
extern "C" void rbd_version(int *major, int *minor, int *extra)
return librbd::flush(ictx);
}
+extern "C" int rbd_aio_flush(rbd_image_t image, rbd_completion_t c)
+{
+ librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
+ librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
+ return librbd::aio_flush(ictx, (librbd::AioCompletion *)comp->pc);
+}
+
extern "C" int rbd_aio_is_complete(rbd_completion_t c)
{
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
}
+
+TEST(LibRBD, FlushAio)
+{
+ rados_t cluster;
+ rados_ioctx_t ioctx;
+ string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool(pool_name, &cluster));
+ rados_ioctx_create(cluster, pool_name.c_str(), &ioctx);
+
+ rbd_image_t image;
+ int order = 0;
+ const char *name = "testimg";
+ uint64_t size = 2 << 20;
+ size_t num_aios = 256;
+
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
+ ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
+
+ char test_data[TEST_IO_SIZE + 1];
+ size_t i;
+ for (i = 0; i < TEST_IO_SIZE; ++i) {
+ test_data[i] = (char) (rand() % (126 - 33) + 33);
+ }
+
+ rbd_completion_t write_comps[num_aios];
+ for (i = 0; i < num_aios; ++i) {
+ ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &write_comps[i]));
+ uint64_t offset = rand() % (size - TEST_IO_SIZE);
+ ASSERT_EQ(0, rbd_aio_write(image, offset, TEST_IO_SIZE, test_data,
+ write_comps[i]));
+ }
+
+ rbd_completion_t flush_comp;
+ ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &flush_comp));
+ ASSERT_EQ(0, rbd_aio_flush(image, flush_comp));
+ ASSERT_EQ(0, rbd_aio_wait_for_complete(flush_comp));
+ ASSERT_EQ(1, rbd_aio_is_complete(flush_comp));
+ rbd_aio_release(flush_comp);
+
+ for (i = 0; i < num_aios; ++i) {
+ ASSERT_EQ(1, rbd_aio_is_complete(write_comps[i]));
+ rbd_aio_release(write_comps[i]);
+ }
+
+ ASSERT_EQ(0, rbd_close(image));
+ ASSERT_EQ(0, rbd_remove(ioctx, name));
+ rados_ioctx_destroy(ioctx);
+ ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
+}
+
+TEST(LibRBD, FlushAioPP)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ {
+ librbd::RBD rbd;
+ librbd::Image image;
+ int order = 0;
+ const char *name = "testimg";
+ uint64_t size = 2 << 20;
+ size_t num_aios = 256;
+
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
+ ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
+
+ char test_data[TEST_IO_SIZE + 1];
+ size_t i;
+ for (i = 0; i < TEST_IO_SIZE; ++i) {
+ test_data[i] = (char) (rand() % (126 - 33) + 33);
+ }
+
+ librbd::RBD::AioCompletion *write_comps[num_aios];
+ for (i = 0; i < num_aios; ++i) {
+ ceph::bufferlist bl;
+ bl.append(test_data, strlen(test_data));
+ write_comps[i] = new librbd::RBD::AioCompletion(NULL, NULL);
+ uint64_t offset = rand() % (size - TEST_IO_SIZE);
+ ASSERT_EQ(0, image.aio_write(offset, TEST_IO_SIZE, bl,
+ write_comps[i]));
+ }
+
+ librbd::RBD::AioCompletion *flush_comp =
+ new librbd::RBD::AioCompletion(NULL, NULL);
+ ASSERT_EQ(0, image.aio_flush(flush_comp));
+ ASSERT_EQ(0, flush_comp->wait_for_complete());
+ ASSERT_EQ(1, flush_comp->is_complete());
+ delete flush_comp;
+
+ for (i = 0; i < num_aios; ++i) {
+ librbd::RBD::AioCompletion *comp = write_comps[i];
+ ASSERT_EQ(1, comp->is_complete());
+ delete comp;
+ }
+ }
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}