#include "librbd/AioCompletion.h"
#include "librbd/ImageCtx.h"
#include "librbd/ObjectMap.h"
+#include "librbd/TaskFinisher.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/lock/cls_lock_types.h"
#include "include/encoding.h"
#include "include/stringify.h"
#include "common/errno.h"
-#include "common/Timer.h"
#include <sstream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
m_watch_ctx(*this), m_watch_handle(0),
m_watch_state(WATCH_STATE_UNREGISTERED),
m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
- m_finisher(new Finisher(image_ctx.cct)),
- m_timer_lock("librbd::ImageWatcher::m_timer_lock"),
- m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
+ m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
- m_retry_aio_context(NULL),
m_owner_client_id_lock("librbd::ImageWatcher::m_owner_client_id_lock")
{
- m_finisher->start();
- m_timer->init();
}
ImageWatcher::~ImageWatcher()
{
- {
- Mutex::Locker l(m_timer_lock);
- m_timer->shutdown();
- }
+ delete m_task_finisher;
{
RWLock::RLocker l(m_watch_lock);
assert(m_watch_state != WATCH_STATE_REGISTERED);
RWLock::RLocker l(m_image_ctx.owner_lock);
assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
}
- m_finisher->stop();
- delete m_finisher;
}
bool ImageWatcher::is_lock_supported() const {
}
cancel_async_requests();
- m_finisher->wait_for_empty();
+ m_task_finisher->cancel_all();
int r = 0;
{
// run notify request in finisher to avoid blocking aio path
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_request_lock, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx);
}
return 0;
}
FunctionContext *ctx = new FunctionContext(
boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid,
bl, NOTIFY_TIMEOUT, reinterpret_cast<bufferlist *>(NULL)));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_ACQUIRED_LOCK, ctx);
return 0;
}
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_released_lock, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx);
return 0;
}
ldout(m_image_ctx.cct, 10) << "releasing exclusive lock by request" << dendl;
{
RWLock::WLocker l(m_image_ctx.owner_lock);
+ if (!is_lock_owner()) {
+ return;
+ }
prepare_unlock();
}
m_image_ctx.cancel_async_requests();
RWLock::WLocker l(m_image_ctx.owner_lock);
+ if (!is_lock_owner()) {
+ return;
+ }
+
{
RWLock::WLocker l2(m_image_ctx.md_lock);
librbd::_flush(&m_image_ctx);
encode_lock_cookie(), WATCHER_LOCK_TAG);
}
+void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
+ uint64_t offset, uint64_t total) {
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_async_progress, this, request, offset,
+ total));
+ m_task_finisher->queue(Task(TASK_CODE_ASYNC_PROGRESS, request), ctx);
+}
+
int ImageWatcher::notify_async_progress(const AsyncRequestId &request,
uint64_t offset, uint64_t total) {
ldout(m_image_ctx.cct, 20) << "remote async request progress: "
::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl);
m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
-
- RWLock::WLocker l(m_async_request_lock);
- m_async_progress.erase(request);
return 0;
}
int r) {
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_async_complete, this, request, r));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(ctx);
}
int ImageWatcher::notify_async_complete(const AsyncRequestId &request,
}
void ImageWatcher::schedule_retry_aio_requests(bool use_timer) {
- Mutex::Locker l(m_timer_lock);
+ Context *ctx = new FunctionContext(boost::bind(
+ &ImageWatcher::retry_aio_requests, this));
if (use_timer) {
- if (m_retry_aio_context == NULL) {
- m_retry_aio_context = new FunctionContext(boost::bind(
- &ImageWatcher::finalize_retry_aio_requests, this));
- m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
- }
+ m_task_finisher->add_event_after(TASK_CODE_RETRY_AIO_REQUESTS,
+ RETRY_DELAY_SECONDS, ctx);
} else {
- m_timer->cancel_event(m_retry_aio_context);
- m_retry_aio_context = NULL;
-
- Context *ctx = new FunctionContext(boost::bind(
- &ImageWatcher::retry_aio_requests, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_RETRY_AIO_REQUESTS, ctx);
}
}
-void ImageWatcher::cancel_retry_aio_requests() {
- Mutex::Locker l(m_timer_lock);
- if (m_retry_aio_context != NULL) {
- m_timer->cancel_event(m_retry_aio_context);
- m_retry_aio_context = NULL;
- }
-}
-
-void ImageWatcher::finalize_retry_aio_requests() {
- assert(m_timer_lock.is_locked());
- m_retry_aio_context = NULL;
- retry_aio_requests();
-}
-
void ImageWatcher::retry_aio_requests() {
+ m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
std::vector<AioRequest> lock_request_restarts;
{
Mutex::Locker l(m_aio_request_lock);
void ImageWatcher::schedule_cancel_async_requests() {
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::cancel_async_requests, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_CANCEL_ASYNC_REQUESTS, ctx);
}
void ImageWatcher::cancel_async_requests() {
void ImageWatcher::notify_request_lock() {
ldout(m_image_ctx.cct, 10) << "notify request lock" << dendl;
- cancel_retry_aio_requests();
+ m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
m_image_ctx.owner_lock.get_read();
if (try_request_lock()) {
return r;
}
-void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
- uint64_t offset, uint64_t total) {
- RWLock::WLocker l(m_async_request_lock);
- if (m_async_progress.count(request) == 0) {
- m_async_progress.insert(request);
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_async_progress, this, request, offset,
- total));
- m_finisher->queue(ctx);
- }
-}
-
void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "image header updated" << dendl;
ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl;
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::release_lock, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
}
}
// cannot notify within a notificiation
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::finalize_header_update, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_HEADER_UPDATE, ctx);
}
}
}
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::reregister_watch, this));
- m_finisher->queue(ctx);
+ m_task_finisher->queue(TASK_CODE_REREGISTER_WATCH, ctx);
}
}
m_image_ctx.md_ctx.notify_ack(m_image_ctx.header_oid, notify_id, handle, out);
}
-void ImageWatcher::schedule_reregister_watch() {
- Mutex::Locker l(m_timer_lock);
- Context *ctx = new FunctionContext(boost::bind(
- &ImageWatcher::reregister_watch, this));
- m_timer->add_event_after(RETRY_DELAY_SECONDS, ctx);
-}
-
void ImageWatcher::reregister_watch() {
ldout(m_image_ctx.cct, 10) << "re-registering image watch" << dendl;
<< cpp_strerror(r) << dendl;
if (r != -ESHUTDOWN) {
FunctionContext *ctx = new FunctionContext(boost::bind(
- &ImageWatcher::schedule_reregister_watch, this));
- m_finisher->queue(ctx);
+ &ImageWatcher::reregister_watch, this));
+ m_task_finisher->add_event_after(TASK_CODE_REREGISTER_WATCH,
+ RETRY_DELAY_SECONDS, ctx);
}
return;
}
#include "include/assert.h"
class entity_name_t;
-class Finisher;
-class SafeTimer;
namespace librbd {
class AioCompletion;
class ImageCtx;
+ template <typename T> class TaskFinisher;
class ImageWatcher {
public:
WATCH_STATE_ERROR
};
+ enum TaskCode {
+ TASK_CODE_ACQUIRED_LOCK,
+ TASK_CODE_REQUEST_LOCK,
+ TASK_CODE_RELEASING_LOCK,
+ TASK_CODE_RELEASED_LOCK,
+ TASK_CODE_RETRY_AIO_REQUESTS,
+ TASK_CODE_CANCEL_ASYNC_REQUESTS,
+ TASK_CODE_HEADER_UPDATE,
+ TASK_CODE_REREGISTER_WATCH,
+ TASK_CODE_ASYNC_REQUEST,
+ TASK_CODE_ASYNC_PROGRESS
+ };
+
typedef std::pair<Context *, ProgressContext *> AsyncRequest;
typedef std::pair<boost::function<int(AioCompletion *)>,
AioCompletion *> AioRequest;
+ class Task {
+ public:
+ Task(TaskCode task_code) : m_task_code(task_code) {}
+ Task(TaskCode task_code, const WatchNotify::AsyncRequestId &id)
+ : m_task_code(task_code), m_async_request_id(id) {}
+
+ inline bool operator<(const Task& rhs) const {
+ if (m_task_code != rhs.m_task_code) {
+ return m_task_code < rhs.m_task_code;
+ } else if ((m_task_code == TASK_CODE_ASYNC_REQUEST ||
+ m_task_code == TASK_CODE_ASYNC_PROGRESS) &&
+ m_async_request_id != rhs.m_async_request_id) {
+ return m_async_request_id < rhs.m_async_request_id;
+ }
+ return false;
+ }
+ private:
+ TaskCode m_task_code;
+ WatchNotify::AsyncRequestId m_async_request_id;
+ };
+
struct WatchCtx : public librados::WatchCtx2 {
ImageWatcher &image_watcher;
LockOwnerState m_lock_owner_state;
- Finisher *m_finisher;
-
- Mutex m_timer_lock;
- SafeTimer *m_timer;
+ TaskFinisher<Task> *m_task_finisher;
RWLock m_async_request_lock;
std::map<WatchNotify::AsyncRequestId, AsyncRequest> m_async_requests;
std::set<WatchNotify::AsyncRequestId> m_async_pending;
- std::set<WatchNotify::AsyncRequestId> m_async_progress;
Mutex m_aio_request_lock;
std::vector<AioRequest> m_aio_requests;
- Context *m_retry_aio_context;
Mutex m_owner_client_id_lock;
WatchNotify::ClientId m_owner_client_id;
void finalize_header_update();
void schedule_retry_aio_requests(bool use_timer);
- void cancel_retry_aio_requests();
- void finalize_retry_aio_requests();
void retry_aio_requests();
void schedule_cancel_async_requests();
void acknowledge_notify(uint64_t notify_id, uint64_t handle,
bufferlist &out);
- void schedule_reregister_watch();
void reregister_watch();
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LIBRBD_TASK_FINISHER_H
+#define LIBRBD_TASK_FINISHER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "common/Finisher.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include <map>
+#include <utility>
+
+class CephContext;
+class Context;
+
+namespace librbd {
+
+template <typename Task>
+class TaskFinisher {
+public:
+ TaskFinisher(CephContext &cct)
+ : m_cct(cct), m_lock("librbd::TaskFinisher::m_lock"),
+ m_finisher(new Finisher(&cct)),
+ m_safe_timer(new SafeTimer(&cct, m_lock, false))
+ {
+ m_finisher->start();
+ m_safe_timer->init();
+ }
+
+ ~TaskFinisher() {
+ {
+ Mutex::Locker l(m_lock);
+ m_safe_timer->shutdown();
+ delete m_safe_timer;
+ }
+
+ m_finisher->stop();
+ delete m_finisher;
+ }
+
+ void cancel(const Task& task) {
+ Mutex::Locker l(m_lock);
+ typename TaskContexts::iterator it = m_task_contexts.find(task);
+ if (it != m_task_contexts.end()) {
+ delete it->second.first;
+ m_task_contexts.erase(it);
+ }
+ }
+
+ void cancel_all() {
+ Mutex::Locker l(m_lock);
+ for (typename TaskContexts::iterator it = m_task_contexts.begin();
+ it != m_task_contexts.end(); ++it) {
+ delete it->second.first;
+ }
+ m_task_contexts.clear();
+ }
+
+ bool add_event_after(const Task& task, double seconds, Context *ctx) {
+ Mutex::Locker l(m_lock);
+ if (m_task_contexts.count(task) != 0) {
+ // task already scheduled on finisher or timer
+ delete ctx;
+ return false;
+ }
+ C_Task *timer_ctx = new C_Task(this, task);
+ m_task_contexts[task] = std::make_pair(ctx, timer_ctx);
+
+ m_safe_timer->add_event_after(seconds, timer_ctx);
+ return true;
+ }
+
+ void queue(Context *ctx) {
+ m_finisher->queue(ctx);
+ }
+
+ bool queue(const Task& task, Context *ctx) {
+ Mutex::Locker l(m_lock);
+ typename TaskContexts::iterator it = m_task_contexts.find(task);
+ if (it != m_task_contexts.end()) {
+ if (it->second.second != NULL) {
+ assert(m_safe_timer->cancel_event(it->second.second));
+ delete it->second.first;
+ } else {
+ // task already scheduled on the finisher
+ delete ctx;
+ return false;
+ }
+ }
+ m_task_contexts[task] = std::make_pair(ctx, reinterpret_cast<Context *>(NULL));
+
+ m_finisher->queue(new C_Task(this, task));
+ return true;
+ }
+
+private:
+ class C_Task : public Context {
+ public:
+ C_Task(TaskFinisher *task_finisher, const Task& task)
+ : m_task_finisher(task_finisher), m_task(task)
+ {
+ }
+ protected:
+ virtual void finish(int r) {
+ m_task_finisher->complete(m_task);
+ }
+ private:
+ TaskFinisher *m_task_finisher;
+ Task m_task;
+ };
+
+ CephContext &m_cct;
+
+ Mutex m_lock;
+ Finisher *m_finisher;
+ SafeTimer *m_safe_timer;
+
+ typedef std::map<Task, std::pair<Context *, Context *> > TaskContexts;
+ TaskContexts m_task_contexts;
+
+ void complete(const Task& task) {
+ Context *ctx = NULL;
+ {
+ Mutex::Locker l(m_lock);
+ typename TaskContexts::iterator it = m_task_contexts.find(task);
+ if (it != m_task_contexts.end()) {
+ ctx = it->second.first;
+ m_task_contexts.erase(it);
+ }
+ }
+
+ if (ctx != NULL) {
+ ctx->complete(0);
+ }
+ }
+};
+
+} // namespace librbd
+
+#endif // LIBRBD_TASK_FINISHER