TEST_F(TestInstanceWatcher, InitShutdown)
 {
-  InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue);
+  InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue,
+                                     nullptr, m_instance_id);
   std::vector<std::string> instance_ids;
   get_instances(&instance_ids);
   ASSERT_EQ(0U, instance_ids.size());
   librados::IoCtx io_ctx;
   ASSERT_EQ("", connect_cluster_pp(cluster));
   ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx));
-  InstanceWatcher<> instance_watcher(io_ctx, m_threads->work_queue,
-                                     instance_id);
+  InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue,
+                                     nullptr, "instance_id");
   // Init
   ASSERT_EQ(0, instance_watcher.init());
 
   ASSERT_EQ(0, m_local_io_ctx.list_watchers(oid, &watchers));
   ASSERT_EQ(1U, watchers.size());
 
-  get_instances(&instance_ids);
-  ASSERT_EQ(1U, instance_ids.size());
-
   // Remove
   C_SaferCond on_remove;
   InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue,
 
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "librados/AioCompletionImpl.h"
 #include "librbd/ManagedLock.h"
-#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librados/test.h"
 #include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "test/librbd/mock/MockImageCtx.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/InstanceReplayer.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
 
 
 } // namespace librbd
 
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+  Mutex &timer_lock;
+  SafeTimer *timer;
+  ContextWQ *work_queue;
+
+  Threads(Threads<librbd::ImageCtx> *threads)
+    : timer_lock(threads->timer_lock), timer(threads->timer),
+      work_queue(threads->work_queue) {
+  }
+};
+
+template <>
+struct InstanceReplayer<librbd::MockTestImageCtx> {
+  MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &,
+                                   const std::string &, Context *));
+  MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
+                                   const std::string &, bool, Context *));
+};
+
+} // namespace mirror
+} // namespace rbd
+
 // template definitions
 #include "tools/rbd_mirror/InstanceWatcher.cc"
 
 class TestMockInstanceWatcher : public TestMockFixture {
 public:
   typedef librbd::ManagedLock<librbd::MockTestImageCtx> MockManagedLock;
+  typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
   typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
+  typedef Threads<librbd::MockTestImageCtx> MockThreads;
 
   std::string m_instance_id;
   std::string m_oid;
+  MockThreads *m_mock_threads;
 
   void SetUp() override {
     TestFixture::SetUp();
 
     m_instance_id = stringify(m_local_io_ctx.get_instance_id());
     m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
+
+    m_mock_threads = new MockThreads(m_threads);
+  }
+
+  void TearDown() override {
+    delete m_mock_threads;
+    TestMockFixture::TearDown();
   }
 
   void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
     EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
   }
 
+  void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx,
+                             const std::string &instance_id) {
+    std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id;
+    EXPECT_CALL(mock_io_ctx, aio_watch(oid, _, _, _));
+  }
+
   void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
     EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
   }
   MockManagedLock mock_managed_lock;
   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
 
-  auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
-                                                  m_threads->work_queue);
+  auto instance_watcher = new MockInstanceWatcher(
+    m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
   InSequence seq;
 
   // Init
   MockManagedLock mock_managed_lock;
   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
 
-  auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
-                                                  m_threads->work_queue);
+  auto instance_watcher = new MockInstanceWatcher(
+    m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
   InSequence seq;
 
   expect_register_instance(mock_io_ctx, 0);
   MockManagedLock mock_managed_lock;
   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
 
-  auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
-                                                  m_threads->work_queue);
+  auto instance_watcher = new MockInstanceWatcher(
+    m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
   InSequence seq;
 
   // Init
   expect_destroy_lock(mock_managed_lock, &on_destroy);
 
   C_SaferCond on_remove;
-  MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+  MockInstanceWatcher::remove_instance(m_local_io_ctx,
+                                       m_mock_threads->work_queue,
                                        "instance_id", &on_remove);
   ASSERT_EQ(0, on_remove.wait());
   ASSERT_EQ(0, on_destroy.wait());
   expect_destroy_lock(mock_managed_lock, &on_destroy);
 
   C_SaferCond on_remove;
-  MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+  MockInstanceWatcher::remove_instance(m_local_io_ctx,
+                                       m_mock_threads->work_queue,
                                        "instance_id", &on_remove);
   ASSERT_EQ(0, on_remove.wait());
   ASSERT_EQ(0, on_destroy.wait());
 }
 
+TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) {
+  MockManagedLock mock_managed_lock;
+
+  librados::IoCtx& io_ctx1 = m_local_io_ctx;
+  std::string instance_id1 = m_instance_id;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+  MockInstanceReplayer mock_instance_replayer1;
+  auto instance_watcher1 = MockInstanceWatcher::create(
+      io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
+
+  librados::Rados cluster;
+  librados::IoCtx io_ctx2;
+  EXPECT_EQ("", connect_cluster_pp(cluster));
+  EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
+  std::string instance_id2 = stringify(io_ctx2.get_instance_id());
+  librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+  MockInstanceReplayer mock_instance_replayer2;
+  auto instance_watcher2 = MockInstanceWatcher::create(
+    io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
+
+  InSequence seq;
+
+  // Init instance watcher 1
+  expect_register_instance(mock_io_ctx1, 0);
+  expect_register_watch(mock_io_ctx1, instance_id1);
+  expect_acquire_lock(mock_managed_lock, 0);
+  ASSERT_EQ(0, instance_watcher1->init());
+
+  // Init instance watcher 2
+  expect_register_instance(mock_io_ctx2, 0);
+  expect_register_watch(mock_io_ctx2, instance_id2);
+  expect_acquire_lock(mock_managed_lock, 0);
+  ASSERT_EQ(0, instance_watcher2->init());
+
+  // Acquire Image on the the same instance
+  EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _))
+      .WillOnce(WithArg<3>(CompleteContext(0)));
+  C_SaferCond on_acquire1;
+  instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
+                                          &on_acquire1);
+  ASSERT_EQ(0, on_acquire1.wait());
+
+  // Acquire Image on the other instance
+  EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _))
+      .WillOnce(WithArg<3>(CompleteContext(0)));
+  C_SaferCond on_acquire2;
+  instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
+                                          &on_acquire2);
+  ASSERT_EQ(0, on_acquire2.wait());
+
+  // Release Image on the the same instance
+  EXPECT_CALL(mock_instance_replayer1, release_image("gid", "uuid", "id", true,
+                                                     _))
+      .WillOnce(WithArg<4>(CompleteContext(0)));
+  C_SaferCond on_release1;
+  instance_watcher1->notify_image_release(instance_id1, "gid", "uuid", "id",
+                                          true, &on_release1);
+  ASSERT_EQ(0, on_release1.wait());
+
+  // Release Image on the other instance
+  EXPECT_CALL(mock_instance_replayer2, release_image("gid", "uuid", "id", true,
+                                                     _))
+      .WillOnce(WithArg<4>(CompleteContext(0)));
+  C_SaferCond on_release2;
+  instance_watcher1->notify_image_release(instance_id2, "gid", "uuid", "id",
+                                          true, &on_release2);
+  ASSERT_EQ(0, on_release2.wait());
+
+  // Shutdown instance watcher 1
+  expect_release_lock(mock_managed_lock, 0);
+  expect_unregister_watch(mock_io_ctx1);
+  expect_unregister_instance(mock_io_ctx1, 0);
+  instance_watcher1->shut_down();
+
+  expect_destroy_lock(mock_managed_lock);
+  delete instance_watcher1;
+
+  // Shutdown instance watcher 2
+  expect_release_lock(mock_managed_lock, 0);
+  expect_unregister_watch(mock_io_ctx2);
+  expect_unregister_instance(mock_io_ctx2, 0);
+  instance_watcher2->shut_down();
+
+  expect_destroy_lock(mock_managed_lock);
+  delete instance_watcher2;
+}
+
+TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) {
+  MockManagedLock mock_managed_lock;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+  auto instance_watcher = new MockInstanceWatcher(
+    m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+  InSequence seq;
+
+  // Init
+  expect_register_instance(mock_io_ctx, 0);
+  expect_register_watch(mock_io_ctx);
+  expect_acquire_lock(mock_managed_lock, 0);
+  ASSERT_EQ(0, instance_watcher->init());
+
+  // Send Acquire Image and cancel
+  EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
+    .WillOnce(Invoke(
+                  [this, instance_watcher, &mock_io_ctx](
+                    const std::string& o, librados::AioCompletionImpl *c,
+                    bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
+                    c->get();
+                    auto ctx = new FunctionContext(
+                      [instance_watcher, &mock_io_ctx, c, pbl](int r) {
+                        instance_watcher->cancel_notify_requests("other");
+                        ::encode(librbd::watcher::NotifyResponse(), *pbl);
+                        mock_io_ctx.get_mock_rados_client()->
+                            finish_aio_completion(c, -ETIMEDOUT);
+                      });
+                    m_threads->work_queue->queue(ctx, 0);
+                  }));
+
+  C_SaferCond on_acquire;
+  instance_watcher->notify_image_acquire("other", "gid", "uuid", "id",
+                                         &on_acquire);
+  ASSERT_EQ(-ECANCELED, on_acquire.wait());
+
+  // Send Release Image and cancel
+  EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
+    .WillOnce(Invoke(
+                  [this, instance_watcher, &mock_io_ctx](
+                    const std::string& o, librados::AioCompletionImpl *c,
+                    bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
+                    c->get();
+                    auto ctx = new FunctionContext(
+                      [instance_watcher, &mock_io_ctx, c, pbl](int r) {
+                        instance_watcher->cancel_notify_requests("other");
+                        ::encode(librbd::watcher::NotifyResponse(), *pbl);
+                        mock_io_ctx.get_mock_rados_client()->
+                            finish_aio_completion(c, -ETIMEDOUT);
+                      });
+                    m_threads->work_queue->queue(ctx, 0);
+                  }));
+
+  C_SaferCond on_release;
+  instance_watcher->notify_image_release("other", "gid", "uuid", "id",
+                                         true, &on_release);
+  ASSERT_EQ(-ECANCELED, on_release.wait());
+
+  // Shutdown
+  expect_release_lock(mock_managed_lock, 0);
+  expect_unregister_watch(mock_io_ctx);
+  expect_unregister_instance(mock_io_ctx, 0);
+  instance_watcher->shut_down();
+
+  expect_destroy_lock(mock_managed_lock);
+  delete instance_watcher;
+}
+
 } // namespace mirror
 } // namespace rbd
 
 add_library(rbd_mirror_types STATIC
+  instance_watcher/Types.cc
   leader_watcher/Types.cc)
 
 set(rbd_mirror_internal
 
 // vim: ts=8 sw=2 smarttab
 
 #include "InstanceWatcher.h"
+#include "include/atomic.h"
 #include "include/stringify.h"
 #include "common/debug.h"
 #include "common/errno.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "librbd/ManagedLock.h"
 #include "librbd/Utils.h"
+#include "InstanceReplayer.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
-                           << this << " " << __func__ << ": "
+#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
 
 namespace rbd {
 namespace mirror {
 
+using namespace instance_watcher;
+
 using librbd::util::create_async_context_callback;
 using librbd::util::create_context_callback;
 using librbd::util::create_rados_callback;
   }
 
   void finish(int r) override {
+    dout(20) << "C_GetInstances: " << this << " " <<  __func__ << ": r=" << r
+             << dendl;
+
     if (r == 0) {
       bufferlist::iterator it = out_bl.begin();
       r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
 };
 
 template <typename I>
-struct RemoveInstanceRequest : public Context {
+struct C_RemoveInstanceRequest : public Context {
   InstanceWatcher<I> instance_watcher;
   Context *on_finish;
 
-  RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
-                        const std::string &instance_id, Context *on_finish)
-    : instance_watcher(io_ctx, work_queue, instance_id), on_finish(on_finish) {
+  C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+                          const std::string &instance_id, Context *on_finish)
+    : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
+      on_finish(on_finish) {
   }
 
   void send() {
+    dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
+
     instance_watcher.remove(this);
   }
 
   void finish(int r) override {
+    dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
+             << r << dendl;
     assert(r == 0);
 
     on_finish->complete(r);
 
 } // anonymous namespace
 
+template <typename I>
+struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
+  InstanceWatcher<I> *instance_watcher;
+  librbd::watcher::Notifier notifier;
+  std::string instance_id;
+  uint64_t request_id;
+  bufferlist bl;
+  Context *on_finish;
+  librbd::watcher::NotifyResponse response;
+  atomic_t canceling;
+
+  C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
+                          const std::string &instance_id, uint64_t request_id,
+                          bufferlist &&bl, Context *on_finish)
+    : instance_watcher(instance_watcher),
+      notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx,
+               RBD_MIRROR_INSTANCE_PREFIX + instance_id),
+      instance_id(instance_id), request_id(request_id), bl(bl),
+      on_finish(on_finish) {
+    instance_watcher->m_notify_op_tracker.start_op();
+    assert(instance_watcher->m_lock.is_locked());
+    auto result = instance_watcher->m_notify_ops.insert(
+        std::make_pair(instance_id, this)).second;
+    assert(result);
+  }
+
+  void send() {
+    dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
+
+    notifier.notify(bl, &response, this);
+  }
+
+  void cancel() {
+    dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
+
+    canceling.set(1);
+  }
+
+  void finish(int r) override {
+    dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
+             << r << dendl;
+
+    if (r == 0 || r == -ETIMEDOUT) {
+      bool found = false;
+      for (auto &it : response.acks) {
+        auto &bl = it.second;
+        if (it.second.length() == 0) {
+          dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+                   << ": no payload in ack, ignoring" << dendl;
+          continue;
+        }
+        try {
+          auto iter = bl.begin();
+          NotifyAckPayload ack;
+          ::decode(ack, iter);
+          if (ack.instance_id != instance_watcher->get_instance_id()) {
+            derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+                 << ": ack instance_id (" << ack.instance_id << ") "
+                 << "does not match, ignoring" << dendl;
+            continue;
+          }
+          if (ack.request_id != request_id) {
+            derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+                 << ": ack request_id (" << ack.request_id << ") "
+                 << "does not match, ignoring" << dendl;
+            continue;
+          }
+          r = ack.ret_val;
+          found = true;
+          break;
+        } catch (const buffer::error &err) {
+          derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+               << ": failed to decode ack: " << err.what() << dendl;
+          continue;
+        }
+      }
+
+      if (!found) {
+        if (r == -ETIMEDOUT) {
+          if (canceling.read()) {
+            r = -ECANCELED;
+          } else {
+            derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+                 << ": resending after timeout" << dendl;
+            send();
+            return;
+          }
+        } else {
+          r = -EINVAL;
+        }
+      }
+    }
+
+    instance_watcher->m_notify_op_tracker.finish_op();
+    on_finish->complete(r);
+
+    Mutex::Locker locker(instance_watcher->m_lock);
+    auto result = instance_watcher->m_notify_ops.erase(
+        std::make_pair(instance_id, this));
+    assert(result > 0);
+    delete this;
+  }
+
+  void complete(int r) override {
+    finish(r);
+  }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
+                           << this << " " << __func__ << ": "
 template <typename I>
 void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
                                        std::vector<std::string> *instance_ids,
                                          ContextWQ *work_queue,
                                          const std::string &instance_id,
                                          Context *on_finish) {
-  auto req = new RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
-                                          on_finish);
+  auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
+                                            on_finish);
   req->send();
 }
 
