#include "test/rbd_mirror/test_mock_fixture.h"
 #include "librbd/journal/TypeTraits.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 class ProgressContext;
 
 template<>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
-  MOCK_METHOD10(start_sync, void(librbd::MockTestImageCtx *local_image_ctx,
-                                 librbd::MockTestImageCtx *remote_image_ctx,
-                                 SafeTimer *timer, Mutex *timer_lock,
-                                 const std::string &mirror_uuid,
-                                 ::journal::MockJournaler *journaler,
-                                 librbd::journal::MirrorPeerClientMeta *client_meta,
-                                 ContextWQ *work_queue, Context *on_finish,
-                                 ProgressContext *progress_ctx));
-  MOCK_METHOD1(cancel_sync, void(const std::string& mirror_uuid));
+struct ImageSync<librbd::MockTestImageCtx> {
+  static ImageSync* s_instance;
+  Context *on_finish = nullptr;
+
+  static ImageSync* create(
+      librbd::MockTestImageCtx *local_image_ctx,
+      librbd::MockTestImageCtx *remote_image_ctx,
+      SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
+      ::journal::MockJournaler *journaler,
+      librbd::journal::MirrorPeerClientMeta *client_meta, ContextWQ *work_queue,
+      InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+      Context *on_finish, ProgressContext *progress_ctx) {
+    assert(s_instance != nullptr);
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  ImageSync() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+  ~ImageSync() {
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(get, void());
+  MOCK_METHOD0(put, void());
+  MOCK_METHOD0(send, void());
+  MOCK_METHOD0(cancel, void());
+};
+
+ImageSync<librbd::MockTestImageCtx>*
+  ImageSync<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+template<>
+struct InstanceWatcher<librbd::MockTestImageCtx> {
 };
 
 namespace image_replayer {
 
 class TestMockImageReplayerBootstrapRequest : public TestMockFixture {
 public:
-  typedef ImageSyncThrottlerRef<librbd::MockTestImageCtx> MockImageSyncThrottler;
   typedef BootstrapRequest<librbd::MockTestImageCtx> MockBootstrapRequest;
   typedef CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
   typedef CreateImageRequest<librbd::MockTestImageCtx> MockCreateImageRequest;
+  typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
+  typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
   typedef IsPrimaryRequest<librbd::MockTestImageCtx> MockIsPrimaryRequest;
   typedef OpenImageRequest<librbd::MockTestImageCtx> MockOpenImageRequest;
   typedef OpenLocalImageRequest<librbd::MockTestImageCtx> MockOpenLocalImageRequest;
         }));
   }
 
-  void expect_image_sync(MockImageSyncThrottler image_sync_throttler,
-                         int r) {
-    EXPECT_CALL(*image_sync_throttler, start_sync(_, _, _, _,
-                                                  StrEq("local mirror uuid"),
-                                                  _, _, _, _, _))
-      .WillOnce(WithArg<8>(Invoke([this, r](Context *on_finish) {
-                             m_threads->work_queue->queue(on_finish, r);
-                           })));
+  void expect_image_sync(MockImageSync &mock_image_sync, int r) {
+    EXPECT_CALL(mock_image_sync, get());
+    EXPECT_CALL(mock_image_sync, send())
+      .WillOnce(Invoke([this, &mock_image_sync, r]() {
+            m_threads->work_queue->queue(mock_image_sync.on_finish, r);
+          }));
+    EXPECT_CALL(mock_image_sync, put());
   }
 
   bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) {
     return bl;
   }
 
-  MockBootstrapRequest *create_request(MockImageSyncThrottler mock_image_sync_throttler,
+  MockBootstrapRequest *create_request(MockInstanceWatcher *mock_instance_watcher,
                                        ::journal::MockJournaler &mock_journaler,
                                        const std::string &local_image_id,
                                        const std::string &remote_image_id,
                                        Context *on_finish) {
     return new MockBootstrapRequest(m_local_io_ctx,
                                     m_remote_io_ctx,
-                                    mock_image_sync_throttler,
+                                    mock_instance_watcher,
                                     &m_local_test_image_ctx,
                                     local_image_id,
                                     remote_image_id,
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+    &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+    &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+    &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+    &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+    &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+    &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   m_do_resync = false;
   expect_journaler_update_client(mock_journaler, client_data, 0);
 
   // sync the remote image to the local image
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
-  expect_image_sync(mock_image_sync_throttler, 0);
+  MockImageSync mock_image_sync;
+  expect_image_sync(mock_image_sync, 0);
 
   MockCloseImageRequest mock_close_image_request;
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, "",
+    &mock_instance_watcher, mock_journaler, "",
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
   expect_journaler_update_client(mock_journaler, client_data, 0);
 
   // sync the remote image to the local image
-  MockImageSyncThrottler mock_image_sync_throttler(
-    new ImageSyncThrottler<librbd::MockTestImageCtx>());
-  expect_image_sync(mock_image_sync_throttler, 0);
+  MockImageSync mock_image_sync;
+  expect_image_sync(mock_image_sync, 0);
 
   MockCloseImageRequest mock_close_image_request;
   expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
 
   C_SaferCond ctx;
+  MockInstanceWatcher mock_instance_watcher;
   MockBootstrapRequest *request = create_request(
-    mock_image_sync_throttler, mock_journaler, "",
+    &mock_instance_watcher, mock_journaler, "",
     mock_remote_image_ctx.id, "global image id", "local mirror uuid",
     "remote mirror uuid", &ctx);
   request->send();
 
 #include "librbd/io/ReadResult.h"
 #include "tools/rbd_mirror/types.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
 #include "tools/rbd_mirror/ImageDeleter.h"
 
     m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
                                                         m_threads->timer,
                                                         &m_threads->timer_lock));
-    m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>());
+    m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
+        m_local_ioctx, m_threads->work_queue, nullptr);
+    m_instance_watcher->handle_acquire_leader();
   }
 
-  ~TestImageReplayer() override 
+  ~TestImageReplayer() override
   {
     unwatch();
 
+    m_instance_watcher->handle_release_leader();
+
     delete m_replayer;
+    delete m_instance_watcher;
     delete m_threads;
 
     EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
 
   template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
   void create_replayer() {
-    m_replayer = new ImageReplayerT(m_threads, m_image_deleter, m_image_sync_throttler,
-      rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
-      m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
+    m_replayer = new ImageReplayerT(
+        m_threads, m_image_deleter, m_instance_watcher,
+        rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
+        m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
     m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id,
                                  m_remote_ioctx);
   }
   std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
   std::shared_ptr<librados::Rados> m_local_cluster;
   librados::Rados m_remote_cluster;
-  std::shared_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
+  rbd::mirror::InstanceWatcher<> *m_instance_watcher;
   std::string m_local_mirror_uuid = "local mirror uuid";
   std::string m_remote_mirror_uuid = "remote mirror uuid";
   std::string m_local_pool_name, m_remote_pool_name;
 
 #include "librbd/io/ReadResult.h"
 #include "librbd/journal/Types.h"
 #include "tools/rbd_mirror/ImageSync.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
 
 void register_test_image_sync() {
     create_and_open(m_local_io_ctx, &m_local_image_ctx);
     create_and_open(m_remote_io_ctx, &m_remote_image_ctx);
 
+    m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
+        m_local_io_ctx, m_threads->work_queue, nullptr);
+    m_instance_watcher->handle_acquire_leader();
+
     m_remote_journaler = new ::journal::Journaler(
       m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
       m_remote_io_ctx, m_remote_image_ctx->id, "mirror-uuid", {});
 
   void TearDown() override {
     TestFixture::TearDown();
+
+    m_instance_watcher->handle_release_leader();
+
     delete m_remote_journaler;
+    delete m_instance_watcher;
   }
 
   void create_and_open(librados::IoCtx &io_ctx, librbd::ImageCtx **image_ctx) {
     return new ImageSync<>(m_local_image_ctx, m_remote_image_ctx,
                            m_threads->timer, &m_threads->timer_lock,
                            "mirror-uuid", m_remote_journaler, &m_client_meta,
-                           m_threads->work_queue, ctx);
+                           m_threads->work_queue, m_instance_watcher, ctx);
   }
 
   librbd::ImageCtx *m_remote_image_ctx;
   librbd::ImageCtx *m_local_image_ctx;
+  rbd::mirror::InstanceWatcher<> *m_instance_watcher;
   ::journal::Journaler *m_remote_journaler;
   librbd::journal::MirrorPeerClientMeta m_client_meta;
 };
 
       }
     }
 
