Utils.cc
cache/ImageWriteback.cc
cache/PassthroughImageCache.cc
+ Watcher.cc
+ watcher/Types.cc
+ watcher/RewatchRequest.cc
exclusive_lock/AcquireRequest.cc
exclusive_lock/AutomaticPolicy.cc
exclusive_lock/ReacquireRequest.cc
object_map/SnapshotRollbackRequest.cc
object_map/UnlockRequest.cc
object_map/UpdateRequest.cc
- object_watcher/Notifier.cc
+ watcher/Notifier.cc
operation/DisableFeaturesRequest.cc
operation/EnableFeaturesRequest.cc
operation/FlattenRequest.cc
// unregister watch before and register back after rename
on_finish = new C_NotifyUpdate<I>(m_image_ctx, on_finish);
on_finish = new FunctionContext([this, on_finish](int r) {
+ if (m_image_ctx.old_format) {
+ m_image_ctx.image_watcher->set_oid(m_image_ctx.header_oid);
+ }
m_image_ctx.image_watcher->register_watch(on_finish);
});
on_finish = new FunctionContext([this, dest_name, on_finish](int r) {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/Watcher.h"
+#include "librbd/watcher/RewatchRequest.h"
+#include "librbd/Utils.h"
+#include "librbd/TaskFinisher.h"
+#include "include/encoding.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include <boost/bind.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Watcher: "
+
+namespace librbd {
+
+using namespace watcher;
+
+using util::create_context_callback;
+using util::create_rados_safe_callback;
+using std::string;
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+ librados::Rados rados;
+ Context *on_finish;
+ bool flushing = false;
+ int ret_val = 0;
+
+ C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
+ : rados(io_ctx), on_finish(on_finish) {
+ }
+
+ virtual void complete(int r) override {
+ if (ret_val == 0 && r < 0) {
+ ret_val = r;
+ }
+
+ if (!flushing) {
+ flushing = true;
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
+ r = rados.aio_watch_flush(aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+ return;
+ }
+
+ // ensure our reference to the RadosClient is released prior
+ // to completing the callback to avoid racing an explicit
+ // librados shutdown
+ Context *ctx = on_finish;
+ r = ret_val;
+ delete this;
+
+ ctx->complete(r);
+ }
+
+ virtual void finish(int r) override {
+ }
+};
+
+} // anonymous namespace
+
+Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
+ const string& oid)
+ : m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
+ m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
+ m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
+ m_watch_state(WATCH_STATE_UNREGISTERED), m_ioctx(ioctx),
+ m_work_queue(work_queue), m_oid(oid), m_watch_ctx(*this)
+{
+}
+
+Watcher::~Watcher()
+{
+ RWLock::RLocker l(m_watch_lock);
+ assert(m_watch_state != WATCH_STATE_REGISTERED);
+}
+
+void Watcher::register_watch(Context *on_finish) {
+ ldout(m_cct, 10) << this << " registering watcher" << dendl;
+
+ RWLock::RLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(
+ new C_RegisterWatch(this, on_finish));
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+void Watcher::handle_register_watch(int r) {
+ ldout(m_cct, 10) << this << " handle register r=" << r << dendl;
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+ if (r < 0) {
+ lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r) << dendl;
+ m_watch_handle = 0;
+ } else if (r >= 0) {
+ m_watch_state = WATCH_STATE_REGISTERED;
+ }
+}
+
+void Watcher::unregister_watch(Context *on_finish) {
+ ldout(m_cct, 10) << this << " unregistering watcher" << dendl;
+
+ RWLock::WLocker watch_locker(m_watch_lock);
+ if (m_watch_state == WATCH_STATE_REWATCHING) {
+ ldout(m_cct, 10) << this << " delaying unregister until rewatch completed"
+ << dendl;
+
+ assert(m_unregister_watch_ctx == nullptr);
+ m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
+ unregister_watch(on_finish);
+ });
+ return;
+ }
+
+ if (m_watch_state == WATCH_STATE_REGISTERED ||
+ m_watch_state == WATCH_STATE_ERROR) {
+ m_watch_state = WATCH_STATE_UNREGISTERED;
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(
+ new C_UnwatchAndFlush(m_ioctx, on_finish));
+ int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+ }
+}
+
+void Watcher::flush(Context *on_finish) {
+ m_notifier.flush(on_finish);
+}
+
+void Watcher::set_oid(const string& oid) {
+ RWLock::WLocker l(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+
+ m_oid = oid;
+}
+
+void Watcher::handle_error(uint64_t handle, int err) {
+ lderr(m_cct) << this << " watch failed: " << handle << ", "
+ << cpp_strerror(err) << dendl;
+
+ RWLock::WLocker l(m_watch_lock);
+ if (m_watch_state == WATCH_STATE_REGISTERED) {
+ m_watch_state = WATCH_STATE_ERROR;
+
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&Watcher::rewatch, this));
+ m_work_queue->queue(ctx);
+ }
+}
+
+void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &out) {
+ m_ioctx.notify_ack(m_oid, notify_id, handle, out);
+}
+
+void Watcher::rewatch() {
+ ldout(m_cct, 10) << this << " re-registering watch" << dendl;
+
+ RWLock::WLocker l(m_watch_lock);
+ if (m_watch_state != WATCH_STATE_ERROR) {
+ return;
+ }
+ m_watch_state = WATCH_STATE_REWATCHING;
+
+ Context *ctx = create_context_callback<Watcher,
+ &Watcher::handle_rewatch>(this);
+ RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
+ &m_watch_ctx,
+ &m_watch_handle, ctx);
+ req->send();
+}
+
+void Watcher::handle_rewatch(int r) {
+ ldout(m_cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+
+ WatchState next_watch_state = WATCH_STATE_REGISTERED;
+ if (r < 0) {
+ // only EBLACKLISTED or ENOENT can be returned
+ assert(r == -EBLACKLISTED || r == -ENOENT);
+ next_watch_state = WATCH_STATE_UNREGISTERED;
+ }
+
+ Context *unregister_watch_ctx = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REWATCHING);
+ m_watch_state = next_watch_state;
+
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+
+ handle_rewatch_complete(r);
+ m_work_queue->queue(
+ create_context_callback<Watcher,
+ &Watcher::handle_rewatch_complete>(this));
+ }
+
+ // wake up pending unregister request
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ }
+}
+
+void Watcher::send_notify(bufferlist& payload, bufferlist *out_bl,
+ Context *on_finish) {
+ m_notifier.notify(payload, out_bl, on_finish);
+}
+
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
+ uint64_t handle,
+ uint64_t notifier_id,
+ bufferlist& bl) {
+ watcher.handle_notify(notify_id, handle, bl);
+}
+
+void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
+ watcher.handle_error(handle, err);
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_H
+#define CEPH_LIBRBD_WATCHER_H
+
+#include "common/Mutex.h"
+#include "common/RWLock.h"
+#include "include/rados/librados.hpp"
+#include "librbd/watcher/Notifier.h"
+#include "librbd/watcher/Types.h"
+#include <string>
+#include <utility>
+
+class ContextWQ;
+
+namespace librbd {
+
+class Watcher {
+ friend struct watcher::C_NotifyAck;
+
+public:
+ Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
+ const std::string& oid);
+ virtual ~Watcher();
+
+ void register_watch(Context *on_finish);
+ void unregister_watch(Context *on_finish);
+ void flush(Context *on_finish);
+
+ void set_oid(const string& oid);
+
+ uint64_t get_watch_handle() const {
+ RWLock::RLocker watch_locker(m_watch_lock);
+ return m_watch_handle;
+ }
+
+ bool is_registered() const {
+ RWLock::RLocker locker(m_watch_lock);
+ return m_watch_state == WATCH_STATE_REGISTERED;
+ }
+
+protected:
+ enum WatchState {
+ WATCH_STATE_UNREGISTERED,
+ WATCH_STATE_REGISTERED,
+ WATCH_STATE_ERROR,
+ WATCH_STATE_REWATCHING
+ };
+
+ CephContext *m_cct;
+ mutable RWLock m_watch_lock;
+ uint64_t m_watch_handle;
+ watcher::Notifier m_notifier;
+ WatchState m_watch_state;
+
+ void send_notify(bufferlist &payload, bufferlist *out_bl = nullptr,
+ Context *on_finish = nullptr);
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &bl) = 0;
+
+ virtual void handle_error(uint64_t cookie, int err);
+
+ void acknowledge_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &out);
+
+ virtual void handle_rewatch_complete(int r) { }
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * UNREGISTERED
+ * |
+ * | (register_watch)
+ * |
+ * v (watch error)
+ * REGISTERED * * * * * * * > ERROR
+ * | ^ |
+ * | | | (rewatch)
+ * | | v
+ * | | REWATCHING
+ * | | |
+ * | | |
+ * | \---------------------/
+ * |
+ * | (unregister_watch)
+ * |
+ * v
+ * UNREGISTERED
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ struct WatchCtx : public librados::WatchCtx2 {
+ Watcher &watcher;
+
+ WatchCtx(Watcher &parent) : watcher(parent) {}
+
+ virtual void handle_notify(uint64_t notify_id,
+ uint64_t handle,
+ uint64_t notifier_id,
+ bufferlist& bl);
+ virtual void handle_error(uint64_t handle, int err);
+ };
+
+ struct C_RegisterWatch : public Context {
+ Watcher *watcher;
+ Context *on_finish;
+
+ C_RegisterWatch(Watcher *watcher, Context *on_finish)
+ : watcher(watcher), on_finish(on_finish) {
+ }
+ virtual void finish(int r) override {
+ watcher->handle_register_watch(r);
+ on_finish->complete(r);
+ }
+ };
+
+ librados::IoCtx& m_ioctx;
+ ContextWQ *m_work_queue;
+ std::string m_oid;
+
+ WatchCtx m_watch_ctx;
+ Context *m_unregister_watch_ctx = nullptr;
+
+ void handle_register_watch(int r);
+
+ void rewatch();
+ void handle_rewatch(int r);
+
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_H
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "librbd/object_watcher/Notifier.h"
-#include "common/WorkQueue.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/Utils.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::object_watcher::Notifier: "
-
-namespace librbd {
-namespace object_watcher {
-
-const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
-
-Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
- : m_work_queue(work_queue), m_oid(oid),
- m_aio_notify_lock(util::unique_lock_name(
- "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) {
- m_ioctx.dup(ioctx);
- m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
-}
-
-Notifier::~Notifier() {
- Mutex::Locker aio_notify_locker(m_aio_notify_lock);
- assert(m_pending_aio_notifies == 0);
-}
-
-void Notifier::flush(Context *on_finish) {
- Mutex::Locker aio_notify_locker(m_aio_notify_lock);
- if (m_pending_aio_notifies == 0) {
- m_work_queue->queue(on_finish, 0);
- return;
- }
-
- m_aio_notify_flush_ctxs.push_back(on_finish);
-}
-
-void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
- {
- Mutex::Locker aio_notify_locker(m_aio_notify_lock);
- ++m_pending_aio_notifies;
-
- ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
- << dendl;
- }
-
- C_AioNotify *ctx = new C_AioNotify(this, on_finish);
- librados::AioCompletion *comp = util::create_rados_ack_callback(ctx);
- int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
- assert(r == 0);
- comp->release();
-}
-
-void Notifier::handle_notify(int r, Context *on_finish) {
- if (on_finish != nullptr) {
- m_work_queue->queue(on_finish, r);
- }
-
- Mutex::Locker aio_notify_locker(m_aio_notify_lock);
- assert(m_pending_aio_notifies > 0);
- --m_pending_aio_notifies;
-
- ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
- << dendl;
- if (m_pending_aio_notifies == 0) {
- for (auto ctx : m_aio_notify_flush_ctxs) {
- m_work_queue->queue(ctx, 0);
- }
- m_aio_notify_flush_ctxs.clear();
- }
-}
-
-} // namespace object_watcher
-} // namespace librbd
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H
-#define CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H
-
-#include "include/int_types.h"
-#include "include/buffer_fwd.h"
-#include "include/Context.h"
-#include "include/rados/librados.hpp"
-#include "common/Mutex.h"
-#include "common/WorkQueue.h"
-#include <list>
-
-namespace librbd {
-
-namespace object_watcher {
-
-class Notifier {
-public:
- static const uint64_t NOTIFY_TIMEOUT;
-
- Notifier(ContextWQ *work_queue, librados::IoCtx &ioctx,
- const std::string &oid);
- ~Notifier();
-
- void flush(Context *on_finish);
- void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
-
-private:
- typedef std::list<Context*> Contexts;
-
- struct C_AioNotify : public Context {
- Notifier *notifier;
- Context *on_finish;
-
- C_AioNotify(Notifier *notifier, Context *on_finish)
- : notifier(notifier), on_finish(on_finish) {
- }
- virtual void finish(int r) override {
- notifier->handle_notify(r, on_finish);
- }
- };
-
- ContextWQ *m_work_queue;
- librados::IoCtx m_ioctx;
- CephContext *m_cct;
- std::string m_oid;
-
- Mutex m_aio_notify_lock;
- size_t m_pending_aio_notifies = 0;
- Contexts m_aio_notify_flush_ctxs;
-
- void handle_notify(int r, Context *on_finish);
-
-};
-
-} // namespace object_watcher
-} // namespace librbd
-
-#endif // CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/Notifier.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::object_watcher::Notifier: "
+
+namespace librbd {
+namespace watcher {
+
+const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
+
+Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
+ : m_work_queue(work_queue), m_oid(oid),
+ m_aio_notify_lock(util::unique_lock_name(
+ "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+}
+
+Notifier::~Notifier() {
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ assert(m_pending_aio_notifies == 0);
+}
+
+void Notifier::flush(Context *on_finish) {
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ if (m_pending_aio_notifies == 0) {
+ m_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ m_aio_notify_flush_ctxs.push_back(on_finish);
+}
+
+void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
+ {
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ ++m_pending_aio_notifies;
+
+ ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
+ << dendl;
+ }
+
+ C_AioNotify *ctx = new C_AioNotify(this, on_finish);
+ librados::AioCompletion *comp = util::create_rados_ack_callback(ctx);
+ int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
+ assert(r == 0);
+ comp->release();
+}
+
+void Notifier::handle_notify(int r, Context *on_finish) {
+ if (on_finish != nullptr) {
+ m_work_queue->queue(on_finish, r);
+ }
+
+ Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+ assert(m_pending_aio_notifies > 0);
+ --m_pending_aio_notifies;
+
+ ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
+ << dendl;
+ if (m_pending_aio_notifies == 0) {
+ for (auto ctx : m_aio_notify_flush_ctxs) {
+ m_work_queue->queue(ctx, 0);
+ }
+ m_aio_notify_flush_ctxs.clear();
+ }
+}
+
+} // namespace watcher
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_NOTIFIER_H
+#define CEPH_LIBRBD_WATCHER_NOTIFIER_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/WorkQueue.h"
+#include <list>
+
+namespace librbd {
+
+namespace watcher {
+
+class Notifier {
+public:
+ static const uint64_t NOTIFY_TIMEOUT;
+
+ Notifier(ContextWQ *work_queue, librados::IoCtx &ioctx,
+ const std::string &oid);
+ ~Notifier();
+
+ void flush(Context *on_finish);
+ void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
+
+private:
+ typedef std::list<Context*> Contexts;
+
+ struct C_AioNotify : public Context {
+ Notifier *notifier;
+ Context *on_finish;
+
+ C_AioNotify(Notifier *notifier, Context *on_finish)
+ : notifier(notifier), on_finish(on_finish) {
+ }
+ virtual void finish(int r) override {
+ notifier->handle_notify(r, on_finish);
+ }
+ };
+
+ ContextWQ *m_work_queue;
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_oid;
+
+ Mutex m_aio_notify_lock;
+ size_t m_pending_aio_notifies = 0;
+ Contexts m_aio_notify_flush_ctxs;
+
+ void handle_notify(int r, Context *on_finish);
+
+};
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_NOTIFIER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/RewatchRequest.h"
+#include "common/RWLock.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::watcher::RewatchRequest: " \
+ << this << ": " << __func__
+
+namespace librbd {
+
+using util::create_context_callback;
+using util::create_rados_safe_callback;
+
+namespace watcher {
+
+using std::string;
+
+RewatchRequest::RewatchRequest(librados::IoCtx& ioctx, const string& oid,
+ RWLock &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish)
+ : m_ioctx(ioctx), m_oid(oid), m_watch_lock(watch_lock),
+ m_watch_ctx(watch_ctx), m_watch_handle(watch_handle),
+ m_on_finish(on_finish) {
+}
+
+void RewatchRequest::send() {
+ unwatch();
+}
+
+void RewatchRequest::unwatch() {
+ assert(m_watch_lock.is_wlocked());
+ assert(*m_watch_handle != 0);
+
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << dendl;
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback<
+ RewatchRequest, &RewatchRequest::handle_unwatch>(this);
+ int r = m_ioctx.aio_unwatch(*m_watch_handle, aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+
+ *m_watch_handle = 0;
+}
+
+void RewatchRequest::handle_unwatch(int r) {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r == -EBLACKLISTED) {
+ lderr(cct) << "client blacklisted" << dendl;
+ finish(r);
+ return;
+ } else if (r < 0) {
+ lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl;
+ }
+ rewatch();
+}
+
+void RewatchRequest::rewatch() {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << dendl;
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback<
+ RewatchRequest, &RewatchRequest::handle_rewatch>(this);
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_rewatch(int r) {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r == -EBLACKLISTED) {
+ lderr(cct) << "client blacklisted" << dendl;
+ finish(r);
+ return;
+ } else if (r == -ENOENT) {
+ ldout(cct, 5) << "object deleted" << dendl;
+ finish(r);
+ return;
+ } else if (r < 0) {
+ lderr(cct) << "failed to watch object: " << cpp_strerror(r)
+ << dendl;
+ rewatch();
+ return;
+ }
+
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ *m_watch_handle = m_rewatch_handle;
+ }
+
+ finish(0);
+}
+
+void RewatchRequest::finish(int r) {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace watcher
+} // namespace librbd
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
+#define CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+
+struct Context;
+struct RWLock;
+
+namespace librbd {
+
+namespace watcher {
+
+class RewatchRequest {
+public:
+
+ static RewatchRequest *create(librados::IoCtx& ioctx, const std::string& oid,
+ RWLock &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish) {
+ return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle,
+ on_finish);
+ }
+
+ RewatchRequest(librados::IoCtx& ioctx, const std::string& oid,
+ RWLock &watch_lock, librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish);
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * UNWATCH
+ * |
+ * | . . . .
+ * | . . (recoverable error)
+ * v v .
+ * REWATCH . . .
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ librados::IoCtx& m_ioctx;
+ std::string m_oid;
+ RWLock &m_watch_lock;
+ librados::WatchCtx2 *m_watch_ctx;
+ uint64_t *m_watch_handle;
+ Context *m_on_finish;
+
+ uint64_t m_rewatch_handle = 0;
+
+ void unwatch();
+ void handle_unwatch(int r);
+
+ void rewatch();
+ void handle_rewatch(int r);
+
+ void finish(int r);
+};
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/Types.h"
+#include "librbd/Watcher.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Watcher: "
+
+namespace librbd {
+namespace watcher {
+
+C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
+ uint64_t handle)
+ : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
+ handle(handle) {
+ ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", "
+ << "handle=" << handle << dendl;
+}
+
+void C_NotifyAck::finish(int r) {
+ assert(r == 0);
+ ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", "
+ << "handle=" << handle << dendl;
+ watcher->acknowledge_notify(notify_id, handle, out);
+}
+
+} // namespace watcher
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_TYPES_H
+#define CEPH_LIBRBD_WATCHER_TYPES_H
+
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include "include/Context.h"
+
+namespace ceph {
+class Formatter;
+}
+
+namespace librbd {
+
+class Watcher;
+
+namespace watcher {
+
+struct C_NotifyAck : public Context {
+ Watcher *watcher;
+ CephContext *cct;
+ uint64_t notify_id;
+ uint64_t handle;
+ bufferlist out;
+
+ C_NotifyAck(Watcher *watcher, uint64_t notify_id, uint64_t handle);
+ void finish(int r);
+};
+
+template <typename Watcher>
+struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ Watcher *watcher;
+ uint64_t notify_id;
+ uint64_t handle;
+
+ HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_,
+ uint64_t handle_)
+ : watcher(watcher_), notify_id(notify_id_), handle(handle_)
+ {
+ }
+
+ template <typename P>
+ inline void operator()(const P &payload) const {
+ C_NotifyAck *ctx = new C_NotifyAck(watcher, notify_id, handle);
+ if (watcher->handle_payload(payload, ctx)) {
+ ctx->complete(0);
+ }
+ }
+};
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+ template <typename P>
+ inline void operator()(const P &payload) const {
+ ::encode(static_cast<uint32_t>(P::NOTIFY_OP), m_bl);
+ payload.encode(m_bl);
+ }
+
+private:
+ bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
+ : m_version(version), m_iter(iter) {}
+
+ template <typename P>
+ inline void operator()(P &payload) const {
+ payload.decode(m_version, m_iter);
+ }
+
+private:
+ __u8 m_version;
+ bufferlist::iterator &m_iter;
+};
+
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_TYPES_H
operation/test_mock_SnapshotRemoveRequest.cc
operation/test_mock_SnapshotRollbackRequest.cc
operation/test_mock_SnapshotUnprotectRequest.cc
+ watcher/test_mock_RewatchRequest.cc
)
add_executable(unittest_librbd
${unittest_librbd_srcs}
--- /dev/null
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "include/rados/librados.hpp"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "test/librbd/test_support.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "librados/AioCompletionImpl.h"
+#include "librbd/watcher/RewatchRequest.h"
+
+namespace librbd {
+namespace watcher {
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::WithArg;
+
+struct TestMockWatcherRewatchRequest : public TestMockFixture {
+ typedef RewatchRequest MockRewatchRequest;
+
+ TestMockWatcherRewatchRequest()
+ : m_watch_lock("watch_lock") {
+ }
+
+ void expect_aio_watch(MockImageCtx &mock_image_ctx, int r) {
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+ mock_image_ctx.md_ctx));
+
+ EXPECT_CALL(mock_io_ctx, aio_watch(mock_image_ctx.header_oid, _, _, _))
+ .WillOnce(DoAll(WithArg<1>(Invoke([&mock_image_ctx, &mock_io_ctx, r](librados::AioCompletionImpl *c) {
+ c->get();
+ mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
+ mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
+ }), r);
+ })),
+ Return(0)));
+ }
+
+ void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r) {
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+ mock_image_ctx.md_ctx));
+
+ EXPECT_CALL(mock_io_ctx, aio_unwatch(m_watch_handle, _))
+ .WillOnce(DoAll(Invoke([&mock_image_ctx, &mock_io_ctx, r](uint64_t handle,
+ librados::AioCompletionImpl *c) {
+ c->get();
+ mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
+ mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
+ }), r);
+ }),
+ Return(0)));
+ }
+
+ struct WatchCtx : public librados::WatchCtx2 {
+ virtual void handle_notify(uint64_t, uint64_t, uint64_t,
+ ceph::bufferlist&) {
+ assert(false);
+ }
+ virtual void handle_error(uint64_t, int) {
+ assert(false);
+ }
+ };
+
+ RWLock m_watch_lock;
+ WatchCtx m_watch_ctx;
+ uint64_t m_watch_handle = 123;
+};
+
+TEST_F(TestMockWatcherRewatchRequest, Success) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+
+ InSequence seq;
+ expect_aio_unwatch(mock_image_ctx, 0);
+ expect_aio_watch(mock_image_ctx, 0);
+
+ C_SaferCond ctx;
+ MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+ mock_image_ctx.header_oid,
+ m_watch_lock,
+ &m_watch_ctx,
+ &m_watch_handle,
+ &ctx);
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ req->send();
+ }
+ ASSERT_EQ(0, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, UnwatchError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+
+ InSequence seq;
+ expect_aio_unwatch(mock_image_ctx, -EINVAL);
+ expect_aio_watch(mock_image_ctx, 0);
+
+ C_SaferCond ctx;
+ MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+ mock_image_ctx.header_oid,
+ m_watch_lock,
+ &m_watch_ctx,
+ &m_watch_handle,
+ &ctx);
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ req->send();
+ }
+ ASSERT_EQ(0, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, WatchBlacklist) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+
+ InSequence seq;
+ expect_aio_unwatch(mock_image_ctx, 0);
+ expect_aio_watch(mock_image_ctx, -EBLACKLISTED);
+
+ C_SaferCond ctx;
+ MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+ mock_image_ctx.header_oid,
+ m_watch_lock,
+ &m_watch_ctx,
+ &m_watch_handle,
+ &ctx);
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ req->send();
+ }
+ ASSERT_EQ(-EBLACKLISTED, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, WatchDNE) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+
+ InSequence seq;
+ expect_aio_unwatch(mock_image_ctx, 0);
+ expect_aio_watch(mock_image_ctx, -ENOENT);
+
+ C_SaferCond ctx;
+ MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+ mock_image_ctx.header_oid,
+ m_watch_lock,
+ &m_watch_ctx,
+ &m_watch_handle,
+ &ctx);
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ req->send();
+ }
+ ASSERT_EQ(-ENOENT, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, WatchError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+
+ InSequence seq;
+ expect_aio_unwatch(mock_image_ctx, 0);
+ expect_aio_watch(mock_image_ctx, -EINVAL);
+ expect_aio_watch(mock_image_ctx, 0);
+
+ C_SaferCond ctx;
+ MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+ mock_image_ctx.header_oid,
+ m_watch_lock,
+ &m_watch_ctx,
+ &m_watch_handle,
+ &ctx);
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ req->send();
+ }
+ ASSERT_EQ(0, ctx.wait());
+}
+
+} // namespace watcher
+} // namespace librbd