+template <typename I>
+InstanceWatcher<I> *InstanceWatcher<I>::create(
+    librados::IoCtx &io_ctx, ContextWQ *work_queue,
+    InstanceReplayer<I> *instance_replayer) {
+  return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
+                                stringify(io_ctx.get_instance_id()));
+}
+
 template <typename I>
 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
                                     ContextWQ *work_queue,
-                                    const boost::optional<std::string> &id)
-  : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX +
-            (id ? *id : stringify(io_ctx.get_instance_id()))),
-    m_instance_id(id ? *id : stringify(io_ctx.get_instance_id())),
+                                    InstanceReplayer<I> *instance_replayer,
+                                    const std::string &instance_id)
+  : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
+    m_instance_replayer(instance_replayer), m_instance_id(instance_id),
     m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
     m_instance_lock(librbd::ManagedLock<I>::create(
       m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
 }
 
 template <typename I>
-void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
-                                       uint64_t notifier_id, bufferlist &bl) {
-  dout(20) << dendl;
+void InstanceWatcher<I>::notify_image_acquire(
+    const std::string &instance_id, const std::string &global_image_id,
+    const std::string &peer_mirror_uuid, const std::string &peer_image_id,
+  Context *on_notify_ack) {
+  dout(20) << "instance_id=" << instance_id << ", global_image_id="
+           << global_image_id << dendl;
+
+  Mutex::Locker locker(m_lock);
 
-  bufferlist out;
-  acknowledge_notify(notify_id, handle, out);
+  assert(m_on_finish == nullptr);
+
+  if (instance_id == m_instance_id) {
+    handle_image_acquire(global_image_id, peer_mirror_uuid, peer_image_id,
+                         on_notify_ack);
+  } else {
+    uint64_t request_id = ++m_request_seq;
+    bufferlist bl;
+    ::encode(NotifyMessage{ImageAcquirePayload{
+        request_id, global_image_id, peer_mirror_uuid, peer_image_id}}, bl);
+    auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+                                           std::move(bl), on_notify_ack);
+    req->send();
+  }
 }
 