+    void update_leader_handler(const std::string &leader_instance_id) override {
+    }
+
   private:
     mutable Mutex m_test_lock;
     int m_acquire_count = 0;
 
 #include "librbd/journal/Replay.h"
 #include "librbd/journal/Types.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "test/journal/mock/MockJournaler.h"
 #include "test/librbd/mock/MockImageCtx.h"
 namespace mirror {
 
 template<>
-class ImageSyncThrottler<librbd::MockTestImageCtx> {
+class InstanceWatcher<librbd::MockTestImageCtx> {
 };
 
 namespace image_replayer {
   Context *on_finish = nullptr;
   bool *do_resync = nullptr;
 
-  static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
-        librados::IoCtx &remote_io_ctx,
-        rbd::mirror::ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
-        librbd::MockTestImageCtx **local_image_ctx,
-        const std::string &local_image_name,
-        const std::string &remote_image_id,
-        const std::string &global_image_id,
-        ContextWQ *work_queue, SafeTimer *timer,
-        Mutex *timer_lock,
-        const std::string &local_mirror_uuid,
-        const std::string &remote_mirror_uuid,
-        ::journal::MockJournalerProxy *journaler,
-        librbd::journal::MirrorPeerClientMeta *client_meta,
-        Context *on_finish,
-        bool *do_resync,
-        rbd::mirror::ProgressContext *progress_ctx = nullptr) {
+  static BootstrapRequest* create(
+      librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
+      rbd::mirror::InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+      librbd::MockTestImageCtx **local_image_ctx,
+      const std::string &local_image_name, const std::string &remote_image_id,
+      const std::string &global_image_id, ContextWQ *work_queue,
+      SafeTimer *timer, Mutex *timer_lock, const std::string &local_mirror_uuid,
+      const std::string &remote_mirror_uuid,
+      ::journal::MockJournalerProxy *journaler,
+      librbd::journal::MirrorPeerClientMeta *client_meta,
+      Context *on_finish, bool *do_resync,
+      rbd::mirror::ProgressContext *progress_ctx = nullptr) {
     assert(s_instance != nullptr);
     s_instance->image_ctx = local_image_ctx;
     s_instance->on_finish = on_finish;
   typedef ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
   typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
   typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
+  typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
 
   void SetUp() override {
     TestMockFixture::SetUp();
     m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
                                                         m_threads->timer,
                                                         &m_threads->timer_lock));
-    m_image_sync_throttler.reset(
-      new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
-
     m_image_replayer = new MockImageReplayer(
-      m_threads, m_image_deleter, m_image_sync_throttler,
+      m_threads, m_image_deleter, &m_instance_watcher,
       rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
       "local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
     m_image_replayer->add_remote_image(
   librbd::ImageCtx *m_remote_image_ctx;
   librbd::ImageCtx *m_local_image_ctx = nullptr;
   std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
-  std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>> m_image_sync_throttler;
+  MockInstanceWatcher m_instance_watcher;
   MockImageReplayer *m_image_replayer;
 };
 
 
 namespace rbd {
 namespace mirror {
 
+template<>
+struct InstanceWatcher<librbd::MockTestImageCtx> {
+  MOCK_METHOD2(notify_sync_request, void(const std::string, Context *));
+  MOCK_METHOD1(cancel_sync_request, bool(const std::string &));
+  MOCK_METHOD1(notify_sync_complete, void(const std::string &));
+};
+
 namespace image_sync {
 
 template <>
 class TestMockImageSync : public TestMockFixture {
 public:
   typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
+  typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
   typedef image_sync::ImageCopyRequest<librbd::MockTestImageCtx> MockImageCopyRequest;
   typedef image_sync::SnapshotCopyRequest<librbd::MockTestImageCtx> MockSnapshotCopyRequest;
   typedef image_sync::SyncPointCreateRequest<librbd::MockTestImageCtx> MockSyncPointCreateRequest;
       ReturnNew<FunctionContext>([](int) {}));
   }
 
+  void expect_notify_sync_request(MockInstanceWatcher &mock_instance_watcher,
+                                  const std::string &sync_id, int r) {
+    EXPECT_CALL(mock_instance_watcher, notify_sync_request(sync_id, _))
+      .WillOnce(Invoke([this, r](const std::string &, Context *on_sync_start) {
+            m_threads->work_queue->queue(on_sync_start, r);
+          }));
+  }
+
+  void expect_cancel_sync_request(MockInstanceWatcher &mock_instance_watcher,
+                                  const std::string &sync_id, bool canceled) {
+    EXPECT_CALL(mock_instance_watcher, cancel_sync_request(sync_id))
+      .WillOnce(Return(canceled));
+  }
+
+  void expect_notify_sync_complete(MockInstanceWatcher &mock_instance_watcher,
+                                   const std::string &sync_id) {
+    EXPECT_CALL(mock_instance_watcher, notify_sync_complete(sync_id));
+  }
+
   void expect_create_sync_point(librbd::MockTestImageCtx &mock_local_image_ctx,
                                 MockSyncPointCreateRequest &mock_sync_point_create_request,
                                 int r) {
   MockImageSync *create_request(librbd::MockTestImageCtx &mock_remote_image_ctx,
                                 librbd::MockTestImageCtx &mock_local_image_ctx,
                                 journal::MockJournaler &mock_journaler,
+                                MockInstanceWatcher &mock_instance_watcher,
                                 Context *ctx) {
     return new MockImageSync(&mock_local_image_ctx, &mock_remote_image_ctx,
                              m_threads->timer, &m_threads->timer_lock,
                              "mirror-uuid", &mock_journaler, &m_client_meta,
-                             m_threads->work_queue, ctx);
+                             m_threads->work_queue, &mock_instance_watcher,
+                             ctx);
   }
 
   librbd::ImageCtx *m_remote_image_ctx;
   librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
   journal::MockJournaler mock_journaler;
+  MockInstanceWatcher mock_instance_watcher;
   MockImageCopyRequest mock_image_copy_request;
   MockSnapshotCopyRequest mock_snapshot_copy_request;
   MockSyncPointCreateRequest mock_sync_point_create_request;
   expect_test_features(mock_local_image_ctx);
 
   InSequence seq;
+  expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
   expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
   expect_copy_snapshots(mock_snapshot_copy_request, 0);
   expect_copy_image(mock_image_copy_request, 0);
   expect_create_object_map(mock_local_image_ctx, mock_object_map);
   expect_open_object_map(mock_local_image_ctx, *mock_object_map);
   expect_prune_sync_point(mock_sync_point_prune_request, true, 0);
+  expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
 
   C_SaferCond ctx;
   MockImageSync *request = create_request(mock_remote_image_ctx,
-                                          mock_local_image_ctx,
-                                          mock_journaler, &ctx);
+                                          mock_local_image_ctx, mock_journaler,
+                                          mock_instance_watcher, &ctx);
   request->send();
   ASSERT_EQ(0, ctx.wait());
 }
   librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
   journal::MockJournaler mock_journaler;
+  MockInstanceWatcher mock_instance_watcher;
   MockImageCopyRequest mock_image_copy_request;
   MockSnapshotCopyRequest mock_snapshot_copy_request;
   MockSyncPointCreateRequest mock_sync_point_create_request;
   expect_test_features(mock_local_image_ctx);
 
   InSequence seq;
+  expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
   expect_prune_sync_point(mock_sync_point_prune_request, false, 0);
   expect_copy_snapshots(mock_snapshot_copy_request, 0);
   expect_copy_image(mock_image_copy_request, 0);
   expect_create_object_map(mock_local_image_ctx, mock_object_map);
   expect_open_object_map(mock_local_image_ctx, *mock_object_map);
   expect_prune_sync_point(mock_sync_point_prune_request, true, 0);
+  expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
 
   C_SaferCond ctx;
   MockImageSync *request = create_request(mock_remote_image_ctx,
-                                          mock_local_image_ctx,
-                                          mock_journaler, &ctx);
+                                          mock_local_image_ctx, mock_journaler,
+                                          mock_instance_watcher, &ctx);
   request->send();
   ASSERT_EQ(0, ctx.wait());
 }
 
+TEST_F(TestMockImageSync, CancelNotifySyncRequest) {
+  librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+  librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
+  journal::MockJournaler mock_journaler;
+  MockInstanceWatcher mock_instance_watcher;
+
+  InSequence seq;
+  Context *on_sync_start = nullptr;
+  C_SaferCond notify_sync_ctx;
+  EXPECT_CALL(mock_instance_watcher,
+              notify_sync_request(mock_local_image_ctx.id, _))
+    .WillOnce(Invoke([this, &on_sync_start, ¬ify_sync_ctx](
+                         const std::string &, Context *ctx) {
+                       on_sync_start = ctx;
+                       notify_sync_ctx.complete(0);
+                     }));
+  EXPECT_CALL(mock_instance_watcher,
+              cancel_sync_request(mock_local_image_ctx.id))
+    .WillOnce(Invoke([this, &on_sync_start](const std::string &) {
+          EXPECT_NE(nullptr, on_sync_start);
+          on_sync_start->complete(-ECANCELED);
+          return true;
+        }));
+
+  C_SaferCond ctx;
+  MockImageSync *request = create_request(mock_remote_image_ctx,
+                                          mock_local_image_ctx, mock_journaler,
+                                          mock_instance_watcher, &ctx);
+  request->get();
+  request->send();
+
+  // cancel the notify sync request once it starts
+  ASSERT_EQ(0, notify_sync_ctx.wait());
+  request->cancel();
+  request->put();
+
+  ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
 TEST_F(TestMockImageSync, CancelImageCopy) {
   librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
   journal::MockJournaler mock_journaler;
+  MockInstanceWatcher mock_instance_watcher;
   MockImageCopyRequest mock_image_copy_request;
   MockSnapshotCopyRequest mock_snapshot_copy_request;
   MockSyncPointCreateRequest mock_sync_point_create_request;
   m_client_meta.sync_points = {{cls::rbd::UserSnapshotNamespace(), "snap1", boost::none}};
 
   InSequence seq;
+  expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
   expect_prune_sync_point(mock_sync_point_prune_request, false, 0);
   expect_copy_snapshots(mock_snapshot_copy_request, 0);
 
     .WillOnce(Invoke([&image_copy_ctx]() {
         image_copy_ctx.complete(0);
       }));
+  expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
+                             false);
   EXPECT_CALL(mock_image_copy_request, cancel());
+  expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
 
   C_SaferCond ctx;
   MockImageSync *request = create_request(mock_remote_image_ctx,
-                                          mock_local_image_ctx,
-                                          mock_journaler, &ctx);
+                                          mock_local_image_ctx, mock_journaler,
+                                          mock_instance_watcher, &ctx);
   request->get();
   request->send();
 
   librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
   journal::MockJournaler mock_journaler;
+  MockInstanceWatcher mock_instance_watcher;
   MockSnapshotCopyRequest mock_snapshot_copy_request;
   MockSyncPointCreateRequest mock_sync_point_create_request;
 
 
   C_SaferCond ctx;
   MockImageSync *request = create_request(mock_remote_image_ctx,
-                                          mock_local_image_ctx,
-                                          mock_journaler, &ctx);
+                                          mock_local_image_ctx, mock_journaler,
+                                          mock_instance_watcher, &ctx);
   InSequence seq;
+  expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
   expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
   EXPECT_CALL(mock_snapshot_copy_request, send())
     .WillOnce((DoAll(InvokeWithoutArgs([request]() {
          Invoke([this, &mock_snapshot_copy_request]() {
              m_threads->work_queue->queue(mock_snapshot_copy_request.on_finish, 0);
            }))));
+  expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
+                             false);
   EXPECT_CALL(mock_snapshot_copy_request, cancel());
+  expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
 
   request->send();
   ASSERT_EQ(-ECANCELED, ctx.wait());
   librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
   journal::MockJournaler mock_journaler;
+  MockInstanceWatcher mock_instance_watcher;
   MockImageCopyRequest mock_image_copy_request;
   MockSnapshotCopyRequest mock_snapshot_copy_request;
   MockSyncPointCreateRequest mock_sync_point_create_request;
 
   C_SaferCond ctx;
   MockImageSync *request = create_request(mock_remote_image_ctx,
-                                          mock_local_image_ctx,
-                                          mock_journaler, &ctx);
+                                          mock_local_image_ctx, mock_journaler,
+                                          mock_instance_watcher, &ctx);
   InSequence seq;
+  expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
   expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
   expect_copy_snapshots(mock_snapshot_copy_request, 0);
   EXPECT_CALL(mock_image_copy_request, send())
          Invoke([this, &mock_image_copy_request]() {
              m_threads->work_queue->queue(mock_image_copy_request.on_finish, 0);
            }))));
+  expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
+                             false);
   EXPECT_CALL(mock_image_copy_request, cancel());
+  expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
 
   request->send();
   ASSERT_EQ(-ECANCELED, ctx.wait());
 
  */
 
 #include "test/rbd_mirror/test_mock_fixture.h"
-#include "librbd/journal/TypeTraits.h"
-#include "test/journal/mock/MockJournaler.h"
 #include "test/librbd/mock/MockImageCtx.h"
-#include "librbd/ImageState.h"
-#include "tools/rbd_mirror/Threads.h"
-#include "tools/rbd_mirror/ImageSync.h"
 
 namespace librbd {
 
 
 } // anonymous namespace
 
-namespace journal {
-
-template <>
-struct TypeTraits<librbd::MockTestImageCtx> {
-  typedef ::journal::MockJournaler Journaler;
-};
-
-} // namespace journal
 } // namespace librbd
 
-namespace rbd {
-namespace mirror {
-
-using ::testing::Invoke;
-
-typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
-
-template<>
-class ImageSync<librbd::MockTestImageCtx> {
-public:
-  static std::vector<MockImageSync *> instances;
-
-  Context *on_finish;
-  bool syncing = false;
-
-  static ImageSync* create(librbd::MockTestImageCtx *local_image_ctx,
-                           librbd::MockTestImageCtx *remote_image_ctx,
-                           SafeTimer *timer, Mutex *timer_lock,
-                           const std::string &mirror_uuid,
-                           journal::MockJournaler *journaler,
-                           librbd::journal::MirrorPeerClientMeta *client_meta,
-                           ContextWQ *work_queue, Context *on_finish,
-                           ProgressContext *progress_ctx = nullptr) {
-    ImageSync *sync = new ImageSync();
-    sync->on_finish = on_finish;
-
-    EXPECT_CALL(*sync, send())
-      .WillRepeatedly(Invoke([sync]() {
-            sync->syncing = true;
-          }));
-
-    return sync;
-  }
-
-  void finish(int r) {
-    on_finish->complete(r);
-  }
-
-  void get() {
-    instances.push_back(this);
-  }
-
-  void put() { delete this; }
-
-  MOCK_METHOD0(cancel, void());
-  MOCK_METHOD0(send, void());
-
-};
-
-
-std::vector<MockImageSync *> MockImageSync::instances;
-
-} // namespace mirror
-} // namespace rbd
-
-
 // template definitions
 #include "tools/rbd_mirror/ImageSyncThrottler.cc"
 
 public:
   typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
 
-  void SetUp() override {
-    TestMockFixture::SetUp();
-
-    librbd::RBD rbd;
-    ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size));
-    ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx));
-
-    ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
-    ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));
-
-    mock_sync_throttler = new MockImageSyncThrottler();
-
-    m_mock_local_image_ctx = new librbd::MockTestImageCtx(*m_local_image_ctx);
-    m_mock_remote_image_ctx = new librbd::MockTestImageCtx(*m_remote_image_ctx);
-    m_mock_journaler = new journal::MockJournaler();
-  }
-
-  void TearDown() override {
-    MockImageSync::instances.clear();
-    delete mock_sync_throttler;
-    delete m_mock_local_image_ctx;
-    delete m_mock_remote_image_ctx;
-    delete m_mock_journaler;
-    TestMockFixture::TearDown();
-  }
-
-  void start_sync(const std::string& image_id, Context *ctx) {
-    m_mock_local_image_ctx->id = image_id;
-    mock_sync_throttler->start_sync(m_mock_local_image_ctx,
-                                    m_mock_remote_image_ctx,
-                                    m_threads->timer,
-                                    &m_threads->timer_lock,
-                                    "mirror_uuid",
-                                    m_mock_journaler,
-                                    &m_client_meta,
-                                    m_threads->work_queue,
-                                    ctx);
-  }
-
-  void cancel(const std::string& mirror_uuid, MockImageSync *sync,
-              bool running=true) {
-    if (running) {
-      EXPECT_CALL(*sync, cancel())
-        .WillOnce(Invoke([sync]() {
-              sync->finish(-ECANCELED);
-            }));
-    } else {
-      EXPECT_CALL(*sync, cancel()).Times(0);
-    }
-    mock_sync_throttler->cancel_sync(mirror_uuid);
-  }
-
-  librbd::ImageCtx *m_remote_image_ctx;
-  librbd::ImageCtx *m_local_image_ctx;
-  librbd::MockTestImageCtx *m_mock_local_image_ctx;
-  librbd::MockTestImageCtx *m_mock_remote_image_ctx;
-  journal::MockJournaler *m_mock_journaler;
-  librbd::journal::MirrorPeerClientMeta m_client_meta;
-  MockImageSyncThrottler *mock_sync_throttler;
 };
 
 TEST_F(TestMockImageSyncThrottler, Single_Sync) {
-  C_SaferCond ctx;
-  start_sync("image_id", &ctx);
-
-  ASSERT_EQ(1u, MockImageSync::instances.size());
-  MockImageSync *sync = MockImageSync::instances[0];
-  ASSERT_EQ(true, sync->syncing);
-  sync->finish(0);
-  ASSERT_EQ(0, ctx.wait());
+  MockImageSyncThrottler throttler;
+  C_SaferCond on_start;
+  throttler.start_op("id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+  throttler.finish_op("id");
 }
 
 TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
-  mock_sync_throttler->set_max_concurrent_syncs(2);
-
-  C_SaferCond ctx1;
-  start_sync("image_id_1", &ctx1);
-  C_SaferCond ctx2;
-  start_sync("image_id_2", &ctx2);
-  C_SaferCond ctx3;
-  start_sync("image_id_3", &ctx3);
-  C_SaferCond ctx4;
-  start_sync("image_id_4", &ctx4);
-
-  ASSERT_EQ(4u, MockImageSync::instances.size());
-
-  MockImageSync *sync1 = MockImageSync::instances[0];
-  ASSERT_TRUE(sync1->syncing);
-
-  MockImageSync *sync2 = MockImageSync::instances[1];
-  ASSERT_TRUE(sync2->syncing);
-
-  MockImageSync *sync3 = MockImageSync::instances[2];
-  ASSERT_FALSE(sync3->syncing);
-
-  MockImageSync *sync4 = MockImageSync::instances[3];
-  ASSERT_FALSE(sync4->syncing);
-
-  sync1->finish(0);
-  ASSERT_EQ(0, ctx1.wait());
-
-  ASSERT_TRUE(sync3->syncing);
-  sync3->finish(-EINVAL);
-  ASSERT_EQ(-EINVAL, ctx3.wait());
-
-  ASSERT_TRUE(sync4->syncing);
-
-  sync2->finish(0);
-  ASSERT_EQ(0, ctx2.wait());
-
-  sync4->finish(0);
-  ASSERT_EQ(0, ctx4.wait());
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+  C_SaferCond on_start3;
+  throttler.start_op("id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id4", &on_start4);
+
+  ASSERT_EQ(0, on_start2.wait());
+  throttler.finish_op("id2");
+  ASSERT_EQ(0, on_start3.wait());
+  throttler.finish_op("id3");
+  ASSERT_EQ(0, on_start1.wait());
+  throttler.finish_op("id1");
+  ASSERT_EQ(0, on_start4.wait());
+  throttler.finish_op("id4");
 }
 
 TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
