]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: minor cleanup of ImageWatcher messages
authorJason Dillaman <dillaman@redhat.com>
Thu, 19 Feb 2015 02:56:34 +0000 (21:56 -0500)
committerJason Dillaman <dillaman@redhat.com>
Mon, 23 Feb 2015 16:07:52 +0000 (11:07 -0500)
Moved all RPC messages to their own classes to facilitate cleaner
version control and backward compatibility.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h
src/librbd/Makefile.am
src/librbd/WatchNotifyTypes.cc [new file with mode: 0644]
src/librbd/WatchNotifyTypes.h [new file with mode: 0644]

index 6e6fa4d76afefb77664fb50a45911e8d281af2c6..5b31c29e942405c808d6016be787a03dd2d0b665 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::ImageWatcher: "
 
-static void decode(librbd::RemoteAsyncRequest &request,
-                  bufferlist::iterator &iter) {
-  ::decode(request.gid, iter);
-  ::decode(request.handle, iter);
-  ::decode(request.request_id, iter);
-}
-
-static void encode(const librbd::RemoteAsyncRequest &request, bufferlist &bl) {
-  ::encode(request.gid, bl);
-  ::encode(request.handle, bl);
-  ::encode(request.request_id, bl);
-}
-
-static std::ostream &operator<<(std::ostream &out,
-                               const librbd::RemoteAsyncRequest &request) {
-  out << "[" << request.gid << "," << request.handle << ","
-      << request.request_id << "]";
-  return out;
-}
-
 namespace librbd {
 
+using namespace WatchNotify;
+
 static const std::string WATCHER_LOCK_TAG = "internal";
 static const std::string WATCHER_LOCK_COOKIE_PREFIX = "auto";
 
 static const uint64_t  NOTIFY_TIMEOUT = 5000;
-static const uint8_t   NOTIFY_VERSION = 1;
 static const double    RETRY_DELAY_SECONDS = 1.0;
 
-enum {
-  NOTIFY_OP_ACQUIRED_LOCK  = 0,
-  NOTIFY_OP_RELEASED_LOCK  = 1,
-  NOTIFY_OP_REQUEST_LOCK   = 2,
-  NOTIFY_OP_HEADER_UPDATE  = 3,
-  NOTIFY_OP_ASYNC_PROGRESS = 4,
-  NOTIFY_OP_ASYNC_COMPLETE = 5,
-  NOTIFY_OP_FLATTEN       = 6,
-  NOTIFY_OP_RESIZE        = 7,
-  NOTIFY_OP_SNAP_CREATE    = 8
-};
-
 ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
   : m_image_ctx(image_ctx),
     m_watch_lock("librbd::ImageWatcher::m_watch_lock"),
@@ -87,6 +56,10 @@ ImageWatcher::~ImageWatcher()
     RWLock::RLocker l(m_watch_lock);
     assert(m_watch_state != WATCH_STATE_REGISTERED);
   }
+  {
+    RWLock::RLocker l(m_image_ctx.owner_lock);
+    assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+  }
   m_finisher->stop();
   delete m_finisher;
 }
@@ -99,7 +72,8 @@ bool ImageWatcher::is_lock_supported() const {
 
 bool ImageWatcher::is_lock_owner() const {
   assert(m_image_ctx.owner_lock.is_locked());
-  return m_lock_owner_state == LOCK_OWNER_STATE_LOCKED;
+  return (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED ||
+         m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
 }
 
 int ImageWatcher::register_watch() {
@@ -269,21 +243,6 @@ bool ImageWatcher::try_request_lock() {
   return is_lock_owner();
 }
 
-void ImageWatcher::finalize_request_lock() {
-  cancel_retry_aio_requests();
-
-  bool owned_lock;
-  {
-    RWLock::RLocker l(m_image_ctx.owner_lock);
-    owned_lock = try_request_lock();
-  }
-  if (owned_lock) {
-    retry_aio_requests();
-  } else {
-    schedule_retry_aio_requests(true);
-  }
-}
-
 int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie,
                                      std::string *address, uint64_t *handle) {
   std::map<rados::cls::lock::locker_id_t,
@@ -352,9 +311,7 @@ int ImageWatcher::lock() {
   }
 
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_ACQUIRED_LOCK, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(AcquiredLockPayload()), bl);
 
   // send the notification when we aren't holding locks
   FunctionContext *ctx = new FunctionContext(
@@ -412,19 +369,14 @@ void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
                                   encode_lock_cookie(), WATCHER_LOCK_TAG);
 }
 
-int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
+int ImageWatcher::notify_async_progress(const AsyncRequestId &request,
                                        uint64_t offset, uint64_t total) {
   ldout(m_image_ctx.cct, 20) << "remote async request progress: "
                             << request << " @ " << offset
                             << "/" << total << dendl;
 
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_ASYNC_PROGRESS, bl);
-  ::encode(request, bl);
-  ::encode(offset, bl);
-  ::encode(total, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl);
 
   m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
 
@@ -433,24 +385,20 @@ int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
   return 0;
 }
 
-void ImageWatcher::schedule_async_complete(const RemoteAsyncRequest &request,
+void ImageWatcher::schedule_async_complete(const AsyncRequestId &request,
                                           int r) {
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&ImageWatcher::notify_async_complete, this, request, r));
   m_finisher->queue(ctx);
 }
 
