#include "librbd/Operations.h"
#include "librbd/TaskFinisher.h"
#include "librbd/Utils.h"
+#include "librbd/image_watcher/Notifier.h"
#include "include/encoding.h"
#include "include/stringify.h"
#include "common/errno.h"
namespace librbd {
+using namespace image_watcher;
using namespace watch_notify;
-static const uint64_t NOTIFY_TIMEOUT = 5000;
static const double RETRY_DELAY_SECONDS = 1.0;
ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
m_watch_ctx(*this), m_watch_handle(0),
m_watch_state(WATCH_STATE_UNREGISTERED),
m_async_request_lock(util::unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)),
- m_owner_client_id_lock(util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this))
+ m_owner_client_id_lock(util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this)),
+ m_notifier(image_ctx)
{
m_image_ctx.cct->lookup_or_create_singleton_object<TaskFinisher<Task> >(
m_task_finisher, "librbd::task_finisher");
return r;
}
+void ImageWatcher::flush(Context *on_finish) {
+ m_notifier.flush(on_finish);
+}
+
void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
uint64_t offset, uint64_t total) {
FunctionContext *ctx = new FunctionContext(
&m_image_ctx);
}
int ret = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl,
- NOTIFY_TIMEOUT, NULL);
+ Notifier::NOTIFY_TIMEOUT, NULL);
if (ret < 0) {
lderr(m_image_ctx.cct) << this << " failed to notify async complete: "
<< cpp_strerror(ret) << dendl;
// case another notification occurs before this one and it requires the lock
bufferlist response_bl;
m_image_ctx.owner_lock.put_read();
- int r = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT,
- &response_bl);
+ int r = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl,
+ Notifier::NOTIFY_TIMEOUT, &response_bl);
m_image_ctx.owner_lock.get_read();
if (r < 0 && r != -ETIMEDOUT) {
#ifndef CEPH_LIBRBD_IMAGE_WATCHER_H
#define CEPH_LIBRBD_IMAGE_WATCHER_H
-#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RWLock.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
+#include "librbd/image_watcher/Notifier.h"
#include "librbd/WatchNotifyTypes.h"
#include <set>
#include <string>
int register_watch();
int unregister_watch();
+ void flush(Context *on_finish);
int notify_flatten(uint64_t request_id, ProgressContext &prog_ctx);
int notify_resize(uint64_t request_id, uint64_t size,
};
enum TaskCode {
- TASK_CODE_ACQUIRED_LOCK,
TASK_CODE_REQUEST_LOCK,
- TASK_CODE_RELEASED_LOCK,
TASK_CODE_CANCEL_ASYNC_REQUESTS,
TASK_CODE_REREGISTER_WATCH,
TASK_CODE_ASYNC_REQUEST,
Mutex m_owner_client_id_lock;
watch_notify::ClientId m_owner_client_id;
+ image_watcher::Notifier m_notifier;
+
void schedule_cancel_async_requests();
void cancel_async_requests();
librbd/image/RefreshParentRequest.cc \
librbd/image/RefreshRequest.cc \
librbd/image/SetSnapRequest.cc \
+ librbd/image_watcher/Notifier.cc \
librbd/journal/Replay.cc \
librbd/object_map/InvalidateRequest.cc \
librbd/object_map/LockRequest.cc \
librbd/image/RefreshParentRequest.h \
librbd/image/RefreshRequest.h \
librbd/image/SetSnapRequest.h \
+ librbd/image_watcher/Notifier.h \
librbd/journal/Replay.h \
librbd/journal/Types.h \
librbd/object_map/InvalidateRequest.h \
return name + " (" + stringify(address) + ")";
}
+librados::AioCompletion *create_rados_ack_callback(Context *on_finish) {
+ return create_rados_ack_callback<Context, &Context::complete>(on_finish);
+}
+
} // namespace util
} // namespace librbd
reinterpret_cast<T*>(arg)->complete(rados_aio_get_return_value(c));
}
+template <typename T, void(T::*MF)(int)>
+void rados_callback(rados_completion_t c, void *arg) {
+ T *obj = reinterpret_cast<T*>(arg);
+ int r = rados_aio_get_return_value(c);
+ (obj->*MF)(r);
+}
+
template <typename T, Context*(T::*MF)(int*), bool destroy>
void rados_state_callback(rados_completion_t c, void *arg) {
T *obj = reinterpret_cast<T*>(arg);
const std::string old_header_name(const std::string &image_name);
std::string unique_lock_name(const std::string &name, void *address);
+librados::AioCompletion *create_rados_ack_callback(Context *on_finish);
+
template <typename T>
librados::AioCompletion *create_rados_ack_callback(T *obj) {
return librados::Rados::aio_create_completion(
obj, &detail::rados_callback<T>, nullptr);
}
+template <typename T, void(T::*MF)(int)>
+librados::AioCompletion *create_rados_ack_callback(T *obj) {
+ return librados::Rados::aio_create_completion(
+ obj, &detail::rados_callback<T, MF>, nullptr);
+}
+
template <typename T, Context*(T::*MF)(int*), bool destroy=true>
librados::AioCompletion *create_rados_ack_callback(T *obj) {
return librados::Rados::aio_create_completion(
namespace librbd {
namespace image {
-
using util::create_async_context_callback;
using util::create_context_callback;
template <typename I>
void CloseRequest<I>::send_close_parent() {
if (m_image_ctx->parent == nullptr) {
- finish();
+ send_flush_image_watcher();
return;
}
if (r < 0) {
lderr(cct) << "error closing parent image: " << cpp_strerror(r) << dendl;
}
+ send_flush_image_watcher();
+}
+
+template <typename I>
+void CloseRequest<I>::send_flush_image_watcher() {
+ if (m_image_ctx->image_watcher == nullptr) {
+ finish();
+ return;
+ }
+
+ m_image_ctx->image_watcher->flush(create_context_callback<
+ CloseRequest<I>, &CloseRequest<I>::handle_flush_image_watcher>(this));
+}
+
+template <typename I>
+void CloseRequest<I>::handle_flush_image_watcher(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 10) << this << " " << __func__ << dendl;
+
+ assert(r == 0);
finish();
}
template <typename I>
void CloseRequest<I>::finish() {
- if (m_image_ctx->image_watcher) {
+ if (m_image_ctx->image_watcher != nullptr) {
m_image_ctx->unregister_watch();
}
* SHUTDOWN_CACHE
* |
* v
- * FLUSH_OP_WORK_QUEUE . . . . .
- * | .
- * v .
- * CLOSE_PARENT . (no parent)
- * | .
- * v .
- * <finish> < . . . . . . . . . .
+ * FLUSH_OP_WORK_QUEUE . . . . .
+ * | .
+ * v .
+ * CLOSE_PARENT . (no parent)
+ * | .
+ * v .
+ * FLUSH_IMAGE_WATCHER < . . . .
+ * |
+ * v
+ * <finish>
*
* @endverbatim
*/
void send_close_parent();
void handle_close_parent(int r);
+ void send_flush_image_watcher();
+ void handle_flush_image_watcher(int r);
+
void finish();
void save_result(int result) {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/image_watcher/Notifier.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::image_watcher::Notifier: "
+
+namespace librbd {
+namespace image_watcher {
+
+const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
+
+Notifier::Notifier(ImageCtx &image_ctx)
+ : m_image_ctx(image_ctx),
+ m_aio_notify_lock(util::unique_lock_name(
+ "librbd::image_watcher::Notifier::m_aio_notify_lock", this)) {
+}
+
+Notifier::~Notifier() {
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ assert(m_pending_aio_notifies == 0);
+}
+
+void Notifier::flush(Context *on_finish) {
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ if (m_pending_aio_notifies == 0) {
+ m_image_ctx.op_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ assert(m_aio_notify_flush == nullptr);
+ m_aio_notify_flush = on_finish;
+}
+
+void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
+ {
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ ++m_pending_aio_notifies;
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
+ << dendl;
+ }
+
+ C_AioNotify *ctx = new C_AioNotify(this, on_finish);
+ librados::AioCompletion *comp = util::create_rados_ack_callback(ctx);
+ int r = m_image_ctx.md_ctx.aio_notify(m_image_ctx.header_oid, comp, bl,
+ NOTIFY_TIMEOUT, nullptr);
+ assert(r == 0);
+ comp->release();
+}
+
+void Notifier::handle_notify(int r, Context *on_finish) {
+ if (on_finish != nullptr) {
+ m_image_ctx.op_work_queue->queue(on_finish, r);
+ }
+
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ assert(m_pending_aio_notifies > 0);
+ --m_pending_aio_notifies;
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
+ << dendl;
+ if (m_pending_aio_notifies == 0 && m_aio_notify_flush != nullptr) {
+ m_image_ctx.op_work_queue->queue(m_aio_notify_flush, 0);
+ }
+}
+
+} // namespace image_watcher
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IMAGE_WATCHER_NOTIFIER_H
+#define CEPH_LIBRBD_IMAGE_WATCHER_NOTIFIER_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "common/Mutex.h"
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace image_watcher {
+
+class Notifier {
+public:
+ static const uint64_t NOTIFY_TIMEOUT;
+
+ Notifier(ImageCtx &image_ctx);
+ ~Notifier();
+
+ void flush(Context *on_finish);
+ void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
+
+private:
+ struct C_AioNotify : public Context {
+ Notifier *notifier;
+ Context *on_finish;
+
+ C_AioNotify(Notifier *notifier, Context *on_finish)
+ : notifier(notifier), on_finish(on_finish) {
+ }
+ virtual void finish(int r) override {
+ notifier->handle_notify(r, on_finish);
+ }
+ };
+
+ ImageCtx &m_image_ctx;
+
+ Mutex m_aio_notify_lock;
+ size_t m_pending_aio_notifies = 0;
+ Context *m_aio_notify_flush = nullptr;
+
+ void handle_notify(int r, Context *on_finish);
+
+};
+
+} // namespace image_watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IMAGE_WATCHER_NOTIFIER_H