-  C_SaferCond ctx1;
-  start_sync("image_id_1", &ctx1);
-  C_SaferCond ctx2;
-  start_sync("image_id_2", &ctx2);
-
-  ASSERT_EQ(2u, MockImageSync::instances.size());
-
-  MockImageSync *sync1 = MockImageSync::instances[0];
-  ASSERT_TRUE(sync1->syncing);
-
-  MockImageSync *sync2 = MockImageSync::instances[1];
-  ASSERT_TRUE(sync2->syncing);
-
-  cancel("image_id_2", sync2);
-  ASSERT_EQ(-ECANCELED, ctx2.wait());
-
-  sync1->finish(0);
-  ASSERT_EQ(0, ctx1.wait());
+  MockImageSyncThrottler throttler;
+  C_SaferCond on_start;
+  throttler.start_op("id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+  ASSERT_FALSE(throttler.cancel_op("id"));
+  throttler.finish_op("id");
 }
 
 TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
-  mock_sync_throttler->set_max_concurrent_syncs(1);
-
-  C_SaferCond ctx1;
-  start_sync("image_id_1", &ctx1);
-  C_SaferCond ctx2;
-  start_sync("image_id_2", &ctx2);
-
-  ASSERT_EQ(2u, MockImageSync::instances.size());
-
-  MockImageSync *sync1 = MockImageSync::instances[0];
-  ASSERT_TRUE(sync1->syncing);
-
-  MockImageSync *sync2 = MockImageSync::instances[1];
-  ASSERT_FALSE(sync2->syncing);
-
-  cancel("image_id_2", sync2, false);
-  ASSERT_EQ(-ECANCELED, ctx2.wait());
-
-  sync1->finish(0);
-  ASSERT_EQ(0, ctx1.wait());
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_TRUE(throttler.cancel_op("id2"));
+  ASSERT_EQ(-ECANCELED, on_start2.wait());
+  throttler.finish_op("id1");
 }
 
-TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
-  mock_sync_throttler->set_max_concurrent_syncs(1);
-
-  C_SaferCond ctx1;
-  start_sync("image_id_1", &ctx1);
-  C_SaferCond ctx2;
-  start_sync("image_id_2", &ctx2);
-
-  ASSERT_EQ(2u, MockImageSync::instances.size());
 
-  MockImageSync *sync1 = MockImageSync::instances[0];
-  ASSERT_TRUE(sync1->syncing);
-
-  MockImageSync *sync2 = MockImageSync::instances[1];
-  ASSERT_FALSE(sync2->syncing);
-
-  cancel("image_id_1", sync1);
-  ASSERT_EQ(-ECANCELED, ctx1.wait());
-
-  ASSERT_TRUE(sync2->syncing);
-  sync2->finish(0);
-  ASSERT_EQ(0, ctx2.wait());
+TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_FALSE(throttler.cancel_op("id1"));
+  throttler.finish_op("id1");
+  ASSERT_EQ(0, on_start2.wait());
+  throttler.finish_op("id2");
 }
 
 TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
-  mock_sync_throttler->set_max_concurrent_syncs(2);
-
-  C_SaferCond ctx1;
-  start_sync("image_id_1", &ctx1);
-  C_SaferCond ctx2;
-  start_sync("image_id_2", &ctx2);
-  C_SaferCond ctx3;
-  start_sync("image_id_3", &ctx3);
-  C_SaferCond ctx4;
-  start_sync("image_id_4", &ctx4);
-  C_SaferCond ctx5;
-  start_sync("image_id_5", &ctx5);
-
-  ASSERT_EQ(5u, MockImageSync::instances.size());
-
-  MockImageSync *sync1 = MockImageSync::instances[0];
-  ASSERT_TRUE(sync1->syncing);
-
-  MockImageSync *sync2 = MockImageSync::instances[1];
-  ASSERT_TRUE(sync2->syncing);
-
-  MockImageSync *sync3 = MockImageSync::instances[2];
-  ASSERT_FALSE(sync3->syncing);
-
-  MockImageSync *sync4 = MockImageSync::instances[3];
-  ASSERT_FALSE(sync4->syncing);
-
-  MockImageSync *sync5 = MockImageSync::instances[4];
-  ASSERT_FALSE(sync5->syncing);
-
-  mock_sync_throttler->set_max_concurrent_syncs(4);
-
-  ASSERT_TRUE(sync3->syncing);
-  ASSERT_TRUE(sync4->syncing);
-  ASSERT_FALSE(sync5->syncing);
-
-  sync1->finish(0);
-  ASSERT_EQ(0, ctx1.wait());
-
-  ASSERT_TRUE(sync5->syncing);
-  sync5->finish(-EINVAL);
-  ASSERT_EQ(-EINVAL, ctx5.wait());
-
-  sync2->finish(0);
-  ASSERT_EQ(0, ctx2.wait());
-
-  sync3->finish(0);
-  ASSERT_EQ(0, ctx3.wait());
-
-  sync4->finish(0);
-  ASSERT_EQ(0, ctx4.wait());
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+  C_SaferCond on_start3;
+  throttler.start_op("id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id4", &on_start4);
+  C_SaferCond on_start5;
+  throttler.start_op("id5", &on_start5);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_EQ(0, on_start2.wait());
+
+  throttler.set_max_concurrent_syncs(4);
+
+  ASSERT_EQ(0, on_start3.wait());
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.finish_op("id4");
+  ASSERT_EQ(0, on_start5.wait());
+
+  throttler.finish_op("id1");
+  throttler.finish_op("id2");
+  throttler.finish_op("id3");
+  throttler.finish_op("id5");
 }
 
 TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
-  mock_sync_throttler->set_max_concurrent_syncs(4);
-
-  C_SaferCond ctx1;
-  start_sync("image_id_1", &ctx1);
-  C_SaferCond ctx2;
-  start_sync("image_id_2", &ctx2);
-  C_SaferCond ctx3;
-  start_sync("image_id_3", &ctx3);
-  C_SaferCond ctx4;
-  start_sync("image_id_4", &ctx4);
-  C_SaferCond ctx5;
-  start_sync("image_id_5", &ctx5);
-
-  ASSERT_EQ(5u, MockImageSync::instances.size());
-
-  MockImageSync *sync1 = MockImageSync::instances[0];
-  ASSERT_TRUE(sync1->syncing);
-
-  MockImageSync *sync2 = MockImageSync::instances[1];
-  ASSERT_TRUE(sync2->syncing);
-
-  MockImageSync *sync3 = MockImageSync::instances[2];
-  ASSERT_TRUE(sync3->syncing);
-
-  MockImageSync *sync4 = MockImageSync::instances[3];
-  ASSERT_TRUE(sync4->syncing);
-
-  MockImageSync *sync5 = MockImageSync::instances[4];
-  ASSERT_FALSE(sync5->syncing);
-
-  mock_sync_throttler->set_max_concurrent_syncs(2);
-
-  ASSERT_FALSE(sync5->syncing);
-
-  sync1->finish(0);
-  ASSERT_EQ(0, ctx1.wait());
-
-  ASSERT_FALSE(sync5->syncing);
-
-  sync2->finish(0);
-  ASSERT_EQ(0, ctx2.wait());
-
-  ASSERT_FALSE(sync5->syncing);
-
-  sync3->finish(0);
-  ASSERT_EQ(0, ctx3.wait());
-
-  ASSERT_TRUE(sync5->syncing);
-
-  sync4->finish(0);
-  ASSERT_EQ(0, ctx4.wait());
-
-  sync5->finish(0);
-  ASSERT_EQ(0, ctx5.wait());
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(4);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+  C_SaferCond on_start3;
+  throttler.start_op("id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id4", &on_start4);
+  C_SaferCond on_start5;
+  throttler.start_op("id5", &on_start5);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_EQ(0, on_start2.wait());
+  ASSERT_EQ(0, on_start3.wait());
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.set_max_concurrent_syncs(2);
+
+  throttler.finish_op("id1");
+  throttler.finish_op("id2");
+  throttler.finish_op("id3");
+
+  ASSERT_EQ(0, on_start5.wait());
+
+  throttler.finish_op("id4");
+  throttler.finish_op("id5");
 }
 
-
 } // namespace mirror
 } // namespace rbd
 
 
 #include "test/librbd/mock/MockImageCtx.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/InstanceReplayer.h"
 #include "tools/rbd_mirror/Threads.h"
 
   }
 };
 
+template<>
+struct InstanceWatcher<librbd::MockTestImageCtx> {
+};
+
 template<>
 struct ImageReplayer<librbd::MockTestImageCtx> {
   static ImageReplayer* s_instance;
   static ImageReplayer *create(
     Threads<librbd::MockTestImageCtx> *threads,
     std::shared_ptr<ImageDeleter> image_deleter,
-    ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
+    InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
     RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
     const std::string &global_image_id) {
     assert(s_instance != nullptr);
   MOCK_METHOD0(is_blacklisted, bool());
 };
 
-template<>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
-  ImageSyncThrottler() {
-  }
-  virtual ~ImageSyncThrottler() {
-  }
-};
-
 ImageReplayer<librbd::MockTestImageCtx>* ImageReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 } // namespace mirror
 public:
   typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
   typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
+  typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
   typedef Threads<librbd::MockTestImageCtx> MockThreads;
 
   void SetUp() override {
     m_image_deleter.reset(
       new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer,
                                     &m_threads->timer_lock));
-    m_image_sync_throttler.reset(
-      new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
   }
 
   void TearDown() override {
 
   MockThreads *m_mock_threads;
   std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
-  std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>>
-    m_image_sync_throttler;
 };
 
 TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
+  MockInstanceWatcher mock_instance_watcher;
   MockImageReplayer mock_image_replayer;
   MockInstanceReplayer instance_replayer(
-    m_mock_threads, m_image_deleter, m_image_sync_throttler,
+    m_mock_threads, m_image_deleter,
     rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
     "local_mirror_uuid", m_local_io_ctx.get_id());
 
     .WillOnce(Return(true));
   EXPECT_CALL(mock_image_replayer, start(nullptr, false));
 
-  instance_replayer.acquire_image(global_image_id, "remote_mirror_uuid",
-                                  "remote_image_id", &on_acquire);
+  instance_replayer.acquire_image(&mock_instance_watcher, global_image_id,
+                                  "remote_mirror_uuid", "remote_image_id",
+                                  &on_acquire);
   ASSERT_EQ(0, on_acquire.wait());
 
   // Release
 
 #include "test/librbd/mock/MockImageCtx.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "tools/rbd_mirror/InstanceReplayer.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
 
 
 template <>
 struct InstanceReplayer<librbd::MockTestImageCtx> {
-  MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &,
+  MOCK_METHOD5(acquire_image, void(InstanceWatcher<librbd::MockTestImageCtx> *,
+                                   const std::string &, const std::string &,
                                    const std::string &, Context *));
   MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
                                    const std::string &, bool, Context *));
 };
 
+template <>
+struct ImageSyncThrottler<librbd::MockTestImageCtx> {
+  static ImageSyncThrottler* s_instance;
+
+  static ImageSyncThrottler *create() {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  ImageSyncThrottler() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  virtual ~ImageSyncThrottler() {
+    assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(destroy, void());
+  MOCK_METHOD1(drain, void(int));
+  MOCK_METHOD2(start_op, void(const std::string &, Context *));
+  MOCK_METHOD1(finish_op, void(const std::string &));
+};
+
+ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+
 } // namespace mirror
 } // namespace rbd
 
   ASSERT_EQ(0, instance_watcher2->init());
 
   // Acquire Image on the the same instance
-  EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _))
-      .WillOnce(WithArg<3>(CompleteContext(0)));
+  EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid",
+                                                     "uuid", "id", _))
+      .WillOnce(WithArg<4>(CompleteContext(0)));
   C_SaferCond on_acquire1;
   instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
                                           &on_acquire1);
   ASSERT_EQ(0, on_acquire1.wait());
 
   // Acquire Image on the other instance
-  EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _))
-      .WillOnce(WithArg<3>(CompleteContext(0)));
+  EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid",
+                                                     "uuid", "id", _))
+      .WillOnce(WithArg<4>(CompleteContext(0)));
   C_SaferCond on_acquire2;
   instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
                                           &on_acquire2);
   delete instance_watcher;
 }
 
