]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rbd-mirror: utilize the mirroring watcher to receive update notifications
authorJason Dillaman <dillaman@redhat.com>
Tue, 22 Nov 2016 18:47:37 +0000 (13:47 -0500)
committerJason Dillaman <dillaman@redhat.com>
Thu, 16 Mar 2017 21:04:30 +0000 (17:04 -0400)
Fixes: http://tracker.ceph.com/issues/15029
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
21 files changed:
src/common/config_opts.h
src/librbd/MirroringWatcher.cc
src/librbd/MirroringWatcher.h
src/librbd/Watcher.cc
src/librbd/Watcher.h
src/test/librbd/test_MirroringWatcher.cc
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/mock/MockContextWQ.h [new file with mode: 0644]
src/test/rbd_mirror/mock/MockSafeTimer.h [new file with mode: 0644]
src/test/rbd_mirror/pool_watcher/test_mock_RefreshImagesRequest.cc
src/test/rbd_mirror/test_PoolWatcher.cc
src/test/rbd_mirror/test_mock_PoolWatcher.cc [new file with mode: 0644]
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/PoolWatcher.cc
src/tools/rbd_mirror/PoolWatcher.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h
src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc
src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h
src/tools/rbd_mirror/types.cc
src/tools/rbd_mirror/types.h

index 009eb2e21869f6ae263bdac9a22c0b6e21f0cb25..4593c7da69e76a94f167a1cf4a85079c555e10a6 100644 (file)
@@ -1397,7 +1397,6 @@ OPTION(rbd_mirror_sync_point_update_age, OPT_DOUBLE, 30) // number of seconds be
 OPTION(rbd_mirror_concurrent_image_syncs, OPT_U32, 5) // maximum number of image syncs in parallel
 OPTION(rbd_mirror_pool_replayers_refresh_interval, OPT_INT, 30) // interval to refresh peers in rbd-mirror daemon
 OPTION(rbd_mirror_delete_retry_interval, OPT_DOUBLE, 30) // interval to check and retry the failed requests in deleter
-OPTION(rbd_mirror_image_directory_refresh_interval, OPT_INT, 30) // interval to refresh images in pool watcher
 OPTION(rbd_mirror_image_state_check_interval, OPT_INT, 30) // interval to get images from pool watcher and set sources in replayer
 OPTION(rbd_mirror_leader_heartbeat_interval, OPT_INT, 5) // interval (in seconds) between mirror leader heartbeats
 OPTION(rbd_mirror_leader_max_missed_heartbeats, OPT_INT, 2) // number of missed heartbeats for non-lock owner to attempt to acquire lock
index b083baa5e58d81a284a0823d6d93d875401c3d0d..753ca91f44b7af4268550c9a74270acf9d2a39ff 100644 (file)
@@ -114,7 +114,7 @@ bool MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
                                          Context *on_notify_ack) {
   CephContext *cct = this->m_cct;
   ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl;
-  handle_mode_updated(payload.mirror_mode, on_notify_ack);
+  handle_mode_updated(payload.mirror_mode);
   return true;
 }
 
@@ -124,7 +124,7 @@ bool MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
   CephContext *cct = this->m_cct;
   ldout(cct, 20) << ": image state updated" << dendl;
   handle_image_updated(payload.mirror_image_state, payload.image_id,
-                       payload.global_image_id, on_notify_ack);
+                       payload.global_image_id);
   return true;
 }
 
index 407223bf85c47e97a1cfedd72819abd5d5c5a23b..ede06d8fe0878418a4ca91a845bddaff7ad1256e 100644 (file)
@@ -43,12 +43,10 @@ public:
                                    const std::string &global_image_id,
                                    Context *on_finish);
 
-  virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode,
-                                   Context *on_ack) = 0;
+  virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) = 0;
   virtual void handle_image_updated(cls::rbd::MirrorImageState state,
                                     const std::string &image_id,
-                                    const std::string &global_image_id,
-                                    Context *on_ack) = 0;
+                                    const std::string &global_image_id) = 0;
 
 private:
   bool handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
index 20e1a88edfc1c58ff03b597e12d18579c1811ef3..9d986a1a5782a2d86b3ac7e5a1e5ca27d3f2a439 100644 (file)
@@ -157,6 +157,11 @@ void Watcher::flush(Context *on_finish) {
   m_notifier.flush(on_finish);
 }
 
+std::string Watcher::get_oid() const {
+  RWLock::RLocker locker(m_watch_lock);
+  return m_oid;
+}
+
 void Watcher::set_oid(const string& oid) {
   RWLock::WLocker l(m_watch_lock);
   assert(m_watch_state == WATCH_STATE_UNREGISTERED);
index 3e353bdb6452431da08816b5cf2b92176f821ef7..099b007b8f6e02ed42fe4e2b20b1056113a4db5c 100644 (file)
@@ -28,6 +28,7 @@ public:
   void unregister_watch(Context *on_finish);
   void flush(Context *on_finish);
 
+  std::string get_oid() const;
   void set_oid(const string& oid);
 
   uint64_t get_watch_handle() const {
index c1365a5e0f5a37071e34443a9db2f0a4d8830de8..f29ff933fdac11e9fb9065ba9014c1b25cba41f5 100644 (file)
@@ -23,11 +23,10 @@ struct MockMirroringWatcher : public MirroringWatcher<> {
     : MirroringWatcher<>(image_ctx.md_ctx, image_ctx.op_work_queue) {
   }
 
-  MOCK_METHOD2(handle_mode_updated, void(cls::rbd::MirrorMode, Context*));
-  MOCK_METHOD4(handle_image_updated, void(cls::rbd::MirrorImageState,
+  MOCK_METHOD1(handle_mode_updated, void(cls::rbd::MirrorMode));
+  MOCK_METHOD3(handle_image_updated, void(cls::rbd::MirrorImageState,
                                           const std::string &,
-                                          const std::string &,
-                                          Context*));
+                                          const std::string &));
 };
 
 } // anonymous namespace
@@ -73,7 +72,7 @@ public:
 };
 
 TEST_F(TestMirroringWatcher, ModeUpdated) {
-  EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _));
+  EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED));
 
   C_SaferCond ctx;
   MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED, &ctx);
