From: Jason Dillaman Date: Fri, 31 Jul 2015 02:31:55 +0000 (-0400) Subject: librbd: simplify IO flush handling X-Git-Tag: v9.2.1~1^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f3987367920368cbce865533cf11a1eb207fb9c7;p=ceph.git librbd: simplify IO flush handling Add a new convenience method to ImageCtx for handling flush requests and cleanup flush handling with dealing with the cache. Signed-off-by: Jason Dillaman (based on commit ee7c6f73992d3b09c6b401fbb782b2151f2399c7) --- diff --git a/src/librbd/AsyncOperation.cc b/src/librbd/AsyncOperation.cc index dfb1e61a1009..2402b4879524 100644 --- a/src/librbd/AsyncOperation.cc +++ b/src/librbd/AsyncOperation.cc @@ -3,6 +3,7 @@ #include "librbd/AsyncOperation.h" #include "librbd/ImageCtx.h" #include "common/dout.h" +#include "common/WorkQueue.h" #include "include/assert.h" #define dout_subsys ceph_subsys_rbd diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index 7c186a2d7d20..d04b0970059e 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -49,6 +49,89 @@ public: } }; +struct C_FlushCache : public Context { + ImageCtx *image_ctx; + Context *on_safe; + + C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe) + : image_ctx(_image_ctx), on_safe(_on_safe) { + } + virtual void finish(int r) { + // successful cache flush indicates all IO is now safe + RWLock::RLocker owner_locker(image_ctx->owner_lock); + image_ctx->flush_cache(on_safe); + } +}; + +struct C_InvalidateCache : public Context { + ImageCtx *image_ctx; + bool purge_on_error; + bool reentrant_safe; + Context *on_finish; + + C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error, + bool _reentrant_safe, Context *_on_finish) + : image_ctx(_image_ctx), purge_on_error(_purge_on_error), + reentrant_safe(_reentrant_safe), on_finish(_on_finish) { + } + virtual void finish(int r) { + assert(image_ctx->cache_lock.is_locked()); + CephContext *cct = image_ctx->cct; + + if (r == -EBLACKLISTED) { + lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl; + image_ctx->object_cacher->purge_set(image_ctx->object_set); + } else if (r != 0 && purge_on_error) { + lderr(cct) << "invalidate cache encountered error " + << cpp_strerror(r) << " !Purging cache..." << dendl; + image_ctx->object_cacher->purge_set(image_ctx->object_set); + } else if (r != 0) { + lderr(cct) << "flush_cache returned " << r << dendl; + } + + loff_t unclean = image_ctx->object_cacher->release_set( + image_ctx->object_set); + if (unclean == 0) { + r = 0; + } else { + lderr(cct) << "could not release all objects from cache: " + << unclean << " bytes remain" << dendl; + r = -EBUSY; + } + + if (reentrant_safe) { + on_finish->complete(r); + } else { + image_ctx->op_work_queue->queue(on_finish, r); + } + } + +}; + +struct C_AsyncCallback : public Context { + ImageCtx *image_ctx; + Context *on_finish; + C_AsyncCallback(ImageCtx *image_ctx, Context *on_finish) + : image_ctx(image_ctx), on_finish(on_finish) { + } + virtual void finish(int r) { + image_ctx->op_work_queue->queue(on_finish, r); + } +}; + +void _flush_async_operations(ImageCtx *ictx, Context *on_finish) { + { + Mutex::Locker async_ops_locker(ictx->async_ops_lock); + if (!ictx->async_ops.empty()) { + ldout(ictx->cct, 20) << "flush async operations: " << on_finish << " " + << "count=" << ictx->async_ops.size() << dendl; + ictx->async_ops.front()->add_flush_context(on_finish); + return; + } + } + on_finish->complete(0); +} + } // anonymous namespace const string ImageCtx::METADATA_CONF_PREFIX = "conf_"; @@ -647,30 +730,24 @@ public: } } - void ImageCtx::flush_cache_aio(Context *onfinish) { + int ImageCtx::flush_cache() { + C_SaferCond cond_ctx; + flush_cache(&cond_ctx); + + ldout(cct, 20) << "waiting for cache to be flushed" << dendl; + int r = cond_ctx.wait(); + ldout(cct, 20) << "finished flushing cache" << dendl; + + return r; + } + + void ImageCtx::flush_cache(Context *onfinish) { assert(owner_lock.is_locked()); cache_lock.Lock(); object_cacher->flush_set(object_set, onfinish); cache_lock.Unlock(); } - int ImageCtx::flush_cache() { - int r = 0; - Mutex mylock("librbd::ImageCtx::flush_cache"); - Cond cond; - bool done; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); - flush_cache_aio(onfinish); - mylock.Lock(); - while (!done) { - ldout(cct, 20) << "waiting for cache to be flushed" << dendl; - cond.Wait(mylock); - } - mylock.Unlock(); - ldout(cct, 20) << "finished flushing cache" << dendl; - return r; - } - int ImageCtx::shutdown_cache() { flush_async_operations(); @@ -681,20 +758,19 @@ public: } int ImageCtx::invalidate_cache(bool purge_on_error) { - int result; - C_SaferCond ctx; - invalidate_cache(&ctx); - result = ctx.wait(); - - if (result && purge_on_error) { - cache_lock.Lock(); - if (object_cacher != NULL) { - lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl; - object_cacher->purge_set(object_set); - } - cache_lock.Unlock(); + flush_async_operations(); + if (object_cacher == NULL) { + return 0; } + cache_lock.Lock(); + object_cacher->release_set(object_set); + cache_lock.Unlock(); + + C_SaferCond ctx; + flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx)); + + int result = ctx.wait(); return result; } @@ -708,29 +784,7 @@ public: object_cacher->release_set(object_set); cache_lock.Unlock(); - flush_cache_aio(new FunctionContext(boost::bind( - &ImageCtx::invalidate_cache_completion, this, _1, on_finish))); - } - - void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) { - assert(cache_lock.is_locked()); - if (r == -EBLACKLISTED) { - lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl; - object_cacher->purge_set(object_set); - } else if (r != 0) { - lderr(cct) << "flush_cache returned " << r << dendl; - } - - loff_t unclean = object_cacher->release_set(object_set); - if (unclean == 0) { - r = 0; - } else { - lderr(cct) << "could not release all objects from cache: " - << unclean << " bytes remain" << dendl; - r = -EBUSY; - } - - op_work_queue->queue(on_finish, r); + flush_cache(new C_InvalidateCache(this, false, false, on_finish)); } void ImageCtx::clear_nonexistence_cache() { @@ -777,20 +831,35 @@ public: void ImageCtx::flush_async_operations() { C_SaferCond ctx; - flush_async_operations(&ctx); + _flush_async_operations(this, &ctx); ctx.wait(); } void ImageCtx::flush_async_operations(Context *on_finish) { - Mutex::Locker l(async_ops_lock); - if (async_ops.empty()) { - on_finish->complete(0); - return; + // complete context in clean thread context + _flush_async_operations(this, new C_AsyncCallback(this, on_finish)); + } + + int ImageCtx::flush() { + assert(owner_lock.is_locked()); + + flush_async_operations(); + if (object_cacher != NULL) { + int r = flush_cache(); + if (r < 0) { + return r; + } } + return 0; + } - ldout(cct, 20) << "flush async operations: " << on_finish << " " - << "count=" << async_ops.size() << dendl; - async_ops.front()->add_flush_context(on_finish); + void ImageCtx::flush(Context *on_safe) { + assert(owner_lock.is_locked()); + if (object_cacher != NULL) { + // flush cache after completing all in-flight AIO ops + on_safe = new C_FlushCache(this, on_safe); + } + flush_async_operations(on_safe); } void ImageCtx::cancel_async_requests() { diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index 3c7f170b5843..4e01cba4c361 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -218,12 +218,11 @@ namespace librbd { void write_to_cache(object_t o, const bufferlist& bl, size_t len, uint64_t off, Context *onfinish, int fadvise_flags); void user_flushed(); - void flush_cache_aio(Context *onfinish); int flush_cache(); + void flush_cache(Context *onfinish); int shutdown_cache(); int invalidate_cache(bool purge_on_error=false); void invalidate_cache(Context *on_finish); - void invalidate_cache_completion(int r, Context *on_finish); void clear_nonexistence_cache(); int register_watch(); void unregister_watch(); @@ -233,6 +232,9 @@ namespace librbd { void flush_async_operations(); void flush_async_operations(Context *on_finish); + int flush(); + void flush(Context *on_safe); + void cancel_async_requests(); void apply_metadata_confs(); }; diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index 43644f8854cd..643e3870edba 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -389,7 +389,7 @@ bool ImageWatcher::release_lock() { RWLock::RLocker owner_locker(m_image_ctx.owner_lock); RWLock::WLocker md_locker(m_image_ctx.md_lock); - librbd::_flush(&m_image_ctx); + m_image_ctx.flush(); } m_image_ctx.owner_lock.get_write(); diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 1eaa50f81f3b..c714244ab229 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -655,7 +655,7 @@ int invoke_async_request(ImageCtx *ictx, const std::string& request_type, } RWLock::WLocker md_locker(ictx->md_lock); - r = _flush(ictx); + r = ictx->flush(); if (r < 0) { return r; } @@ -2384,7 +2384,7 @@ reprotect_and_return_err: } // release snap_lock and cache_lock if (new_snap) { - _flush(ictx); + ictx->flush(); } ictx->refresh_lock.Lock(); @@ -2444,7 +2444,6 @@ reprotect_and_return_err: // writes might create new snapshots. Rolling back will replace // the current version, so we have to invalidate that too. RWLock::WLocker md_locker(ictx->md_lock); - ictx->flush_async_operations(); r = ictx->invalidate_cache(); if (r < 0) { return r; @@ -2683,7 +2682,7 @@ reprotect_and_return_err: // get -EROFS for writes RWLock::RLocker owner_locker(ictx->owner_lock); RWLock::WLocker md_locker(ictx->md_lock); - ictx->flush_cache(); + ictx->flush(); } int r = _snap_set(ictx, snap_name); if (r < 0) { @@ -3177,7 +3176,7 @@ reprotect_and_return_err: // ensure previous writes are visible to listsnaps { RWLock::RLocker owner_locker(ictx->owner_lock); - _flush(ictx); + ictx->flush(); } int r = ictx_check(ictx); @@ -3427,19 +3426,9 @@ reprotect_and_return_err: C_AioWrite *flush_ctx = new C_AioWrite(cct, c); c->add_request(); - ictx->flush_async_operations(flush_ctx); + ictx->flush(flush_ctx); c->start_op(ictx, AIO_TYPE_FLUSH); - C_AioWrite *req_comp = new C_AioWrite(cct, c); - c->add_request(); - if (ictx->object_cacher) { - ictx->flush_cache_aio(req_comp); - } else { - librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb); - ictx->data_ctx.aio_flush_async(rados_completion); - rados_completion->release(); - } c->finish_adding_requests(cct); c->put(); ictx->perfcounter->inc(l_librbd_aio_flush); @@ -3458,31 +3447,12 @@ reprotect_and_return_err: ictx->user_flushed(); { RWLock::RLocker owner_locker(ictx->owner_lock); - r = _flush(ictx); + r = ictx->flush(); } ictx->perfcounter->inc(l_librbd_flush); return r; } - int _flush(ImageCtx *ictx) - { - assert(ictx->owner_lock.is_locked()); - CephContext *cct = ictx->cct; - int r; - // flush any outstanding writes - if (ictx->object_cacher) { - r = ictx->flush_cache(); - } else { - r = ictx->data_ctx.aio_flush(); - ictx->flush_async_operations(); - } - - if (r) - lderr(cct) << "_flush " << ictx << " r = " << r << dendl; - - return r; - } - int invalidate_cache(ImageCtx *ictx) { CephContext *cct = ictx->cct; @@ -3493,8 +3463,6 @@ reprotect_and_return_err: return r; } - ictx->flush_async_operations(); - RWLock::RLocker owner_locker(ictx->owner_lock); RWLock::WLocker md_locker(ictx->md_lock); r = ictx->invalidate_cache(); diff --git a/src/librbd/internal.h b/src/librbd/internal.h index 7eaa3c5614fe..220cad8633cc 100644 --- a/src/librbd/internal.h +++ b/src/librbd/internal.h @@ -204,7 +204,6 @@ namespace librbd { char *buf, bufferlist *pbl, AioCompletion *c, int op_flags); void aio_flush(ImageCtx *ictx, AioCompletion *c); int flush(ImageCtx *ictx); - int _flush(ImageCtx *ictx); int invalidate_cache(ImageCtx *ictx); int metadata_list(ImageCtx *ictx, const string &last, uint64_t max, map *pairs); int metadata_get(ImageCtx *ictx, const std::string &key, std::string *value);