+class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
+public:
+  typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+
+  MockManagedLock mock_managed_lock;
+  MockImageSyncThrottler mock_image_sync_throttler;
+  std::string instance_id1;
+  std::string instance_id2;
+
+  librados::Rados cluster;
+  librados::IoCtx io_ctx2;
+
+  MockInstanceWatcher *instance_watcher1;
+  MockInstanceWatcher *instance_watcher2;
+
+  void SetUp() override {
+    TestMockInstanceWatcher::SetUp();
+
+    instance_id1 = m_instance_id;
+    librados::IoCtx& io_ctx1 = m_local_io_ctx;
+    librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+    instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
+                                                    m_mock_threads->work_queue,
+                                                    nullptr);
+    EXPECT_EQ("", connect_cluster_pp(cluster));
+    EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
+    instance_id2 = stringify(io_ctx2.get_instance_id());
+    librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+    instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
+                                                    m_mock_threads->work_queue,
+                                                    nullptr);
+    InSequence seq;
+
+    // Init instance watcher 1 (leader)
+    expect_register_instance(mock_io_ctx1, 0);
+    expect_register_watch(mock_io_ctx1, instance_id1);
+    expect_acquire_lock(mock_managed_lock, 0);
+    EXPECT_EQ(0, instance_watcher1->init());
+    instance_watcher1->handle_acquire_leader();
+
+    // Init instance watcher 2
+    expect_register_instance(mock_io_ctx2, 0);
+    expect_register_watch(mock_io_ctx2, instance_id2);
+    expect_acquire_lock(mock_managed_lock, 0);
+    EXPECT_EQ(0, instance_watcher2->init());
+    instance_watcher2->handle_update_leader(instance_id1);
+  }
+
+  void TearDown() override {
+    librados::IoCtx& io_ctx1 = m_local_io_ctx;
+    librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+    librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+
+    InSequence seq;
+
+    expect_throttler_destroy();
+    instance_watcher1->handle_release_leader();
+
+    // Shutdown instance watcher 1
+    expect_release_lock(mock_managed_lock, 0);
+    expect_unregister_watch(mock_io_ctx1);
+    expect_unregister_instance(mock_io_ctx1, 0);
+    instance_watcher1->shut_down();
+
+    expect_destroy_lock(mock_managed_lock);
+    delete instance_watcher1;
+
+    // Shutdown instance watcher 2
+    expect_release_lock(mock_managed_lock, 0);
+    expect_unregister_watch(mock_io_ctx2);
+    expect_unregister_instance(mock_io_ctx2, 0);
+    instance_watcher2->shut_down();
+
+    expect_destroy_lock(mock_managed_lock);
+    delete instance_watcher2;
+
+    TestMockInstanceWatcher::TearDown();
+  }
+
+  void expect_throttler_destroy(
+      std::vector<Context *> *throttler_queue = nullptr) {
+    EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
+        .WillOnce(Invoke([throttler_queue] (int r) {
+              if (throttler_queue != nullptr) {
+                for (auto ctx : *throttler_queue) {
+                  ctx->complete(r);
+                }
+              }
+            }));
+    EXPECT_CALL(mock_image_sync_throttler, destroy());
+  }
+
+  void expect_throttler_start_op(const std::string &sync_id,
+                                 Context *on_call = nullptr,
+                                 Context **on_start_ctx = nullptr) {
+    EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
+        .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
+                                                  Context *ctx) {
+                           if (on_call != nullptr) {
+                             on_call->complete(0);
+                           }
+                           if (on_start_ctx != nullptr) {
+                             *on_start_ctx = ctx;
+                           } else {
+                             ctx->complete(0);
+                           }
+                         }));
+  }
+
+  void expect_throttler_finish_op(const std::string &sync_id,
+                                  Context *on_finish) {
+    EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
+        .WillOnce(Invoke([on_finish](const std::string &) {
+              on_finish->complete(0);
+            }));
+  }
+};
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
+  InSequence seq;
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start;
+  instance_watcher1->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher1->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) {
+  InSequence seq;
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start;
+  instance_watcher1->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+
+  ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id"));
+
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher1->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) {
+  InSequence seq;
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start;
+  instance_watcher2->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher2->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) {
+  InSequence seq;
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start;
+  instance_watcher2->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+
+  ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id"));
+
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher2->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) {
+  InSequence seq;
+
+  C_SaferCond on_start_op_called;
+  Context *on_start_ctx;
+  expect_throttler_start_op("sync_id", &on_start_op_called,
+                                          &on_start_ctx);
+  C_SaferCond on_start;
+  instance_watcher2->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start_op_called.wait());
+
+  ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id"));
+  // emulate watcher timeout
+  on_start_ctx->complete(-ETIMEDOUT);
+  ASSERT_EQ(-ECANCELED, on_start.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) {
+  // start sync when previous notification is still in flight
+
+  InSequence seq;
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start1;
+  instance_watcher2->notify_sync_request("sync_id", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+
+  C_SaferCond on_start2;
+  EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
+      .WillOnce(Invoke([this, &on_start2](const std::string &) {
+            instance_watcher2->notify_sync_request("sync_id", &on_start2);
+          }));
+  expect_throttler_start_op("sync_id");
+  instance_watcher2->notify_sync_complete("sync_id");
+
+  ASSERT_EQ(0, on_start2.wait());
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher2->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
+  InSequence seq;
+
+  expect_throttler_destroy();
+  instance_watcher1->handle_release_leader();
+  instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
+  InSequence seq;
+
+  expect_throttler_destroy();
+  instance_watcher1->handle_release_leader();
+  instance_watcher2->handle_acquire_leader();
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start;
+  instance_watcher2->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+  expect_throttler_destroy();
+  instance_watcher2->handle_release_leader();
+  instance_watcher2->notify_sync_complete("sync_id");
+
+  instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) {
+  InSequence seq;
+
+  C_SaferCond on_start_op_called;
+  Context *on_start_ctx;
+  expect_throttler_start_op("sync_id", &on_start_op_called,
+                                          &on_start_ctx);
+  C_SaferCond on_start;
+  instance_watcher1->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start_op_called.wait());
+
+  std::vector<Context *> throttler_queue = {on_start_ctx};
+  expect_throttler_destroy(&throttler_queue);
+  instance_watcher1->handle_release_leader();
+  instance_watcher2->handle_acquire_leader();
+  instance_watcher1->handle_update_leader(instance_id2);
+
+  expect_throttler_start_op("sync_id");
+  ASSERT_EQ(0, on_start.wait());
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher1->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+
+  expect_throttler_destroy();
+  instance_watcher2->handle_release_leader();
+  instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
+  InSequence seq;
+
+  expect_throttler_destroy();
+  instance_watcher1->handle_release_leader();
+  instance_watcher2->handle_acquire_leader();
+  instance_watcher1->handle_update_leader(instance_id2);
+
+  expect_throttler_start_op("sync_id");
+  C_SaferCond on_start;
+  instance_watcher1->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+
+  expect_throttler_destroy();
+  instance_watcher2->handle_release_leader();
+  instance_watcher1->handle_acquire_leader();
+  instance_watcher2->handle_update_leader(instance_id2);
+
+  instance_watcher1->notify_sync_complete("sync_id");
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) {
+  InSequence seq;
+
+  C_SaferCond on_start_op_called;
+  Context *on_start_ctx;
+  expect_throttler_start_op("sync_id", &on_start_op_called,
+                                          &on_start_ctx);
+  C_SaferCond on_start;
+  instance_watcher2->notify_sync_request("sync_id", &on_start);
+  ASSERT_EQ(0, on_start_op_called.wait());
+
+  std::vector<Context *> throttler_queue = {on_start_ctx};
+  expect_throttler_destroy(&throttler_queue);
+  instance_watcher1->handle_release_leader();
+
+  EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
+      .WillOnce(WithArg<1>(CompleteContext(0)));
+  instance_watcher2->handle_acquire_leader();
+  instance_watcher1->handle_update_leader(instance_id2);
+
+  ASSERT_EQ(0, on_start.wait());
+
+  C_SaferCond on_finish;
+  expect_throttler_finish_op("sync_id", &on_finish);
+  instance_watcher2->notify_sync_complete("sync_id");
+  ASSERT_EQ(0, on_finish.wait());
+
+  expect_throttler_destroy();
+  instance_watcher2->handle_release_leader();
+  instance_watcher1->handle_acquire_leader();
+}
+
 } // namespace mirror
 } // namespace rbd
 
   MOCK_CONST_METHOD0(is_shutdown, bool());
 
   MOCK_CONST_METHOD0(is_state_post_acquiring, bool());
+  MOCK_CONST_METHOD0(is_state_pre_releasing, bool());
   MOCK_CONST_METHOD0(is_state_locked, bool());
 };
 
     return MockManagedLock::get_instance().is_state_post_acquiring();
   }
 
+  bool is_state_pre_releasing() const {
+    return MockManagedLock::get_instance().is_state_pre_releasing();
+  }
+
   bool is_state_locked() const {
     return MockManagedLock::get_instance().is_state_locked();
   }
 
   MOCK_METHOD1(post_acquire_handler, void(Context *));
   MOCK_METHOD1(pre_release_handler, void(Context *));
+
+  MOCK_METHOD1(update_leader_handler, void(const std::string &));
 };
 
 MockListener *MockListener::s_instance = nullptr;
       .Times(AtLeast(0)).WillRepeatedly(Return(false));
     EXPECT_CALL(mock_managed_lock, is_state_locked())
       .Times(AtLeast(0)).WillRepeatedly(Return(false));
+    EXPECT_CALL(mock_managed_lock, is_state_pre_releasing())
+      .Times(AtLeast(0)).WillRepeatedly(Return(false));
   }
 
   void expect_notify_heartbeat(MockManagedLock &mock_managed_lock,
   expect_is_shutdown(mock_managed_lock);
   expect_is_leader(mock_managed_lock);
   expect_destroy(mock_managed_lock);
+  EXPECT_CALL(listener, update_leader_handler(_));
 
   InSequence seq;
 
 
   virtual void cancel() {}
 
 protected:
-  void finish(int r) {
+  virtual void finish(int r) {
     if (m_cct) {
       lsubdout(m_cct, rbd_mirror, 20) << m_name << "::finish: r=" << r << dendl;
     }
 
 #include "librbd/Utils.h"
 #include "librbd/journal/Replay.h"
 #include "ImageReplayer.h"
-#include "ImageSync.h"
 #include "Threads.h"
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 template <typename I>
 ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
                                 shared_ptr<ImageDeleter> image_deleter,
-                                ImageSyncThrottlerRef<I> image_sync_throttler,
+                                InstanceWatcher<I> *instance_watcher,
                                 RadosRef local,
                                 const std::string &local_mirror_uuid,
                                 int64_t local_pool_id,
                                 const std::string &global_image_id) :
   m_threads(threads),
   m_image_deleter(image_deleter),
-  m_image_sync_throttler(image_sync_throttler),
+  m_instance_watcher(instance_watcher),
   m_local(local),
   m_local_mirror_uuid(local_mirror_uuid),
   m_local_pool_id(local_pool_id),
     ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
 
   BootstrapRequest<I> *request = BootstrapRequest<I>::create(
-    m_local_ioctx, m_remote_image.io_ctx, m_image_sync_throttler,
+    m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
     &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
     m_global_image_id, m_threads->work_queue, m_threads->timer,
     &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
 
 namespace rbd {
 namespace mirror {
 
+template <typename> struct InstanceWatcher;
 template <typename> struct Threads;
 
 namespace image_replayer { template <typename> class BootstrapRequest; }
   static ImageReplayer *create(
     Threads<librbd::ImageCtx> *threads,
     std::shared_ptr<ImageDeleter> image_deleter,
-    ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+    InstanceWatcher<ImageCtxT> *instance_watcher,
     RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
     const std::string &global_image_id) {
-    return new ImageReplayer(threads, image_deleter, image_sync_throttler,
+    return new ImageReplayer(threads, image_deleter, instance_watcher,
                              local, local_mirror_uuid, local_pool_id,
                              global_image_id);
   }
 
   ImageReplayer(Threads<librbd::ImageCtx> *threads,
                 std::shared_ptr<ImageDeleter> image_deleter,
-                ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+                InstanceWatcher<ImageCtxT> *instance_watcher,
                 RadosRef local, const std::string &local_mirror_uuid,
                 int64_t local_pool_id, const std::string &global_image_id);
   virtual ~ImageReplayer();
 
   Threads<librbd::ImageCtx> *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
-  ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
+  InstanceWatcher<ImageCtxT> *m_instance_watcher;
 
   RemoteImages m_remote_images;
   RemoteImage m_remote_image;
 
 // vim: ts=8 sw=2 smarttab
 
 #include "ImageSync.h"
+#include "InstanceWatcher.h"
 #include "ProgressContext.h"
 #include "common/errno.h"
 #include "journal/Journaler.h"
                         SafeTimer *timer, Mutex *timer_lock,
                         const std::string &mirror_uuid, Journaler *journaler,
                         MirrorPeerClientMeta *client_meta,
-                        ContextWQ *work_queue, Context *on_finish,
-                       ProgressContext *progress_ctx)
+                        ContextWQ *work_queue,
+                        InstanceWatcher<I> *instance_watcher,
+                        Context *on_finish, ProgressContext *progress_ctx)
   : BaseRequest("rbd::mirror::ImageSync", local_image_ctx->cct, on_finish),
     m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
     m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid),
     m_journaler(journaler), m_client_meta(client_meta),
-    m_work_queue(work_queue), m_progress_ctx(progress_ctx),
+    m_work_queue(work_queue), m_instance_watcher(instance_watcher),
+    m_progress_ctx(progress_ctx),
     m_lock(unique_lock_name("ImageSync::m_lock", this)) {
 }
 
 
 template <typename I>
 void ImageSync<I>::send() {
-  send_prune_catch_up_sync_point();
+  send_notify_sync_request();
 }
 
 template <typename I>
 
   m_canceled = true;
 
+  if (m_instance_watcher->cancel_sync_request(m_local_image_ctx->id)) {
+    return;
+  }
+
   if (m_snapshot_copy_request != nullptr) {
     m_snapshot_copy_request->cancel();
   }
   }
 }
 
+template <typename I>
+void ImageSync<I>::send_notify_sync_request() {
+  update_progress("NOTIFY_SYNC_REQUEST");
+
+  dout(20) << dendl;
+
+  Context *ctx = create_context_callback<
+    ImageSync<I>, &ImageSync<I>::handle_notify_sync_request>(this);
+  m_instance_watcher->notify_sync_request(m_local_image_ctx->id, ctx);
+}
+
+template <typename I>
+void ImageSync<I>::handle_notify_sync_request(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  if (r < 0) {
+    BaseRequest::finish(r);
+    return;
+  }
+
+  send_prune_catch_up_sync_point();
+}
+
 template <typename I>
 void ImageSync<I>::send_prune_catch_up_sync_point() {
   update_progress("PRUNE_CATCH_UP_SYNC_POINT");
   }
 }
 
