From: Jason Dillaman Date: Fri, 31 Jul 2015 02:31:55 +0000 (-0400) Subject: librbd: simplify IO flush handling X-Git-Tag: v10.0.1~52^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ee7c6f73992d3b09c6b401fbb782b2151f2399c7;p=ceph.git librbd: simplify IO flush handling Add a new convenience method to ImageCtx for handling flush requests and cleanup flush handling with dealing with the cache. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/AioImageRequest.cc b/src/librbd/AioImageRequest.cc index f594f61f9cad..49632b6d68db 100644 --- a/src/librbd/AioImageRequest.cc +++ b/src/librbd/AioImageRequest.cc @@ -432,22 +432,10 @@ void AioImageFlush::send_request() { } } - // TODO race condition between registering op and submitting to cache - // (might not be flushed -- backport needed) - C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp); - m_image_ctx.flush_async_operations(flush_ctx); - - m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH); C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); - if (m_image_ctx.object_cacher != NULL) { - m_image_ctx.flush_cache_aio(req_comp); - } else { - librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb); - m_image_ctx.data_ctx.aio_flush_async(rados_completion); - rados_completion->release(); - } + m_image_ctx.flush(req_comp); + m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH); m_aio_comp->finish_adding_requests(cct); m_aio_comp->put(); diff --git a/src/librbd/AsyncOperation.cc b/src/librbd/AsyncOperation.cc index 3114d549f771..7cfc2d74f7fa 100644 --- a/src/librbd/AsyncOperation.cc +++ b/src/librbd/AsyncOperation.cc @@ -3,6 +3,7 @@ #include "librbd/AsyncOperation.h" #include "librbd/ImageCtx.h" #include "common/dout.h" +#include "common/WorkQueue.h" #include "include/assert.h" #define dout_subsys ceph_subsys_rbd @@ -11,6 +12,29 @@ namespace librbd { +namespace { + +struct C_CompleteFlushes : public Context { + ImageCtx *image_ctx; + std::list flush_contexts; + + C_CompleteFlushes(ImageCtx *image_ctx, std::list &&flush_contexts) + : image_ctx(image_ctx), flush_contexts(std::move(flush_contexts)) { + } + virtual void finish(int r) { + RWLock::RLocker owner_locker(image_ctx->owner_lock); + while (!flush_contexts.empty()) { + Context *flush_ctx = flush_contexts.front(); + flush_contexts.pop_front(); + + ldout(image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl; + flush_ctx->complete(0); + } + } +}; + +} // anonymous namespace + void AsyncOperation::start_op(ImageCtx &image_ctx) { assert(m_image_ctx == NULL); m_image_ctx = &image_ctx; @@ -39,13 +63,9 @@ void AsyncOperation::finish_op() { } } - while (!m_flush_contexts.empty()) { - Context *flush_ctx = m_flush_contexts.front(); - m_flush_contexts.pop_front(); - - ldout(m_image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl; - flush_ctx->complete(0); - } + C_CompleteFlushes *ctx = new C_CompleteFlushes(m_image_ctx, + std::move(m_flush_contexts)); + m_image_ctx->op_work_queue->queue(ctx); } void AsyncOperation::add_flush_context(Context *on_finish) { diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index a7ce73a168b6..8fd13e4f7423 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -53,6 +53,65 @@ public: } }; +struct C_FlushCache : public Context { + ImageCtx *image_ctx; + Context *on_safe; + + C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe) + : image_ctx(_image_ctx), on_safe(_on_safe) { + } + virtual void finish(int r) { + // successful cache flush indicates all IO is now safe + assert(image_ctx->owner_lock.is_locked()); + image_ctx->flush_cache(on_safe); + } +}; + +struct C_InvalidateCache : public Context { + ImageCtx *image_ctx; + bool purge_on_error; + bool reentrant_safe; + Context *on_finish; + + C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error, + bool _reentrant_safe, Context *_on_finish) + : image_ctx(_image_ctx), purge_on_error(_purge_on_error), + reentrant_safe(_reentrant_safe), on_finish(_on_finish) { + } + virtual void finish(int r) { + assert(image_ctx->cache_lock.is_locked()); + CephContext *cct = image_ctx->cct; + + if (r == -EBLACKLISTED) { + lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl; + image_ctx->object_cacher->purge_set(image_ctx->object_set); + } else if (r != 0 && purge_on_error) { + lderr(cct) << "invalidate cache encountered error " + << cpp_strerror(r) << " !Purging cache..." << dendl; + image_ctx->object_cacher->purge_set(image_ctx->object_set); + } else if (r != 0) { + lderr(cct) << "flush_cache returned " << r << dendl; + } + + loff_t unclean = image_ctx->object_cacher->release_set( + image_ctx->object_set); + if (unclean == 0) { + r = 0; + } else { + lderr(cct) << "could not release all objects from cache: " + << unclean << " bytes remain" << dendl; + r = -EBUSY; + } + + if (reentrant_safe) { + on_finish->complete(r); + } else { + image_ctx->op_work_queue->queue(on_finish, r); + } + } + +}; + } // anonymous namespace const string ImageCtx::METADATA_CONF_PREFIX = "conf_"; @@ -663,30 +722,24 @@ public: } } - void ImageCtx::flush_cache_aio(Context *onfinish) { + int ImageCtx::flush_cache() { + C_SaferCond cond_ctx; + flush_cache(&cond_ctx); + + ldout(cct, 20) << "waiting for cache to be flushed" << dendl; + int r = cond_ctx.wait(); + ldout(cct, 20) << "finished flushing cache" << dendl; + + return r; + } + + void ImageCtx::flush_cache(Context *onfinish) { assert(owner_lock.is_locked()); cache_lock.Lock(); object_cacher->flush_set(object_set, onfinish); cache_lock.Unlock(); } - int ImageCtx::flush_cache() { - int r = 0; - Mutex mylock("librbd::ImageCtx::flush_cache"); - Cond cond; - bool done; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); - flush_cache_aio(onfinish); - mylock.Lock(); - while (!done) { - ldout(cct, 20) << "waiting for cache to be flushed" << dendl; - cond.Wait(mylock); - } - mylock.Unlock(); - ldout(cct, 20) << "finished flushing cache" << dendl; - return r; - } - int ImageCtx::shutdown_cache() { flush_async_operations(); @@ -697,21 +750,19 @@ public: } int ImageCtx::invalidate_cache(bool purge_on_error) { - int result; - C_SaferCond ctx; - invalidate_cache(&ctx); - result = ctx.wait(); - ldout(cct, 20) << "finished invalidating cache" << dendl; - - if (result && purge_on_error) { - cache_lock.Lock(); - if (object_cacher != NULL) { - lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl; - object_cacher->purge_set(object_set); - } - cache_lock.Unlock(); + flush_async_operations(); + if (object_cacher == NULL) { + return 0; } + cache_lock.Lock(); + object_cacher->release_set(object_set); + cache_lock.Unlock(); + + C_SaferCond ctx; + flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx)); + + int result = ctx.wait(); return result; } @@ -725,29 +776,7 @@ public: object_cacher->release_set(object_set); cache_lock.Unlock(); - flush_cache_aio(new FunctionContext(boost::bind( - &ImageCtx::invalidate_cache_completion, this, _1, on_finish))); - } - - void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) { - assert(cache_lock.is_locked()); - if (r == -EBLACKLISTED) { - lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl; - object_cacher->purge_set(object_set); - } else if (r != 0) { - lderr(cct) << "flush_cache returned " << r << dendl; - } - - loff_t unclean = object_cacher->release_set(object_set); - if (unclean == 0) { - r = 0; - } else { - lderr(cct) << "could not release all objects from cache: " - << unclean << " bytes remain" << dendl; - r = -EBUSY; - } - - op_work_queue->queue(on_finish, r); + flush_cache(new C_InvalidateCache(this, false, false, on_finish)); } void ImageCtx::clear_nonexistence_cache() { @@ -800,15 +829,31 @@ public: } void ImageCtx::flush_async_operations(Context *on_finish) { - Mutex::Locker l(async_ops_lock); - if (async_ops.empty()) { - on_finish->complete(0); - return; + { + Mutex::Locker l(async_ops_lock); + if (!async_ops.empty()) { + ldout(cct, 20) << "flush async operations: " << on_finish << " " + << "count=" << async_ops.size() << dendl; + async_ops.front()->add_flush_context(on_finish); + return; + } } + on_finish->complete(0); + } - ldout(cct, 20) << "flush async operations: " << on_finish << " " - << "count=" << async_ops.size() << dendl; - async_ops.front()->add_flush_context(on_finish); + int ImageCtx::flush() { + C_SaferCond cond_ctx; + flush(&cond_ctx); + return cond_ctx.wait(); + } + + void ImageCtx::flush(Context *on_safe) { + assert(owner_lock.is_locked()); + if (object_cacher != NULL) { + // flush cache after completing all in-flight AIO ops + on_safe = new C_FlushCache(this, on_safe); + } + flush_async_operations(on_safe); } void ImageCtx::cancel_async_requests() { diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index df3784a770cd..f61929c1a6df 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -229,12 +229,11 @@ namespace librbd { uint64_t off, Context *onfinish, int fadvise_flags, uint64_t journal_tid); void user_flushed(); - void flush_cache_aio(Context *onfinish); int flush_cache(); + void flush_cache(Context *onfinish); int shutdown_cache(); int invalidate_cache(bool purge_on_error=false); void invalidate_cache(Context *on_finish); - void invalidate_cache_completion(int r, Context *on_finish); void clear_nonexistence_cache(); int register_watch(); void unregister_watch(); @@ -244,6 +243,9 @@ namespace librbd { void flush_async_operations(); void flush_async_operations(Context *on_finish); + int flush(); + void flush(Context *on_safe); + void cancel_async_requests(); void apply_metadata_confs(); diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index bedc5a50c653..014ce9407217 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -403,7 +403,7 @@ int ImageWatcher::release_lock() notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING); RWLock::WLocker md_locker(m_image_ctx.md_lock); - r = librbd::_flush(&m_image_ctx); + r = m_image_ctx.flush(); if (r < 0) { lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl; goto err_cancel_unlock; diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index d4e1e6ed8169..25d99f43f318 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -775,7 +775,7 @@ int invoke_async_request(ImageCtx *ictx, const std::string& request_type, } RWLock::WLocker md_locker(ictx->md_lock); - r = _flush(ictx); + r = ictx->flush(); if (r < 0) { return r; } @@ -1843,7 +1843,7 @@ reprotect_and_return_err: RWLock::RLocker owner_locker(ictx->owner_lock); RWLock::WLocker md_locker(ictx->md_lock); - r = _flush(ictx); + r = ictx->flush(); if (r < 0) { return r; } @@ -2779,7 +2779,7 @@ reprotect_and_return_err: } if (new_snap) { - _flush(ictx); + ictx->flush(); } return 0; } @@ -2834,7 +2834,6 @@ reprotect_and_return_err: // writes might create new snapshots. Rolling back will replace // the current version, so we have to invalidate that too. RWLock::WLocker md_locker(ictx->md_lock); - ictx->flush_async_operations(); r = ictx->invalidate_cache(); if (r < 0) { return r; @@ -3079,14 +3078,9 @@ reprotect_and_return_err: } ictx->cancel_async_requests(); - ictx->flush_async_operations(); - - if (ictx->object_cacher) { + { RWLock::RLocker owner_locker(ictx->owner_lock); - r = _flush(ictx); - if (r < 0) { - return r; - } + r = ictx->flush(); } { @@ -3191,18 +3185,14 @@ reprotect_and_return_err: ictx->flush_async_operations(); ictx->readahead.wait_for_pending(); - int flush_r; if (ictx->object_cacher) { - flush_r = ictx->shutdown_cache(); // implicitly flushes - } else { - RWLock::RLocker owner_locker(ictx->owner_lock); - flush_r = _flush(ictx); - } - if (flush_r< 0) { - lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r) - << dendl; - if (r == 0) { - r = flush_r; + int flush_r = ictx->shutdown_cache(); // implicitly flushes + if (flush_r < 0) { + lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r) + << dendl; + if (r == 0) { + r = flush_r; + } } } @@ -3628,7 +3618,7 @@ reprotect_and_return_err: // ensure previous writes are visible to listsnaps { RWLock::RLocker owner_locker(ictx->owner_lock); - _flush(ictx); + ictx->flush(); } int r = ictx_check(ictx); @@ -3699,31 +3689,12 @@ reprotect_and_return_err: ictx->user_flushed(); { RWLock::RLocker owner_locker(ictx->owner_lock); - r = _flush(ictx); + r = ictx->flush(); } ictx->perfcounter->inc(l_librbd_flush); return r; } - int _flush(ImageCtx *ictx) - { - assert(ictx->owner_lock.is_locked()); - CephContext *cct = ictx->cct; - int r; - // flush any outstanding writes - if (ictx->object_cacher) { - r = ictx->flush_cache(); - } else { - r = ictx->data_ctx.aio_flush(); - ictx->flush_async_operations(); - } - - if (r) - lderr(cct) << "_flush " << ictx << " r = " << r << dendl; - - return r; - } - int invalidate_cache(ImageCtx *ictx) { CephContext *cct = ictx->cct; @@ -3734,8 +3705,6 @@ reprotect_and_return_err: return r; } - ictx->flush_async_operations(); - RWLock::RLocker owner_locker(ictx->owner_lock); RWLock::WLocker md_locker(ictx->md_lock); r = ictx->invalidate_cache(); diff --git a/src/librbd/internal.h b/src/librbd/internal.h index e94cc2b0accb..3de90a66fd03 100644 --- a/src/librbd/internal.h +++ b/src/librbd/internal.h @@ -212,7 +212,6 @@ namespace librbd { ProgressContext &prog_ctx); int flush(ImageCtx *ictx); - int _flush(ImageCtx *ictx); int invalidate_cache(ImageCtx *ictx); int metadata_list(ImageCtx *ictx, const string &last, uint64_t max, map *pairs); int metadata_get(ImageCtx *ictx, const std::string &key, std::string *value);