-int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request,
+int ImageWatcher::notify_async_complete(const AsyncRequestId &request,
                                        int r) {
   ldout(m_image_ctx.cct, 20) << "remote async request finished: "
                             << request << " = " << r << dendl;
 
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_ASYNC_COMPLETE, bl);
-  ::encode(request, bl);
-  ::encode(r, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(AsyncCompletePayload(request, r)), bl);
 
   librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid,
                        &m_image_ctx);
@@ -473,13 +421,12 @@ int ImageWatcher::notify_flatten(uint64_t request_id, ProgressContext &prog_ctx)
   assert(m_image_ctx.owner_lock.is_locked());
   assert(!is_lock_owner());
 
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_FLATTEN, bl);
-  encode_async_request(request_id, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(FlattenPayload(async_request_id)), bl);
 
-  return notify_async_request(request_id, bl, prog_ctx);
+  return notify_async_request(async_request_id, bl, prog_ctx);
 }
 
 int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size,
@@ -487,14 +434,12 @@ int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size,
   assert(m_image_ctx.owner_lock.is_locked());
   assert(!is_lock_owner());
 
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_RESIZE, bl);
-  ::encode(size, bl);
-  encode_async_request(request_id, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(ResizePayload(size, async_request_id)), bl);
 
-  return notify_async_request(request_id, bl, prog_ctx);
+  return notify_async_request(async_request_id, bl, prog_ctx);
 }
 
 int ImageWatcher::notify_snap_create(const std::string &snap_name) {
@@ -502,10 +447,7 @@ int ImageWatcher::notify_snap_create(const std::string &snap_name) {
   assert(!is_lock_owner());
 
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_SNAP_CREATE, bl);
-  ::encode(snap_name, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(SnapCreatePayload(snap_name)), bl);
 
   bufferlist response;
   int r = notify_lock_owner(bl, response);
@@ -520,9 +462,7 @@ void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx,
 {
   // supports legacy (empty buffer) clients
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_HEADER_UPDATE, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(HeaderUpdatePayload()), bl);
 
   io_ctx.notify2(oid, bl, NOTIFY_TIMEOUT, NULL);
 }