+template <typename I>
+void ImageSync<I>::finish(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  m_instance_watcher->notify_sync_complete(m_local_image_ctx->id);
+  BaseRequest::finish(r);
+}
+
 } // namespace mirror
 } // namespace rbd
 
 
 
 class ProgressContext;
 
+template <typename> class InstanceWatcher;
+
 namespace image_sync { template <typename> class ImageCopyRequest; }
 namespace image_sync { template <typename> class SnapshotCopyRequest; }
 
                            Mutex *timer_lock, const std::string &mirror_uuid,
                            Journaler *journaler,
                            MirrorPeerClientMeta *client_meta,
-                           ContextWQ *work_queue, Context *on_finish,
-                          ProgressContext *progress_ctx = nullptr) {
+                           ContextWQ *work_queue,
+                           InstanceWatcher<ImageCtxT> *instance_watcher,
+                           Context *on_finish,
+                           ProgressContext *progress_ctx = nullptr) {
     return new ImageSync(local_image_ctx, remote_image_ctx, timer, timer_lock,
                          mirror_uuid, journaler, client_meta, work_queue,
-                         on_finish, progress_ctx);
+                         instance_watcher, on_finish, progress_ctx);
   }
 
   ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
             SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
             Journaler *journaler, MirrorPeerClientMeta *client_meta,
-            ContextWQ *work_queue, Context *on_finish,
-            ProgressContext *progress_ctx = nullptr);
+            ContextWQ *work_queue, InstanceWatcher<ImageCtxT> *instance_watcher,
+            Context *on_finish, ProgressContext *progress_ctx = nullptr);
   ~ImageSync() override;
 
   void send() override;
   void cancel() override;
 
+protected:
+  void finish(int r) override;
+
 private:
   /**
    * @verbatim
    * <start>
    *    |
    *    v
+   * NOTIFY_SYNC_REQUEST
+   *    |
+   *    v
    * PRUNE_CATCH_UP_SYNC_POINT
    *    |
    *    v
   Journaler *m_journaler;
   MirrorPeerClientMeta *m_client_meta;
   ContextWQ *m_work_queue;
+  InstanceWatcher<ImageCtxT> *m_instance_watcher;
   ProgressContext *m_progress_ctx;
 
   SnapMap m_snap_map;
   image_sync::ImageCopyRequest<ImageCtxT> *m_image_copy_request = nullptr;
   decltype(ImageCtxT::object_map) m_object_map = nullptr;
 
+  void send_notify_sync_request();
+  void handle_notify_sync_request(int r);
+
   void send_prune_catch_up_sync_point();
   void handle_prune_catch_up_sync_point(int r);
 
 
  */
 
 #include "ImageSyncThrottler.h"
-#include "ImageSync.h"
-#include "common/ceph_context.h"
+#include "common/Formatter.h"
+#include "common/debug.h"
+#include "common/errno.h"
 #include "librbd/Utils.h"
 
 #define dout_context g_ceph_context
 #undef dout_prefix
 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
                            << " " << __func__ << ": "
-using std::unique_ptr;
-using std::string;
-using std::set;
 
 namespace rbd {
 namespace mirror {
 
-template <typename ImageCtxT>
-struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
-  ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
-  std::string m_local_image_id;
-  ImageSync<ImageCtxT> *m_sync = nullptr;
-  Context *m_on_finish;
-
-  C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
-               const std::string &local_image_id, Context *on_finish)
-    : m_sync_throttler(sync_throttler),
-      m_local_image_id(local_image_id), m_on_finish(on_finish) {
-  }
-
-  void finish(int r) override {
-    m_sync->put();
-    m_sync_throttler->handle_sync_finished(this);
-
-    m_on_finish->complete(r);
-  }
-};
-
 template <typename I>
 ImageSyncThrottler<I>::ImageSyncThrottler()
-  : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
-    m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", this))
-{
-  dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
-           << dendl;
+  : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
+                                          this)),
+    m_max_concurrent_syncs(
+      g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) {
+  dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
   g_ceph_context->_conf->add_observer(this);
 }
 
 template <typename I>
 ImageSyncThrottler<I>::~ImageSyncThrottler() {
-  {
-    Mutex::Locker l(m_lock);
-    assert(m_sync_queue.empty());
-    assert(m_inflight_syncs.empty());
-  }
-
   g_ceph_context->_conf->remove_observer(this);
+
+  Mutex::Locker locker(m_lock);
+  assert(m_inflight_ops.empty());
+  assert(m_queue.empty());
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
-                                       SafeTimer *timer, Mutex *timer_lock,
-                                       const std::string &mirror_uuid,
-                                       Journaler *journaler,
-                                       MirrorPeerClientMeta *client_meta,
-                                       ContextWQ *work_queue,
-                                       Context *on_finish,
-                                       ProgressContext *progress_ctx) {
-  dout(20) << dendl;
+void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
+  dout(20) << "id=" << id << dendl;
 
-  C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
-                                                   on_finish);
-  sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
-                                                 remote_image_ctx, timer,
-                                                 timer_lock, mirror_uuid,
-                                                 journaler, client_meta,
-                                                 work_queue, sync_holder_ctx,
-                                                 progress_ctx);
-  sync_holder_ctx->m_sync->get();
-
-  bool start = false;
   {
-    Mutex::Locker l(m_lock);
-
-    if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
-      assert(m_inflight_syncs.count(local_image_ctx->id) == 0);
-      m_inflight_syncs[local_image_ctx->id] = sync_holder_ctx;
-      start = true;
-      dout(10) << "ready to start image sync for local_image_id "
-               << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
-               << m_max_concurrent_syncs << "]" << dendl;
+    Mutex::Locker locker(m_lock);
+
+    if (m_inflight_ops.count(id) > 0) {
+      dout(20) << "duplicate for already started op " << id << dendl;
+    } else if (m_max_concurrent_syncs == 0 ||
+               m_inflight_ops.size() < m_max_concurrent_syncs) {
+      assert(m_queue.empty());
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start sync for " << id << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+               << dendl;
     } else {
-      m_sync_queue.push_front(sync_holder_ctx);
-      dout(10) << "image sync for local_image_id " << local_image_ctx->id
-               << " has been queued" << dendl;
+      m_queue.push_back(std::make_pair(id, on_start));
+      on_start = nullptr;
+      dout(20) << "image sync for " << id << " has been queued" << dendl;
     }
   }
 