@@ -83,8 +82,8 @@ TEST_F(TestMirroringWatcher, ModeUpdated) {
 TEST_F(TestMirroringWatcher, ImageStatusUpdated) {
   EXPECT_CALL(*m_image_watcher,
               handle_image_updated(cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
-                                   StrEq("image id"), StrEq("global image id"),
-                                   _));
+                                   StrEq("image id"),
+                                   StrEq("global image id")));
 
   C_SaferCond ctx;
   MockMirroringWatcher::notify_image_updated(m_ioctx,
index 08ca7fa7d5cfc62fb2d5bb7d4499043f9711534c..88435ff7828fb9bc92f487e1a2f3aa97e729d89b 100644 (file)
@@ -9,8 +9,8 @@ set(rbd_mirror_test_srcs
   test_LeaderWatcher.cc
   test_fixture.cc
   )
-add_library(rbd_mirror STATIC ${rbd_mirror_test_srcs})
-set_target_properties(rbd_mirror PROPERTIES COMPILE_FLAGS
+add_library(rbd_mirror_test STATIC ${rbd_mirror_test_srcs})
+set_target_properties(rbd_mirror_test PROPERTIES COMPILE_FLAGS
   ${UNITTEST_CXX_FLAGS})
 
 add_executable(unittest_rbd_mirror
@@ -21,6 +21,7 @@ add_executable(unittest_rbd_mirror
   test_mock_ImageSyncThrottler.cc
   test_mock_InstanceWatcher.cc
   test_mock_LeaderWatcher.cc
+  test_mock_PoolWatcher.cc
   image_replayer/test_mock_BootstrapRequest.cc
   image_replayer/test_mock_CreateImageRequest.cc
   image_replayer/test_mock_EventPreprocessor.cc
@@ -40,7 +41,7 @@ add_dependencies(unittest_rbd_mirror
   cls_lock
   cls_rbd)
 target_link_libraries(unittest_rbd_mirror
-  rbd_mirror
+  rbd_mirror_test
   rados_test_stub
   rbd_mirror_internal
   rbd_mirror_types
@@ -65,7 +66,7 @@ add_executable(ceph_test_rbd_mirror
 set_target_properties(ceph_test_rbd_mirror PROPERTIES COMPILE_FLAGS
   ${UNITTEST_CXX_FLAGS})
 target_link_libraries(ceph_test_rbd_mirror
-  rbd_mirror
+  rbd_mirror_test
   rbd_mirror_internal
   rbd_mirror_types
   rbd_api
diff --git a/src/test/rbd_mirror/mock/MockContextWQ.h b/src/test/rbd_mirror/mock/MockContextWQ.h
new file mode 100644 (file)
index 0000000..1c0ee88
--- /dev/null
@@ -0,0 +1,18 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MOCK_CONTEXT_WQ_H
+#define CEPH_MOCK_CONTEXT_WQ_H
+
+#include <gmock/gmock.h>
+
+struct Context;
+
+struct MockContextWQ {
+  void queue(Context *ctx) {
+    queue(ctx, 0);
+  }
+  MOCK_METHOD2(queue, void(Context *, int));
+};
+
+#endif // CEPH_MOCK_CONTEXT_WQ_H
diff --git a/src/test/rbd_mirror/mock/MockSafeTimer.h b/src/test/rbd_mirror/mock/MockSafeTimer.h
new file mode 100644 (file)
index 0000000..4926660
--- /dev/null
@@ -0,0 +1,16 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MOCK_SAFE_TIMER_H
+#define CEPH_MOCK_SAFE_TIMER_H
+
+#include <gmock/gmock.h>
+
+struct Context;
+
+struct MockSafeTimer {
+  MOCK_METHOD2(add_event_after, void(double, Context*));
+  MOCK_METHOD1(cancel_event, void(Context *));
+};
+
+#endif // CEPH_MOCK_SAFE_TIMER_H
index 4e29dba97f6043b8b02f6489fbfdd340cbdbbd39..1e92670d8a57ae95dd39bb301695e03a1d1441dd 100644 (file)
@@ -81,8 +81,7 @@ TEST_F(TestMockPoolWatcherRefreshImagesRequest, Success) {
   req->send();
   ASSERT_EQ(0, ctx.wait());
 
-  ImageIds expected_image_ids = {{"global id", "local id",
-                                  boost::optional<std::string>{"image name"}}};
+  ImageIds expected_image_ids = {{"global id", "local id", "image name"}};
   ASSERT_EQ(expected_image_ids, image_ids);
 }
 
@@ -114,8 +113,7 @@ TEST_F(TestMockPoolWatcherRefreshImagesRequest, LargeDirectory) {
   req->send();
   ASSERT_EQ(0, ctx.wait());
 
-  expected_image_ids.insert({"global id", "local id",
-                             boost::optional<std::string>{"image name"}});
+  expected_image_ids.insert({"global id", "local id", "image name"});
   ASSERT_EQ(expected_image_ids, image_ids);
 }
 
index bbc9bb123e03643d62d514f28c507eeeba827a10..fbdaab9a218753d8c0c6ebcbba6b6a1875151cb0 100644 (file)
@@ -1,5 +1,6 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
+
 #include "include/rados/librados.hpp"
 #include "include/rbd/librbd.hpp"
 #include "include/stringify.h"
@@ -17,6 +18,7 @@
 #include "common/errno.h"
 #include "common/Mutex.h"
 #include "tools/rbd_mirror/PoolWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
 #include "tools/rbd_mirror/types.h"
 #include "test/librados/test.h"
 #include "gtest/gtest.h"
@@ -42,20 +44,48 @@ void register_test_pool_watcher() {
 class TestPoolWatcher : public ::rbd::mirror::TestFixture {
 public:
 
-TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
-    m_image_number(0), m_snap_number(0)
+  TestPoolWatcher()
+    : m_lock("TestPoolWatcherLock"), m_pool_watcher_listener(this),
+      m_image_number(0), m_snap_number(0)
   {
     m_cluster = std::make_shared<librados::Rados>();
     EXPECT_EQ("", connect_cluster_pp(*m_cluster));
   }
 
-  ~TestPoolWatcher() override {
+  void TearDown() override {
+    if (m_pool_watcher) {
+      C_SaferCond ctx;
+      m_pool_watcher->shut_down(&ctx);
+      EXPECT_EQ(0, ctx.wait());
+    }
+
     m_cluster->wait_for_latest_osdmap();
     for (auto& pool : m_pools) {
       EXPECT_EQ(0, m_cluster->pool_delete(pool.c_str()));
     }
+
+    TestFixture::TearDown();
   }
 
+  struct PoolWatcherListener : public PoolWatcher<>::Listener {
+    TestPoolWatcher *test;
+    Cond cond;
+    ImageIds image_ids;
+
+    PoolWatcherListener(TestPoolWatcher *test) : test(test) {
+    }
+
+    void handle_update(const ImageIds &added_image_ids,
+                       const ImageIds &removed_image_ids) override {
+      Mutex::Locker locker(test->m_lock);
+      for (auto &image_id : removed_image_ids) {
+        image_ids.erase(image_id);
+      }
+      image_ids.insert(added_image_ids.begin(), added_image_ids.end());
+      cond.Signal();
+    }
+  };
+
   void create_pool(bool enable_mirroring, const peer_t &peer, string *name=nullptr) {
     string pool_name = get_temp_pool_name("test-rbd-mirror-");
     ASSERT_EQ(0, m_cluster->pool_create(pool_name.c_str()));
@@ -67,7 +97,9 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
     librados::IoCtx ioctx;
     ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx));
 
-    m_pool_watcher.reset(new PoolWatcher(ioctx, 30, m_lock, m_cond));
+    m_pool_watcher.reset(new PoolWatcher<>(m_threads, ioctx,
+                                           m_pool_watcher_listener));
+
     if (enable_mirroring) {
       ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(ioctx,
                                                    RBD_MIRROR_MODE_POOL));
@@ -79,6 +111,8 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
     if (name != nullptr) {
       *name = pool_name;
     }
+
+    m_pool_watcher->init();
   }
 
   string get_image_id(librados::IoCtx *ioctx, const string &image_name) {
@@ -166,15 +200,21 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
   }
 
   void check_images() {
-    m_pool_watcher->refresh_images(false);
     Mutex::Locker l(m_lock);
-    ASSERT_EQ(m_mirrored_images, m_pool_watcher->get_images());
+    while (m_mirrored_images != m_pool_watcher_listener.image_ids) {
+      if (m_pool_watcher_listener.cond.WaitInterval(
+            m_lock, utime_t(10, 0)) != 0) {
+        break;
+      }
+    }
+
+    ASSERT_EQ(m_mirrored_images, m_pool_watcher_listener.image_ids);
   }
 
   Mutex m_lock;
-  Cond m_cond;
   RadosRef m_cluster;
-  unique_ptr<PoolWatcher> m_pool_watcher;
+  PoolWatcherListener m_pool_watcher_listener;
+  unique_ptr<PoolWatcher<> > m_pool_watcher;
 
   set<string> m_pools;
   ImageIds m_mirrored_images;
diff --git a/src/test/rbd_mirror/test_mock_PoolWatcher.cc b/src/test/rbd_mirror/test_mock_PoolWatcher.cc
new file mode 100644 (file)
index 0000000..737aad9
--- /dev/null
@@ -0,0 +1,807 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/mock/MockContextWQ.h"
+#include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include "librbd/MirroringWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/PoolWatcher.h"
+#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
+#include "include/stringify.h"
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+  MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+struct MockMirroringWatcher {
+  static MockMirroringWatcher *s_instance;
+  static MockMirroringWatcher &get_instance() {
+    assert(s_instance != nullptr);
+    return *s_instance;
+  }
+
+  MockMirroringWatcher() {
+    s_instance = this;
+  }
+
+  MOCK_CONST_METHOD0(is_unregistered, bool());
+  MOCK_METHOD1(register_watch, void(Context*));
+  MOCK_METHOD1(unregister_watch, void(Context*));
+
+  MOCK_CONST_METHOD0(get_oid, std::string());
+};
+
+template <>
+struct MirroringWatcher<MockTestImageCtx> {
+  static MirroringWatcher *s_instance;
+
+  MirroringWatcher(librados::IoCtx &io_ctx, ::MockContextWQ *work_queue) {
+    s_instance = this;
+  }
+  virtual ~MirroringWatcher() {
+  }
+
+  static MirroringWatcher<MockTestImageCtx> &get_instance() {
+    assert(s_instance != nullptr);
+    return *s_instance;
+  }
+
+  virtual void handle_rewatch_complete(int r) = 0;
+
+  virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) = 0;
+  virtual void handle_image_updated(cls::rbd::MirrorImageState state,
+                                    const std::string &remote_image_id,
+                                    const std::string &global_image_id) = 0;
+
+  bool is_unregistered() const {
+    return MockMirroringWatcher::get_instance().is_unregistered();
+  }
+  void register_watch(Context *ctx) {
+    MockMirroringWatcher::get_instance().register_watch(ctx);
+  }
+  void unregister_watch(Context *ctx) {
+    MockMirroringWatcher::get_instance().unregister_watch(ctx);
+  }
+  std::string get_oid() const {
+    return MockMirroringWatcher::get_instance().get_oid();
+  }
+};
+
+MockMirroringWatcher *MockMirroringWatcher::s_instance = nullptr;
+MirroringWatcher<MockTestImageCtx> *MirroringWatcher<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+  MockSafeTimer *timer;
+  Mutex &timer_lock;
+
+  MockContextWQ *work_queue;
+
+  Threads(Threads<librbd::ImageCtx> *threads)
+    : timer(new MockSafeTimer()),
+      timer_lock(threads->timer_lock),
+      work_queue(new MockContextWQ()) {
+  }
+  ~Threads() {
+    delete timer;
+    delete work_queue;
+  }
+};
+
+namespace pool_watcher {
+
+template <>
+struct RefreshImagesRequest<librbd::MockTestImageCtx> {
+  ImageIds *image_ids = nullptr;
+  Context *on_finish = nullptr;
+  static RefreshImagesRequest *s_instance;
+  static RefreshImagesRequest *create(librados::IoCtx &io_ctx,
+                                      ImageIds *image_ids,
+                                      Context *on_finish) {
+    assert(s_instance != nullptr);
+    s_instance->image_ids = image_ids;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  MOCK_METHOD0(send, void());
+
+  RefreshImagesRequest() {
+    s_instance = this;
+  }
+};
+
+RefreshImagesRequest<librbd::MockTestImageCtx> *RefreshImagesRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace pool_watcher
+
+} // namespace mirror
+} // namespace rbd
+
+// template definitions
+#include "tools/rbd_mirror/PoolWatcher.cc"
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::StrEq;
+using ::testing::WithArg;
+using ::testing::WithoutArgs;
+
+class TestMockPoolWatcher : public TestMockFixture {
+public:
+  typedef PoolWatcher<librbd::MockTestImageCtx> MockPoolWatcher;
+  typedef Threads<librbd::MockTestImageCtx> MockThreads;
+  typedef pool_watcher::RefreshImagesRequest<librbd::MockTestImageCtx> MockRefreshImagesRequest;
+  typedef librbd::MockMirroringWatcher MockMirroringWatcher;
+  typedef librbd::MirroringWatcher<librbd::MockTestImageCtx> MirroringWatcher;
+
+  struct MockListener : MockPoolWatcher::Listener {
+    TestMockPoolWatcher *test;
+
+    MockListener(TestMockPoolWatcher *test) : test(test) {
+    }
+
+    MOCK_METHOD2(handle_update, void(const ImageIds &, const ImageIds &));
+  };
+
+  TestMockPoolWatcher() : m_lock("TestMockPoolWatcher::m_lock") {
+  }
+
+  void expect_work_queue(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+      .WillRepeatedly(Invoke([this](Context *ctx, int r) {
+          m_threads->work_queue->queue(ctx, r);
+        }));
+  }
+
+  void expect_mirroring_watcher_is_unregistered(MockMirroringWatcher &mock_mirroring_watcher,
+                                                bool unregistered) {
+    EXPECT_CALL(mock_mirroring_watcher, is_unregistered())
+      .WillOnce(Return(unregistered));
+  }
+
+  void expect_mirroring_watcher_register(MockMirroringWatcher &mock_mirroring_watcher,
+                                         int r) {
+    EXPECT_CALL(mock_mirroring_watcher, register_watch(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_mirroring_watcher_unregister(MockMirroringWatcher &mock_mirroring_watcher,
+                                         int r) {
+    EXPECT_CALL(mock_mirroring_watcher, unregister_watch(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_refresh_images(MockRefreshImagesRequest &request,
+                             const ImageIds &image_ids, int r) {
+    EXPECT_CALL(request, send())
+      .WillOnce(Invoke([&request, image_ids, r]() {
+          *request.image_ids = image_ids;
+          request.on_finish->complete(r);
+        }));
+  }
+
+  void expect_listener_handle_update(MockListener &mock_listener,
+                                     const ImageIds &added_image_ids,
+                                     const ImageIds &removed_image_ids) {
+    EXPECT_CALL(mock_listener, handle_update(added_image_ids, removed_image_ids))
+      .WillOnce(WithoutArgs(Invoke([this]() {
+          Mutex::Locker locker(m_lock);
+          ++m_update_count;
+          m_cond.Signal();
+        })));
+  }
+
+  void expect_dir_list(librados::IoCtx &io_ctx,
+                       const std::string &id, const std::string &name, int r) {
+    bufferlist in_bl;
+    ::encode(id, in_bl);
+
+    bufferlist out_bl;
+    ::encode(name, out_bl);
+
+    EXPECT_CALL(get_mock_io_ctx(io_ctx),
+                exec(RBD_DIRECTORY, _, StrEq("rbd"), StrEq("dir_get_name"),
+                     ContentsEqual(in_bl), _, _))
+      .WillOnce(DoAll(WithArg<5>(Invoke([this, out_bl](bufferlist *bl) {
+                          *bl = out_bl;
+                          Mutex::Locker locker(m_lock);
+                          ++m_get_name_count;
+                          m_cond.Signal();
+                        })),
+                      Return(r)));
+  }
+
+  void expect_timer_add_event(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+      .WillOnce(WithArg<1>(Invoke([](Context *ctx) {
+          ctx->complete(0);
+        })));
+  }
+
+  int when_shut_down(MockPoolWatcher &mock_pool_watcher) {
+    C_SaferCond ctx;
+    mock_pool_watcher.shut_down(&ctx);
+    return ctx.wait();
+  }
+
+  bool wait_for_update(uint32_t count) {
+    Mutex::Locker locker(m_lock);
+    while (m_update_count < count) {
+      if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) {
+        break;
+      }
+    }
+    if (m_update_count < count) {
+      return false;
+    }
+
+    m_update_count -= count;
+    return true;
+  }
+
+  bool wait_for_get_name(uint32_t count) {
+    Mutex::Locker locker(m_lock);
+    while (m_get_name_count < count) {
+      if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) {
+        break;
+      }
+    }
+    if (m_get_name_count < count) {
+      return false;
+    }
+
+    m_get_name_count -= count;
+    return true;
+  }
+
+  Mutex m_lock;
+  Cond m_cond;
+  uint32_t m_update_count = 0;
+  uint32_t m_get_name_count = 0;
+};
+
+TEST_F(TestMockPoolWatcher, EmptyPool) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_TRUE(wait_for_update(1));
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, NonEmptyPool) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  ImageIds image_ids{
+    {"global id 1", "remote id 1", "image name 1"},
+    {"global id 2", "remote id 2", "image name 2"}};
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, image_ids, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, image_ids, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_TRUE(wait_for_update(1));
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, NotifyDuringRefresh) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  ImageIds image_ids{
+    {"global id 1", "remote id 1", "image name 1"},
+    {"global id 2", "remote id 2", "image name 2"}};
+  MockRefreshImagesRequest mock_refresh_images_request;
+  bool refresh_sent = false;
+  EXPECT_CALL(mock_refresh_images_request, send())
+    .WillOnce(Invoke([this, &mock_refresh_images_request, &image_ids,
+                      &refresh_sent]() {
+       *mock_refresh_images_request.image_ids = image_ids;
+
+        Mutex::Locker locker(m_lock);
+        refresh_sent = true;
+        m_cond.Signal();
+      }));
+
+  expect_dir_list(m_remote_io_ctx, "remote id 1a", "image name 1a", 0);
+  expect_dir_list(m_remote_io_ctx, "remote id 3", "image name 3", 0);
+  expect_dir_list(m_remote_io_ctx, "dummy", "", -ENOENT);
+
+  MockListener mock_listener(this);
+  image_ids = {
+    {"global id 1", "remote id 1a", "image name 1a"},
+    {"global id 3", "remote id 3", "image name 3"}};
+  expect_listener_handle_update(mock_listener, image_ids, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  mock_pool_watcher.init(nullptr);
+
+  {
+    Mutex::Locker locker(m_lock);
+    while (!refresh_sent) {
+      m_cond.Wait(m_lock);
+    }
+  }
+
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_DISABLING, "remote id 2", "global id 2");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 1a", "global id 1");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 3", "global id 3");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "dummy", "dummy");
+  wait_for_get_name(3);
+
+  mock_refresh_images_request.on_finish->complete(0);
+  ASSERT_TRUE(wait_for_update(1));
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, Notify) {
+  MockThreads mock_threads(m_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  ImageIds image_ids{
+    {"global id 1", "remote id 1", "image name 1"},
+    {"global id 2", "remote id 2", "image name 2"}};
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, image_ids, 0);
+  EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+    .WillOnce(Invoke([this](Context *ctx, int r) {
+        m_threads->work_queue->queue(ctx, r);
+      }));
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, image_ids, {});
+
+  Context *notify_ctx = nullptr;
+  EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+    .WillOnce(Invoke([this, &notify_ctx](Context *ctx, int r) {
+        Mutex::Locker locker(m_lock);
+        ASSERT_EQ(nullptr, notify_ctx);
+        notify_ctx = ctx;
+        m_cond.Signal();
+      }));
+  expect_dir_list(m_remote_io_ctx, "remote id 1a", "image name 1a", 0);
+  expect_dir_list(m_remote_io_ctx, "remote id 3", "image name 3", 0);
+  expect_dir_list(m_remote_io_ctx, "dummy", "", -ENOENT);
+  expect_listener_handle_update(
+    mock_listener,
+    {{"global id 1", "remote id 1a", "image name 1a"},
+     {"global id 3", "remote id 3", "image name 3"}},
+    {{"global id 1", "remote id 1", "image name 1"},
+     {"global id 2", "remote id 2", "image name 2"}});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_TRUE(wait_for_update(1));
+
+  C_SaferCond flush_ctx;
+  m_threads->work_queue->queue(&flush_ctx, 0);
+  ASSERT_EQ(0, flush_ctx.wait());
+
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_DISABLING, "remote id 2", "global id 2");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_DISABLED, "remote id 2", "global id 2");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 1a", "global id 1");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 3", "global id 3");
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "dummy", "dummy");
+  ASSERT_TRUE(wait_for_get_name(3));
+  notify_ctx->complete(0);
+
+  ASSERT_TRUE(wait_for_update(1));
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RegisterWatcherBlacklist) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, -EBLACKLISTED);
+
+  MockListener mock_listener(this);
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(-EBLACKLISTED, ctx.wait());
+  ASSERT_TRUE(mock_pool_watcher.is_blacklisted());
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RegisterWatcherMissing) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, -ENOENT);
+  expect_timer_add_event(mock_threads);
+
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_TRUE(wait_for_update(1));
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RegisterWatcherError) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, -EINVAL);
+  expect_timer_add_event(mock_threads);
+
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_TRUE(wait_for_update(1));
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RefreshBlacklist) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, -EBLACKLISTED);
+
+  MockListener mock_listener(this);
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(-EBLACKLISTED, ctx.wait());
+  ASSERT_TRUE(mock_pool_watcher.is_blacklisted());
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RefreshMissing) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, -ENOENT);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_TRUE(wait_for_update(1));
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RefreshError) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, -EINVAL);
+  expect_timer_add_event(mock_threads);
+
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_TRUE(wait_for_update(1));
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, Rewatch) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  expect_timer_add_event(mock_threads);
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+  expect_refresh_images(mock_refresh_images_request, {{"global id", "image id", "name"}}, 0);
+  expect_listener_handle_update(mock_listener, {{"global id", "image id", "name"}}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_TRUE(wait_for_update(1));
+
+  MirroringWatcher::get_instance().handle_rewatch_complete(0);
+  ASSERT_TRUE(wait_for_update(1));
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RewatchBlacklist) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_TRUE(wait_for_update(1));
+
+  MirroringWatcher::get_instance().handle_rewatch_complete(-EBLACKLISTED);
+  ASSERT_TRUE(mock_pool_watcher.is_blacklisted());
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RewatchError) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  expect_timer_add_event(mock_threads);
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+  expect_refresh_images(mock_refresh_images_request, {{"global id", "image id", "name"}}, 0);
+  expect_listener_handle_update(mock_listener, {{"global id", "image id", "name"}}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_TRUE(wait_for_update(1));
+
+  MirroringWatcher::get_instance().handle_rewatch_complete(-EINVAL);
+  ASSERT_TRUE(wait_for_update(1));
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, GetImageNameBlacklist) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  expect_dir_list(m_remote_io_ctx, "remote id", "image name", -EBLACKLISTED);
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_TRUE(wait_for_update(1));
+
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id", "global id");
+  ASSERT_TRUE(wait_for_get_name(1));
+  while (true) {
+    if (mock_pool_watcher.is_blacklisted()) {
+      break;
+    }
+    usleep(1000);
+  }
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, GetImageNameError) {
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  InSequence seq;
+  MockMirroringWatcher mock_mirroring_watcher;
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+  expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+  MockRefreshImagesRequest mock_refresh_images_request;
+  expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+  MockListener mock_listener(this);
+  expect_listener_handle_update(mock_listener, {}, {});
+
+  expect_dir_list(m_remote_io_ctx, "remote id", "image name", -EINVAL);
+  expect_timer_add_event(mock_threads);
+
+  expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+  expect_refresh_images(mock_refresh_images_request, {{"global id", "remote id", "name"}}, 0);
+  expect_listener_handle_update(mock_listener, {{"global id", "remote id", "name"}}, {});
+
+  MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+                                    mock_listener);
+  C_SaferCond ctx;
+  mock_pool_watcher.init(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_TRUE(wait_for_update(1));
+
+  MirroringWatcher::get_instance().handle_image_updated(
+    cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id", "global id");
+  ASSERT_TRUE(wait_for_get_name(1));
+  ASSERT_TRUE(wait_for_update(1));
+
+  expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+  ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+} // namespace mirror
+} // namespace rbd
index 7de87f3d15c1fa8bc412156eba2bed23edc63f0f..0a58a6b72c6b9bdd27cce3941e5b89abfdda43cf 100644 (file)
@@ -295,8 +295,9 @@ ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
         << ": " << cpp_strerror(r) << dendl;
     pool_name = stringify(m_local_pool_id);
   }