+template <typename I>
+void InstanceWatcher<I>::notify_image_release(
+  const std::string &instance_id, const std::string &global_image_id,
+  const std::string &peer_mirror_uuid, const std::string &peer_image_id,
+  bool schedule_delete, Context *on_notify_ack) {
+  dout(20) << "instance_id=" << instance_id << ", global_image_id="
+           << global_image_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_finish == nullptr);
+
+  if (instance_id == m_instance_id) {
+    handle_image_release(global_image_id, peer_mirror_uuid, peer_image_id,
+                         schedule_delete, on_notify_ack);
+  } else {
+    uint64_t request_id = ++m_request_seq;
+    bufferlist bl;
+    ::encode(NotifyMessage{ImageReleasePayload{
+        request_id, global_image_id, peer_mirror_uuid, peer_image_id,
+        schedule_delete}}, bl);
+    auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+                                           std::move(bl), on_notify_ack);
+    req->send();
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::cancel_notify_requests(
+    const std::string &instance_id) {
+  dout(20) << "instance_id=" << instance_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  for (auto op : m_notify_ops) {
+    if (op.first == instance_id) {
+      op.second->cancel();
+    }
+  }
+}
+
+
 template <typename I>
 void InstanceWatcher<I>::register_instance() {
   assert(m_lock.is_locked());
 
     std::swap(on_finish, m_on_finish);
   }
+
   on_finish->complete(r);
 }
 
     derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
   }
 
+  Mutex::Locker locker(m_lock);
+  wait_for_notify_ops();
+}
+
+template <typename I>
+void InstanceWatcher<I>::wait_for_notify_ops() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  for (auto op : m_notify_ops) {
+    op.second->cancel();
+  }
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
+
+  m_notify_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
   Context *on_finish = nullptr;
   {
     Mutex::Locker locker(m_lock);
 
+    assert(m_notify_ops.empty());
+
     std::swap(on_finish, m_on_finish);
     r = m_ret_val;
 
   remove_instance_object();
 }
 
