From c611936c83465ac1d28112eafc3e7a9afe6c934b Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 24 Feb 2015 23:35:31 -0500 Subject: [PATCH] librbd: replace Finisher/SafeTimer use with facade Replace the two Context threading classes used within ImageWatcher with a facade to orchestrate the scheduling and canceling of Context task callbacks. Signed-off-by: Jason Dillaman --- src/librbd/ImageWatcher.cc | 110 +++++++++----------------- src/librbd/ImageWatcher.h | 47 +++++++++--- src/librbd/Makefile.am | 1 + src/librbd/TaskFinisher.h | 141 ++++++++++++++++++++++++++++++++++ src/librbd/WatchNotifyTypes.h | 3 + 5 files changed, 218 insertions(+), 84 deletions(-) create mode 100644 src/librbd/TaskFinisher.h diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index de30d265b3e8b..de751123681f8 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -4,12 +4,12 @@ #include "librbd/AioCompletion.h" #include "librbd/ImageCtx.h" #include "librbd/ObjectMap.h" +#include "librbd/TaskFinisher.h" #include "cls/lock/cls_lock_client.h" #include "cls/lock/cls_lock_types.h" #include "include/encoding.h" #include "include/stringify.h" #include "common/errno.h" -#include "common/Timer.h" #include #include #include @@ -35,24 +35,16 @@ ImageWatcher::ImageWatcher(ImageCtx &image_ctx) m_watch_ctx(*this), m_watch_handle(0), m_watch_state(WATCH_STATE_UNREGISTERED), m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED), - m_finisher(new Finisher(image_ctx.cct)), - m_timer_lock("librbd::ImageWatcher::m_timer_lock"), - m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)), + m_task_finisher(new TaskFinisher(*m_image_ctx.cct)), m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"), m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"), - m_retry_aio_context(NULL), m_owner_client_id_lock("librbd::ImageWatcher::m_owner_client_id_lock") { - m_finisher->start(); - m_timer->init(); } ImageWatcher::~ImageWatcher() { - { - Mutex::Locker l(m_timer_lock); - m_timer->shutdown(); - } + delete m_task_finisher; { RWLock::RLocker l(m_watch_lock); assert(m_watch_state != WATCH_STATE_REGISTERED); @@ -61,8 +53,6 @@ ImageWatcher::~ImageWatcher() RWLock::RLocker l(m_image_ctx.owner_lock); assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED); } - m_finisher->stop(); - delete m_finisher; } bool ImageWatcher::is_lock_supported() const { @@ -102,7 +92,7 @@ int ImageWatcher::unregister_watch() { } cancel_async_requests(); - m_finisher->wait_for_empty(); + m_task_finisher->cancel_all(); int r = 0; { @@ -209,7 +199,7 @@ int ImageWatcher::request_lock( // run notify request in finisher to avoid blocking aio path FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::notify_request_lock, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx); } return 0; } @@ -325,7 +315,7 @@ int ImageWatcher::lock() { FunctionContext *ctx = new FunctionContext( boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, reinterpret_cast(NULL))); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_ACQUIRED_LOCK, ctx); return 0; } @@ -366,7 +356,7 @@ int ImageWatcher::unlock() FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::notify_released_lock, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx); return 0; } @@ -375,12 +365,19 @@ void ImageWatcher::release_lock() ldout(m_image_ctx.cct, 10) << "releasing exclusive lock by request" << dendl; { RWLock::WLocker l(m_image_ctx.owner_lock); + if (!is_lock_owner()) { + return; + } prepare_unlock(); } m_image_ctx.cancel_async_requests(); RWLock::WLocker l(m_image_ctx.owner_lock); + if (!is_lock_owner()) { + return; + } + { RWLock::WLocker l2(m_image_ctx.md_lock); librbd::_flush(&m_image_ctx); @@ -399,6 +396,14 @@ void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) { encode_lock_cookie(), WATCHER_LOCK_TAG); } +void ImageWatcher::schedule_async_progress(const AsyncRequestId &request, + uint64_t offset, uint64_t total) { + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::notify_async_progress, this, request, offset, + total)); + m_task_finisher->queue(Task(TASK_CODE_ASYNC_PROGRESS, request), ctx); +} + int ImageWatcher::notify_async_progress(const AsyncRequestId &request, uint64_t offset, uint64_t total) { ldout(m_image_ctx.cct, 20) << "remote async request progress: " @@ -409,9 +414,6 @@ int ImageWatcher::notify_async_progress(const AsyncRequestId &request, ::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl); m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL); - - RWLock::WLocker l(m_async_request_lock); - m_async_progress.erase(request); return 0; } @@ -419,7 +421,7 @@ void ImageWatcher::schedule_async_complete(const AsyncRequestId &request, int r) { FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::notify_async_complete, this, request, r)); - m_finisher->queue(ctx); + m_task_finisher->queue(ctx); } int ImageWatcher::notify_async_complete(const AsyncRequestId &request, @@ -510,38 +512,18 @@ bool ImageWatcher::decode_lock_cookie(const std::string &tag, } void ImageWatcher::schedule_retry_aio_requests(bool use_timer) { - Mutex::Locker l(m_timer_lock); + Context *ctx = new FunctionContext(boost::bind( + &ImageWatcher::retry_aio_requests, this)); if (use_timer) { - if (m_retry_aio_context == NULL) { - m_retry_aio_context = new FunctionContext(boost::bind( - &ImageWatcher::finalize_retry_aio_requests, this)); - m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context); - } + m_task_finisher->add_event_after(TASK_CODE_RETRY_AIO_REQUESTS, + RETRY_DELAY_SECONDS, ctx); } else { - m_timer->cancel_event(m_retry_aio_context); - m_retry_aio_context = NULL; - - Context *ctx = new FunctionContext(boost::bind( - &ImageWatcher::retry_aio_requests, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_RETRY_AIO_REQUESTS, ctx); } } -void ImageWatcher::cancel_retry_aio_requests() { - Mutex::Locker l(m_timer_lock); - if (m_retry_aio_context != NULL) { - m_timer->cancel_event(m_retry_aio_context); - m_retry_aio_context = NULL; - } -} - -void ImageWatcher::finalize_retry_aio_requests() { - assert(m_timer_lock.is_locked()); - m_retry_aio_context = NULL; - retry_aio_requests(); -} - void ImageWatcher::retry_aio_requests() { + m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS); std::vector lock_request_restarts; { Mutex::Locker l(m_aio_request_lock); @@ -560,7 +542,7 @@ void ImageWatcher::retry_aio_requests() { void ImageWatcher::schedule_cancel_async_requests() { FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::cancel_async_requests, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_CANCEL_ASYNC_REQUESTS, ctx); } void ImageWatcher::cancel_async_requests() { @@ -587,7 +569,7 @@ void ImageWatcher::notify_released_lock() { void ImageWatcher::notify_request_lock() { ldout(m_image_ctx.cct, 10) << "notify request lock" << dendl; - cancel_retry_aio_requests(); + m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS); m_image_ctx.owner_lock.get_read(); if (try_request_lock()) { @@ -710,18 +692,6 @@ int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id, return r; } -void ImageWatcher::schedule_async_progress(const AsyncRequestId &request, - uint64_t offset, uint64_t total) { - RWLock::WLocker l(m_async_request_lock); - if (m_async_progress.count(request) == 0) { - m_async_progress.insert(request); - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_async_progress, this, request, offset, - total)); - m_finisher->queue(ctx); - } -} - void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload, bufferlist *out) { ldout(m_image_ctx.cct, 10) << "image header updated" << dendl; @@ -791,7 +761,7 @@ void ImageWatcher::handle_payload(const RequestLockPayload &payload, ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl; FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::release_lock, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx); } } @@ -925,7 +895,7 @@ void ImageWatcher::handle_payload(const SnapCreatePayload &payload, // cannot notify within a notificiation FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::finalize_header_update, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_HEADER_UPDATE, ctx); } } } @@ -975,7 +945,7 @@ void ImageWatcher::handle_error(uint64_t handle, int err) { FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::reregister_watch, this)); - m_finisher->queue(ctx); + m_task_finisher->queue(TASK_CODE_REREGISTER_WATCH, ctx); } } @@ -984,13 +954,6 @@ void ImageWatcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, m_image_ctx.md_ctx.notify_ack(m_image_ctx.header_oid, notify_id, handle, out); } -void ImageWatcher::schedule_reregister_watch() { - Mutex::Locker l(m_timer_lock); - Context *ctx = new FunctionContext(boost::bind( - &ImageWatcher::reregister_watch, this)); - m_timer->add_event_after(RETRY_DELAY_SECONDS, ctx); -} - void ImageWatcher::reregister_watch() { ldout(m_image_ctx.cct, 10) << "re-registering image watch" << dendl; @@ -1015,8 +978,9 @@ void ImageWatcher::reregister_watch() { << cpp_strerror(r) << dendl; if (r != -ESHUTDOWN) { FunctionContext *ctx = new FunctionContext(boost::bind( - &ImageWatcher::schedule_reregister_watch, this)); - m_finisher->queue(ctx); + &ImageWatcher::reregister_watch, this)); + m_task_finisher->add_event_after(TASK_CODE_REREGISTER_WATCH, + RETRY_DELAY_SECONDS, ctx); } return; } diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h index 17573d74daac3..d159d59a2af4e 100644 --- a/src/librbd/ImageWatcher.h +++ b/src/librbd/ImageWatcher.h @@ -17,13 +17,12 @@ #include "include/assert.h" class entity_name_t; -class Finisher; -class SafeTimer; namespace librbd { class AioCompletion; class ImageCtx; + template class TaskFinisher; class ImageWatcher { public: @@ -68,10 +67,44 @@ namespace librbd { WATCH_STATE_ERROR }; + enum TaskCode { + TASK_CODE_ACQUIRED_LOCK, + TASK_CODE_REQUEST_LOCK, + TASK_CODE_RELEASING_LOCK, + TASK_CODE_RELEASED_LOCK, + TASK_CODE_RETRY_AIO_REQUESTS, + TASK_CODE_CANCEL_ASYNC_REQUESTS, + TASK_CODE_HEADER_UPDATE, + TASK_CODE_REREGISTER_WATCH, + TASK_CODE_ASYNC_REQUEST, + TASK_CODE_ASYNC_PROGRESS + }; + typedef std::pair AsyncRequest; typedef std::pair, AioCompletion *> AioRequest; + class Task { + public: + Task(TaskCode task_code) : m_task_code(task_code) {} + Task(TaskCode task_code, const WatchNotify::AsyncRequestId &id) + : m_task_code(task_code), m_async_request_id(id) {} + + inline bool operator<(const Task& rhs) const { + if (m_task_code != rhs.m_task_code) { + return m_task_code < rhs.m_task_code; + } else if ((m_task_code == TASK_CODE_ASYNC_REQUEST || + m_task_code == TASK_CODE_ASYNC_PROGRESS) && + m_async_request_id != rhs.m_async_request_id) { + return m_async_request_id < rhs.m_async_request_id; + } + return false; + } + private: + TaskCode m_task_code; + WatchNotify::AsyncRequestId m_async_request_id; + }; + struct WatchCtx : public librados::WatchCtx2 { ImageWatcher &image_watcher; @@ -159,19 +192,14 @@ namespace librbd { LockOwnerState m_lock_owner_state; - Finisher *m_finisher; - - Mutex m_timer_lock; - SafeTimer *m_timer; + TaskFinisher *m_task_finisher; RWLock m_async_request_lock; std::map m_async_requests; std::set m_async_pending; - std::set m_async_progress; Mutex m_aio_request_lock; std::vector m_aio_requests; - Context *m_retry_aio_context; Mutex m_owner_client_id_lock; WatchNotify::ClientId m_owner_client_id; @@ -187,8 +215,6 @@ namespace librbd { void finalize_header_update(); void schedule_retry_aio_requests(bool use_timer); - void cancel_retry_aio_requests(); - void finalize_retry_aio_requests(); void retry_aio_requests(); void schedule_cancel_async_requests(); @@ -239,7 +265,6 @@ namespace librbd { void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out); - void schedule_reregister_watch(); void reregister_watch(); }; diff --git a/src/librbd/Makefile.am b/src/librbd/Makefile.am index ffed797760f91..a4b21d97db131 100644 --- a/src/librbd/Makefile.am +++ b/src/librbd/Makefile.am @@ -61,4 +61,5 @@ noinst_HEADERS += \ librbd/ObjectMap.h \ librbd/parent_types.h \ librbd/SnapInfo.h \ + librbd/TaskFinisher.h \ librbd/WatchNotifyTypes.h diff --git a/src/librbd/TaskFinisher.h b/src/librbd/TaskFinisher.h new file mode 100644 index 0000000000000..14dcd3011fd0c --- /dev/null +++ b/src/librbd/TaskFinisher.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef LIBRBD_TASK_FINISHER_H +#define LIBRBD_TASK_FINISHER_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "common/Finisher.h" +#include "common/Mutex.h" +#include "common/Timer.h" +#include +#include + +class CephContext; +class Context; + +namespace librbd { + +template +class TaskFinisher { +public: + TaskFinisher(CephContext &cct) + : m_cct(cct), m_lock("librbd::TaskFinisher::m_lock"), + m_finisher(new Finisher(&cct)), + m_safe_timer(new SafeTimer(&cct, m_lock, false)) + { + m_finisher->start(); + m_safe_timer->init(); + } + + ~TaskFinisher() { + { + Mutex::Locker l(m_lock); + m_safe_timer->shutdown(); + delete m_safe_timer; + } + + m_finisher->stop(); + delete m_finisher; + } + + void cancel(const Task& task) { + Mutex::Locker l(m_lock); + typename TaskContexts::iterator it = m_task_contexts.find(task); + if (it != m_task_contexts.end()) { + delete it->second.first; + m_task_contexts.erase(it); + } + } + + void cancel_all() { + Mutex::Locker l(m_lock); + for (typename TaskContexts::iterator it = m_task_contexts.begin(); + it != m_task_contexts.end(); ++it) { + delete it->second.first; + } + m_task_contexts.clear(); + } + + bool add_event_after(const Task& task, double seconds, Context *ctx) { + Mutex::Locker l(m_lock); + if (m_task_contexts.count(task) != 0) { + // task already scheduled on finisher or timer + delete ctx; + return false; + } + C_Task *timer_ctx = new C_Task(this, task); + m_task_contexts[task] = std::make_pair(ctx, timer_ctx); + + m_safe_timer->add_event_after(seconds, timer_ctx); + return true; + } + + void queue(Context *ctx) { + m_finisher->queue(ctx); + } + + bool queue(const Task& task, Context *ctx) { + Mutex::Locker l(m_lock); + typename TaskContexts::iterator it = m_task_contexts.find(task); + if (it != m_task_contexts.end()) { + if (it->second.second != NULL) { + assert(m_safe_timer->cancel_event(it->second.second)); + delete it->second.first; + } else { + // task already scheduled on the finisher + delete ctx; + return false; + } + } + m_task_contexts[task] = std::make_pair(ctx, reinterpret_cast(NULL)); + + m_finisher->queue(new C_Task(this, task)); + return true; + } + +private: + class C_Task : public Context { + public: + C_Task(TaskFinisher *task_finisher, const Task& task) + : m_task_finisher(task_finisher), m_task(task) + { + } + protected: + virtual void finish(int r) { + m_task_finisher->complete(m_task); + } + private: + TaskFinisher *m_task_finisher; + Task m_task; + }; + + CephContext &m_cct; + + Mutex m_lock; + Finisher *m_finisher; + SafeTimer *m_safe_timer; + + typedef std::map > TaskContexts; + TaskContexts m_task_contexts; + + void complete(const Task& task) { + Context *ctx = NULL; + { + Mutex::Locker l(m_lock); + typename TaskContexts::iterator it = m_task_contexts.find(task); + if (it != m_task_contexts.end()) { + ctx = it->second.first; + m_task_contexts.erase(it); + } + } + + if (ctx != NULL) { + ctx->complete(0); + } + } +}; + +} // namespace librbd + +#endif // LIBRBD_TASK_FINISHER diff --git a/src/librbd/WatchNotifyTypes.h b/src/librbd/WatchNotifyTypes.h index 3f54a1c802c1b..2b3c34b61633e 100644 --- a/src/librbd/WatchNotifyTypes.h +++ b/src/librbd/WatchNotifyTypes.h @@ -67,6 +67,9 @@ struct AsyncRequestId { return request_id < rhs.request_id; } } + inline bool operator!=(const AsyncRequestId &rhs) const { + return (client_id != rhs.client_id || request_id != rhs.request_id); + } }; enum NotifyOp { -- 2.39.5