-  m_name = pool_name + "/" + m_global_image_id;
 
+  m_name = pool_name + "/" + m_global_image_id;
+  dout(20) << "registered asok hook: " << m_name << dendl;
   m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
                                                     this);
 }
@@ -515,6 +516,7 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
       }
     }
     if (!m_asok_hook) {
+      dout(20) << "registered asok hook: " << m_name << dendl;
       m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
                                                         this);
     }
index f654855faab2bb37bf618e7a1b59a94b8feadd19..270180738e0fe5be46ce9effe1a4722e12a0445b 100644 (file)
@@ -1,17 +1,21 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
-#include <boost/bind.hpp>
-
+#include "tools/rbd_mirror/PoolWatcher.h"
+#include "include/rbd_types.h"
+#include "cls/rbd/cls_rbd_client.h"
 #include "common/debug.h"
 #include "common/errno.h"
-
-#include "cls/rbd/cls_rbd_client.h"
-#include "include/rbd_types.h"
+#include "common/Timer.h"
+#include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
+#include "librbd/MirroringWatcher.h"
+#include "librbd/Utils.h"
 #include "librbd/api/Image.h"
 #include "librbd/api/Mirror.h"
-#include "PoolWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
+#include <boost/bind.hpp>
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -23,123 +27,526 @@ using std::list;
 using std::string;
 using std::unique_ptr;
 using std::vector;