-  if (start) {
-    sync_holder_ctx->m_sync->send();
+  if (on_start != nullptr) {
+    on_start->complete(0);
   }
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::cancel_sync(const std::string &local_image_id) {
-  dout(20) << dendl;
-
-  C_SyncHolder *sync_holder = nullptr;
-  bool running_sync = true;
+bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
+  dout(20) << "id=" << id << dendl;
 
+  Context *on_start = nullptr;
   {
-    Mutex::Locker l(m_lock);
-    if (m_inflight_syncs.empty()) {
-      // no image sync currently running and neither waiting
-      return;
-    }
-
-    auto it = m_inflight_syncs.find(local_image_id);
-    if (it != m_inflight_syncs.end()) {
-      sync_holder = it->second;
-    }
-
-    if (!sync_holder) {
-      for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
-        if ((*it)->m_local_image_id == local_image_id) {
-          sync_holder = (*it);
-          m_sync_queue.erase(it);
-          running_sync = false;
-          break;
-        }
+    Mutex::Locker locker(m_lock);
+    for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
+      if (it->first == id) {
+        on_start = it->second;
+        dout(20) << "canceled queued sync for " << id << dendl;
+        m_queue.erase(it);
+        break;
       }
     }
   }
 
-  if (sync_holder) {
-    if (running_sync) {
-      dout(10) << "canceled running image sync for local_image_id "
-               << sync_holder->m_local_image_id << dendl;
-      sync_holder->m_sync->cancel();
-    } else {
-      dout(10) << "canceled waiting image sync for local_image_id "
-               << sync_holder->m_local_image_id << dendl;
-      sync_holder->m_on_finish->complete(-ECANCELED);
-      sync_holder->m_sync->put();
-      delete sync_holder;
-    }
+  if (on_start == nullptr) {
+    return false;
   }
+
+  on_start->complete(-ECANCELED);
+  return true;
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
-  dout(20) << dendl;
+void ImageSyncThrottler<I>::finish_op(const std::string &id) {
+  dout(20) << "id=" << id << dendl;
 
-  C_SyncHolder *next_sync_holder = nullptr;
+  if (cancel_op(id)) {
+    return;
+  }
 
+  Context *on_start = nullptr;
   {
-    Mutex::Locker l(m_lock);
-    m_inflight_syncs.erase(sync_holder->m_local_image_id);
-
-    if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
-        !m_sync_queue.empty()) {
-      next_sync_holder = m_sync_queue.back();
-      m_sync_queue.pop_back();
-
-      assert(
-        m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
-      m_inflight_syncs[next_sync_holder->m_local_image_id] =
-        next_sync_holder;
-      dout(10) << "ready to start image sync for local_image_id "
-               << next_sync_holder->m_local_image_id << " ["
-               << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
-               << "]" << dendl;
+    Mutex::Locker locker(m_lock);
+
+    m_inflight_ops.erase(id);
+
+    if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
+      auto pair = m_queue.front();
+      m_inflight_ops.insert(pair.first);
+      dout(20) << "ready to start sync for " << pair.first << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+               << dendl;
+      on_start= pair.second;
+      m_queue.pop_front();
     }
+  }
+
+  if (on_start != nullptr) {
+    on_start->complete(0);
+  }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::drain(int r) {
+  dout(20) << dendl;
 
-    dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
-             << "/" << m_max_concurrent_syncs << "]" << dendl;
+  std::list<std::pair<std::string, Context *>> queue;
+  {
+    Mutex::Locker locker(m_lock);
+    std::swap(m_queue, queue);
+    m_inflight_ops.clear();
   }
 
-  if (next_sync_holder) {
-    next_sync_holder->m_sync->send();
+  for (auto &pair : queue) {
+    pair.second->complete(r);
   }
 }
 
 template <typename I>
 void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
-  dout(20) << " max=" << max << dendl;
-
-  assert(max > 0);
+  dout(20) << "max=" << max << dendl;
 
-  std::list<C_SyncHolder *> next_sync_holders;
+  std::list<Context *> ops;
   {
-    Mutex::Locker l(m_lock);
-    this->m_max_concurrent_syncs = max;
-
-    // Start waiting syncs in the case of available free slots
-    while(m_inflight_syncs.size() < m_max_concurrent_syncs
-          && !m_sync_queue.empty()) {
-        C_SyncHolder *next_sync_holder = m_sync_queue.back();
-        next_sync_holders.push_back(next_sync_holder);
-        m_sync_queue.pop_back();
-
-        assert(
-          m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
-        m_inflight_syncs[next_sync_holder->m_local_image_id] = next_sync_holder;
-
-        dout(10) << "ready to start image sync for local_image_id "
-                 << next_sync_holder->m_local_image_id << " ["
-                 << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
-                 << "]" << dendl;
+    Mutex::Locker locker(m_lock);
+    m_max_concurrent_syncs = max;
+
+    // Start waiting ops in the case of available free slots
+    while ((m_max_concurrent_syncs == 0 ||
+            m_inflight_ops.size() < m_max_concurrent_syncs) &&
+           !m_queue.empty()) {
+      auto pair = m_queue.front();
+      m_inflight_ops.insert(pair.first);
+      dout(20) << "ready to start sync for " << pair.first << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+               << dendl;
+      ops.push_back(pair.second);
+      m_queue.pop_front();
     }
   }
 
-  for (const auto& sync_holder : next_sync_holders) {
-    sync_holder->m_sync->send();
+  for (const auto& ctx : ops) {
+    ctx->complete(0);
   }
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
-  Mutex::Locker l(m_lock);
+void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
 
   if (f) {
     f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
-    f->dump_int("running_syncs", m_inflight_syncs.size());
-    f->dump_int("waiting_syncs", m_sync_queue.size());
+    f->dump_int("running_syncs", m_inflight_ops.size());
+    f->dump_int("waiting_syncs", m_queue.size());
     f->flush(*ss);
   } else {
     *ss << "[ ";
     *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
-    *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
-    *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
+    *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
+    *ss << "waiting_syncs=" << m_queue.size() << " ]";
   }
 }
 
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::handle_conf_change(
-                                              const struct md_config_t *conf,
-                                              const set<string> &changed) {
+void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
+                                      const set<string> &changed) {
   if (changed.count("rbd_mirror_concurrent_image_syncs")) {
     set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
   }
 
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 SUSE LINUX GmbH
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
 
-#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
-#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
 
 #include <list>
-#include <map>
+#include <set>
+#include <sstream>
+#include <string>
 #include <utility>
+
 #include "common/Mutex.h"
-#include "librbd/ImageCtx.h"
-#include "include/Context.h"
-#include "librbd/journal/TypeTraits.h"
+#include "common/config_obs.h"
 
-class CephContext;
 class Context;
-class ContextWQ;
-class SafeTimer;
-namespace journal { class Journaler; }
-namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
+
+namespace ceph { class Formatter; }
+namespace librbd { class ImageCtx; }
 
 namespace rbd {
 namespace mirror {
 
-template <typename> class ImageSync;
-
-class ProgressContext;
-
-/**
- * Manage concurrent image-syncs
- */
 template <typename ImageCtxT = librbd::ImageCtx>
 class ImageSyncThrottler : public md_config_obs_t {
 public:
-
-  typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
-  typedef typename TypeTraits::Journaler Journaler;
-  typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+  static ImageSyncThrottler *create() {
+    return new ImageSyncThrottler();
+  }
+  void destroy() {
+    delete this;
+  }
 
   ImageSyncThrottler();
   ~ImageSyncThrottler() override;
-  ImageSyncThrottler(const ImageSyncThrottler&) = delete;
-  ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete;
-
-  void start_sync(ImageCtxT *local_image_ctx,
-                  ImageCtxT *remote_image_ctx, SafeTimer *timer,
-                  Mutex *timer_lock, const std::string &mirror_uuid,
-                  Journaler *journaler, MirrorPeerClientMeta *client_meta,
-                  ContextWQ *work_queue, Context *on_finish,
-                  ProgressContext *progress_ctx = nullptr);
-
-  void cancel_sync(const std::string &local_image_id);
 
   void set_max_concurrent_syncs(uint32_t max);
+  void start_op(const std::string &id, Context *on_start);
+  bool cancel_op(const std::string &id);
+  void finish_op(const std::string &id);
+  void drain(int r);
 
   void print_status(Formatter *f, std::stringstream *ss);
 
 private:
-  struct C_SyncHolder;
-
-  void handle_sync_finished(C_SyncHolder *sync_holder);
+  Mutex m_lock;
+  uint32_t m_max_concurrent_syncs;
+  std::list<std::pair<std::string, Context *>> m_queue;
+  std::set<std::string> m_inflight_ops;
 
   const char **get_tracked_conf_keys() const override;
   void handle_conf_change(const struct md_config_t *conf,
                           const std::set<std::string> &changed) override;
-
-  uint32_t m_max_concurrent_syncs;
-  Mutex m_lock;
-  std::list<C_SyncHolder *> m_sync_queue;
-  std::map<std::string, C_SyncHolder *> m_inflight_syncs;
-
 };
 
 } // namespace mirror
 } // namespace rbd
 
-#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+extern template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
 
 template <typename I>
 InstanceReplayer<I>::InstanceReplayer(
     Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
-    ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
-    const std::string &local_mirror_uuid, int64_t local_pool_id)
-    : m_threads(threads), m_image_deleter(image_deleter),
-      m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
-      m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
-      m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+    RadosRef local_rados, const std::string &local_mirror_uuid,
+    int64_t local_pool_id)
+  : m_threads(threads), m_image_deleter(image_deleter),
+    m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid),
+    m_local_pool_id(local_pool_id),
+    m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
 }
 
 template <typename I>
 }
 
 template <typename I>
-void InstanceReplayer<I>::acquire_image(const std::string &global_image_id,
+void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
+                                        const std::string &global_image_id,
                                         const std::string &peer_mirror_uuid,
                                         const std::string &peer_image_id,
                                         Context *on_finish) {
 
   if (it == m_image_replayers.end()) {
     auto image_replayer = ImageReplayer<I>::create(
-      m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
-      m_local_mirror_uuid, m_local_pool_id, global_image_id);
+        m_threads, m_image_deleter, instance_watcher, m_local_rados,
+        m_local_mirror_uuid, m_local_pool_id, global_image_id);
 
     dout(20) << global_image_id << ": creating replayer " << image_replayer
              << dendl;
 
 class ImageDeleter;
 
 template <typename> class ImageReplayer;
+template <typename> class InstanceWatcher;
 template <typename> struct Threads;
 
 template <typename ImageCtxT = librbd::ImageCtx>
 public:
   static InstanceReplayer* create(
       Threads<ImageCtxT> *threads, std::shared_ptr<ImageDeleter> image_deleter,
-      ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler, RadosRef local_rados,
-      const std::string &local_mirror_uuid, int64_t local_pool_id) {
-      return new InstanceReplayer(threads, image_deleter, image_sync_throttler,
-                                 local_rados, local_mirror_uuid, local_pool_id);
+      RadosRef local_rados, const std::string &local_mirror_uuid,
+      int64_t local_pool_id) {
+    return new InstanceReplayer(threads, image_deleter, local_rados,
+                                local_mirror_uuid, local_pool_id);
   }
   void destroy() {
     delete this;
 
   InstanceReplayer(Threads<ImageCtxT> *threads,
                   std::shared_ptr<ImageDeleter> image_deleter,
-                   ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
                   RadosRef local_rados, const std::string &local_mirror_uuid,
                   int64_t local_pool_id);
   ~InstanceReplayer();
   void add_peer(std::string mirror_uuid, librados::IoCtx io_ctx);
   void remove_peer(std::string mirror_uuid);
 
-  void acquire_image(const std::string &global_image_id,
+  void acquire_image(InstanceWatcher<ImageCtxT> *instance_watcher,
+                     const std::string &global_image_id,
                      const std::string &peer_mirror_uuid,
                      const std::string &peer_image_id,
                      Context *on_finish);
 
   Threads<ImageCtxT> *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
-  ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
   RadosRef m_local_rados;
   std::string m_local_mirror_uuid;
   int64_t m_local_pool_id;
 
 #include "librbd/ManagedLock.h"
 #include "librbd/Utils.h"
 #include "InstanceReplayer.h"
+#include "ImageSyncThrottler.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
 template <typename I>
 struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
   InstanceWatcher<I> *instance_watcher;
-  librbd::watcher::Notifier notifier;
   std::string instance_id;
   uint64_t request_id;
   bufferlist bl;
   Context *on_finish;
+  bool send_to_leader;
+  std::unique_ptr<librbd::watcher::Notifier> notifier;
   librbd::watcher::NotifyResponse response;
-  atomic_t canceling;
+  bool canceling = false;
 
   C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
                           const std::string &instance_id, uint64_t request_id,
                           bufferlist &&bl, Context *on_finish)
-    : instance_watcher(instance_watcher),
-      notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx,
-               RBD_MIRROR_INSTANCE_PREFIX + instance_id),
-      instance_id(instance_id), request_id(request_id), bl(bl),
-      on_finish(on_finish) {
-    instance_watcher->m_notify_op_tracker.start_op();
+    : instance_watcher(instance_watcher), instance_id(instance_id),
+      request_id(request_id), bl(bl), on_finish(on_finish),
+      send_to_leader(instance_id.empty()) {
+    dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+             << ": instance_watcher=" << instance_watcher << ", instance_id="
+             << instance_id << ", request_id=" << request_id << dendl;
+
     assert(instance_watcher->m_lock.is_locked());
+
+    if (!send_to_leader) {
+      assert((!instance_id.empty()));
+      notifier.reset(new librbd::watcher::Notifier(
+                         instance_watcher->m_work_queue,
+                         instance_watcher->m_ioctx,
+                         RBD_MIRROR_INSTANCE_PREFIX + instance_id));
+    }
+
+    instance_watcher->m_notify_op_tracker.start_op();
     auto result = instance_watcher->m_notify_ops.insert(
         std::make_pair(instance_id, this)).second;
     assert(result);
   void send() {
     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
 
-    notifier.notify(bl, &response, this);
+    assert(instance_watcher->m_lock.is_locked());
+
+    if (canceling) {
+      dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+               << ": canceling" << dendl;
+      instance_watcher->m_work_queue->queue(this, -ECANCELED);
+      return;
+    }
+
+    if (send_to_leader) {
+      if (instance_watcher->m_leader_instance_id.empty()) {
+        dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+                 << ": suspending" << dendl;
+        instance_watcher->suspend_notify_request(this);
+        return;
+      }
+
+      if (instance_watcher->m_leader_instance_id != instance_id) {
+        auto count = instance_watcher->m_notify_ops.erase(
+            std::make_pair(instance_id, this));
+        assert(count > 0);
+
+        instance_id = instance_watcher->m_leader_instance_id;
+
+        auto result = instance_watcher->m_notify_ops.insert(
+            std::make_pair(instance_id, this)).second;
+        assert(result);
+
+        notifier.reset(new librbd::watcher::Notifier(
+                           instance_watcher->m_work_queue,
+                           instance_watcher->m_ioctx,
+                           RBD_MIRROR_INSTANCE_PREFIX + instance_id));
+      }
+    }
+
+    dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+             << ": sendding to " << instance_id << dendl;
+    notifier->notify(bl, &response, this);
   }
 
   void cancel() {
     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
 
-    canceling.set(1);
+    assert(instance_watcher->m_lock.is_locked());
+
+    canceling = true;
+    instance_watcher->unsuspend_notify_request(this);
   }
 
   void finish(int r) override {
 
       if (!found) {
         if (r == -ETIMEDOUT) {
-          if (canceling.read()) {
-            r = -ECANCELED;
-          } else {
-            derr << "C_NotifyInstanceRequest: " << this << " " << __func__
-                 << ": resending after timeout" << dendl;
-            send();
-            return;
-          }
+          derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+               << ": resending after timeout" << dendl;
+          Mutex::Locker locker(instance_watcher->m_lock);
+          send();
+          return;
         } else {
           r = -EINVAL;
         }
+      } else {
+        if (r == -ESTALE && send_to_leader) {
+          derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+               << ": resending due to leader change" << dendl;
+          Mutex::Locker locker(instance_watcher->m_lock);
+          send();
+          return;
+        }
       }
     }
 
-    instance_watcher->m_notify_op_tracker.finish_op();
     on_finish->complete(r);
 
-    Mutex::Locker locker(instance_watcher->m_lock);
-    auto result = instance_watcher->m_notify_ops.erase(
+    {
+      Mutex::Locker locker(instance_watcher->m_lock);
+      auto result = instance_watcher->m_notify_ops.erase(
         std::make_pair(instance_id, this));
-    assert(result > 0);
+      assert(result > 0);
+      instance_watcher->m_notify_op_tracker.finish_op();
+    }
+
     delete this;
   }
 
   }
 };
 
+template <typename I>
+struct InstanceWatcher<I>::C_SyncRequest : public Context {
+  InstanceWatcher<I> *instance_watcher;
+  std::string sync_id;
+  Context *on_start;
+  Context *on_complete = nullptr;
+  C_NotifyInstanceRequest *req = nullptr;
+
+  C_SyncRequest(InstanceWatcher<I> *instance_watcher,
+                const std::string &sync_id, Context *on_start)
+    : instance_watcher(instance_watcher), sync_id(sync_id),
+      on_start(on_start) {
+    dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
+             << sync_id << dendl;
+  }
+
+  void finish(int r) override {
+    dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
+             << r << dendl;
+
+    if (on_start != nullptr) {
+      instance_watcher->handle_notify_sync_request(this, r);
+    } else {
+      instance_watcher->handle_notify_sync_complete(this, r);
+      delete this;
+    }
+  }
+
+  // called twice
+  void complete(int r) override {
+    finish(r);
+  }
+};
+
 #undef dout_prefix
 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
                            << this << " " << __func__ << ": "
 
 template <typename I>
 InstanceWatcher<I>::~InstanceWatcher() {
+  assert(m_notify_ops.empty());
+  assert(m_notify_op_tracker.empty());
+  assert(m_suspended_ops.empty());
+  assert(m_inflight_sync_reqs.empty());
+  assert(m_image_sync_throttler == nullptr);
   m_instance_lock->destroy();
 }
 
   }
 }
 
+template <typename I>
+void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
+                                             Context *on_sync_start) {
+  dout(20) << "sync_id=" << sync_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_inflight_sync_reqs.count(sync_id) == 0);
+
+  uint64_t request_id = ++m_request_seq;
+
+  bufferlist bl;
+  ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
+
+  auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
+  sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
+                                              std::move(bl), sync_ctx);
+
+  m_inflight_sync_reqs[sync_id] = sync_ctx;
+  sync_ctx->req->send();
+}
+
+template <typename I>
+bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
+  dout(20) << "sync_id=" << sync_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  auto it = m_inflight_sync_reqs.find(sync_id);
+  if (it == m_inflight_sync_reqs.end()) {
+    return false;
+  }
+
+  auto sync_ctx = it->second;
+
+  if (sync_ctx->on_start == nullptr) {
+    return false;
+  }
+
+  assert(sync_ctx->req != nullptr);
+  sync_ctx->req->cancel();
+  return true;
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
+                                           const std::string &sync_id) {
+  dout(20) << "sync_id=" << sync_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  uint64_t request_id = ++m_request_seq;
+
+  bufferlist bl;
+  ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
+
+  auto ctx = new FunctionContext(
+    [this, sync_id] (int r) {
+      dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
+      Mutex::Locker locker(m_lock);
+      if (r != -ESTALE && m_image_sync_throttler != nullptr) {
+        m_image_sync_throttler->finish_op(sync_id);
+      }
+    });
+  auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+                                         std::move(bl), ctx);
+  req->send();
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
+  dout(20) << "sync_id=" << sync_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  auto it = m_inflight_sync_reqs.find(sync_id);
+  assert(it != m_inflight_sync_reqs.end());
+
+  auto sync_ctx = it->second;
+  assert(sync_ctx->req == nullptr);
+
+  m_inflight_sync_reqs.erase(it);
+  m_work_queue->queue(sync_ctx, 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
+                                                    int r) {
+  dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
+
+  Context *on_start = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    assert(sync_ctx->req != nullptr);
+    assert(sync_ctx->on_start != nullptr);
+
+    if (sync_ctx->req->canceling) {
+      r = -ECANCELED;
+    }
+
+    std::swap(sync_ctx->on_start, on_start);
+    sync_ctx->req = nullptr;
+  }
+
+  on_start->complete(r == -ECANCELED ? r : 0);
+
+  if (r == -ECANCELED) {
+    notify_sync_complete(sync_ctx->sync_id);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
+                                                     int r) {
+  dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
+
+  if (sync_ctx->on_complete != nullptr) {
+    sync_ctx->on_complete->complete(r);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (m_image_sync_throttler != nullptr) {
+    m_image_sync_throttler->print_status(f, ss);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_acquire_leader() {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_image_sync_throttler == nullptr);
+  m_image_sync_throttler = ImageSyncThrottler<I>::create();
+
+  m_leader_instance_id = m_instance_id;
+  unsuspend_notify_requests();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_release_leader() {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_image_sync_throttler != nullptr);
+
+  m_leader_instance_id.clear();
+
+  m_image_sync_throttler->drain(-ESTALE);
+  m_image_sync_throttler->destroy();
+  m_image_sync_throttler = nullptr;
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_update_leader(
+  const std::string &leader_instance_id) {
+  dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_leader_instance_id = leader_instance_id;
+
+  if (!m_leader_instance_id.empty()) {
+    unsuspend_notify_requests();
+  }
+}
+
 template <typename I>
 void InstanceWatcher<I>::cancel_notify_requests(
     const std::string &instance_id) {
   Mutex::Locker locker(m_lock);
 
   for (auto op : m_notify_ops) {
-    if (op.first == instance_id) {
+    if (op.first == instance_id && !op.second->send_to_leader) {
       op.second->cancel();
     }
   }
 }
 
-
 template <typename I>
 void InstanceWatcher<I>::register_instance() {
   assert(m_lock.is_locked());
   remove_instance_object();
 }
 
+template <typename I>
+void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
+  dout(20) << req << dendl;
+
+  assert(m_lock.is_locked());
+
+  auto result = m_suspended_ops.insert(req).second;
+  assert(result);
+}
+
+template <typename I>
+bool InstanceWatcher<I>::unsuspend_notify_request(
+  C_NotifyInstanceRequest *req) {
+  dout(20) << req << dendl;
+
+  assert(m_lock.is_locked());
+
+  auto result = m_suspended_ops.erase(req);
+  if (result == 0) {
+    return false;
+  }
+
+  req->send();
+  return true;
+}
+
+template <typename I>
+void InstanceWatcher<I>::unsuspend_notify_requests() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  std::set<C_NotifyInstanceRequest *> suspended_ops;
+  std::swap(m_suspended_ops, suspended_ops);
+
+  for (auto op : suspended_ops) {
+    op->send();
+  }
+}
+
 template <typename I>
 Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
                                              uint64_t request_id,
     delete it->on_notify_ack;
     m_requests.erase(it);
   } else {
-    ctx = new FunctionContext(
-      [this, instance_id, request_id] (int r) {
-        C_NotifyAck *on_notify_ack = nullptr;
-        {
-          // update request state in the requests list
-          Mutex::Locker locker(m_lock);
-          Request request(instance_id, request_id);
-          auto it = m_requests.find(request);
-          assert(it != m_requests.end());
-          on_notify_ack = it->on_notify_ack;
-          m_requests.erase(it);
-        }
-
-        ::encode(NotifyAckPayload(instance_id, request_id, r),
-                 on_notify_ack->out);
-        on_notify_ack->complete(0);
-      });
+    ctx = create_async_context_callback(
+        m_work_queue, new FunctionContext(
+            [this, instance_id, request_id] (int r) {
+              complete_request(instance_id, request_id, r);
+            }));
   }
 
   request.on_notify_ack = on_notify_ack;
   return ctx;
 }
 
