From: Jason Dillaman Date: Mon, 19 Jan 2015 22:33:41 +0000 (-0500) Subject: librbd: throttle async progress callbacks X-Git-Tag: v0.93~193^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4ac3cd729cfb967ed4caf7af0d86e17b90e17724;p=ceph.git librbd: throttle async progress callbacks Ensure that no more than one outstanding progress callback is queued for notification. This will allow remote progress updates to be sent at a rate in which all watch/notify clients can support. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index 13cf69bee74..b1fea4af62f 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -59,58 +59,6 @@ enum { NOTIFY_OP_SNAP_CREATE = 8 }; -class RemoteProgressContext : public ProgressContext { -public: - RemoteProgressContext(ImageWatcher &image_watcher, Finisher &finisher, - const RemoteAsyncRequest &remote_async_request) - : m_image_watcher(image_watcher), m_finisher(finisher), - m_remote_async_request(remote_async_request) - { - } - - virtual int update_progress(uint64_t offset, uint64_t total) { - // TODO: JD throttle notify updates(?) - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_async_progress, - &m_image_watcher, m_remote_async_request, offset, total)); - m_finisher.queue(ctx); - return 0; - } - -private: - ImageWatcher &m_image_watcher; - Finisher &m_finisher; - RemoteAsyncRequest m_remote_async_request; -}; - -class RemoteContext : public Context { -public: - RemoteContext(ImageWatcher &image_watcher, Finisher &finisher, - const RemoteAsyncRequest &remote_async_request, - RemoteProgressContext *prog_ctx) - : m_image_watcher(image_watcher), m_finisher(finisher), - m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx) - { - } - - ~RemoteContext() { - delete m_prog_ctx; - } - - virtual void finish(int r) { - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_async_complete, - &m_image_watcher, m_remote_async_request, r)); - m_finisher.queue(ctx); - } - -private: - ImageWatcher &m_image_watcher; - Finisher &m_finisher; - RemoteAsyncRequest m_remote_async_request; - RemoteProgressContext *m_prog_ctx; -}; - ImageWatcher::ImageWatcher(ImageCtx &image_ctx) : m_image_ctx(image_ctx), m_watch_ctx(*this), m_handle(0), m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED), @@ -443,6 +391,9 @@ int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request, ENCODE_FINISH(bl); m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL); + + RWLock::WLocker l(m_async_request_lock); + m_async_progress.erase(request); return 0; } @@ -757,6 +708,19 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id, return r; } +void ImageWatcher::schedule_update_progress( + const RemoteAsyncRequest &remote_async_request, + uint64_t offset, uint64_t total) { + RWLock::WLocker l(m_async_request_lock); + if (m_async_progress.count(remote_async_request) == 0) { + m_async_progress.insert(remote_async_request); + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::notify_async_progress, + this, remote_async_request, offset, total)); + m_finisher->queue(ctx); + } +} + void ImageWatcher::handle_header_update() { ldout(m_image_ctx.cct, 1) << "image header updated" << dendl; @@ -857,13 +821,11 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) { RemoteAsyncRequest request; ::decode(request, iter); - RemoteProgressContext *prog_ctx = - new RemoteProgressContext(*this, *m_finisher, request); - RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request, - prog_ctx); + RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this, + request); + RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); ldout(m_image_ctx.cct, 20) << "remote flatten request: " << request << dendl; - int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx); if (r < 0) { delete ctx; @@ -884,14 +846,12 @@ void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) { RemoteAsyncRequest request; ::decode(request, iter); - RemoteProgressContext *prog_ctx = - new RemoteProgressContext(*this, *m_finisher, request); - RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request, - prog_ctx); + RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this, + request); + RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); ldout(m_image_ctx.cct, 20) << "remote resize request: " << request << " " << size << dendl; - int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx); if (r < 0) { delete ctx; @@ -1076,4 +1036,11 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) { image_watcher.handle_error(handle, err); } +void ImageWatcher::RemoteContext::finish(int r) { + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::notify_async_complete, + &m_image_watcher, m_remote_async_request, r)); + m_image_watcher.m_finisher->queue(ctx); +} + } diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h index 1ef02f140f8..e2a73a7931d 100644 --- a/src/librbd/ImageWatcher.h +++ b/src/librbd/ImageWatcher.h @@ -7,6 +7,8 @@ #include "common/Mutex.h" #include "common/RWLock.h" #include "include/rados/librados.hpp" +#include "include/rbd/librbd.hpp" +#include #include #include #include @@ -22,7 +24,6 @@ namespace librbd { class AioCompletion; class ImageCtx; - class ProgressContext; struct RemoteAsyncRequest { uint64_t gid; @@ -32,12 +33,21 @@ namespace librbd { RemoteAsyncRequest() : gid(), handle(), request_id() {} RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_) : gid(gid_), handle(handle_), request_id(request_id_) {} + + inline bool operator<(const RemoteAsyncRequest &rhs) const { + if (gid != rhs.gid) { + return gid < rhs.gid; + } else if (handle != rhs.handle) { + return handle < rhs.handle; + } else { + return request_id < request_id; + } + } }; class ImageWatcher { public: - ImageWatcher(ImageCtx& image_ctx); ~ImageWatcher(); @@ -97,6 +107,48 @@ namespace librbd { virtual void handle_error(uint64_t handle, int err); }; + class RemoteProgressContext : public ProgressContext { + public: + RemoteProgressContext(ImageWatcher &image_watcher, + const RemoteAsyncRequest &remote_async_request) + : m_image_watcher(image_watcher), + m_remote_async_request(remote_async_request) + { + } + + virtual int update_progress(uint64_t offset, uint64_t total) { + m_image_watcher.schedule_update_progress( + m_remote_async_request, offset, total); + return 0; + } + + private: + ImageWatcher &m_image_watcher; + RemoteAsyncRequest m_remote_async_request; + }; + + class RemoteContext : public Context { + public: + RemoteContext(ImageWatcher &image_watcher, + const RemoteAsyncRequest &remote_async_request, + RemoteProgressContext *prog_ctx) + : m_image_watcher(image_watcher), + m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx) + { + } + + ~RemoteContext() { + delete m_prog_ctx; + } + + virtual void finish(int r); + + private: + ImageWatcher &m_image_watcher; + RemoteAsyncRequest m_remote_async_request; + RemoteProgressContext *m_prog_ctx; + }; + ImageCtx &m_image_ctx; WatchCtx m_watch_ctx; @@ -115,6 +167,7 @@ namespace librbd { RWLock m_async_request_lock; uint64_t m_async_request_id; std::map m_async_requests; + std::set m_async_progress; Mutex m_aio_request_lock; Cond m_aio_request_cond; @@ -152,6 +205,9 @@ namespace librbd { ProgressContext& prog_ctx); void notify_request_leadership(); + void schedule_update_progress(const RemoteAsyncRequest &remote_async_request, + uint64_t offset, uint64_t total); + void handle_header_update(); void handle_acquired_lock(); void handle_released_lock();