-
-using librados::Rados;
-using librados::IoCtx;
-using librbd::cls_client::mirror_image_list;
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_callback;
 
 namespace rbd {
 namespace mirror {
 
-PoolWatcher::PoolWatcher(librados::IoCtx &remote_io_ctx,
-                         double interval_seconds,
-                        Mutex &lock, Cond &cond) :
-  m_lock(lock),
-  m_refresh_cond(cond),
-  m_timer(g_ceph_context, m_lock, false),
-  m_interval(interval_seconds)
-{
-  m_remote_io_ctx.dup(remote_io_ctx);
-  m_timer.init();
+template <typename I>
+class PoolWatcher<I>::MirroringWatcher : public librbd::MirroringWatcher<I> {
+public:
+  using ContextWQ = typename std::decay<
+    typename std::remove_pointer<
+      decltype(Threads<I>::work_queue)>::type>::type;
+
+  MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+                   PoolWatcher *pool_watcher)
+    : librbd::MirroringWatcher<I>(io_ctx, work_queue),
+      m_pool_watcher(pool_watcher) {
+  }
+
+  void handle_rewatch_complete(int r) override {
+    m_pool_watcher->handle_rewatch_complete(r);
+  }
+
+  void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override {
+    // do nothing
+  }
+
+  void handle_image_updated(cls::rbd::MirrorImageState state,
+                            const std::string &remote_image_id,
+                            const std::string &global_image_id) override {
+    bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+    m_pool_watcher->handle_image_updated(remote_image_id, global_image_id,
+                                         enabled);
+  }
+
+private:
+  PoolWatcher *m_pool_watcher;
+};
+
+template <typename I>
+PoolWatcher<I>::PoolWatcher(Threads<I> *threads, librados::IoCtx &remote_io_ctx,
+                            Listener &listener)
+  : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener),
+    m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) {
+  m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx,
+                                             m_threads->work_queue, this);
 }
 
-PoolWatcher::~PoolWatcher()
-{
-  Mutex::Locker l(m_lock);
-  m_stopping = true;
-  m_timer.shutdown();
+template <typename I>
+PoolWatcher<I>::~PoolWatcher() {
+  delete m_mirroring_watcher;
 }
 
-bool PoolWatcher::is_blacklisted() const {
-  assert(m_lock.is_locked());
+template <typename I>
+bool PoolWatcher<I>::is_blacklisted() const {
+  Mutex::Locker locker(m_lock);
   return m_blacklisted;
 }
 
-const ImageIds& PoolWatcher::get_images() const
-{
-  assert(m_lock.is_locked());
-  return m_images;
+template <typename I>
+void PoolWatcher<I>::init(Context *on_finish) {
+  dout(5) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    m_on_init_finish = on_finish;
+  }
+
+  // start async updates for mirror image directory
+  register_watcher();
+}
+
+template <typename I>
+void PoolWatcher<I>::shut_down(Context *on_finish) {
+  dout(5) << dendl;
+
+  {
+    Mutex::Locker timer_locker(m_threads->timer_lock);
+    Mutex::Locker locker(m_lock);
+
+    assert(!m_shutting_down);
+    m_shutting_down = true;
+    if (m_timer_ctx != nullptr) {
+      m_threads->timer->cancel_event(m_timer_ctx);
+      m_timer_ctx = nullptr;
+    }
+  }
+
+  // in-progress unregister tracked as async op
+  unregister_watcher();
+
+  m_async_op_tracker.wait_for_ops(on_finish);
 }
 
-void PoolWatcher::refresh_images(bool reschedule)
-{
-  ImageIds image_ids;
-  int r = refresh(&image_ids);
+template <typename I>
+void PoolWatcher<I>::register_watcher() {
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_image_ids_invalid);
+    assert(!m_refresh_in_progress);
+    m_refresh_in_progress = true;
+  }
+
+  // if the watch registration is in-flight, let the watcher
+  // handle the transition -- only (re-)register if it's not registered
+  if (!m_mirroring_watcher->is_unregistered()) {
+    refresh_images();
+    return;
+  }
+
+  // first time registering or the watch failed
+  dout(5) << dendl;
+  m_async_op_tracker.start_op();
+
+  Context *ctx = create_context_callback<
+    PoolWatcher, &PoolWatcher<I>::handle_register_watcher>(this);
+  m_mirroring_watcher->register_watch(ctx);
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_register_watcher(int r) {
+  dout(5) << "r=" << r << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_image_ids_invalid);
+    assert(m_refresh_in_progress);
+    if (r < 0) {
+      m_refresh_in_progress = false;
+    }
+  }
 
-  Mutex::Locker l(m_lock);
+  Context *on_init_finish = nullptr;
   if (r >= 0) {
-    m_images = std::move(image_ids);
+    refresh_images();
   } else if (r == -EBLACKLISTED) {
-    derr << "blacklisted during image refresh" << dendl;
+    dout(0) << "detected client is blacklisted" << dendl;
+
+    Mutex::Locker locker(m_lock);
     m_blacklisted = true;
+    std::swap(on_init_finish, m_on_init_finish);
+  } else if (r == -ENOENT) {
+    dout(5) << "mirroring directory does not exist" << dendl;
+    schedule_refresh_images(30);
+  } else {
+    derr << "unexpected error registering mirroring directory watch: "
+         << cpp_strerror(r) << dendl;
+    schedule_refresh_images(10);
+  }
+
+  m_async_op_tracker.finish_op();
+  if (on_init_finish != nullptr) {
+    on_init_finish->complete(r);
   }
+}
 