+template <typename I>
+void InstanceWatcher<I>::complete_request(const std::string &instance_id,
+                                          uint64_t request_id, int r) {
+  dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
+           << dendl;
+
+  C_NotifyAck *on_notify_ack;
+  {
+    Mutex::Locker locker(m_lock);
+    Request request(instance_id, request_id);
+    auto it = m_requests.find(request);
+    assert(it != m_requests.end());
+    on_notify_ack = it->on_notify_ack;
+    m_requests.erase(it);
+  }
+
+  ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
+  on_notify_ack->complete(0);
+}
+
 template <typename I>
 void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
                                        uint64_t notifier_id, bufferlist &bl) {
   auto ctx = new FunctionContext(
       [this, global_image_id, peer_mirror_uuid, peer_image_id,
        on_finish] (int r) {
-        m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid,
-                                           peer_image_id, on_finish);
+        m_instance_replayer->acquire_image(this, global_image_id,
+                                           peer_mirror_uuid, peer_image_id,
+                                           on_finish);
         m_notify_op_tracker.finish_op();
       });
 
   m_work_queue->queue(ctx, 0);
 }
 
+template <typename I>
+void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
+                                             const std::string &sync_id,
+                                             Context *on_finish) {
+  dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (m_image_sync_throttler == nullptr) {
+    dout(20) << "sync request for non-leader" << dendl;
+    m_work_queue->queue(on_finish, -ESTALE);
+    return;
+  }
+
+  Context *on_start = create_async_context_callback(
+    m_work_queue, new FunctionContext(
+      [this, instance_id, sync_id, on_finish] (int r) {
+        dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
+                 << ", sync_id=" << sync_id << ", r=" << r << dendl;
+        if (r == 0) {
+          notify_sync_start(instance_id, sync_id);
+        }
+        on_finish->complete(r);
+      }));
+  m_image_sync_throttler->start_op(sync_id, on_start);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
+                                           const std::string &sync_id,
+                                           Context *on_finish) {
+  dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  auto it = m_inflight_sync_reqs.find(sync_id);
+  if (it == m_inflight_sync_reqs.end()) {
+    dout(20) << "not found" << dendl;
+    m_work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  auto sync_ctx = it->second;
+
+  if (sync_ctx->on_complete != nullptr) {
+    dout(20) << "duplicate request" << dendl;
+    m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
+  }
+
+  sync_ctx->on_complete = on_finish;
+}
+
 template <typename I>
 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
                                         const ImageAcquirePayload &payload,
   }
 }
 
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const SyncRequestPayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(20) << "sync_request: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish == nullptr) {
+    return;
+  }
+
+  handle_sync_request(instance_id, payload.sync_id, on_finish);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const SyncStartPayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(20) << "sync_start: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish == nullptr) {
+    return;
+  }
+
+  handle_sync_start(instance_id, payload.sync_id, on_finish);
+}
+
 template <typename I>
 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
                                         const UnknownPayload &payload,
 
 #define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
 
 #include <map>
+#include <memory>
 #include <set>
 #include <string>
 #include <vector>
 namespace rbd {
 namespace mirror {
 
+template <typename> class ImageSyncThrottler;
 template <typename> class InstanceReplayer;
 template <typename> struct Threads;
 
                             const std::string &peer_image_id,
                            bool schedule_delete, Context *on_notify_ack);
 
+  void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
+  bool cancel_sync_request(const std::string &sync_id);
+  void notify_sync_complete(const std::string &sync_id);
+
+  void print_sync_status(Formatter *f, stringstream *ss);
+
   void cancel_notify_requests(const std::string &instance_id);
 
+  void handle_acquire_leader();
+  void handle_release_leader();
+  void handle_update_leader(const std::string &leader_instance_id);
+
 private:
   /**
    * @verbatim
    */
 
   struct C_NotifyInstanceRequest;
+  struct C_SyncRequest;
+
+  typedef std::pair<std::string, std::string> Id;
 
   struct HandlePayloadVisitor : public boost::static_visitor<void> {
     InstanceWatcher *instance_watcher;
   Context *m_on_finish = nullptr;
   int m_ret_val = 0;
   bool m_removing = false;
+  std::string m_leader_instance_id;
   librbd::managed_lock::Locker m_instance_locker;
   std::set<std::pair<std::string, C_NotifyInstanceRequest *>> m_notify_ops;
   AsyncOpTracker m_notify_op_tracker;
   uint64_t m_request_seq = 0;
   std::set<Request> m_requests;
+  std::set<C_NotifyInstanceRequest *> m_suspended_ops;
+  std::map<std::string, C_SyncRequest *> m_inflight_sync_reqs;
+  ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler = nullptr;
 
   void register_instance();
   void handle_register_instance(int r);
   void break_instance_lock();
   void handle_break_instance_lock(int r);
 
+  void suspend_notify_request(C_NotifyInstanceRequest *req);
+  bool unsuspend_notify_request(C_NotifyInstanceRequest *req);
+  void unsuspend_notify_requests();
+
+  void handle_notify_sync_request(C_SyncRequest *sync_ctx, int r);
+  void handle_notify_sync_complete(C_SyncRequest *sync_ctx, int r);
+
+  void notify_sync_start(const std::string &instance_id,
+                         const std::string &sync_id);
+
   Context *prepare_request(const std::string &instance_id, uint64_t request_id,
                            C_NotifyAck *on_notify_ack);
+  void complete_request(const std::string &instance_id, uint64_t request_id,
+                        int r);
 
   void handle_notify(uint64_t notify_id, uint64_t handle,
                      uint64_t notifier_id, bufferlist &bl) override;
                             const std::string &peer_image_id,
                             bool schedule_delete, Context *on_finish);
 
+  void handle_sync_request(const std::string &instance_id,
+                           const std::string &sync_id, Context *on_finish);
+  void handle_sync_start(const std::string &instance_id,
+                         const std::string &sync_id, Context *on_finish);
+
   void handle_payload(const std::string &instance_id,
                       const instance_watcher::ImageAcquirePayload &payload,
                       C_NotifyAck *on_notify_ack);
   void handle_payload(const std::string &instance_id,
                       const instance_watcher::ImageReleasePayload &payload,
                       C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::SyncRequestPayload &payload,
+                      C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::SyncStartPayload &payload,
+                      C_NotifyAck *on_notify_ack);
   void handle_payload(const std::string &instance_id,
                       const instance_watcher::UnknownPayload &payload,
                       C_NotifyAck *on_notify_ack);
 
   delete m_leader_lock;
 }
 
+template <typename I>
+std::string LeaderWatcher<I>::get_instance_id() {
+  return stringify(m_notifier_id);
+}
+
 template <typename I>
 int LeaderWatcher<I>::init() {
   C_SaferCond init_ctx;
     return;
   }
 
+  bool notify_listener = false;
   if (m_locker != locker) {
     m_locker = locker;
+    notify_listener = true;
     if (m_acquire_attempts > 1) {
       dout(10) << "new lock owner detected -- resetting heartbeat counter"
                << dendl;
     dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
             << "failed attempts to acquire" << dendl;
     break_leader_lock();
-  } else {
-    schedule_acquire_leader_lock(1);
+    return;
+  }
+
+  schedule_acquire_leader_lock(1);
+
+  if (!notify_listener) {
     m_timer_op_tracker.finish_op();
+    return;
   }
+
+  auto ctx = new FunctionContext(
+    [this](int r) {
+      std::string instance_id;
+      if (get_leader_instance_id(&instance_id)) {
+        m_listener->update_leader_handler(instance_id);
+      }
+      Mutex::Locker timer_locker(m_threads->timer_lock);
+      Mutex::Locker locker(m_lock);
+      m_timer_op_tracker.finish_op();
+    });
+  m_work_queue->queue(ctx, 0);
 }
 
 template <typename I>
 
 
     virtual void post_acquire_handler(Context *on_finish) = 0;
     virtual void pre_release_handler(Context *on_finish) = 0;
