#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
-#define dout_prefix *_dout << "librbd::Watcher: "
+#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
+ << ": "
namespace librbd {
}
void Watcher::register_watch(Context *on_finish) {
- ldout(m_cct, 10) << this << " registering watcher" << dendl;
+ ldout(m_cct, 10) << dendl;
RWLock::RLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+ m_watch_state = WATCH_STATE_REGISTERING;
+
librados::AioCompletion *aio_comp = create_rados_safe_callback(
new C_RegisterWatch(this, on_finish));
int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
aio_comp->release();
}
-void Watcher::handle_register_watch(int r) {
- ldout(m_cct, 10) << this << " handle register r=" << r << dendl;
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
- if (r < 0) {
- lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r) << dendl;
- m_watch_handle = 0;
- } else if (r >= 0) {
- m_watch_state = WATCH_STATE_REGISTERED;
+void Watcher::handle_register_watch(int r, Context *on_finish) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REGISTERING);
+
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ if (r < 0) {
+ lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
+ << dendl;
+ m_watch_handle = 0;
+ m_watch_state = WATCH_STATE_UNREGISTERED;
+ } else if (r >= 0) {
+ m_watch_state = WATCH_STATE_REGISTERED;
+ }
+ }
+
+ on_finish->complete(r);
+
+ // wake up pending unregister request
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
}
}
void Watcher::unregister_watch(Context *on_finish) {
- ldout(m_cct, 10) << this << " unregistering watcher" << dendl;
+ ldout(m_cct, 10) << dendl;
- RWLock::WLocker watch_locker(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REWATCHING) {
- ldout(m_cct, 10) << this << " delaying unregister until rewatch completed"
- << dendl;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ if (m_watch_state == WATCH_STATE_REGISTERING ||
+ m_watch_state == WATCH_STATE_REWATCHING) {
+ ldout(m_cct, 10) << "delaying unregister until register completed"
+ << dendl;
+
+ assert(m_unregister_watch_ctx == nullptr);
+ m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
+ unregister_watch(on_finish);
+ });
+ return;
+ }
- assert(m_unregister_watch_ctx == nullptr);
- m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
- unregister_watch(on_finish);
- });
- return;
- }
+ if (m_watch_state == WATCH_STATE_REGISTERED ||
+ m_watch_state == WATCH_STATE_ERROR) {
+ m_watch_state = WATCH_STATE_UNREGISTERED;
- if (m_watch_state == WATCH_STATE_REGISTERED ||
- m_watch_state == WATCH_STATE_ERROR) {
- m_watch_state = WATCH_STATE_UNREGISTERED;
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback(
- new C_UnwatchAndFlush(m_ioctx, on_finish));
- int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
- assert(r == 0);
- aio_comp->release();
- } else {
- on_finish->complete(0);
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(
+ new C_UnwatchAndFlush(m_ioctx, on_finish));
+ int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+ return;
+ }
}
+
+ on_finish->complete(0);
}
void Watcher::flush(Context *on_finish) {
}
void Watcher::handle_error(uint64_t handle, int err) {
- lderr(m_cct) << this << " watch failed: " << handle << ", "
- << cpp_strerror(err) << dendl;
+ lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
RWLock::WLocker l(m_watch_lock);
if (m_watch_state == WATCH_STATE_REGISTERED) {
}
void Watcher::rewatch() {
- ldout(m_cct, 10) << this << " re-registering watch" << dendl;
+ ldout(m_cct, 10) << dendl;
RWLock::WLocker l(m_watch_lock);
if (m_watch_state != WATCH_STATE_ERROR) {
}
void Watcher::handle_rewatch(int r) {
- ldout(m_cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+ ldout(m_cct, 10) "r=" << r << dendl;
WatchState next_watch_state = WATCH_STATE_REGISTERED;
if (r < 0) {
RWLock::RLocker locker(m_watch_lock);
return m_watch_state == WATCH_STATE_REGISTERED;
}
+ bool is_unregistered() const {
+ RWLock::RLocker locker(m_watch_lock);
+ return m_watch_state == WATCH_STATE_UNREGISTERED;
+ }
protected:
enum WatchState {
WATCH_STATE_UNREGISTERED,
+ WATCH_STATE_REGISTERING,
WATCH_STATE_REGISTERED,
WATCH_STATE_ERROR,
WATCH_STATE_REWATCHING
* |
* | (register_watch)
* |
+ * REGISTERING
+ * |
* v (watch error)
* REGISTERED * * * * * * * > ERROR
* | ^ |
: watcher(watcher), on_finish(on_finish) {
}
virtual void finish(int r) override {
- watcher->handle_register_watch(r);
- on_finish->complete(r);
+ watcher->handle_register_watch(r, on_finish);
}
};
WatchCtx m_watch_ctx;
Context *m_unregister_watch_ctx = nullptr;
- void handle_register_watch(int r);
+ void handle_register_watch(int r, Context *on_finish);
void rewatch();
void handle_rewatch(int r);