-  if (!m_stopping && reschedule) {
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&PoolWatcher::refresh_images, this, true));
-    m_timer.add_event_after(m_interval, ctx);
+template <typename I>
+void PoolWatcher<I>::unregister_watcher() {
+  dout(5) << dendl;
+
+  m_async_op_tracker.start_op();
+  Context *ctx = new FunctionContext([this](int r) {
+      dout(5) << "unregister_watcher: r=" << r << dendl;
+      if (r < 0) {
+        derr << "error unregistering watcher for "
+             << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r)
+             << dendl;
+      }
+      m_async_op_tracker.finish_op();
+    });
+
+  m_mirroring_watcher->unregister_watch(ctx);
+}
+
+template <typename I>
+void PoolWatcher<I>::refresh_images() {
+  dout(5) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_image_ids_invalid);
+    assert(m_refresh_in_progress);
+
+    // clear all pending notification events since we need to perform
+    // a full image list refresh
+    m_pending_added_image_ids.clear();
+    m_pending_removed_image_ids.clear();
+    if (!m_updated_images.empty()) {
+      auto it = m_updated_images.begin();
+      it->invalid = true;
+
+      // only have a single in-flight request -- remove the rest
+      ++it;
+      while (it != m_updated_images.end()) {
+        m_id_to_updated_images.erase({it->global_image_id,
+                                      it->remote_image_id});
+        it = m_updated_images.erase(it);
+      }
+    }
   }
-  m_refresh_cond.Signal();
-  // TODO: perhaps use a workqueue instead, once we get notifications
-  // about new/removed mirrored images
+
+  m_async_op_tracker.start_op();
+  m_refresh_image_ids.clear();
+  Context *ctx = create_context_callback<
+    PoolWatcher, &PoolWatcher<I>::handle_refresh_images>(this);
+  auto req = pool_watcher::RefreshImagesRequest<I>::create(m_remote_io_ctx,
+                                                           &m_refresh_image_ids,
+                                                           ctx);
+  req->send();
 }
 
-int PoolWatcher::refresh(ImageIds *image_ids) {
-  dout(20) << "enter" << dendl;
+template <typename I>
+void PoolWatcher<I>::handle_refresh_images(int r) {
+  dout(5) << "r=" << r << dendl;
+
+  bool retry_refresh = false;
+  Context *on_init_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_image_ids_invalid);
+    assert(m_refresh_in_progress);
+    m_refresh_in_progress = false;
+
+    if (r >= 0) {
+      m_image_ids_invalid = false;
+      m_pending_image_ids = m_refresh_image_ids;
+      std::swap(on_init_finish, m_on_init_finish);
+      schedule_listener();
+    } else if (r == -EBLACKLISTED) {
+      dout(0) << "detected client is blacklisted during image refresh" << dendl;
+
+      m_blacklisted = true;
+      std::swap(on_init_finish, m_on_init_finish);
+    } else if (r == -ENOENT) {
+      dout(5) << "mirroring directory not found" << dendl;
+      m_image_ids_invalid = false;
+      m_pending_image_ids.clear();
+      std::swap(on_init_finish, m_on_init_finish);
+      r = 0;
+      schedule_listener();
+    } else {
+      retry_refresh = true;
+    }
+  }
+
+  if (retry_refresh) {
+    derr << "failed to retrieve mirroring directory: " << cpp_strerror(r)
+         << dendl;
+    schedule_refresh_images(10);
+  }
 
-  std::string pool_name = m_remote_io_ctx.get_pool_name();
-  rbd_mirror_mode_t mirror_mode;
-  int r = librbd::api::Mirror<>::mode_get(m_remote_io_ctx, &mirror_mode);
-  if (r < 0) {
-    derr << "could not tell whether mirroring was enabled for "
-         << pool_name << ": " << cpp_strerror(r) << dendl;
-    return r;
+  m_async_op_tracker.finish_op();
+  if (on_init_finish != nullptr) {
+    on_init_finish->complete(r);
   }
-  if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
-    dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl;
-    return 0;
+}
+
+template <typename I>
+void PoolWatcher<I>::schedule_refresh_images(double interval) {
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  Mutex::Locker locker(m_lock);
+  if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) {
+    return;
   }
 