+
+    virtual void update_leader_handler(
+      const std::string &leader_instance_id) = 0;
   };
 
   LeaderWatcher(Threads<ImageCtxT> *threads, librados::IoCtx &io_ctx,
   void release_leader();
   void list_instances(std::vector<std::string> *instance_ids);
 
+  std::string get_instance_id();
+
 private:
   /**
    * @verbatim
 
 
   dout(20) << "connected to " << m_peer << dendl;
 
-  m_image_sync_throttler.reset(new ImageSyncThrottler<>());
-
   m_instance_replayer.reset(
-    InstanceReplayer<>::create(m_threads, m_image_deleter,
-                               m_image_sync_throttler, m_local_rados,
+    InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
                                local_mirror_uuid, m_local_pool_id));
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
                                              &m_leader_listener));
+
   r = m_leader_watcher->init();
   if (r < 0) {
     derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
                      admin_socket);
 
   f->open_object_section("sync_throttler");
-  m_image_sync_throttler->print_status(f, ss);
+  m_instance_watcher->print_sync_status(f, ss);
   f->close_section();
 
   m_instance_replayer->print_status(f, ss);
 
 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
   dout(20) << dendl;
+
+  m_instance_watcher->handle_acquire_leader();
   init_local_pool_watcher(on_finish);
 }
 
 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
   dout(20) << dendl;
+
+  m_instance_watcher->handle_release_leader();
   shut_down_pool_watchers(on_finish);
 }
 
   m_instance_replayer->release_all(on_finish);
 }
 
+void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
+  dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
+
+  m_instance_watcher->handle_update_leader(leader_instance_id);
+}
+
 } // namespace mirror
 } // namespace rbd
 
   void wait_for_update_ops(Context *on_finish);
   void handle_wait_for_update_ops(int r, Context *on_finish);
 
+  void handle_update_leader(const std::string &leader_instance_id);
+
   Threads<librbd::ImageCtx> *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
-  ImageSyncThrottlerRef<> m_image_sync_throttler;
   mutable Mutex m_lock;
   Cond m_cond;
   std::atomic<bool> m_stopping = { false };
       m_pool_replayer->handle_pre_release_leader(on_finish);
     }
 
+    void update_leader_handler(
+      const std::string &leader_instance_id) override {
+      m_pool_replayer->handle_update_leader(leader_instance_id);
+    }
+
   private:
     PoolReplayer *m_pool_replayer;
   } m_leader_listener;
 
 #include "librbd/Utils.h"
 #include "librbd/journal/Types.h"
 #include "tools/rbd_mirror/ProgressContext.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/ImageSync.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
 BootstrapRequest<I>::BootstrapRequest(
         librados::IoCtx &local_io_ctx,
         librados::IoCtx &remote_io_ctx,
-        std::shared_ptr<ImageSyncThrottler<I>> image_sync_throttler,
+        InstanceWatcher<I> *instance_watcher,
         I **local_image_ctx,
         const std::string &local_image_id,
         const std::string &remote_image_id,
   : BaseRequest("rbd::mirror::image_replayer::BootstrapRequest",
                reinterpret_cast<CephContext*>(local_io_ctx.cct()), on_finish),
     m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
-    m_image_sync_throttler(image_sync_throttler),
-    m_local_image_ctx(local_image_ctx), m_local_image_id(local_image_id),
-    m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
-    m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
+    m_instance_watcher(instance_watcher), m_local_image_ctx(local_image_ctx),
+    m_local_image_id(local_image_id), m_remote_image_id(remote_image_id),
+    m_global_image_id(global_image_id), m_work_queue(work_queue),
+    m_timer(timer), m_timer_lock(timer_lock),
     m_local_mirror_uuid(local_mirror_uuid),
     m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler),
     m_client_meta(client_meta), m_progress_ctx(progress_ctx),
   Mutex::Locker locker(m_lock);
   m_canceled = true;
 
-  m_image_sync_throttler->cancel_sync(m_local_image_id);
+  if (m_image_sync != nullptr) {
+    m_image_sync->cancel();
+  }
 }
 
 template <typename I>
 
   {
     Mutex::Locker locker(m_lock);
-    if (!m_canceled) {
-      m_image_sync_throttler->start_sync(*m_local_image_ctx,
-                                         m_remote_image_ctx, m_timer,
-                                         m_timer_lock,
-                                         m_local_mirror_uuid, m_journaler,
-                                         m_client_meta, m_work_queue, ctx,
-                                         m_progress_ctx);
+    if (m_canceled) {
+      m_ret_val = -ECANCELED;
+    } else {
+      assert(m_image_sync == nullptr);
+      m_image_sync = ImageSync<I>::create(
+          *m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock,
+          m_local_mirror_uuid, m_journaler, m_client_meta, m_work_queue,
+          m_instance_watcher, ctx, m_progress_ctx);
+
+      m_image_sync->get();
+      m_image_sync->send();
       return;
     }
   }
 
   dout(10) << ": request canceled" << dendl;
-  m_ret_val = -ECANCELED;
   close_remote_image();
 }
 
 void BootstrapRequest<I>::handle_image_sync(int r) {
   dout(20) << ": r=" << r << dendl;
 
-  if (m_canceled) {
-    dout(10) << ": request canceled" << dendl;
-    m_ret_val = -ECANCELED;
-  }
+  {
+    Mutex::Locker locker(m_lock);
 
-  if (r < 0) {
-    derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
-    m_ret_val = r;
+    m_image_sync->put();
+    m_image_sync = nullptr;
+
+    if (m_canceled) {
+      dout(10) << ": request canceled" << dendl;
+      m_ret_val = -ECANCELED;
+    }
+
+    if (r < 0) {
+      derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
+      m_ret_val = r;
+    }
   }
 
   close_remote_image();
 
 
 class ProgressContext;
 
+template <typename> class ImageSync;
+template <typename> class InstanceWatcher;
+
 namespace image_replayer {
 
 template <typename ImageCtxT = librbd::ImageCtx>
   static BootstrapRequest* create(
         librados::IoCtx &local_io_ctx,
         librados::IoCtx &remote_io_ctx,
-        ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+        InstanceWatcher<ImageCtxT> *instance_watcher,
         ImageCtxT **local_image_ctx,
         const std::string &local_image_id,
         const std::string &remote_image_id,
         bool *do_resync,
         ProgressContext *progress_ctx = nullptr) {
     return new BootstrapRequest(local_io_ctx, remote_io_ctx,
-                                image_sync_throttler, local_image_ctx,
+                                instance_watcher, local_image_ctx,
                                 local_image_id, remote_image_id,
                                 global_image_id, work_queue, timer, timer_lock,
                                 local_mirror_uuid, remote_mirror_uuid,
 
   BootstrapRequest(librados::IoCtx &local_io_ctx,
                    librados::IoCtx &remote_io_ctx,
-                   ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+                   InstanceWatcher<ImageCtxT> *instance_watcher,
                    ImageCtxT **local_image_ctx,
                    const std::string &local_image_id,
                    const std::string &remote_image_id,
 
   librados::IoCtx &m_local_io_ctx;
   librados::IoCtx &m_remote_io_ctx;
-  ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
+  InstanceWatcher<ImageCtxT> *m_instance_watcher;
   ImageCtxT **m_local_image_ctx;
   std::string m_local_image_id;
   std::string m_remote_image_id;
   MirrorPeerClientMeta *m_client_meta;
   ProgressContext *m_progress_ctx;
   bool *m_do_resync;
+
   Mutex m_lock;
   bool m_canceled = false;
 
   ImageCtxT *m_remote_image_ctx = nullptr;
   bool m_primary = false;
   int m_ret_val = 0;
+  ImageSync<ImageCtxT> *m_image_sync = nullptr;
 
   bufferlist m_out_bl;
 
 
 
 } // anonymous namespace
 
-void ImagePayloadBase::encode(bufferlist &bl) const {
+void PayloadBase::encode(bufferlist &bl) const {
   ::encode(request_id, bl);
+}
+
+void PayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
+  ::decode(request_id, iter);
+}
+
+void PayloadBase::dump(Formatter *f) const {
+  f->dump_unsigned("request_id", request_id);
+}
+
+void ImagePayloadBase::encode(bufferlist &bl) const {
+  PayloadBase::encode(bl);
   ::encode(global_image_id, bl);
   ::encode(peer_mirror_uuid, bl);
   ::encode(peer_image_id, bl);
 }
 
 void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
-  ::decode(request_id, iter);
+  PayloadBase::decode(version, iter);
   ::decode(global_image_id, iter);
   ::decode(peer_mirror_uuid, iter);
   ::decode(peer_image_id, iter);
 }
 
 void ImagePayloadBase::dump(Formatter *f) const {
-  f->dump_unsigned("request_id", request_id);
+  PayloadBase::dump(f);
   f->dump_string("global_image_id", global_image_id);
   f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
   f->dump_string("peer_image_id", peer_image_id);
   f->dump_bool("schedule_delete", schedule_delete);
 }
 
+void SyncPayloadBase::encode(bufferlist &bl) const {
+  PayloadBase::encode(bl);
+  ::encode(sync_id, bl);
+}
+
+void SyncPayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
+  PayloadBase::decode(version, iter);
+  ::decode(sync_id, iter);
+}
+
+void SyncPayloadBase::dump(Formatter *f) const {
+  PayloadBase::dump(f);
+  f->dump_string("sync_id", sync_id);
+}
+
 void UnknownPayload::encode(bufferlist &bl) const {
   assert(false);
 }
   case NOTIFY_OP_IMAGE_RELEASE:
     payload = ImageReleasePayload();
     break;
+  case NOTIFY_OP_SYNC_REQUEST:
+    payload = SyncRequestPayload();
+    break;
+  case NOTIFY_OP_SYNC_START:
+    payload = SyncStartPayload();
+    break;
   default:
     payload = UnknownPayload();
     break;
   o.push_back(new NotifyMessage(ImageReleasePayload()));
   o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id",
                                                     true)));
+
+  o.push_back(new NotifyMessage(SyncRequestPayload()));
+  o.push_back(new NotifyMessage(SyncRequestPayload(1, "sync_id")));
+
+  o.push_back(new NotifyMessage(SyncStartPayload()));
+  o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id")));
 }
 
 std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
   case NOTIFY_OP_IMAGE_RELEASE:
     out << "ImageRelease";
     break;
+  case NOTIFY_OP_SYNC_REQUEST:
+    out << "SyncRequest";
+    break;
+  case NOTIFY_OP_SYNC_START:
+    out << "SyncStart";
+    break;
   default:
     out << "Unknown (" << static_cast<uint32_t>(op) << ")";
     break;
 
 enum NotifyOp {
   NOTIFY_OP_IMAGE_ACQUIRE  = 0,
   NOTIFY_OP_IMAGE_RELEASE  = 1,
+  NOTIFY_OP_SYNC_REQUEST   = 2,
+  NOTIFY_OP_SYNC_START     = 3,
 };
 
-struct ImagePayloadBase {
+struct PayloadBase {
   uint64_t request_id;
+
+  PayloadBase() : request_id(0) {
+  }
+
+  PayloadBase(uint64_t request_id) : request_id(request_id) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+  void dump(Formatter *f) const;
+};
+
+struct ImagePayloadBase : public PayloadBase {
   std::string global_image_id;
   std::string peer_mirror_uuid;
   std::string peer_image_id;
 
-  ImagePayloadBase() : request_id(0) {
+  ImagePayloadBase() : PayloadBase() {
   }
 
   ImagePayloadBase(uint64_t request_id, const std::string &global_image_id,
                    const std::string &peer_mirror_uuid,
                    const std::string &peer_image_id)
-    : request_id(request_id), global_image_id(global_image_id),
+    : PayloadBase(request_id), global_image_id(global_image_id),
       peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) {
   }
 
   void dump(Formatter *f) const;
 };
 
+struct SyncPayloadBase : public PayloadBase {
+  std::string sync_id;
+
+  SyncPayloadBase() : PayloadBase() {
+  }
+
+  SyncPayloadBase(uint64_t request_id, const std::string &sync_id)
+    : PayloadBase(request_id), sync_id(sync_id) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &iter);
+  void dump(Formatter *f) const;
+};
+
+struct SyncRequestPayload : public SyncPayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_REQUEST;
+
+  SyncRequestPayload() : SyncPayloadBase() {
+  }
+
+  SyncRequestPayload(uint64_t request_id, const std::string &sync_id)
+    : SyncPayloadBase(request_id, sync_id) {
+  }
+};
+
+struct SyncStartPayload : public SyncPayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_START;
+
+  SyncStartPayload() : SyncPayloadBase() {
+  }
+
+  SyncStartPayload(uint64_t request_id, const std::string &sync_id)
+    : SyncPayloadBase(request_id, sync_id) {
+  }
+};
+
 struct UnknownPayload {
   static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
 
 
 typedef boost::variant<ImageAcquirePayload,
                        ImageReleasePayload,
+                       SyncRequestPayload,
+                       SyncStartPayload,
                        UnknownPayload> Payload;
 
 struct NotifyMessage {
 
 namespace leader_watcher {
 
 enum NotifyOp {
-  NOTIFY_OP_HEARTBEAT     = 0,
-  NOTIFY_OP_LOCK_ACQUIRED = 1,
-  NOTIFY_OP_LOCK_RELEASED = 2,
+  NOTIFY_OP_HEARTBEAT        = 0,
+  NOTIFY_OP_LOCK_ACQUIRED    = 1,
+  NOTIFY_OP_LOCK_RELEASED    = 2,
 };
 
 struct HeartbeatPayload {
 
 // vim: ts=8 sw=2 smarttab
 
 #include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
-#include "common/dout.h"
+#include "common/debug.h"
 #include "common/errno.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "librbd/Utils.h"
 
 #include <vector>
 
 #include "include/rbd/librbd.hpp"
-#include "ImageSyncThrottler.h"
 
 namespace rbd {
 namespace mirror {
 typedef shared_ptr<librados::IoCtx> IoCtxRef;
 typedef shared_ptr<librbd::Image> ImageRef;
 
-template <typename I = librbd::ImageCtx>
-using ImageSyncThrottlerRef = std::shared_ptr<ImageSyncThrottler<I>>;
-
 struct ImageId {
   std::string global_id;
   std::string id;