+template <typename I>
+Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
+                                             uint64_t request_id,
+                                             C_NotifyAck *on_notify_ack) {
+  dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
+           << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  Context *ctx = nullptr;
+  Request request(instance_id, request_id);
+  auto it = m_requests.find(request);
+
+  if (it != m_requests.end()) {
+    dout(20) << "duplicate for in-progress request" << dendl;
+    delete it->on_notify_ack;
+    m_requests.erase(it);
+  } else {
+    ctx = new FunctionContext(
+      [this, instance_id, request_id] (int r) {
+        C_NotifyAck *on_notify_ack = nullptr;
+        {
+          // update request state in the requests list
+          Mutex::Locker locker(m_lock);
+          Request request(instance_id, request_id);
+          auto it = m_requests.find(request);
+          assert(it != m_requests.end());
+          on_notify_ack = it->on_notify_ack;
+          m_requests.erase(it);
+        }
+
+        ::encode(NotifyAckPayload(instance_id, request_id, r),
+                 on_notify_ack->out);
+        on_notify_ack->complete(0);
+      });
+  }
+
+  request.on_notify_ack = on_notify_ack;
+  m_requests.insert(request);
+  return ctx;
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+                                       uint64_t notifier_id, bufferlist &bl) {
+  dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
+           << "notifier_id=" << notifier_id << dendl;
+
+  auto ctx = new C_NotifyAck(this, notify_id, handle);
+
+  NotifyMessage notify_message;
+  try {
+    bufferlist::iterator iter = bl.begin();
+    ::decode(notify_message, iter);
+  } catch (const buffer::error &err) {
+    derr << "error decoding image notification: " << err.what() << dendl;
+    ctx->complete(0);
+    return;
+  }
+
+  apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
+                notify_message.payload);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_image_acquire(
+  const std::string &global_image_id, const std::string &peer_mirror_uuid,
+  const std::string &peer_image_id, Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid,
+                                     peer_image_id, on_finish);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_image_release(
+  const std::string &global_image_id,  const std::string &peer_mirror_uuid,
+  const std::string &peer_image_id, bool schedule_delete, Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  m_instance_replayer->release_image(global_image_id, peer_mirror_uuid,
+                                     peer_image_id, schedule_delete, on_finish);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const ImageAcquirePayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(20) << "image_acquire: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish != nullptr) {
+    handle_image_acquire(payload.global_image_id, payload.peer_mirror_uuid,
+                         payload.peer_image_id, on_finish);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const ImageReleasePayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(20) << "image_release: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish != nullptr) {
+    handle_image_release(payload.global_image_id, payload.peer_mirror_uuid,
+                         payload.peer_image_id, payload.schedule_delete,
+                         on_finish);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const UnknownPayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(20) << "unknown: instance_id=" << instance_id << dendl;
+
+  on_notify_ack->complete(0);
+}
+
 } // namespace mirror
 } // namespace rbd
 
 
 #ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
 #define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
 
+#include <map>
+#include <set>
 #include <string>
 #include <vector>
-#include <boost/optional.hpp>
 
+#include "common/AsyncOpTracker.h"
 #include "librbd/Watcher.h"
 #include "librbd/managed_lock/Types.h"
+#include "tools/rbd_mirror/instance_watcher/Types.h"
 
 namespace librbd {
-  class ImageCtx;
-  template <typename> class ManagedLock;
+
+class ImageCtx;
+template <typename> class ManagedLock;
+
 }
 
 namespace rbd {
 namespace mirror {
 
+template <typename> class InstanceReplayer;
+template <typename> struct Threads;
+
 template <typename ImageCtxT = librbd::ImageCtx>
 class InstanceWatcher : protected librbd::Watcher {
 public:
   static void get_instances(librados::IoCtx &io_ctx,
                             std::vector<std::string> *instance_ids,
                             Context *on_finish);
-  static void remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+  static void remove_instance(librados::IoCtx &io_ctx,
+                              ContextWQ *work_queue,
                               const std::string &instance_id,
                               Context *on_finish);
 
   static InstanceWatcher *create(
     librados::IoCtx &io_ctx, ContextWQ *work_queue,
-    const boost::optional<std::string> &id = boost::none) {
-    return new InstanceWatcher(io_ctx, work_queue, id);
-  }
+    InstanceReplayer<ImageCtxT> *instance_replayer);
   void destroy() {
     delete this;
   }
 
   InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
-                  const boost::optional<std::string> &id = boost::none);
+                  InstanceReplayer<ImageCtxT> *instance_replayer,
+                  const std::string &instance_id);
   ~InstanceWatcher() override;
 
+  inline std::string &get_instance_id() {
+    return m_instance_id;
+  }
+
   int init();
   void shut_down();
 
   void shut_down(Context *on_finish);
   void remove(Context *on_finish);
 
-protected:
-  void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id,
-                     bufferlist &bl) override;
+  void notify_image_acquire(const std::string &instance_id,
+                            const std::string &global_image_id,
+                            const std::string &peer_mirror_uuid,
+                            const std::string &peer_image_id,
+                            Context *on_notify_ack);
+  void notify_image_release(const std::string &instance_id,
+                            const std::string &global_image_id,
+                            const std::string &peer_mirror_uuid,
+                            const std::string &peer_image_id,
+                           bool schedule_delete, Context *on_notify_ack);
+
+  void cancel_notify_requests(const std::string &instance_id);
 
 private:
   /**
    *       GET_INSTANCE_LOCKER  * * *>|
    *          ^ (remove)              |
    *          |                       |
-   * <uninitialized> <----------------+--------\
-   *    | (init)         ^            |        |
+   * <uninitialized> <----------------+---- WAIT_FOR_NOTIFY_OPS
+   *    | (init)         ^            |        ^
    *    v        (error) *            |        |
    * REGISTER_INSTANCE * *     * * * *|* *> UNREGISTER_INSTANCE
    *    |                      *      |        ^
    * @endverbatim
    */
 