-  std::map<std::string, std::string> images_map;
-  r = librbd::api::Image<>::list_images(m_remote_io_ctx, &images_map);
-  if (r < 0) {
-    derr << "error retrieving image names from pool " << pool_name << ": "
+  m_image_ids_invalid = true;
+  m_timer_ctx = new FunctionContext([this](int r) {
+      processs_refresh_images();
+    });
+  m_threads->timer->add_event_after(interval, m_timer_ctx);
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_rewatch_complete(int r) {
+  dout(5) << "r=" << r << dendl;
+
+  if (r == -EBLACKLISTED) {
+    dout(0) << "detected client is blacklisted" << dendl;
+
+    Mutex::Locker locker(m_lock);
+    m_blacklisted = true;
+    return;
+  } else if (r == -ENOENT) {
+    dout(5) << "mirroring directory deleted" << dendl;
+  } else if (r < 0) {
+    derr << "unexpected error re-registering mirroring directory watch: "
          << cpp_strerror(r) << dendl;
-    return r;
   }
 
-  std::map<std::string, std::string> image_id_to_name;
-  for (const auto& img_pair : images_map) {
-    image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first));
+  schedule_refresh_images(5);
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_image_updated(const std::string &remote_image_id,
+                                       const std::string &global_image_id,
+                                       bool enabled) {
+  dout(10) << "remote_image_id=" << remote_image_id << ", "
+           << "global_image_id=" << global_image_id << ", "
+           << "enabled=" << enabled << dendl;
+
+  Mutex::Locker locker(m_lock);
+  ImageId image_id(global_image_id, remote_image_id);
+  m_pending_added_image_ids.erase(image_id);
+  m_pending_removed_image_ids.erase(image_id);
+
+  auto id = std::make_pair(global_image_id, remote_image_id);
+  auto id_it = m_id_to_updated_images.find(id);
+  if (id_it != m_id_to_updated_images.end()) {
+    id_it->second->enabled = enabled;
+    id_it->second->invalid = false;
+  } else if (enabled) {
+    // need to resolve the image name before notifying listener
+    auto it = m_updated_images.emplace(m_updated_images.end(),
+                                       global_image_id, remote_image_id);
+    m_id_to_updated_images[id] = it;
+    schedule_get_image_name();
+  } else {
+    // delete image w/ if no resolve name in-flight
+    m_pending_removed_image_ids.insert(image_id);
+    schedule_listener();
   }
+}
 
-  std::string last_read = "";
-  int max_read = 1024;
-  do {
-    std::map<std::string, std::string> mirror_images;
-    r =  mirror_image_list(&m_remote_io_ctx, last_read, max_read,
-                           &mirror_images);
-    if (r < 0) {
-      derr << "error listing mirrored image directory: "
-           << cpp_strerror(r) << dendl;
-      return r;
+template <typename I>
+void PoolWatcher<I>::schedule_get_image_name() {
+  assert(m_lock.is_locked());
+  if (m_shutting_down || m_blacklisted || m_updated_images.empty() ||
+      m_get_name_in_progress) {
+    return;
+  }
+  m_get_name_in_progress = true;
+
+  auto &updated_image = m_updated_images.front();
+  dout(10) << "global_image_id=" << updated_image.global_image_id << ", "
+           << "remote_image_id=" << updated_image.remote_image_id << dendl;
+
+  librados::ObjectReadOperation op;
+  librbd::cls_client::dir_get_name_start(&op, updated_image.remote_image_id);
+
+  m_async_op_tracker.start_op();
+
+  m_out_bl.clear();
+  librados::AioCompletion *aio_comp = create_rados_callback<
+    PoolWatcher, &PoolWatcher<I>::handle_get_image_name>(this);
+  int r = m_remote_io_ctx.aio_operate(RBD_DIRECTORY, aio_comp, &op, &m_out_bl);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_get_image_name(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::string name;
+  if (r == 0) {
+    bufferlist::iterator it = m_out_bl.begin();
+    r = librbd::cls_client::dir_get_name_finish(&it, &name);
+  }
+
+  bool image_ids_invalid = false;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(!m_updated_images.empty());
+    m_get_name_in_progress = false;
+
+    auto updated_image = m_updated_images.front();
+    m_updated_images.pop_front();
+    m_id_to_updated_images.erase(std::make_pair(updated_image.global_image_id,
+                                                updated_image.remote_image_id));
+
+    if (r == 0) {
+      // since names are resolved in event order -- the current update is
+      // the latest state
+      ImageId image_id(updated_image.global_image_id,
+                       updated_image.remote_image_id, name);
+      m_pending_added_image_ids.erase(image_id);
+      m_pending_removed_image_ids.erase(image_id);
+      if (!updated_image.invalid) {
+        if (updated_image.enabled) {
+          m_pending_added_image_ids.insert(image_id);
+        } else {
+          m_pending_removed_image_ids.insert(image_id);
+        }
+        schedule_listener();
+      }
+    } else if (r == -EBLACKLISTED) {
+      dout(0) << "detected client is blacklisted" << dendl;
+
+      m_blacklisted = true;
+    } else if (r == -ENOENT) {
+      dout(10) << "image removed after add notification" << dendl;
+    } else {
+      derr << "error resolving image name " << updated_image.remote_image_id
+           << " (" << updated_image.global_image_id << "): " << cpp_strerror(r)
+           << dendl;
+      image_ids_invalid = true;
+    }
+
+    if (!image_ids_invalid) {
+      schedule_get_image_name();
     }
-    for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
-      std::string image_name;
-      auto it2 = image_id_to_name.find(it->first);
-      if (it2 != image_id_to_name.end()) {
-        image_name = it2->second;
+  }
+
+  if (image_ids_invalid) {
+    schedule_refresh_images(5);
+  }
+  m_async_op_tracker.finish_op();
+}
+
+template <typename I>
+void PoolWatcher<I>::processs_refresh_images() {
+  assert(m_threads->timer_lock.is_locked());
+  assert(m_timer_ctx != nullptr);
+  m_timer_ctx = nullptr;
+
+  // execute outside of the timer's lock
+  m_async_op_tracker.start_op();
+  Context *ctx = new FunctionContext([this](int r) {
+      register_watcher();
+      m_async_op_tracker.finish_op();
+    });
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void PoolWatcher<I>::schedule_listener() {
+  assert(m_lock.is_locked());
+  m_pending_updates = true;
+  if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) {
+    return;
+  }
+
+  dout(20) << dendl;
+
+  m_async_op_tracker.start_op();
+  Context *ctx = new FunctionContext([this](int r) {
+      notify_listener();
+      m_async_op_tracker.finish_op();
+    });
+
+  m_notify_listener_in_progress = true;
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void PoolWatcher<I>::notify_listener() {
+  dout(10) << dendl;
+
+  ImageIds added_image_ids;
+  ImageIds removed_image_ids;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_notify_listener_in_progress);
+
+    // if the watch failed while we didn't own the lock, we are going
+    // to need to perform a full refresh
+    if (m_image_ids_invalid) {
+      m_notify_listener_in_progress = false;
+      return;
+    }
+
+    // merge add/remove notifications into pending set (a given image
+    // can only be in one set or another)
+    for (auto it = m_pending_removed_image_ids.begin();
+         it != m_pending_removed_image_ids.end(); ) {
+      if (std::find_if(m_updated_images.begin(), m_updated_images.end(),
+                       [&it](const UpdatedImage &updated_image) {
+              return (it->id == updated_image.remote_image_id);
+            }) != m_updated_images.end()) {
+        // still resolving the name -- so keep it in the pending set
+        auto image_id_it = m_image_ids.find(*it);
+        if (image_id_it != m_image_ids.end()) {
+          m_pending_image_ids.insert(*image_id_it);
+        }
+        ++it;
+        continue;
+      }
+
+      // merge the remove event into the pending set
+      m_pending_image_ids.erase(*it);
+      it = m_pending_removed_image_ids.erase(it);
+    }
+
+    for (auto &image_id : m_pending_added_image_ids) {
+      dout(20) << "image_id=" << image_id << dendl;
+      m_pending_image_ids.erase(image_id);
+      m_pending_image_ids.insert(image_id);
+    }
+    m_pending_added_image_ids.clear();
+
+    // compute added/removed images
+    for (auto &image_id : m_image_ids) {
+      auto it = m_pending_image_ids.find(image_id);
+      if (it == m_pending_image_ids.end() || it->id != image_id.id) {
+        removed_image_ids.insert(image_id);
       }
-      image_ids->insert(ImageId(it->second, it->first, image_name));
     }
-    if (!mirror_images.empty()) {
-      last_read = mirror_images.rbegin()->first;
+    for (auto &image_id : m_pending_image_ids) {
+      auto it = m_image_ids.find(image_id);
+      if (it == m_image_ids.end() || it->id != image_id.id) {
+        added_image_ids.insert(image_id);
+      }
     }
-    r = mirror_images.size();
-  } while (r == max_read);
 
-  return 0;
+    m_pending_updates = false;
+    m_image_ids = m_pending_image_ids;
+  }
+
+  m_listener.handle_update(added_image_ids, removed_image_ids);
+
+  {
+    Mutex::Locker locker(m_lock);
+    m_notify_listener_in_progress = false;
+    if (m_pending_updates) {
+      schedule_listener();
+    }
+  }
 }
 
 } // namespace mirror
 } // namespace rbd
+
+template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;
index 721106f761edf8e7bc7f55dc4d307ce32e5a5766..1cd2ede2d12aafe4848f2ba771a7428575f72da1 100644 (file)
 #include <set>
 #include <string>
 
+#include "common/AsyncOpTracker.h"
 #include "common/ceph_context.h"
 #include "common/Mutex.h"
-#include "common/Timer.h"
 #include "include/rados/librados.hpp"
 #include "types.h"
+#include <list>
+#include <unordered_map>
+#include <boost/functional/hash.hpp>
 #include <boost/optional.hpp>
+#include "include/assert.h"
+
+namespace librbd { struct ImageCtx; }
 
 namespace rbd {
 namespace mirror {
 
+template <typename> struct Threads;
+
 /**
  * Keeps track of images that have mirroring enabled within all
  * pools.
  */
+template <typename ImageCtxT = librbd::ImageCtx>
 class PoolWatcher {
 public:
-  PoolWatcher(librados::IoCtx &remote_io_ctx, double interval_seconds,
-             Mutex &lock, Cond &cond);
+  struct Listener {
+    virtual ~Listener() {
+    }
+
+    virtual void handle_update(const ImageIds &added_image_ids,
+                               const ImageIds &removed_image_ids) = 0;
+  };
+
+  PoolWatcher(Threads<ImageCtxT> *threads, librados::IoCtx &remote_io_ctx,
+              Listener &listener);
   ~PoolWatcher();
   PoolWatcher(const PoolWatcher&) = delete;
   PoolWatcher& operator=(const PoolWatcher&) = delete;
 
   bool is_blacklisted() const;
 
-  const ImageIds& get_images() const;
-  void refresh_images(bool reschedule=true);
+  void init(Context *on_finish = nullptr);
+  void shut_down(Context *on_finish);
 
 private:
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   *  INIT
+   *    |
+   *    v
+   * REGISTER_WATCHER
+   *    |
+   *    |/--------------------------------\
+   *    |                                 |
+   *    v                                 |
+   * REFRESH_IMAGES                       |
+   *    |                                 |
+   *    |/----------------------------\   |
+   *    |                             |   |
+   *    v                             |   |
+   * NOTIFY_LISTENER                  |   |
+   *    |                             |   |
+   *    v                             |   |
+   *  IDLE ---\                       |   |
+   *    |     |                       |   |
+   *    |     |\---> IMAGE_UPDATED    |   |
+   *    |     |         |             |   |
+   *    |     |         v             |   |
+   *    |     |      GET_IMAGE_NAME --/   |
+   *    |     |                           |
+   *    |     \----> WATCH_ERROR ---------/
+   *    v
+   * SHUT_DOWN
+   *    |
+   *    v
+   * UNREGISTER_WATCHER
+   *    |
+   *    v
+   * <finish>
+   *
+   * @endverbatim
+   */
+  class MirroringWatcher;
+
+  struct UpdatedImage {
+    std::string global_image_id;
+    std::string remote_image_id;
+    bool enabled = true;
+    bool invalid = false;
+
+    UpdatedImage(const std::string &global_image_id,
+                 const std::string &remote_image_id)
+      : global_image_id(global_image_id), remote_image_id(remote_image_id) {
+    }
+  };
+
+  typedef std::pair<std::string, std::string> GlobalRemoteIds;
+  typedef std::list<UpdatedImage> UpdatedImages;
+  typedef std::unordered_map<GlobalRemoteIds, typename UpdatedImages::iterator,
+                             boost::hash<GlobalRemoteIds> > IdToUpdatedImages;
+
+  struct StrictImageIdCompare {
+    bool operator()(const ImageId &lhs, const ImageId &rhs) const {
+      if (lhs.global_id != rhs.global_id) {
+        return lhs.global_id < rhs.global_id;
+      }
+      return lhs.id < rhs.id;
+    }
+  };
+  Threads<ImageCtxT> *m_threads;
   librados::IoCtx m_remote_io_ctx;
-  Mutex &m_lock;
-  Cond &m_refresh_cond;
+  Listener &m_listener;
+
+  ImageIds m_refresh_image_ids;
+  bufferlist m_out_bl;
+
+  mutable Mutex m_lock;
+
+  Context *m_on_init_finish = nullptr;
 
-  bool m_stopping = false;
+  ImageIds m_image_ids;
+
+  bool m_pending_updates = false;
+  bool m_notify_listener_in_progress = false;
+  ImageIds m_pending_image_ids;
+  ImageIds m_pending_added_image_ids;
+  ImageIds m_pending_removed_image_ids;
+
+  MirroringWatcher *m_mirroring_watcher;
+
+  Context *m_timer_ctx = nullptr;
+
+  AsyncOpTracker m_async_op_tracker;
   bool m_blacklisted = false;
-  SafeTimer m_timer;
-  double m_interval;
+  bool m_shutting_down = false;
+  bool m_image_ids_invalid = true;
+  bool m_refresh_in_progress = false;
+
+  UpdatedImages m_updated_images;
+  IdToUpdatedImages m_id_to_updated_images;
+  bool m_get_name_in_progress = false;
+
+  void register_watcher();
+  void handle_register_watcher(int r);
+  void unregister_watcher();
 
-  ImageIds m_images;
+  void refresh_images();
+  void handle_refresh_images(int r);
+
+  void schedule_refresh_images(double interval);
+  void processs_refresh_images();
+
+  void handle_rewatch_complete(int r);
+  void handle_image_updated(const std::string &remote_image_id,
+                            const std::string &global_image_id,
+                            bool enabled);
+
+  void schedule_get_image_name();
+  void handle_get_image_name(int r);
+
+  void schedule_listener();
+  void notify_listener();
 
-  int refresh(ImageIds *image_ids);
 };
 
 } // namespace mirror
 } // namespace rbd
 
+extern template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;
+
 #endif // CEPH_RBD_MIRROR_POOL_WATCHER_H
index 4e6af299cc2d7be587bb94403488c0aaea645c49..558df3bec7b270c1dd3eebf19c766bd7092f2784 100644 (file)
@@ -218,6 +218,7 @@ Replayer::Replayer(Threads<librbd::ImageCtx> *threads,
   m_peer(peer),
   m_args(args),
   m_local_pool_id(local_pool_id),
+  m_pool_watcher_listener(this),
   m_asok_hook(nullptr),
   m_replayer_thread(this),
   m_leader_listener(this)
@@ -242,6 +243,8 @@ Replayer::~Replayer()
   if (m_instance_watcher) {
     m_instance_watcher->shut_down();
   }
+
+  assert(!m_pool_watcher);
 }
 
 bool Replayer::is_blacklisted() const {
@@ -290,6 +293,11 @@ int Replayer::init()
 
   dout(20) << "connected to " << m_peer << dendl;
 
+  r = init_local_mirroring_images();
+  if (r < 0) {
+    return r;
+  }
+
   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
                                              &m_leader_listener));
   r = m_leader_watcher->init();