@@ -600,28 +540,28 @@ void ImageWatcher::schedule_cancel_async_requests() {
 
 void ImageWatcher::cancel_async_requests() {
   RWLock::WLocker l(m_async_request_lock);
-  for (std::map<uint64_t, AsyncRequest>::iterator iter = m_async_requests.begin();
+  for (std::map<AsyncRequestId, AsyncRequest>::iterator iter =
+        m_async_requests.begin();
        iter != m_async_requests.end(); ++iter) {
     iter->second.first->complete(-ERESTART);
   }
   m_async_requests.clear();
 }
 
-void ImageWatcher::encode_async_request(uint64_t request_id, bufferlist &bl) {
-  RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(),
-                            reinterpret_cast<uint64_t>(this), request_id);
-  ::encode(request, bl);
-
-  ldout(m_image_ctx.cct, 10) << "async request: " << request << dendl;
+ClientId ImageWatcher::get_client_id() {
+  RWLock::RLocker l(m_watch_lock);
+  return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
 }
 
 int ImageWatcher::decode_response_code(bufferlist &bl) {
   int r;
   try {
     bufferlist::iterator iter = bl.begin();
-    DECODE_START(NOTIFY_VERSION, iter);
-    ::decode(r, iter);
-    DECODE_FINISH(iter);
+
+    ResponseMessage response_message;
+    ::decode(response_message, iter);
+
+    r = response_message.result;
   } catch (const buffer::error &err) {
     r = -EINVAL;
   }
@@ -631,9 +571,7 @@ int ImageWatcher::decode_response_code(bufferlist &bl) {
 void ImageWatcher::notify_released_lock() {
   ldout(m_image_ctx.cct, 10) << "notify released lock" << dendl;
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_RELEASED_LOCK, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(ReleasedLockPayload()), bl);
   m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
 }
 
@@ -649,9 +587,7 @@ void ImageWatcher::notify_request_lock() {
   }
 
   bufferlist bl;
-  ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
-  ::encode(NOTIFY_OP_REQUEST_LOCK, bl);
-  ENCODE_FINISH(bl);
+  ::encode(NotifyMessage(RequestLockPayload()), bl);
 
   bufferlist response;
   int r = notify_lock_owner(bl, response);
@@ -714,11 +650,12 @@ int ImageWatcher::notify_lock_owner(bufferlist &bl, bufferlist& response) {
   return 0;
 }
 
-int ImageWatcher::notify_async_request(uint64_t async_request_id,
+int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id,
                                       bufferlist &in,
                                       ProgressContext& prog_ctx) {
   assert(m_image_ctx.owner_lock.is_locked());
 
+  ldout(m_image_ctx.cct, 10) << "async request: " << async_request_id << dendl;
   Mutex my_lock("librbd::ImageWatcher::notify_async_request::my_lock");
   Cond cond;
   bool done = false;
@@ -753,7 +690,7 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id,
   return r;
 }
 
-void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request,
+void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
                                           uint64_t offset, uint64_t total) {
   RWLock::WLocker l(m_async_request_lock);
   if (m_async_progress.count(request) == 0) {
@@ -765,7 +702,8 @@ void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request,
   }
 }
 
-void ImageWatcher::handle_header_update() {
+void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
+                                 bufferlist *out) {
   ldout(m_image_ctx.cct, 10) << "image header updated" << dendl;
 
   Mutex::Locker lictx(m_image_ctx.refresh_lock);
@@ -773,7 +711,8 @@ void ImageWatcher::handle_header_update() {
   m_image_ctx.perfcounter->inc(l_librbd_notify);
 }
 
-void ImageWatcher::handle_acquired_lock() {
+void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
+                                  bufferlist *out) {
   ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl;
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
@@ -781,7 +720,8 @@ void ImageWatcher::handle_acquired_lock() {
   }
 }
 
-void ImageWatcher::handle_released_lock() {
+void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
+                                  bufferlist *out) {
   ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl;
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
@@ -790,92 +730,78 @@ void ImageWatcher::handle_released_lock() {
   }
 }
 
-void ImageWatcher::handle_request_lock(bufferlist *out) {
+void ImageWatcher::handle_payload(const RequestLockPayload &payload,
+                                  bufferlist *out) {
+  ldout(m_image_ctx.cct, 10) << "exclusive lock requested" << dendl;
   RWLock::WLocker l(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
     m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
 
     // need to send something back so the client can detect a missing leader
-    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
-    ::encode(0, *out);
-    ENCODE_FINISH(*out);
+    ::encode(ResponseMessage(0), *out);
 
-    ldout(m_image_ctx.cct, 10) << "exclusive lock requested, releasing" << dendl;
+    ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl;
     FunctionContext *ctx = new FunctionContext(
       boost::bind(&ImageWatcher::release_lock, this));
     m_finisher->queue(ctx);
   }
 }
 
-void ImageWatcher::handle_async_progress(bufferlist::iterator iter) {
-  RemoteAsyncRequest request;
-  ::decode(request, iter);
-
-  uint64_t offset;
-  uint64_t total;
-  ::decode(offset, iter);
-  ::decode(total, iter);
-  if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
-      request.handle == reinterpret_cast<uint64_t>(this)) {
-    RWLock::RLocker l(m_async_request_lock);
-    std::map<uint64_t, AsyncRequest>::iterator iter =
-      m_async_requests.find(request.request_id);
-    if (iter != m_async_requests.end()) {
-      ldout(m_image_ctx.cct, 20) << "request progress: "
-                                << request << " @ " << offset
-                                << "/" << total << dendl;
-      iter->second.second->update_progress(offset, total);
-    }
+void ImageWatcher::handle_payload(const AsyncProgressPayload &payload,
+                                  bufferlist *out) {
+  RWLock::RLocker l(m_async_request_lock);
+  std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
+    m_async_requests.find(payload.async_request_id);
+  if (req_it != m_async_requests.end()) {
+    ldout(m_image_ctx.cct, 20) << "request progress: "
+                              << payload.async_request_id << " @ "
+                              << payload.offset << "/" << payload.total
+                              << dendl;
+    req_it->second.second->update_progress(payload.offset, payload.total);
   }
 }
 
-void ImageWatcher::handle_async_complete(bufferlist::iterator iter) {
-  RemoteAsyncRequest request;
-  ::decode(request, iter);
-
-  int r;
-  ::decode(r, iter);
-  if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
-      request.handle == reinterpret_cast<uint64_t>(this)) {
-    Context *ctx = NULL;
-    {
-      RWLock::RLocker l(m_async_request_lock);
-      std::map<uint64_t, AsyncRequest>::iterator iter =
-        m_async_requests.find(request.request_id);
-      if (iter != m_async_requests.end()) {
-       ctx = iter->second.first;
-      }
-    }
-    if (ctx != NULL) {
-      ldout(m_image_ctx.cct, 10) << "request finished: "
-                                 << request << " = " << r << dendl;
-      ctx->complete(r);
+void ImageWatcher::handle_payload(const AsyncCompletePayload &payload,
+                                  bufferlist *out) {
+  Context *ctx = NULL;
+  {
+    RWLock::RLocker l(m_async_request_lock);
+    std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
+      m_async_requests.find(payload.async_request_id);
+    if (req_it != m_async_requests.end()) {
+      ctx = req_it->second.first;
     }
   }
+  if (ctx != NULL) {
+    ldout(m_image_ctx.cct, 10) << "request finished: "
+                               << payload.async_request_id << "="
+                              << payload.result << dendl;
+    ctx->complete(payload.result);
+  }
 }
 
-void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
+void ImageWatcher::handle_payload(const FlattenPayload &payload,
+                                 bufferlist *out) {
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (is_lock_owner()) {
-    RemoteAsyncRequest request;
-    ::decode(request, iter);
-
     bool new_request;
     {
       RWLock::WLocker l(m_async_request_lock);
-      new_request = (m_async_pending.count(request) == 0);
+      new_request = (m_async_pending.count(payload.async_request_id) == 0);
       if (new_request) {
-       m_async_pending.insert(request);
+       m_async_pending.insert(payload.async_request_id);
       }
     }
 
     int r = 0;
     if (new_request) {
       RemoteProgressContext *prog_ctx =
-       new RemoteProgressContext(*this, request);
-      RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
+       new RemoteProgressContext(*this, payload.async_request_id);
+      RemoteContext *ctx = new RemoteContext(*this, payload.async_request_id,
+                                            prog_ctx);
 
-      ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl;
+      ldout(m_image_ctx.cct, 10) << "remote flatten request: "
+                                << payload.async_request_id << dendl;
       r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
       if (r < 0) {
        delete ctx;
@@ -883,71 +809,61 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
                               << cpp_strerror(r) << dendl;
 
        RWLock::WLocker l(m_async_request_lock);
-       m_async_pending.erase(request);
+       m_async_pending.erase(payload.async_request_id);
       }
     }
 
-    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
-    ::encode(r, *out);
-    ENCODE_FINISH(*out);
+    ::encode(ResponseMessage(r), *out);
   }
 }
 
-void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) {
+void ImageWatcher::handle_payload(const ResizePayload &payload,
+                                 bufferlist *out) {
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (is_lock_owner()) {
-    uint64_t size;
-    ::decode(size, iter);
-
-    RemoteAsyncRequest request;
-    ::decode(request, iter);
-
     bool new_request;
     {
       RWLock::WLocker l(m_async_request_lock);
-      new_request = (m_async_pending.count(request) == 0);
+      new_request = (m_async_pending.count(payload.async_request_id) == 0);
       if (new_request) {
-       m_async_pending.insert(request);
+       m_async_pending.insert(payload.async_request_id);
       }
     }
 
     int r = 0;
     if (new_request) {
       RemoteProgressContext *prog_ctx =
-       new RemoteProgressContext(*this, request);
-      RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
-
-      ldout(m_image_ctx.cct, 10) << "remote resize request: " << request
-                                << " " << size << dendl;
-      r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
+       new RemoteProgressContext(*this, payload.async_request_id);
+      RemoteContext *ctx = new RemoteContext(*this, payload.async_request_id,
+                                            prog_ctx);
+
+      ldout(m_image_ctx.cct, 10) << "remote resize request: "
+                                << payload.async_request_id << " "
+                                << payload.size << dendl;
+      r = librbd::async_resize(&m_image_ctx, ctx, payload.size, *prog_ctx);
       if (r < 0) {
        lderr(m_image_ctx.cct) << "remove resize request failed: "
                               << cpp_strerror(r) << dendl;
        delete ctx;
 
        RWLock::WLocker l(m_async_request_lock);
-       m_async_pending.erase(request);
+       m_async_pending.erase(payload.async_request_id);
       }
     }
 
-    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
-    ::encode(r, *out);
-    ENCODE_FINISH(*out);
+    ::encode(ResponseMessage(r), *out);
   }
 }
 
-void ImageWatcher::handle_snap_create(bufferlist::iterator iter, bufferlist *out) {
+void ImageWatcher::handle_payload(const SnapCreatePayload &payload,
+                                 bufferlist *out) {
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (is_lock_owner()) {
-    std::string snap_name;
-    ::decode(snap_name, iter);
-
-    ldout(m_image_ctx.cct, 10) << "remote snap_create request: " << snap_name << dendl;
-    int r = librbd::snap_create(&m_image_ctx, snap_name.c_str(), false);
-    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
-    ::encode(r, *out);
-    ENCODE_FINISH(*out);
+    ldout(m_image_ctx.cct, 10) << "remote snap_create request: "
+                              << payload.snap_name << dendl;
+    int r = librbd::snap_create(&m_image_ctx, payload.snap_name.c_str(), false);
 
+    ::encode(ResponseMessage(r), *out);
     if (r == 0) {
       // cannot notify within a notificiation
       FunctionContext *ctx = new FunctionContext(
@@ -957,91 +873,39 @@ void ImageWatcher::handle_snap_create(bufferlist::iterator iter, bufferlist *out
   }
 }
 
-void ImageWatcher::handle_unknown_op(bufferlist *out) {
+void ImageWatcher::handle_payload(const UnknownPayload &payload,
+                                 bufferlist *out) {
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (is_lock_owner()) {
-    ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
-    ::encode(-EOPNOTSUPP, *out);
-    ENCODE_FINISH(*out);
+    ::encode(ResponseMessage(-EOPNOTSUPP), *out);
   }
 }
 
 void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
                                 bufferlist &bl) {
-  if (bl.length() == 0) {
-    // legacy notification for header updates
-    bufferlist out;
-    acknowledge_notify(notify_id, handle, out);
-    handle_header_update();
-    return;
-  }
-
   bool loopback;
   {
     RWLock::RLocker l(m_watch_lock);
     loopback = (m_watch_handle == handle);
   }
 
-  bufferlist::iterator iter = bl.begin();
-  try {
-    DECODE_START(NOTIFY_VERSION, iter);
-    int op;
-    ::decode(op, iter);
-
-    bufferlist out;
-    if (loopback && op != NOTIFY_OP_HEADER_UPDATE) {
-      acknowledge_notify(notify_id, handle, out);
-    } else {
-      switch (op) {
-      // client ops
-      case NOTIFY_OP_ACQUIRED_LOCK:
-        acknowledge_notify(notify_id, handle, out);
-        handle_acquired_lock();
-        break;
-      case NOTIFY_OP_RELEASED_LOCK:
-        acknowledge_notify(notify_id, handle, out);
-        handle_released_lock();
-        break;
-      case NOTIFY_OP_HEADER_UPDATE:
-        acknowledge_notify(notify_id, handle, out);
-        handle_header_update();
-        break;
-      case NOTIFY_OP_ASYNC_PROGRESS:
-        acknowledge_notify(notify_id, handle, out);
-        handle_async_progress(iter);
-        break;
-      case NOTIFY_OP_ASYNC_COMPLETE:
-        acknowledge_notify(notify_id, handle, out);
-        handle_async_complete(iter);
-        break;
-
-      // lock owner-only ops
-      case NOTIFY_OP_REQUEST_LOCK:
-        handle_request_lock(&out);
-        acknowledge_notify(notify_id, handle, out);
-        break;
-      case NOTIFY_OP_FLATTEN:
-        handle_flatten(iter, &out);
-        acknowledge_notify(notify_id, handle, out);
-        break;
-      case NOTIFY_OP_RESIZE:
-        handle_resize(iter, &out);
-        acknowledge_notify(notify_id, handle, out);
-        break;
-      case NOTIFY_OP_SNAP_CREATE:
-        handle_snap_create(iter, &out);
-        acknowledge_notify(notify_id, handle, out);
-        break;
-      default:
-        handle_unknown_op(&out);
-        acknowledge_notify(notify_id, handle, out);
-        break;
-      }
+  NotifyMessage notify_message;
+  if (bl.length() == 0) {
+    // legacy notification for header updates
+    notify_message = NotifyMessage(HeaderUpdatePayload());
+  } else {
+    try {
+      bufferlist::iterator iter = bl.begin();
+      ::decode(notify_message, iter);
+    } catch (const buffer::error &err) {
+      lderr(m_image_ctx.cct) << "error decoding image notification: "
+                            << err.what() << dendl;
+      return;
     }
-    DECODE_FINISH(iter);
-  } catch (const buffer::error &err) {
-    lderr(m_image_ctx.cct) << "error decoding image notification" << dendl;
   }
+
+  apply_visitor(HandlePayloadVisitor(this, notify_id, handle, loopback),
+               notify_message.payload); 
 }
 
 void ImageWatcher::handle_error(uint64_t handle, int err) {
@@ -1103,7 +967,7 @@ void ImageWatcher::reregister_watch() {
 
       m_watch_state = WATCH_STATE_REGISTERED;
     }
-    handle_header_update();
+    handle_payload(HeaderUpdatePayload(), NULL);
 
     if (lock_owner) {
       r = try_lock();
@@ -1132,7 +996,7 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) {
 }
 
 void ImageWatcher::RemoteContext::finish(int r) {
-  m_image_watcher.schedule_async_complete(m_remote_async_request, r);
+  m_image_watcher.schedule_async_complete(m_async_request_id, r);
 }
 
 }
index 59ef416c7d3d7be8cc35d491887fa5379f6081dc..6230f35d5d3e92dd332d427078ecdf50a854517d 100644 (file)
@@ -8,6 +8,7 @@
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include "include/rbd/librbd.hpp"
+#include "librbd/WatchNotifyTypes.h"
 #include <set>
 #include <string>
 #include <utility>
@@ -24,26 +25,6 @@ namespace librbd {
   class AioCompletion;
   class ImageCtx;
 
-  struct RemoteAsyncRequest {
-    uint64_t gid;
-    uint64_t handle;
-    uint64_t request_id;
-
-    RemoteAsyncRequest() : gid(), handle(), request_id() {}
-    RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_)
-      : gid(gid_), handle(handle_), request_id(request_id_) {}
-
-    inline bool operator<(const RemoteAsyncRequest &rhs) const {
-      if (gid != rhs.gid) {
-       return gid < rhs.gid;
-      } else if (handle != rhs.handle) {
-       return handle < rhs.handle;
-      } else {
-       return request_id < rhs.request_id;
-      }
-    }
-  };
-
   class ImageWatcher {
   public:
 
@@ -104,30 +85,29 @@ namespace librbd {
     class RemoteProgressContext : public ProgressContext {
     public:
       RemoteProgressContext(ImageWatcher &image_watcher,
-                           const RemoteAsyncRequest &remote_async_request)
-        : m_image_watcher(image_watcher),
-          m_remote_async_request(remote_async_request)
+                           const WatchNotify::AsyncRequestId &id)
+        : m_image_watcher(image_watcher), m_async_request_id(id)
       {
       }
 
       virtual int update_progress(uint64_t offset, uint64_t total) {
-       m_image_watcher.schedule_async_progress(m_remote_async_request, offset,
+       m_image_watcher.schedule_async_progress(m_async_request_id, offset,
                                                total);
         return 0;
       }
 
     private:
       ImageWatcher &m_image_watcher;
-      RemoteAsyncRequest m_remote_async_request;
+      WatchNotify::AsyncRequestId m_async_request_id;
     };
 
     class RemoteContext : public Context {
     public:
       RemoteContext(ImageWatcher &image_watcher,
-                   const RemoteAsyncRequest &remote_async_request,
+                   const WatchNotify::AsyncRequestId &id,
                    RemoteProgressContext *prog_ctx)
-        : m_image_watcher(image_watcher),
-          m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
+        : m_image_watcher(image_watcher), m_async_request_id(id),
+         m_prog_ctx(prog_ctx)
       {
       }
 
@@ -139,10 +119,37 @@ namespace librbd {
 
     private:
       ImageWatcher &m_image_watcher;
-      RemoteAsyncRequest m_remote_async_request;
+      WatchNotify::AsyncRequestId m_async_request_id;
       RemoteProgressContext *m_prog_ctx;
     };
 
+    struct HandlePayloadVisitor : public boost::static_visitor<void> {
+      ImageWatcher *image_watcher;
+      uint64_t notify_id;
+      uint64_t handle; 
+      bool loopback;
+
+      HandlePayloadVisitor(ImageWatcher *image_watcher_, uint64_t notify_id_,
+                          uint64_t handle_, bool loopback_)
+       : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
+         loopback(loopback_) {}
+
+      inline void operator()(const WatchNotify::HeaderUpdatePayload &payload) const {
+       bufferlist out;
+       image_watcher->handle_payload(payload, &out);
+       image_watcher->acknowledge_notify(notify_id, handle, out);
+      }
+
+      template <typename Payload>
+      inline void operator()(const Payload &payload) const {
+       bufferlist out;
+       if (!loopback) {
+         image_watcher->handle_payload(payload, &out);
+       }
+       image_watcher->acknowledge_notify(notify_id, handle, out);
+      }
+    };
+
     ImageCtx &m_image_ctx;
 
     RWLock m_watch_lock;
@@ -158,9 +165,9 @@ namespace librbd {
     SafeTimer *m_timer;
 
     RWLock m_async_request_lock;
-    std::map<uint64_t, AsyncRequest> m_async_requests;
-    std::set<RemoteAsyncRequest> m_async_pending;
-    std::set<RemoteAsyncRequest> m_async_progress;
+    std::map<WatchNotify::AsyncRequestId, AsyncRequest> m_async_requests;
+    std::set<WatchNotify::AsyncRequestId> m_async_pending;
+    std::set<WatchNotify::AsyncRequestId> m_async_progress;
 
     Mutex m_aio_request_lock;
     std::vector<AioRequest> m_aio_requests;
@@ -174,7 +181,6 @@ namespace librbd {
     int lock();
     void release_lock();
     bool try_request_lock();
-    void finalize_request_lock();
     void finalize_header_update();
 
     void schedule_retry_aio_requests(bool use_timer);
@@ -185,37 +191,47 @@ namespace librbd {
     void schedule_cancel_async_requests();
     void cancel_async_requests();
 
-    void encode_async_request(uint64_t request_id, bufferlist &bl);
+    WatchNotify::ClientId get_client_id();
     static int decode_response_code(bufferlist &bl);
 
     void notify_released_lock();
     void notify_request_lock();
     int notify_lock_owner(bufferlist &bl, bufferlist &response);
 
-    int notify_async_request(uint64_t async_request_id, bufferlist &in,
-                            ProgressContext& prog_ctx);
+    int notify_async_request(const WatchNotify::AsyncRequestId &id,
+                            bufferlist &in, ProgressContext& prog_ctx);
     void notify_request_leadership();
 
-    void schedule_async_progress(const RemoteAsyncRequest &remote_async_request,
+    void schedule_async_progress(const WatchNotify::AsyncRequestId &id,
                                 uint64_t offset, uint64_t total);
-    int notify_async_progress(const RemoteAsyncRequest &remote_async_request,
+    int notify_async_progress(const WatchNotify::AsyncRequestId &id,
                              uint64_t offset, uint64_t total);
-    void schedule_async_complete(const RemoteAsyncRequest &remote_async_request,
+    void schedule_async_complete(const WatchNotify::AsyncRequestId &id,
                                 int r);
-    int notify_async_complete(const RemoteAsyncRequest &remote_async_request,
+    int notify_async_complete(const WatchNotify::AsyncRequestId &id,
                              int r);
 
-    void handle_header_update();
-    void handle_acquired_lock();
-    void handle_released_lock();
-    void handle_request_lock(bufferlist *out);
-
-    void handle_async_progress(bufferlist::iterator iter);
-    void handle_async_complete(bufferlist::iterator iter);
-    void handle_flatten(bufferlist::iterator iter, bufferlist *out);
-    void handle_resize(bufferlist::iterator iter, bufferlist *out);
-    void handle_snap_create(bufferlist::iterator iter, bufferlist *out);
-    void handle_unknown_op(bufferlist *out);
+    void handle_payload(const WatchNotify::HeaderUpdatePayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::AcquiredLockPayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::ReleasedLockPayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::RequestLockPayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::AsyncProgressPayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::AsyncCompletePayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::FlattenPayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::ResizePayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::SnapCreatePayload& payload,
+                       bufferlist *out);
+    void handle_payload(const WatchNotify::UnknownPayload& payload,
+                       bufferlist *out);
+
     void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
     void handle_error(uint64_t cookie, int err);
     void acknowledge_notify(uint64_t notify_id, uint64_t handle,
index b94061edf167606747065a3b45edd525c3a96e7c..f18d7b00b3307b83db1f8658e4a5e3735025ad6b 100644 (file)
@@ -12,7 +12,8 @@ librbd_internal_la_SOURCES = \
        librbd/ImageWatcher.cc \
        librbd/internal.cc \
        librbd/LibrbdWriteback.cc \
-       librbd/ObjectMap.cc
+       librbd/ObjectMap.cc \
+       librbd/WatchNotifyTypes.cc
 noinst_LTLIBRARIES += librbd_internal.la
 
 librbd_api_la_SOURCES = \
@@ -56,4 +57,5 @@ noinst_HEADERS += \
        librbd/LibrbdWriteback.h \
        librbd/ObjectMap.h \
        librbd/parent_types.h \
-       librbd/SnapInfo.h
+       librbd/SnapInfo.h \
+       librbd/WatchNotifyTypes.h
diff --git a/src/librbd/WatchNotifyTypes.cc b/src/librbd/WatchNotifyTypes.cc
new file mode 100644 (file)
index 0000000..4622ff6
--- /dev/null
@@ -0,0 +1,220 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/WatchNotifyTypes.h"
+#include "include/assert.h"
+
+namespace librbd {
+namespace WatchNotify {
+
+namespace {
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+  EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+  template <typename Payload>
+  inline void operator()(const Payload &payload) const {
+    payload.encode(m_bl);
+  }
+
+private:
+  bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+  DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
+    : m_version(version), m_iter(iter) {}
+
+  template <typename Payload>
+  inline void operator()(Payload &payload) const {
+    payload.decode(m_version, m_iter);
+  }
+
+private:
+  __u8 m_version;
+  bufferlist::iterator &m_iter;
+};
+
+}
+
+void ClientId::encode(bufferlist &bl) const {
+  ::encode(gid, bl);
+  ::encode(handle, bl);
+}
+
+void ClientId::decode(bufferlist::iterator &iter) {
+  ::decode(gid, iter);
+  ::decode(handle, iter);
+}
+
+void AsyncRequestId::encode(bufferlist &bl) const {
+  ::encode(client_id, bl);
+  ::encode(request_id, bl);
+}
+
+void AsyncRequestId::decode(bufferlist::iterator &iter) {
+  ::decode(client_id, iter);
+  ::decode(request_id, iter);
+}
+
+void AcquiredLockPayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_ACQUIRED_LOCK), bl);
+}
+
+void AcquiredLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void ReleasedLockPayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_RELEASED_LOCK), bl);
+}
+
+void ReleasedLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void RequestLockPayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_REQUEST_LOCK), bl);
+}
+
+void RequestLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void HeaderUpdatePayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_HEADER_UPDATE), bl);
+}
+
+void HeaderUpdatePayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void AsyncProgressPayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_ASYNC_PROGRESS), bl);
+  ::encode(async_request_id, bl);
+  ::encode(offset, bl);
+  ::encode(total, bl);
+}
+
+void AsyncProgressPayload::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(async_request_id, iter);
+  ::decode(offset, iter);
+  ::decode(total, iter);
+}
+
+void AsyncCompletePayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_ASYNC_COMPLETE), bl);
+  ::encode(async_request_id, bl);
+  ::encode(result, bl);
+}
+
+void AsyncCompletePayload::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(async_request_id, iter);
+  ::decode(result, iter);
+}
+
+void FlattenPayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_FLATTEN), bl);
+  ::encode(async_request_id, bl);
+}
+
+void FlattenPayload::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(async_request_id, iter);
+}
+
+void ResizePayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_RESIZE), bl);
+  ::encode(size, bl);
+  ::encode(async_request_id, bl);
+}
+
+void ResizePayload::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(size, iter);
+  ::decode(async_request_id, iter);
+}
+
+void SnapCreatePayload::encode(bufferlist &bl) const {
+  ::encode(static_cast<uint32_t>(NOTIFY_OP_SNAP_CREATE), bl);
+  ::encode(snap_name, bl);
+}
+
+void SnapCreatePayload::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(snap_name, iter);
+}
+
+void UnknownPayload::encode(bufferlist &bl) const {
+  assert(false);
+}
+
+void UnknownPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void NotifyMessage::encode(bufferlist& bl) const {
+  ENCODE_START(1, 1, bl);
+  boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+  ENCODE_FINISH(bl);
+}
+
+void NotifyMessage::decode(bufferlist::iterator& iter) {
+  DECODE_START(1, iter);
+
+  uint32_t notify_op;
+  ::decode(notify_op, iter);
+
+  // select the correct payload variant based upon the encoded op
+  switch (notify_op) {
+  case NOTIFY_OP_ACQUIRED_LOCK:
+    payload = AcquiredLockPayload();
+    break;
+  case NOTIFY_OP_RELEASED_LOCK:
+    payload = ReleasedLockPayload();
+    break;
+  case NOTIFY_OP_REQUEST_LOCK:
+    payload = RequestLockPayload();
+    break;
+  case NOTIFY_OP_HEADER_UPDATE:
+    payload = HeaderUpdatePayload();
+    break;
+  case NOTIFY_OP_ASYNC_PROGRESS:
+    payload = AsyncProgressPayload();
+    break;
+  case NOTIFY_OP_ASYNC_COMPLETE:
+    payload = AsyncCompletePayload();
+    break;
+  case NOTIFY_OP_FLATTEN:
+    payload = FlattenPayload();
+    break;
+  case NOTIFY_OP_RESIZE:
+    payload = ResizePayload();
+    break;
+  case NOTIFY_OP_SNAP_CREATE:
+    payload = SnapCreatePayload();
+    break;
+  default:
+    payload = UnknownPayload();
+    break;
+  }
+
+  apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+  DECODE_FINISH(iter);
+}
+
+void ResponseMessage::encode(bufferlist& bl) const {
+  ENCODE_START(1, 1, bl);
+  ::encode(result, bl);
+  ENCODE_FINISH(bl);
+}
+
+void ResponseMessage::decode(bufferlist::iterator& iter) {
+  DECODE_START(1, iter);
+  ::decode(result, iter);
+  DECODE_FINISH(iter);
+}
+
+} // namespace WatchNotify
+} // namespace librbd
+
+std::ostream &operator<<(std::ostream &out,
+                         const librbd::WatchNotify::AsyncRequestId &request) {
+  out << "[" << request.client_id.gid << "," << request.client_id.handle << ","
+      << request.request_id << "]";
+  return out;
+}
diff --git a/src/librbd/WatchNotifyTypes.h b/src/librbd/WatchNotifyTypes.h
new file mode 100644 (file)
index 0000000..abb36d6
--- /dev/null
@@ -0,0 +1,197 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LIBRBD_WATCH_NOTIFY_TYPES_H
+#define LIBRBD_WATCH_NOTIFY_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iostream>
+#include <string>
+#include <boost/variant.hpp>
+
+namespace librbd {
+namespace WatchNotify {
+
+struct ClientId {
+  uint64_t gid;
+  uint64_t handle;
+
+  ClientId() : gid(0), handle(0) {}
+  ClientId(uint64_t gid_, uint64_t handle_) : gid(gid_), handle(handle_) {}
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+
+  inline bool operator==(const ClientId &rhs) const {
+    return (gid == rhs.gid && handle == rhs.handle);
+  }
+  inline bool operator!=(const ClientId &rhs) const {
+    return !(*this == rhs);
+  }
+  inline bool operator<(const ClientId &rhs) const {
+    if (gid != rhs.gid) {
+      return gid < rhs.gid;
+    } else {
+      return handle < rhs.handle;
+    }
+  }
+};
+
+struct AsyncRequestId {
+  ClientId client_id;
+  uint64_t request_id;
+
+  AsyncRequestId() : request_id() {}
+  AsyncRequestId(const ClientId &client_id_, uint64_t request_id_)
+    : client_id(client_id_), request_id(request_id_) {}
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+
+  inline bool operator<(const AsyncRequestId &rhs) const {
+    if (client_id != rhs.client_id) {
+      return client_id < rhs.client_id;
+    } else {
+      return request_id < rhs.request_id;
+    }
+  }
+};
+
+enum NotifyOp {
+  NOTIFY_OP_ACQUIRED_LOCK  = 0,
+  NOTIFY_OP_RELEASED_LOCK  = 1,
+  NOTIFY_OP_REQUEST_LOCK   = 2,
+  NOTIFY_OP_HEADER_UPDATE  = 3,
+  NOTIFY_OP_ASYNC_PROGRESS = 4,
+  NOTIFY_OP_ASYNC_COMPLETE = 5,
+  NOTIFY_OP_FLATTEN        = 6,
+  NOTIFY_OP_RESIZE         = 7,
+  NOTIFY_OP_SNAP_CREATE    = 8
+};
+
+struct AcquiredLockPayload {
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct ReleasedLockPayload {
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct RequestLockPayload {
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct HeaderUpdatePayload {
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct AsyncProgressPayload {
+  AsyncProgressPayload() : offset(0), total(0) {}
+  AsyncProgressPayload(const AsyncRequestId &id, uint64_t offset_, uint64_t total_)
+    : async_request_id(id), offset(offset_), total(total_) {}
+
+  AsyncRequestId async_request_id;
+  uint64_t offset;
+  uint64_t total;
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct AsyncCompletePayload {
+  AsyncCompletePayload() {}
+  AsyncCompletePayload(const AsyncRequestId &id, int r)
+    : async_request_id(id), result(r) {}
+
+  AsyncRequestId async_request_id;
+  int result;
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct FlattenPayload {
+  FlattenPayload() {}
+  FlattenPayload(const AsyncRequestId &id) : async_request_id(id) {}
+
+  AsyncRequestId async_request_id;
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct ResizePayload {
+  ResizePayload() : size(0) {}
+  ResizePayload(uint64_t size_, const AsyncRequestId &id)
+    : size(size_), async_request_id(id) {}
+
+  uint64_t size;
+  AsyncRequestId async_request_id;
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct SnapCreatePayload {
+  SnapCreatePayload() {}
+  SnapCreatePayload(const std::string &name) : snap_name(name) {}
+
+  std::string snap_name;
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct UnknownPayload {
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+typedef boost::variant<AcquiredLockPayload,
+                 ReleasedLockPayload,
+                 RequestLockPayload,
+                 HeaderUpdatePayload,
+                 AsyncProgressPayload,
+                 AsyncCompletePayload,
+                 FlattenPayload,
+                 ResizePayload,
+                 SnapCreatePayload,
+                 UnknownPayload> Payload;
+
+struct NotifyMessage {
+  NotifyMessage() : payload(UnknownPayload()) {}
+  NotifyMessage(const Payload &payload_) : payload(payload_) {}
+
+  Payload payload;
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+};
+
+struct ResponseMessage {
+  ResponseMessage() : result(0) {}
+  ResponseMessage(int result_) : result(result_) {}
+
+  int result;
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+};
+
+} // namespace WatchNotify
+} // namespace librbd
+
+std::ostream &operator<<(std::ostream &out,
+                         const librbd::WatchNotify::AsyncRequestId &request);
+
+WRITE_CLASS_ENCODER(librbd::WatchNotify::ClientId);
+WRITE_CLASS_ENCODER(librbd::WatchNotify::AsyncRequestId);
+WRITE_CLASS_ENCODER(librbd::WatchNotify::NotifyMessage);
+WRITE_CLASS_ENCODER(librbd::WatchNotify::ResponseMessage);
+
+#endif // LIBRBD_WATCH_NOTIFY_TYPES_H