-  bool m_owner;
+  struct C_NotifyInstanceRequest;
+
+  struct HandlePayloadVisitor : public boost::static_visitor<void> {
+    InstanceWatcher *instance_watcher;
+    std::string instance_id;
+    C_NotifyAck *on_notify_ack;
+
+    HandlePayloadVisitor(InstanceWatcher *instance_watcher,
+                         const std::string &instance_id,
+                         C_NotifyAck *on_notify_ack)
+      : instance_watcher(instance_watcher), instance_id(instance_id),
+        on_notify_ack(on_notify_ack) {
+    }
+
+    template <typename Payload>
+    inline void operator()(const Payload &payload) const {
+      instance_watcher->handle_payload(instance_id, payload, on_notify_ack);
+    }
+  };
+
+  struct Request {
+    std::string instance_id;
+    uint64_t request_id;
+    C_NotifyAck *on_notify_ack = nullptr;
+
+    Request(const std::string &instance_id, uint64_t request_id)
+      : instance_id(instance_id), request_id(request_id) {
+    }
+
+    inline bool operator<(const Request &rhs) const {
+      return instance_id < rhs.instance_id ||
+        (instance_id == rhs.instance_id && request_id < rhs.request_id);
+    }
+  };
+
+  Threads<ImageCtxT> *m_threads;
+  InstanceReplayer<ImageCtxT> *m_instance_replayer;
   std::string m_instance_id;
 
   mutable Mutex m_lock;
   int m_ret_val = 0;
   bool m_removing = false;
   librbd::managed_lock::Locker m_instance_locker;
+  std::set<std::pair<std::string, C_NotifyInstanceRequest *>> m_notify_ops;
+  AsyncOpTracker m_notify_op_tracker;
+  uint64_t m_request_seq = 0;
+  std::set<Request> m_requests;
 
   void register_instance();
   void handle_register_instance(int r);
   void unregister_instance();
   void handle_unregister_instance(int r);
 
+  void wait_for_notify_ops();
+  void handle_wait_for_notify_ops(int r);
+
   void get_instance_locker();
   void handle_get_instance_locker(int r);
 
   void break_instance_lock();
   void handle_break_instance_lock(int r);
+
+  Context *prepare_request(const std::string &instance_id, uint64_t request_id,
+                           C_NotifyAck *on_notify_ack);
+
+  void handle_notify(uint64_t notify_id, uint64_t handle,
+                     uint64_t notifier_id, bufferlist &bl) override;
+
+  void handle_image_acquire(const std::string &global_image_id,
+                            const std::string &peer_mirror_uuid,
+                            const std::string &peer_image_id,
+                            Context *on_finish);
+  void handle_image_release(const std::string &global_image_id,
+                            const std::string &peer_mirror_uuid,
+                            const std::string &peer_image_id,
+                            bool schedule_delete, Context *on_finish);
+
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::ImageAcquirePayload &payload,
+                      C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::ImageReleasePayload &payload,
+                      C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::UnknownPayload &payload,
+                      C_NotifyAck *on_notify_ack);
 };
 
 } // namespace mirror
 
 #include "common/debug.h"
 #include "common/errno.h"
 #include "cls/rbd/cls_rbd_client.h"
+#include "include/stringify.h"
 #include "librbd/Utils.h"
 #include "librbd/watcher/Types.h"
 #include "Threads.h"
 }
 
 template <typename I>
-bool LeaderWatcher<I>::is_leader() {
+bool LeaderWatcher<I>::is_leader() const {
   Mutex::Locker locker(m_lock);
 
   return is_leader(m_lock);
 }
 
 template <typename I>
-bool LeaderWatcher<I>::is_leader(Mutex &lock) {
+bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
   assert(m_lock.is_locked());
 
   bool leader = m_leader_lock->is_leader();
   return leader;
 }
 
+template <typename I>
+bool LeaderWatcher<I>::is_releasing_leader() const {
+  Mutex::Locker locker(m_lock);
+
+  return is_releasing_leader(m_lock);
+}
+
+template <typename I>
+bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
+  assert(m_lock.is_locked());
+
+  bool releasing = m_leader_lock->is_releasing_leader();
+  dout(20) << releasing << dendl;
+  return releasing;
+}
+
+template <typename I>
+bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
+    *instance_id = stringify(m_notifier_id);
+    return true;
+  }
+
+  if (!m_locker.cookie.empty()) {
+    *instance_id = stringify(m_locker.entity.num());
+    return true;
+  }
+
+  return false;
+}
+
 template <typename I>
 void LeaderWatcher<I>::release_leader() {
   dout(20) << dendl;
   }
 }
 
-
 template <typename I>
 void LeaderWatcher<I>::cancel_timer_task() {
   assert(m_threads->timer_lock.is_locked());
 
   void init(Context *on_finish);
   void shut_down(Context *on_finish);
 
-  bool is_leader();
+  bool is_leader() const;
+  bool is_releasing_leader() const;
+  bool get_leader_instance_id(std::string *instance_id) const;
   void release_leader();
   void list_instances(std::vector<std::string> *instance_ids);
 
       return Parent::is_state_post_acquiring() || Parent::is_state_locked();
     }
 
+    bool is_releasing_leader() const {
+      Mutex::Locker locker(Parent::m_lock);
+      return Parent::is_state_pre_releasing();
+    }
+
   protected:
     void post_acquire_lock_handler(int r, Context *on_finish) {
       if (r == 0) {
   Threads<ImageCtxT> *m_threads;
   Listener *m_listener;
 
-  Mutex m_lock;
+  mutable Mutex m_lock;
   uint64_t m_notifier_id;
   LeaderLock *m_leader_lock;
   Context *m_on_finish = nullptr;
 
   librbd::watcher::NotifyResponse m_heartbeat_response;
 
-  bool is_leader(Mutex &m_lock);
+  bool is_leader(Mutex &m_lock) const;
+  bool is_releasing_leader(Mutex &m_lock) const;
 
   void cancel_timer_task();
   void schedule_timer_task(const std::string &name,
 
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
-  m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
-                                             &m_leader_listener));
-  r = m_leader_watcher->init();
+  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
+                                                     m_threads->work_queue,
+                                                     m_instance_replayer.get()));
+  r = m_instance_watcher->init();
   if (r < 0) {
-    derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+    derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
     return r;
   }
 