@@ -306,14 +314,6 @@ int Replayer::init()
     return r;
   }
 
-  // Bootstrap existing mirroring images
-  init_local_mirroring_images();
-
-  m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx,
-                      g_ceph_context->_conf->rbd_mirror_image_directory_refresh_interval,
-                      m_lock, m_cond));
-  m_pool_watcher->refresh_images();
-
   m_replayer_thread.create("replayer");
 
   return 0;
@@ -390,18 +390,20 @@ int Replayer::init_rados(const std::string &cluster_name,
   return 0;
 }
 
-void Replayer::init_local_mirroring_images() {
+int Replayer::init_local_mirroring_images() {
+  dout(20) << dendl;
+
   rbd_mirror_mode_t mirror_mode;
   int r = librbd::api::Mirror<>::mode_get(m_local_io_ctx, &mirror_mode);
   if (r < 0) {
     derr << "could not tell whether mirroring was enabled for "
          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-    return;
+    return r;
   }
   if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
     dout(20) << "pool " << m_local_io_ctx.get_pool_name() << " "
              << "has mirroring disabled" << dendl;
-    return;
+    return -ENOENT;
   }
 
   ImageIds image_ids;
@@ -415,16 +417,22 @@ void Replayer::init_local_mirroring_images() {
     if (r < 0) {
       derr << "error listing mirrored image directory: "
            << cpp_strerror(r) << dendl;
-      continue;
+      return r;
     }
     for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
       std::string image_name;
       r = dir_get_name(&m_local_io_ctx, RBD_DIRECTORY, it->first, &image_name);
-      if (r < 0) {
+      if (r == -ENOENT) {
+        dout(20) << "orphaned mirror image: " << it->first << dendl;
+        continue;
+      } else if (r < 0) {
         derr << "error retrieving local image name: " << cpp_strerror(r)
              << dendl;
-        continue;
+        return r;
       }
+
+      dout(20) << "local image: " << it->second << " (" << it->first << ")"
+               << dendl;
       image_ids.insert(ImageId(it->second, it->first, image_name));
     }
     if (!mirror_images.empty()) {
@@ -434,6 +442,7 @@ void Replayer::init_local_mirroring_images() {
   } while (r == max_read);
 
   m_init_image_ids = std::move(image_ids);
+  return 0;
 }
 
 void Replayer::run()
@@ -441,7 +450,6 @@ void Replayer::run()
   dout(20) << "enter" << dendl;
 
   while (!m_stopping.read()) {
-
     std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
                                  m_peer.cluster_name;
     if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
@@ -453,29 +461,33 @@ void Replayer::run()
     }
 
     Mutex::Locker locker(m_lock);
-    if (m_pool_watcher->is_blacklisted()) {
+    if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
       m_blacklisted = true;
       m_stopping.set(1);
-    } else if (!m_manual_stop && m_leader_watcher->is_leader()) {
-      set_sources(m_pool_watcher->get_images());
+      break;
     }
 
-    if (m_blacklisted) {
-      break;
+    for (auto image_it = m_image_replayers.begin();
+         image_it != m_image_replayers.end(); ) {
+      if (image_it->second->remote_images_empty()) {
+        if (stop_image_replayer(image_it->second)) {
+          image_it = m_image_replayers.erase(image_it);
+          continue;
+        }
+      } else {
+        start_image_replayer(image_it->second);
+      }
+      ++image_it;
     }
+
     m_cond.WaitInterval(m_lock,
-                       utime_t(g_ceph_context->_conf
-                               ->rbd_mirror_image_state_check_interval, 0));
+                       utime_t(g_ceph_context->_conf->
+                                  rbd_mirror_image_state_check_interval, 0));
   }
 
-  ImageIds empty_sources;
-  while (true) {
-    Mutex::Locker locker(m_lock);
-    set_sources(empty_sources);
-    if (m_image_replayers.empty()) {
-      break;
-    }
-    m_cond.WaitInterval(m_lock, seconds(1));
+  Mutex::Locker locker(m_lock);
+  while (!m_image_replayers.empty()) {
+    stop_image_replayers();
   }
 }
 
@@ -601,17 +613,24 @@ void Replayer::release_leader()
   m_leader_watcher->release_leader();
 }
 