-  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
-                                                     m_threads->work_queue));
-  r = m_instance_watcher->init();
+  m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
+                                             &m_leader_listener));
+  r = m_leader_watcher->init();
   if (r < 0) {
-    derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+    derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
     return r;
   }
 
   f->open_object_section("replayer_status");
   f->dump_string("pool", m_local_io_ctx.get_pool_name());
   f->dump_stream("peer") << m_peer;
+  f->dump_string("instance_id", m_instance_watcher->get_instance_id());
+
+  std::string leader_instance_id;
+  m_leader_watcher->get_leader_instance_id(&leader_instance_id);
+  f->dump_string("leader_instance_id", leader_instance_id);
 
   bool leader = m_leader_watcher->is_leader();
   f->dump_bool("leader", leader);
   C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
 
   for (auto &image_id : removed_image_ids) {
-    m_instance_replayer->release_image(image_id.global_id, mirror_uuid,
-                                       image_id.id, true,
-                                       gather_ctx->new_sub());
+    // for now always send to myself (the leader)
+    std::string &instance_id = m_instance_watcher->get_instance_id();
+    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+                                             mirror_uuid, image_id.id, true,
+                                             gather_ctx->new_sub());
   }
 
   for (auto &image_id : added_image_ids) {
-    m_instance_replayer->acquire_image(image_id.global_id, mirror_uuid,
-                                       image_id.id, gather_ctx->new_sub());
+    // for now always send to myself (the leader)
+    std::string &instance_id = m_instance_watcher->get_instance_id();
+    m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
+                                             mirror_uuid, image_id.id,
+                                             gather_ctx->new_sub());
   }
 
   gather_ctx->activate();
 
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Types.h"
+#include "include/assert.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+
+namespace rbd {
+namespace mirror {
+namespace instance_watcher {
+
+namespace {
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+  explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+  template <typename Payload>
+  inline void operator()(const Payload &payload) const {
+    ::encode(static_cast<uint32_t>(Payload::NOTIFY_OP), m_bl);
+    payload.encode(m_bl);
+  }
+
+private:
+  bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+  DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
+    : m_version(version), m_iter(iter) {}
+
+  template <typename Payload>
+  inline void operator()(Payload &payload) const {
+    payload.decode(m_version, m_iter);
+  }
+
+private:
+  __u8 m_version;
+  bufferlist::iterator &m_iter;
+};
+
+class DumpPayloadVisitor : public boost::static_visitor<void> {
+public:
+  explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
+
+  template <typename Payload>
+  inline void operator()(const Payload &payload) const {
+    NotifyOp notify_op = Payload::NOTIFY_OP;
+    m_formatter->dump_string("notify_op", stringify(notify_op));
+    payload.dump(m_formatter);
+  }
+
+private:
+  ceph::Formatter *m_formatter;
+};
+
+} // anonymous namespace
+
+void ImagePayloadBase::encode(bufferlist &bl) const {
+  ::encode(request_id, bl);
+  ::encode(global_image_id, bl);
+  ::encode(peer_mirror_uuid, bl);
+  ::encode(peer_image_id, bl);
+}
+
+void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(request_id, iter);
+  ::decode(global_image_id, iter);
+  ::decode(peer_mirror_uuid, iter);
+  ::decode(peer_image_id, iter);
+}
+
+void ImagePayloadBase::dump(Formatter *f) const {
+  f->dump_unsigned("request_id", request_id);
+  f->dump_string("global_image_id", global_image_id);
+  f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
+  f->dump_string("peer_image_id", peer_image_id);
+}
+
+void ImageReleasePayload::encode(bufferlist &bl) const {
+  ImagePayloadBase::encode(bl);
+  ::encode(schedule_delete, bl);
+}
+
+void ImageReleasePayload::decode(__u8 version, bufferlist::iterator &iter) {
+  ImagePayloadBase::decode(version, iter);
+  ::decode(schedule_delete, iter);
+}
+
+void ImageReleasePayload::dump(Formatter *f) const {
+  ImagePayloadBase::dump(f);
+  f->dump_bool("schedule_delete", schedule_delete);
+}
+
+void UnknownPayload::encode(bufferlist &bl) const {
+  assert(false);
+}
+
+void UnknownPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void UnknownPayload::dump(Formatter *f) const {
+}
+
+void NotifyMessage::encode(bufferlist& bl) const {
+  ENCODE_START(1, 1, bl);
+  boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+  ENCODE_FINISH(bl);
+}
+
+void NotifyMessage::decode(bufferlist::iterator& iter) {
+  DECODE_START(1, iter);
+
+  uint32_t notify_op;
+  ::decode(notify_op, iter);
+
+  // select the correct payload variant based upon the encoded op
+  switch (notify_op) {
+  case NOTIFY_OP_IMAGE_ACQUIRE:
+    payload = ImageAcquirePayload();
+    break;
+  case NOTIFY_OP_IMAGE_RELEASE:
+    payload = ImageReleasePayload();
+    break;
+  default:
+    payload = UnknownPayload();
+    break;
+  }
+
+  apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+  DECODE_FINISH(iter);
+}
+
+void NotifyMessage::dump(Formatter *f) const {
+  apply_visitor(DumpPayloadVisitor(f), payload);
+}
+
+void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
+  o.push_back(new NotifyMessage(ImageAcquirePayload()));
+  o.push_back(new NotifyMessage(ImageAcquirePayload(1, "gid", "uuid", "id")));
+
+  o.push_back(new NotifyMessage(ImageReleasePayload()));
+  o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id",
+                                                    true)));
+}
+
+std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
+  switch (op) {
+  case NOTIFY_OP_IMAGE_ACQUIRE:
+    out << "ImageAcquire";
+    break;
+  case NOTIFY_OP_IMAGE_RELEASE:
+    out << "ImageRelease";
+    break;
+  default:
+    out << "Unknown (" << static_cast<uint32_t>(op) << ")";
+    break;
+  }
+  return out;
+}
+
+void NotifyAckPayload::encode(bufferlist &bl) const {
+  ::encode(instance_id, bl);
+  ::encode(request_id, bl);
+  ::encode(ret_val, bl);
+}
+
+void NotifyAckPayload::decode(bufferlist::iterator &iter) {
+  ::decode(instance_id, iter);
+  ::decode(request_id, iter);
+  ::decode(ret_val, iter);
+}
+
+void NotifyAckPayload::dump(Formatter *f) const {
+  f->dump_string("instance_id", instance_id);
+  f->dump_unsigned("request_id", request_id);
+  f->dump_int("request_id", ret_val);
+}
+
+} // namespace instance_watcher
+} // namespace mirror
+} // namespace rbd
 
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
+#define RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
+
+#include <string>
+#include <set>
+#include <boost/variant.hpp>
+
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include "include/int_types.h"
+
+namespace ceph { class Formatter; }
+
+namespace rbd {
+namespace mirror {
+namespace instance_watcher {
+
+enum NotifyOp {
+  NOTIFY_OP_IMAGE_ACQUIRE  = 0,
+  NOTIFY_OP_IMAGE_RELEASE  = 1,
+};
+
+struct ImagePayloadBase {
+  uint64_t request_id;
+  std::string global_image_id;
+  std::string peer_mirror_uuid;
+  std::string peer_image_id;
+
+  ImagePayloadBase() : request_id(0) {
+  }
+
+  ImagePayloadBase(uint64_t request_id, const std::string &global_image_id,
+                   const std::string &peer_mirror_uuid,
+                   const std::string &peer_image_id)
+    : request_id(request_id), global_image_id(global_image_id),
+      peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+  void dump(Formatter *f) const;
+};
+
+struct ImageAcquirePayload : public ImagePayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_IMAGE_ACQUIRE;
+
+  ImageAcquirePayload() : ImagePayloadBase() {
+  }
+
+  ImageAcquirePayload(uint64_t request_id, const std::string &global_image_id,
+                      const std::string &peer_mirror_uuid,
+                      const std::string &peer_image_id)
+    : ImagePayloadBase(request_id, global_image_id, peer_mirror_uuid,
+                       peer_image_id) {
+  }
+};
+
+struct ImageReleasePayload : public ImagePayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_IMAGE_RELEASE;
+
+  bool schedule_delete;
+
+  ImageReleasePayload() : ImagePayloadBase(), schedule_delete(false) {
+  }
+
+  ImageReleasePayload(uint64_t request_id, const std::string &global_image_id,
+                      const std::string &peer_mirror_uuid,
+                      const std::string &peer_image_id, bool schedule_delete)
+    : ImagePayloadBase(request_id, global_image_id, peer_mirror_uuid,
+                       peer_image_id),
+      schedule_delete(schedule_delete) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+  void dump(Formatter *f) const;
+};
+
+struct UnknownPayload {
+  static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
+
+  UnknownPayload() {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+  void dump(Formatter *f) const;
+};
+
+typedef boost::variant<ImageAcquirePayload,
+                       ImageReleasePayload,
+                       UnknownPayload> Payload;
+
+struct NotifyMessage {
+  NotifyMessage(const Payload &payload = UnknownPayload()) : payload(payload) {
+  }
+
+  Payload payload;
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+  void dump(Formatter *f) const;
+
+  static void generate_test_instances(std::list<NotifyMessage *> &o);
+};
+
+WRITE_CLASS_ENCODER(NotifyMessage);
+
+std::ostream &operator<<(std::ostream &out, const NotifyOp &op);
+
+struct NotifyAckPayload {
+  std::string instance_id;
+  uint64_t request_id;
+  int ret_val;
+
+  NotifyAckPayload() : request_id(0), ret_val(0) {
+  }
+
+  NotifyAckPayload(const std::string &instance_id, uint64_t request_id,
+                   int ret_val)
+    : instance_id(instance_id), request_id(request_id), ret_val(ret_val) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator& it);
+  void dump(Formatter *f) const;
+};
+
+WRITE_CLASS_ENCODER(NotifyAckPayload);
+
+} // namespace instance_watcher
+} // namespace mirror
+} // namespace librbd
+
+using rbd::mirror::instance_watcher::encode;
+using rbd::mirror::instance_watcher::decode;
+
+#endif // RBD_MIRROR_INSTANCE_WATCHER_TYPES_H