-void Replayer::set_sources(const ImageIds &image_ids)
-{
-  dout(20) << "enter" << dendl;
+void Replayer::handle_update(const ImageIds &added_image_ids,
+                             const ImageIds &removed_image_ids) {
+  if (m_stopping.read()) {
+    return;
+  }
 
-  assert(m_lock.is_locked());
+  dout(10) << dendl;
+  Mutex::Locker locker(m_lock);
+  if (!m_leader_watcher->is_leader()) {
+    return;
+  }
 
-  if (!m_init_image_ids.empty() && !m_stopping.read() &&
-      m_leader_watcher->is_leader()) {
+  // first callback will be a full directory -- so see if we need to remove
+  // any local images that no longer exist on the remote side
+  if (!m_init_image_ids.empty()) {
     dout(20) << "scanning initial local image set" << dendl;
-    for (auto &remote_image : image_ids) {
-      auto it = m_init_image_ids.find(ImageId(remote_image.global_id));
+    for (auto &image_id : added_image_ids) {
+      auto it = m_init_image_ids.find(image_id);
       if (it != m_init_image_ids.end()) {
         m_init_image_ids.erase(it);
       }
@@ -628,30 +647,46 @@ void Replayer::set_sources(const ImageIds &image_ids)
   }
 
   // shut down replayers for non-mirrored images
-  for (auto image_it = m_image_replayers.begin();
-       image_it != m_image_replayers.end();) {
-    auto image_id_it = image_ids.find(ImageId(image_it->first));
-    if (image_id_it == image_ids.end()) {
+  for (auto &image_id : removed_image_ids) {
+    auto image_it = m_image_replayers.find(image_id.global_id);
+    if (image_it != m_image_replayers.end()) {
+      assert(!m_remote_mirror_uuid.empty());
+      image_it->second->remove_remote_image(m_remote_mirror_uuid,
+                                            image_id.id);
+
       if (image_it->second->is_running()) {
         dout(20) << "stop image replayer for remote image "
-                 << image_it->second->get_global_image_id() << dendl;
+                 << image_id.id << " (" << image_id.global_id << ")"
+                 << dendl;
       }
-      if (stop_image_replayer(image_it->second)) {
-        image_it = m_image_replayers.erase(image_it);
-        continue;
+
+      if (image_it->second->remote_images_empty() &&
+          stop_image_replayer(image_it->second)) {
+        // no additional remotes registered for this image
+        m_image_replayers.erase(image_it);
       }
     }
-    ++image_it;
   }
 
-  if (image_ids.empty()) {
+  // prune previously stopped image replayers
+  for (auto image_it = m_image_replayers.begin();
+       image_it != m_image_replayers.end(); ) {
+    if (image_it->second->remote_images_empty() &&
+        stop_image_replayer(image_it->second)) {
+      image_it = m_image_replayers.erase(image_it);
+    } else {
+      ++image_it;
+    }
+  }
+
+  if (added_image_ids.empty()) {
     return;
   }
 
   std::string local_mirror_uuid;
   int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
                                               &local_mirror_uuid);
-  if (r < 0) {
+  if (r < 0 || local_mirror_uuid.empty()) {
     derr << "failed to retrieve local mirror uuid from pool "
          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
     return;
@@ -660,18 +695,24 @@ void Replayer::set_sources(const ImageIds &image_ids)
   std::string remote_mirror_uuid;
   r = librbd::cls_client::mirror_uuid_get(&m_remote_io_ctx,
                                           &remote_mirror_uuid);
-  if (r < 0) {
+  if (r < 0 || remote_mirror_uuid.empty()) {
     derr << "failed to retrieve remote mirror uuid from pool "
          << m_remote_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
     return;
   }
+  m_remote_mirror_uuid = remote_mirror_uuid;
 
-  for (auto &image_id : image_ids) {
+  // start replayers for newly added remote image sources
+  for (auto &image_id : added_image_ids) {
     auto it = m_image_replayers.find(image_id.global_id);
     if (it == m_image_replayers.end()) {
       unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
         m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
         local_mirror_uuid, m_local_pool_id, image_id.global_id));
+      if (m_manual_stop) {
+        image_replayer->stop(nullptr, true);
+      }
+
       it = m_image_replayers.insert(
         std::make_pair(image_id.global_id, std::move(image_replayer))).first;
     }
@@ -689,11 +730,7 @@ void Replayer::set_sources(const ImageIds &image_ids)
 void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
 {
   assert(m_lock.is_locked());
-
-  std::string global_image_id = image_replayer->get_global_image_id();
-  dout(20) << "global_image_id=" << global_image_id << dendl;
-
-  if (!image_replayer->is_stopped()) {
+  if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) {
     return;
   } else if (image_replayer->is_blacklisted()) {
     derr << "blacklisted detected during image replay" << dendl;
@@ -702,6 +739,9 @@ void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer
     return;
   }
 
+  std::string global_image_id = image_replayer->get_global_image_id();
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
   FunctionContext *ctx = new FunctionContext(
       [this, global_image_id] (int r) {
         dout(20) << "image deleter result: r=" << r << ", "
@@ -770,31 +810,36 @@ bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
   return false;
 }
 
-void Replayer::handle_post_acquire_leader(Context *on_finish) {
+void Replayer::stop_image_replayers() {
   dout(20) << dendl;
 
-  {
-    Mutex::Locker locker(m_lock);
-    m_cond.Signal();
+  assert(m_lock.is_locked());
+  for (auto image_it = m_image_replayers.begin();
+       image_it != m_image_replayers.end();) {
+    if (stop_image_replayer(image_it->second)) {
+      image_it = m_image_replayers.erase(image_it);
+      continue;
+    }
+    ++image_it;
   }
-
-  on_finish->complete(0);
 }
 
-void Replayer::handle_pre_release_leader(Context *on_finish) {
+void Replayer::stop_image_replayers(Context *on_finish) {
   dout(20) << dendl;
 
   {
     Mutex::Locker locker(m_lock);
-    set_sources(ImageIds());
+    stop_image_replayers();
+
     if (!m_image_replayers.empty()) {
+      Context *ctx = new FunctionContext([this, on_finish](int r) {
+          assert(r == 0);
+          stop_image_replayers(on_finish);
+        });
+      ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
       Mutex::Locker timer_locker(m_threads->timer_lock);
-      Context *task = create_async_context_callback(
-        m_threads->work_queue, new FunctionContext(
-          [this, on_finish](int r) {
-            handle_pre_release_leader(on_finish);
-          }));
-      m_threads->timer->add_event_after(1, task);
+      m_threads->timer->add_event_after(1, ctx);
       return;
     }
   }
@@ -802,5 +847,48 @@ void Replayer::handle_pre_release_leader(Context *on_finish) {
   on_finish->complete(0);
 }
 
+void Replayer::handle_post_acquire_leader(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(!m_pool_watcher);
+  m_pool_watcher.reset(new PoolWatcher<>(
+    m_threads, m_remote_io_ctx, m_pool_watcher_listener));
+  m_pool_watcher->init(create_async_context_callback(
+    m_threads->work_queue, on_finish));
+
+  m_cond.Signal();
+}
+
+void Replayer::handle_pre_release_leader(Context *on_finish) {
+  dout(20) << dendl;
+  shut_down_pool_watcher(on_finish);
+}
+
+void Replayer::shut_down_pool_watcher(Context *on_finish) {
+  dout(20) << dendl;
+
+  Context *ctx = new FunctionContext([this, on_finish](int r) {
+      handle_shut_down_pool_watcher(r, on_finish);
+    });
+  ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+  Mutex::Locker locker(m_lock);
+  assert(m_pool_watcher);
+  m_pool_watcher->shut_down(ctx);
+}
+
+void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_pool_watcher);
+    m_pool_watcher.reset();
+  }
+
+  stop_image_replayers(on_finish);
+}
+
 } // namespace mirror
 } // namespace rbd
index abcc35694af0c3c6c7a3a674c8f11db619b3ad4b..aff667381a9574ebe23e17eb70e5dd7e6e02faa9 100644 (file)
@@ -59,18 +59,38 @@ public:
   void release_leader();
 
 private:
-  void init_local_mirroring_images();
-  void set_sources(const ImageIds &image_ids);
+  struct PoolWatcherListener : public PoolWatcher<>::Listener {
+    Replayer *replayer;
+
+    PoolWatcherListener(Replayer *replayer) : replayer(replayer) {
+    }
+
+    void handle_update(const ImageIds &added_image_ids,
+                       const ImageIds &removed_image_ids) override {
+      replayer->handle_update(added_image_ids, removed_image_ids);
+    }
+  };
+
+  int init_local_mirroring_images();
+
+  void handle_update(const ImageIds &added_image_ids,
+                     const ImageIds &removed_image_ids);
 
   void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
   bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
+  void stop_image_replayers();
+  void stop_image_replayers(Context *on_finish);
 
-  int init_rados(const std::string &cluster_name, const std::string &client_name,
+  int init_rados(const std::string &cluster_name,
+                 const std::string &client_name,
                  const std::string &description, RadosRef *rados_ref);
 
   void handle_post_acquire_leader(Context *on_finish);
   void handle_pre_release_leader(Context *on_finish);
 
+  void shut_down_pool_watcher(Context *on_finish);
+  void handle_shut_down_pool_watcher(int r, Context *on_finish);
+
   Threads<librbd::ImageCtx> *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
   ImageSyncThrottlerRef<> m_image_sync_throttler;
@@ -91,7 +111,11 @@ private:
   int64_t m_local_pool_id = -1;
   int64_t m_remote_pool_id = -1;
 
-  std::unique_ptr<PoolWatcher> m_pool_watcher;
+  std::string m_remote_mirror_uuid;
+
+  PoolWatcherListener m_pool_watcher_listener;
+  std::unique_ptr<PoolWatcher<> > m_pool_watcher;
+
   std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
 
   std::string m_asok_hook_name;
index e134c13ace5b6187751c0049b17ec2deba308af0..c387ecea789a1ccdffc5653d6fcf11cb5fd470f0 100644 (file)
@@ -52,7 +52,7 @@ void RefreshImagesRequest<I>::handle_mirror_image_list(int r) {
     r = librbd::cls_client::mirror_image_list_finish(&it, &ids);
   }
 
-  if (r < 0) {
+  if (r < 0 && r != -ENOENT) {
     derr << "failed to list mirrored images: " << cpp_strerror(r) << dendl;
     finish(r);
     return;
@@ -98,7 +98,7 @@ void RefreshImagesRequest<I>::handle_dir_list(int r) {
     r = librbd::cls_client::dir_list_finish(&it, &name_to_ids);
   }
 
-  if (r < 0) {
+  if (r < 0 && r != -ENOENT) {
     derr << "failed to list images: " << cpp_strerror(r) << dendl;
     finish(r);
     return;
index 38dd1f5d53bb9c08d0b8e996e971cee837119815..79a327cd6c7dea82a636b10bee592004de06c99c 100644 (file)
@@ -21,8 +21,8 @@ namespace pool_watcher {
 template <typename ImageCtxT = librbd::ImageCtx>
 class RefreshImagesRequest {
 public:
-  RefreshImagesRequest *create(librados::IoCtx &remote_io_ctx,
-                               ImageIds *image_ids, Context *on_finish) {
+  static RefreshImagesRequest *create(librados::IoCtx &remote_io_ctx,
+                                      ImageIds *image_ids, Context *on_finish) {
     return new RefreshImagesRequest(remote_io_ctx, image_ids, on_finish);
   }
 
index 9040f210c70a9e4f12ebac620ea64fd661e4cb62..52bf9de20fcdc46ade5c92fcf7c8ec54ca00d91c 100644 (file)
@@ -6,8 +6,13 @@
 namespace rbd {
 namespace mirror {
 
-std::ostream& operator<<(std::ostream& lhs, const peer_t &peer)
-{
+std::ostream &operator<<(std::ostream &os, const ImageId &image_id) {
+  return os << "global id=" << image_id.global_id << ", "
+            << "id=" << image_id.id << ", "
+            << "name=" << image_id.name;
+}
+
+std::ostream& operator<<(std::ostream& lhs, const peer_t &peer) {
   return lhs << "uuid: " << peer.uuid
             << " cluster: " << peer.cluster_name
             << " client: " << peer.client_name;
index 0a13049f5813c2f3984e27f72aa4597ee8b35880..617effd21cf0df0052864a7f8ea965b80219e9be 100644 (file)
@@ -31,8 +31,11 @@ struct ImageId {
 
   explicit ImageId(const std::string &global_id) : global_id(global_id) {
   }
+  ImageId(const std::string &global_id, const std::string &id)
+    : global_id(global_id), id(id) {
+  }
   ImageId(const std::string &global_id, const std::string &id,
-          const boost::optional<std::string> &name = boost::none)
+          const std::string &name)
     : global_id(global_id), id(id), name(name) {
   }
 
@@ -44,6 +47,8 @@ struct ImageId {
   }
 };
 
+std::ostream &operator<<(std::ostream &, const ImageId &image_id);
+
 typedef std::set<ImageId> ImageIds;
 
 struct peer_t {