]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: replayer should only handle a single pool
authorJason Dillaman <dillaman@redhat.com>
Fri, 27 May 2016 03:26:20 +0000 (23:26 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 27 May 2016 04:40:13 +0000 (00:40 -0400)
The pool watcher now only needs to scan a single pool for
its associated replayer since a peer is inherently tied to
a single pool.

Fixes: http://tracker.ceph.com/issues/16045
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_PoolWatcher.cc
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/PoolWatcher.cc
src/tools/rbd_mirror/PoolWatcher.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index fd739f5bd8bd64257b839797ec27897be48e7508..2a2708ed686c329507cfd8c9702da0e57624989c 100644 (file)
@@ -42,7 +42,6 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
   {
     m_cluster = std::make_shared<librados::Rados>();
     EXPECT_EQ("", connect_cluster_pp(*m_cluster));
-    m_pool_watcher.reset(new PoolWatcher(m_cluster, 30, m_lock, m_cond));
   }
 
   ~TestPoolWatcher() {
@@ -59,9 +58,12 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
     int64_t pool_id = m_cluster->pool_lookup(pool_name.c_str());
     ASSERT_GE(pool_id, 0);
     m_pools.insert(pool_name);
+
+    librados::IoCtx ioctx;
+    ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx));
+
+    m_pool_watcher.reset(new PoolWatcher(ioctx, 30, m_lock, m_cond));
     if (enable_mirroring) {
-      librados::IoCtx ioctx;
-      ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx));
       ASSERT_EQ(0, librbd::mirror_mode_set(ioctx, RBD_MIRROR_MODE_POOL));
       std::string uuid;
       ASSERT_EQ(0, librbd::mirror_peer_add(ioctx, &uuid,
@@ -73,50 +75,6 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
     }
   }
 
-  void delete_pool(const string &name, const peer_t &peer) {
-    int64_t pool_id = m_cluster->pool_lookup(name.c_str());
-    ASSERT_GE(pool_id, 0);
-    m_pools.erase(name);
-    ASSERT_EQ(0, m_cluster->pool_delete(name.c_str()));
-    m_mirrored_images.erase(pool_id);
-  }
-
-  void create_cache_pool(const string &base_pool, string *cache_pool_name) {
-    bufferlist inbl;
-    *cache_pool_name = get_temp_pool_name("test-rbd-mirror-");
-    ASSERT_EQ(0, m_cluster->pool_create(cache_pool_name->c_str()));
-
-    ASSERT_EQ(0, m_cluster->mon_command(
-      "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool +
-      "\", \"tierpool\": \"" + *cache_pool_name +
-      "\", \"force_nonempty\": \"--force-nonempty\" }",
-      inbl, NULL, NULL));
-    ASSERT_EQ(0, m_cluster->mon_command(
-      "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool +
-      "\", \"overlaypool\": \"" + *cache_pool_name + "\"}",
-      inbl, NULL, NULL));
-    ASSERT_EQ(0, m_cluster->mon_command(
-      "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + *cache_pool_name +
-      "\", \"mode\": \"writeback\"}",
-      inbl, NULL, NULL));
-    m_cluster->wait_for_latest_osdmap();
-  }
-
-  void remove_cache_pool(const string &base_pool, const string &cache_pool) {
-    bufferlist inbl;
-    // tear down tiers
-    ASSERT_EQ(0, m_cluster->mon_command(
-      "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool +
-      "\"}",
-      inbl, NULL, NULL));
-    ASSERT_EQ(0, m_cluster->mon_command(
-      "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool +
-      "\", \"tierpool\": \"" + cache_pool + "\"}",
-      inbl, NULL, NULL));
-    m_cluster->wait_for_latest_osdmap();
-    m_cluster->pool_delete(cache_pool.c_str());
-  }
-
   string get_image_id(librados::IoCtx *ioctx, const string &image_name) {
     string obj = librbd::util::id_obj_name(image_name);
     string id;
@@ -148,7 +106,7 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
                                                sizeof(mirror_image_info)));
       image.close();
 
-      m_mirrored_images[ioctx.get_id()].insert(PoolWatcher::ImageIds(
+      m_mirrored_images.insert(PoolWatcher::ImageId(
         get_image_id(&ioctx, name), name, mirror_image_info.global_id));
     }
     if (image_name != nullptr)
@@ -193,7 +151,7 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
                                                sizeof(mirror_image_info)));
       image.close();
 
-      m_mirrored_images[cioctx.get_id()].insert(PoolWatcher::ImageIds(
+      m_mirrored_images.insert(PoolWatcher::ImageId(
         get_image_id(&cioctx, name), name, mirror_image_info.global_id));
     }
     if (image_name != nullptr)
@@ -212,23 +170,23 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
   unique_ptr<PoolWatcher> m_pool_watcher;
 
   set<string> m_pools;
-  PoolWatcher::PoolImageIds m_mirrored_images;
+  PoolWatcher::ImageIds m_mirrored_images;
 
   uint64_t m_image_number;
   uint64_t m_snap_number;
 };
 
-TEST_F(TestPoolWatcher, NoPools) {
+TEST_F(TestPoolWatcher, EmptyPool) {
+  string uuid1 = "00000000-0000-0000-0000-000000000001";
+  peer_t site1(uuid1, "site1", "mirror1");
+  create_pool(true, site1);
   check_images();
 }
 
 TEST_F(TestPoolWatcher, ReplicatedPools) {
   string uuid1 = "00000000-0000-0000-0000-000000000001";
-  string uuid2 = "20000000-2222-2222-2222-000000000002";
   peer_t site1(uuid1, "site1", "mirror1");
-  peer_t site2(uuid2, "site2", "mirror2");
   string first_pool, local_pool, last_pool;
-  check_images();
   create_pool(true, site1, &first_pool);
   check_images();
   create_image(first_pool);
@@ -242,50 +200,4 @@ TEST_F(TestPoolWatcher, ReplicatedPools) {
   check_images();
   create_image(first_pool, false);
   check_images();
-
-  create_pool(false, peer_t(), &local_pool);
-  check_images();
-  create_image(local_pool, false);
-  check_images();
-  clone_image(first_pool, parent_image2, local_pool, false);
-  check_images();
-  create_pool(true, site2);
-  check_images();
-
-  create_pool(true, site2, &last_pool);
-  check_images();
-  clone_image(first_pool, parent_image2, last_pool);
-  check_images();
-  create_image(last_pool);
-  check_images();
-  delete_pool(last_pool, site2);
-  check_images();
-  delete_pool(first_pool, site1);
-  check_images();
-}
-
-TEST_F(TestPoolWatcher, CachePools) {
-  peer_t site1("11111111-1111-1111-1111-111111111111", "site1", "mirror1");
-  string base1, base2, cache1, cache2;
-  create_pool(true, site1, &base1);
-  check_images();
-
-  create_cache_pool(base1, &cache1);
-  BOOST_SCOPE_EXIT( base1, cache1, this_ ) {
-    this_->remove_cache_pool(base1, cache1);
-  } BOOST_SCOPE_EXIT_END;
-  check_images();
-  create_image(base1);
-  check_images();
-  create_image(base1, false);
-  check_images();
-
-  create_pool(false, peer_t(), &base2);
-  create_cache_pool(base2, &cache2);
-  BOOST_SCOPE_EXIT( base2, cache2, this_ ) {
-    this_->remove_cache_pool(base2, cache2);
-  } BOOST_SCOPE_EXIT_END;
-  check_images();
-  create_image(base2, false);
-  check_images();
 }
index 21dac318bf064194c2b33877fed620e144f52e04..71e64edf8d001fd007c841908a94e8f85c2622ed 100644 (file)
@@ -345,18 +345,16 @@ void Mirror::update_replayers(const PoolPeers &pool_peers)
 
   // remove stale replayers before creating new replayers
   for (auto it = m_replayers.begin(); it != m_replayers.end();) {
-    auto next_it(it);
-    ++next_it;
-
     auto &peer = it->first.second;
     auto pool_peer_it = pool_peers.find(it->first.first);
     if (pool_peer_it == pool_peers.end() ||
         pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
       dout(20) << "removing replayer for " << peer << dendl;
       // TODO: make async
-      m_replayers.erase(it);
+      it = m_replayers.erase(it);
+    } else {
+      ++it;
     }
-    it = next_it;
   }
 
   for (auto &kv : pool_peers) {
@@ -365,7 +363,8 @@ void Mirror::update_replayers(const PoolPeers &pool_peers)
       if (m_replayers.find(pool_peer) == m_replayers.end()) {
         dout(20) << "starting replayer for " << peer << dendl;
         unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
-                                                   m_local, peer, m_args));
+                                                   m_local, kv.first, peer,
+                                                   m_args));
         // TODO: make async, and retry connecting within replayer
         int r = replayer->init();
         if (r < 0) {
index 21ccc2df7aa9222beee6704f231f740ff1d00510..184e947fd4cbef5f5cf58f3ecc7ecd8eeb9224f4 100644 (file)
@@ -29,15 +29,15 @@ using librbd::cls_client::mirror_image_list;
 namespace rbd {
 namespace mirror {
 
-PoolWatcher::PoolWatcher(RadosRef cluster, double interval_seconds,
+PoolWatcher::PoolWatcher(librados::IoCtx &remote_io_ctx,
+                         double interval_seconds,
                         Mutex &lock, Cond &cond) :
   m_lock(lock),
   m_refresh_cond(cond),
-  m_stopping(false),
-  m_cluster(cluster),
   m_timer(g_ceph_context, m_lock, false),
   m_interval(interval_seconds)
 {
+  m_remote_io_ctx.dup(remote_io_ctx);
   m_timer.init();
 }
 
@@ -48,7 +48,7 @@ PoolWatcher::~PoolWatcher()
   m_timer.shutdown();
 }
 
-const PoolWatcher::PoolImageIds& PoolWatcher::get_images() const
+const PoolWatcher::ImageIds& PoolWatcher::get_images() const
 {
   assert(m_lock.is_locked());
   return m_images;
@@ -56,106 +56,74 @@ const PoolWatcher::PoolImageIds& PoolWatcher::get_images() const
 
 void PoolWatcher::refresh_images(bool reschedule)
 {
+  ImageIds image_ids;
+  refresh(&image_ids);
+
+  Mutex::Locker l(m_lock);
+  m_images = std::move(image_ids);
+
+  if (!m_stopping && reschedule) {
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&PoolWatcher::refresh_images, this, true));
+    m_timer.add_event_after(m_interval, ctx);
+  }
+  m_refresh_cond.Signal();
+  // TODO: perhaps use a workqueue instead, once we get notifications
+  // about new/removed mirrored images
+}
+
+void PoolWatcher::refresh(ImageIds *image_ids) {
   dout(20) << "enter" << dendl;
-  PoolImageIds images;
-  list<pair<int64_t, string> > pools;
-  int r = m_cluster->pool_list2(pools);
+
+  std::string pool_name = m_remote_io_ctx.get_pool_name();
+  rbd_mirror_mode_t mirror_mode;
+  int r = librbd::mirror_mode_get(m_remote_io_ctx, &mirror_mode);
   if (r < 0) {
-    derr << "error listing pools: " << cpp_strerror(r) << dendl;
+    derr << "could not tell whether mirroring was enabled for "
+         << pool_name << ": " << cpp_strerror(r) << dendl;
+    return;
+  }
+  if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
+    dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl;
     return;
   }
 
-  for (auto kv : pools) {
-    int64_t pool_id = kv.first;
-    string pool_name = kv.second;
-    int64_t base_tier;
-    r = m_cluster->pool_get_base_tier(pool_id, &base_tier);
-    if (r == -ENOENT) {
-      dout(10) << "pool " << pool_name << " no longer exists" << dendl;
-      continue;
-    } else if (r < 0) {
-      derr << "Error retrieving base tier for pool " << pool_name << dendl;
-      continue;
-    }
-    if (pool_id != base_tier) {
-      // pool is a cache; skip it
-      continue;
-    }
-
-    IoCtx ioctx;
-    r = m_cluster->ioctx_create2(pool_id, ioctx);
-    if (r == -ENOENT) {
-      dout(10) << "pool " << pool_name << " no longer exists" << dendl;
-      continue;
-    } else if (r < 0) {
-      derr << "Error accessing pool " << pool_name << cpp_strerror(r) << dendl;
-      continue;
-    }
+  std::map<std::string, std::string> images_map;
+  r = librbd::list_images_v2(m_remote_io_ctx, images_map);
+  if (r < 0) {
+    derr << "error retrieving image names from pool " << pool_name << ": "
+         << cpp_strerror(r) << dendl;
+  }
 
-    rbd_mirror_mode_t mirror_mode;
-    r = librbd::mirror_mode_get(ioctx, &mirror_mode);
-    if (r < 0) {
-      derr << "could not tell whether mirroring was enabled for " << pool_name
-          << " : " << cpp_strerror(r) << dendl;
-      continue;
-    }
-    if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
-      dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl;
-      continue;
-    }
+  std::map<std::string, std::string> image_id_to_name;
+  for (const auto& img_pair : images_map) {
+    image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first));
+  }
 
-    std::map<std::string, std::string> images_map;
-    r = librbd::list_images_v2(ioctx, images_map);
+  std::string last_read = "";
+  int max_read = 1024;
+  do {
+    std::map<std::string, std::string> mirror_images;
+    r =  mirror_image_list(&m_remote_io_ctx, last_read, max_read,
+                           &mirror_images);
     if (r < 0) {
-      derr << "error retrieving image names from pool " << pool_name << ": "
+      derr << "error listing mirrored image directory: "
            << cpp_strerror(r) << dendl;
+      continue;
     }
-
-    std::map<std::string, std::string> image_id_to_name;
-    for (const auto& img_pair : images_map) {
-      image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first));
-    }
-
-    std::set<ImageIds> image_ids;
-    std::string last_read = "";
-    int max_read = 1024;
-    do {
-      std::map<std::string, std::string> mirror_images;
-      r =  mirror_image_list(&ioctx, last_read, max_read, &mirror_images);
-      if (r < 0) {
-        derr << "error listing mirrored image directory: "
-             << cpp_strerror(r) << dendl;
-        continue;
-      }
-      for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
-        boost::optional<std::string> image_name(boost::none);
-        auto it2 = image_id_to_name.find(it->first);
-        if (it2 != image_id_to_name.end()) {
-          image_name = it2->second;
-        }
-        image_ids.insert(ImageIds(it->first, image_name, it->second));
-      }
-      if (!mirror_images.empty()) {
-        last_read = mirror_images.rbegin()->first;
+    for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
+      boost::optional<std::string> image_name(boost::none);
+      auto it2 = image_id_to_name.find(it->first);
+      if (it2 != image_id_to_name.end()) {
+        image_name = it2->second;
       }
-      r = mirror_images.size();
-    } while (r == max_read);
-
-    if (!image_ids.empty()) {
-      images[pool_id] = std::move(image_ids);
+      image_ids->insert(ImageId(it->first, image_name, it->second));
     }
-  }
-
-  Mutex::Locker l(m_lock);
-  m_images = std::move(images);
-  if (!m_stopping && reschedule) {
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&PoolWatcher::refresh_images, this, true));
-    m_timer.add_event_after(m_interval, ctx);
-  }
-  m_refresh_cond.Signal();
-  // TODO: perhaps use a workqueue instead, once we get notifications
-  // about new/removed mirrored images
+    if (!mirror_images.empty()) {
+      last_read = mirror_images.rbegin()->first;
+    }
+    r = mirror_images.size();
+  } while (r == max_read);
 }
 
 } // namespace mirror
index 9eeb1015bbeb5a0ac48d376147d2700014bd4b80..9983f9d56edfd36a3e641b383ee0fe431345ada6 100644 (file)
@@ -24,45 +24,47 @@ namespace mirror {
  */
 class PoolWatcher {
 public:
-  struct ImageIds {
+  struct ImageId {
     std::string id;
     boost::optional<std::string> name;
     std::string global_id;
 
-    ImageIds(const std::string &id,
-             const boost::optional<std::string> &name = boost::none,
-             const std::string &global_id = "")
+    ImageId(const std::string &id,
+            const boost::optional<std::string> &name = boost::none,
+            const std::string &global_id = "")
       : id(id), name(name), global_id(global_id) {
     }
 
-    inline bool operator==(const ImageIds &rhs) const {
+    inline bool operator==(const ImageId &rhs) const {
       return (id == rhs.id && name == rhs.name && global_id == rhs.global_id);
     }
-    inline bool operator<(const ImageIds &rhs) const {
+    inline bool operator<(const ImageId &rhs) const {
       return id < rhs.id;
     }
   };
-  typedef std::map<int64_t, std::set<ImageIds> > PoolImageIds;
+  typedef std::set<ImageId> ImageIds;
 
-  PoolWatcher(RadosRef cluster, double interval_seconds,
+  PoolWatcher(librados::IoCtx &remote_io_ctx, double interval_seconds,
              Mutex &lock, Cond &cond);
   ~PoolWatcher();
   PoolWatcher(const PoolWatcher&) = delete;
   PoolWatcher& operator=(const PoolWatcher&) = delete;
 
-  const PoolImageIds& get_images() const;
+  const ImageIds& get_images() const;
   void refresh_images(bool reschedule=true);
 
 private:
+  librados::IoCtx m_remote_io_ctx;
   Mutex &m_lock;
   Cond &m_refresh_cond;
-  bool m_stopping;
 
-  RadosRef m_cluster;
+  bool m_stopping = false;
   SafeTimer m_timer;
   double m_interval;
 
-  PoolImageIds m_images;
+  ImageIds m_images;
+
+  void refresh(ImageIds *image_ids);
 };
 
 } // namespace mirror
index 8dd3a0dfa7f6496bf5cfa1f2a435ca70f16e64cb..f3d4b03975fa7835753c3e741ff74dc705f8b769 100644 (file)
@@ -229,8 +229,8 @@ private:
 };
 
 Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
-                   RadosRef local_cluster, const peer_t &peer,
-                   const std::vector<const char*> &args) :
+                   RadosRef local_cluster, int64_t local_pool_id,
+                   const peer_t &peer, const std::vector<const char*> &args) :
   m_threads(threads),
   m_image_deleter(image_deleter),
   m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
@@ -238,6 +238,7 @@ Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter
   m_args(args),
   m_local(local_cluster),
   m_remote(new librados::Rados),
+  m_local_pool_id(local_pool_id),
   m_asok_hook(nullptr),
   m_replayer_thread(this)
 {
@@ -263,6 +264,13 @@ int Replayer::init()
 {
   dout(20) << "replaying for " << m_peer << dendl;
 
+  int r = m_local->ioctx_create2(m_local_pool_id, m_local_io_ctx);
+  if (r < 0) {
+    derr << "error accessing local pool " << m_local_pool_id << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
   // NOTE: manually bootstrap a CephContext here instead of via
   // the librados API to avoid mixing global singletons between
   // the librados shared library and the daemon
@@ -279,7 +287,7 @@ int Replayer::init()
   cct->_conf->cluster = m_peer.cluster_name;
 
   // librados::Rados::conf_read_file
-  int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
+  r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
   if (r < 0) {
     derr << "could not read ceph conf for " << m_peer << ": "
         << cpp_strerror(r) << dendl;
@@ -326,13 +334,22 @@ int Replayer::init()
     return r;
   }
 
+  r = m_remote->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
+                             m_remote_io_ctx);
+  if (r < 0) {
+    derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
+         << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  m_remote_pool_id = m_remote_io_ctx.get_id();
+
   dout(20) << "connected to " << m_peer << dendl;
 
   // Bootstrap existing mirroring images
   init_local_mirroring_images();
 
   // TODO: make interval configurable
-  m_pool_watcher.reset(new PoolWatcher(m_remote, 30, m_lock, m_cond));
+  m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, 30, m_lock, m_cond));
   m_pool_watcher->refresh_images();
 
   m_replayer_thread.create("replayer");
@@ -341,101 +358,49 @@ int Replayer::init()
 }
 
 void Replayer::init_local_mirroring_images() {
-  list<pair<int64_t, string> > pools;
-  int r = m_local->pool_list2(pools);
+  rbd_mirror_mode_t mirror_mode;
+  int r = librbd::mirror_mode_get(m_local_io_ctx, &mirror_mode);
   if (r < 0) {
-    derr << "error listing pools: " << cpp_strerror(r) << dendl;
+    derr << "could not tell whether mirroring was enabled for "
+         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+    return;
+  }
+  if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
+    dout(20) << "pool " << m_local_io_ctx.get_pool_name() << " "
+             << "has mirroring disabled" << dendl;
     return;
   }
 
-  for (auto kv : pools) {
-    int64_t pool_id = kv.first;
-    string pool_name = kv.second;
-    int64_t base_tier;
-    r = m_local->pool_get_base_tier(pool_id, &base_tier);
-    if (r == -ENOENT) {
-      dout(10) << "pool " << pool_name << " no longer exists" << dendl;
-      continue;
-    } else if (r < 0) {
-      derr << "Error retrieving base tier for pool " << pool_name << dendl;
-      continue;
-    }
-    if (pool_id != base_tier) {
-      // pool is a cache; skip it
-      continue;
-    }
-
-    librados::IoCtx ioctx;
-    r = m_local->ioctx_create2(pool_id, ioctx);
-    if (r == -ENOENT) {
-      dout(10) << "pool " << pool_name << " no longer exists" << dendl;
-      continue;
-    } else if (r < 0) {
-      derr << "Error accessing pool " << pool_name << cpp_strerror(r) << dendl;
-      continue;
-    }
+  std::set<InitImageInfo> images;
 
-    rbd_mirror_mode_t mirror_mode;
-    r = librbd::mirror_mode_get(ioctx, &mirror_mode);
+  std::string last_read = "";
+  int max_read = 1024;
+  do {
+    std::map<std::string, std::string> mirror_images;
+    r = librbd::cls_client::mirror_image_list(&m_local_io_ctx, last_read,
+                                              max_read, &mirror_images);
     if (r < 0) {
-      derr << "could not tell whether mirroring was enabled for " << pool_name
-          << " : " << cpp_strerror(r) << dendl;
-      continue;
-    }
-    if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
-      dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl;
-      continue;
-    }
-
-    librados::IoCtx remote_ioctx;
-    r = m_remote->ioctx_create(ioctx.get_pool_name().c_str(), remote_ioctx);
-    if (r < 0 && r != -ENOENT) {
-      dout(10) << "Error connecting to remote pool " << ioctx.get_pool_name()
-               << ": " << cpp_strerror(r) << dendl;
+      derr << "error listing mirrored image directory: "
+           << cpp_strerror(r) << dendl;
       continue;
-    } else if (r == -ENOENT) {
-      // remote pool does not exist anymore, we are going to add the images
-      // with local pool id
-      pool_id = ioctx.get_id();
-    }
-    else {
-      pool_id = remote_ioctx.get_id();
     }
-
-    std::set<InitImageInfo> images;
-
-    std::string last_read = "";
-    int max_read = 1024;
-    do {
-      std::map<std::string, std::string> mirror_images;
-      r = librbd::cls_client::mirror_image_list(&ioctx, last_read, max_read,
-                                                &mirror_images);
+    for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
+      std::string image_name;
+      r = dir_get_name(&m_local_io_ctx, RBD_DIRECTORY, it->first, &image_name);
       if (r < 0) {
-        derr << "error listing mirrored image directory: "
-             << cpp_strerror(r) << dendl;
+        derr << "error retrieving local image name: " << cpp_strerror(r)
+             << dendl;
         continue;
       }
-      for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
-        std::string image_name;
-        r = dir_get_name(&ioctx, RBD_DIRECTORY, it->first, &image_name);
-        if (r < 0) {
-          derr << "error retrieving local image name: " << cpp_strerror(r)
-               << dendl;
-          continue;
-        }
-        images.insert(InitImageInfo(it->second, ioctx.get_id(), it->first,
-                                    image_name));
-      }
-      if (!mirror_images.empty()) {
-        last_read = mirror_images.rbegin()->first;
-      }
-      r = mirror_images.size();
-    } while (r == max_read);
-
-    if (!images.empty()) {
-      m_init_images[pool_id] = std::move(images);
+      images.insert(InitImageInfo(it->second, it->first, image_name));
     }
-  }
+    if (!mirror_images.empty()) {
+      last_read = mirror_images.rbegin()->first;
+    }
+    r = mirror_images.size();
+  } while (r == max_read);
+
+  m_init_images = std::move(images);
 }
 
 void Replayer::run()
@@ -452,11 +417,11 @@ void Replayer::run()
 
   m_image_deleter.reset();
 
-  PoolImageIds empty_sources;
+  ImageIds empty_sources;
   while (true) {
     Mutex::Locker l(m_lock);
     set_sources(empty_sources);
-    if (m_images.empty()) {
+    if (m_image_replayers.empty()) {
       break;
     }
     m_cond.WaitInterval(g_ceph_context, m_lock, seconds(1));
@@ -475,12 +440,9 @@ void Replayer::print_status(Formatter *f, stringstream *ss)
     f->open_array_section("image_replayers");
   };
 
-  for (auto it = m_images.begin(); it != m_images.end(); it++) {
-    auto &pool_images = it->second;
-    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
-      auto &image_replayer = i->second;
-      image_replayer->print_status(f, ss);
-    }
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->print_status(f, ss);
   }
 
   if (f) {
@@ -502,12 +464,9 @@ void Replayer::start()
 
   m_manual_stop = false;
 
-  for (auto it = m_images.begin(); it != m_images.end(); it++) {
-    auto &pool_images = it->second;
-    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
-      auto &image_replayer = i->second;
-      image_replayer->start(nullptr, nullptr, true);
-    }
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->start(nullptr, nullptr, true);
   }
 }
 
@@ -523,12 +482,9 @@ void Replayer::stop()
 
   m_manual_stop = true;
 
-  for (auto it = m_images.begin(); it != m_images.end(); it++) {
-    auto &pool_images = it->second;
-    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
-      auto &image_replayer = i->second;
-      image_replayer->stop(nullptr, true);
-    }
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->stop(nullptr, true);
   }
 }
 
@@ -544,12 +500,9 @@ void Replayer::restart()
 
   m_manual_stop = false;
 
-  for (auto it = m_images.begin(); it != m_images.end(); it++) {
-    auto &pool_images = it->second;
-    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
-      auto &image_replayer = i->second;
-      image_replayer->restart();
-    }
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->restart();
   }
 }
 
@@ -563,184 +516,122 @@ void Replayer::flush()
     return;
   }
 
-  for (auto it = m_images.begin(); it != m_images.end(); it++) {
-    auto &pool_images = it->second;
-    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
-      auto &image_replayer = i->second;
-      image_replayer->flush();
-    }
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->flush();
   }
 }
 
-void Replayer::set_sources(const PoolImageIds &pool_image_ids)
+void Replayer::set_sources(const ImageIds &image_ids)
 {
   dout(20) << "enter" << dendl;
 
   assert(m_lock.is_locked());
 
   if (!m_init_images.empty()) {
-    dout(20) << "m_init_images has images!" << dendl;
-    for (auto it = m_init_images.begin(); it != m_init_images.end(); ++it) {
-      int64_t pool_id = it->first;
-      std::set<InitImageInfo>& images = it->second;
-      auto remote_pool_it = pool_image_ids.find(pool_id);
-      if (remote_pool_it != pool_image_ids.end()) {
-        const std::set<ImageIds>& remote_images = remote_pool_it->second;
-        for (const auto& remote_image : remote_images) {
-          auto image = images.find(InitImageInfo(remote_image.global_id));
-          if (image != images.end()) {
-            images.erase(image);
-          }
-        }
+    dout(20) << "scanning initial local image set" << dendl;
+    for (auto &remote_image : image_ids) {
+      auto it = m_init_images.find(InitImageInfo(remote_image.global_id));
+      if (it != m_init_images.end()) {
+        m_init_images.erase(it);
       }
     }
+
     // the remaining images in m_init_images must be deleted
-    for (auto it = m_init_images.begin(); it != m_init_images.end(); ++it) {
-      for (const auto& image : it->second) {
-        dout(20) << "scheduling the deletion of init image: "
-                 << image.name << dendl;
-        m_image_deleter->schedule_image_delete(image.pool_id, image.id,
-                                               image.name, image.global_id);
-      }
+    for (auto &image : m_init_images) {
+      dout(20) << "scheduling the deletion of init image: "
+               << image.name << dendl;
+      m_image_deleter->schedule_image_delete(m_local_pool_id, image.id,
+                                             image.name, image.global_id);
     }
     m_init_images.clear();
-  } else {
-    dout(20) << "m_init_images is empty!" << dendl;
   }
 
-  for (auto it = m_images.begin(); it != m_images.end();) {
-    int64_t pool_id = it->first;
-    auto &pool_images = it->second;
-
-    // pool has no mirrored images
-    if (pool_image_ids.find(pool_id) == pool_image_ids.end()) {
-      dout(20) << "pool " << pool_id << " has no mirrored images" << dendl;
-      for (auto images_it = pool_images.begin();
-          images_it != pool_images.end();) {
-        if (images_it->second->is_running()) {
-          dout(20) << "stop image replayer for "
-                   << images_it->second->get_global_image_id() << dendl;
-        }
-       if (stop_image_replayer(images_it->second)) {
-         images_it = pool_images.erase(images_it);
-       } else {
-          ++images_it;
-        }
+  // shut down replayers for non-mirrored images
+  bool existing_image_replayers = !m_image_replayers.empty();
+  for (auto image_it = m_image_replayers.begin();
+       image_it != m_image_replayers.end();) {
+    if (image_ids.find(ImageId(image_it->first)) == image_ids.end()) {
+      if (image_it->second->is_running()) {
+        dout(20) << "stop image replayer for "
+                 << image_it->second->get_global_image_id() << dendl;
       }
-      if (pool_images.empty()) {
-       mirror_image_status_shut_down(pool_id);
-       it = m_images.erase(it);
-      } else {
-        ++it;
-      }
-      continue;
-    }
-
-    // shut down replayers for non-mirrored images
-    for (auto images_it = pool_images.begin();
-        images_it != pool_images.end();) {
-      auto &image_ids = pool_image_ids.at(pool_id);
-      if (image_ids.find(ImageIds(images_it->first)) == image_ids.end()) {
-        if (images_it->second->is_running()) {
-          dout(20) << "stop image replayer for "
-                   << images_it->second->get_global_image_id() << dendl;
-        }
-       if (stop_image_replayer(images_it->second)) {
-         images_it = pool_images.erase(images_it);
-       } else {
-         ++images_it;
-       }
-      } else {
-       ++images_it;
+      if (stop_image_replayer(image_it->second)) {
+        image_it = m_image_replayers.erase(image_it);
+        continue;
       }
     }
-    ++it;
+    ++image_it;
   }
 
-  // (re)start new image replayers
-  for (const auto &kv : pool_image_ids) {
-    int64_t pool_id = kv.first;
-
-    // TODO: clean up once remote peer -> image replayer refactored
-    librados::IoCtx remote_ioctx;
-    int r = m_remote->ioctx_create2(pool_id, remote_ioctx);
-    if (r < 0) {
-      derr << "failed to lookup remote pool " << pool_id << ": "
-           << cpp_strerror(r) << dendl;
-      continue;
+  if (image_ids.empty()) {
+    if (existing_image_replayers && m_image_replayers.empty()) {
+      mirror_image_status_shut_down();
     }
+    return;
+  }
 
-    librados::IoCtx local_ioctx;
-    r = m_local->ioctx_create(remote_ioctx.get_pool_name().c_str(), local_ioctx);
-    if (r < 0) {
-      derr << "failed to lookup local pool " << remote_ioctx.get_pool_name()
-           << ": " << cpp_strerror(r) << dendl;
-      continue;
-    }
+  std::string local_mirror_uuid;
+  int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
+                                              &local_mirror_uuid);
+  if (r < 0) {
+    derr << "failed to retrieve local mirror uuid from pool "
+         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+    return;
+  }
 
-    std::string local_mirror_uuid;
-    r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &local_mirror_uuid);
-    if (r < 0) {
-      derr << "failed to retrieve local mirror uuid from pool "
-        << local_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-      continue;
-    }
+  std::string remote_mirror_uuid;
+  r = librbd::cls_client::mirror_uuid_get(&m_remote_io_ctx,
+                                          &remote_mirror_uuid);
+  if (r < 0) {
+    derr << "failed to retrieve remote mirror uuid from pool "
+         << m_remote_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+    return;
+  }
 
-    std::string remote_mirror_uuid;
-    r = librbd::cls_client::mirror_uuid_get(&remote_ioctx, &remote_mirror_uuid);
+  if (m_image_replayers.empty()) {
+    // create entry for pool if it doesn't exist
+    r = mirror_image_status_init();
     if (r < 0) {
-      derr << "failed to retrieve remote mirror uuid from pool "
-        << remote_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-      continue;
+      return;
     }
+  }
 
-    // create entry for pool if it doesn't exist
-    auto &pool_replayers = m_images[pool_id];
-
-    if (pool_replayers.empty()) {
-      r = mirror_image_status_init(pool_id, local_ioctx);
-      if (r < 0) {
-       continue;
-      }
+  for (auto &image_id : image_ids) {
+    auto it = m_image_replayers.find(image_id.id);
+    if (it == m_image_replayers.end()) {
+      unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
+        m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid,
+        m_local_pool_id, m_remote_pool_id, image_id.id, image_id.global_id));
+      it = m_image_replayers.insert(
+        std::make_pair(image_id.id, std::move(image_replayer))).first;
     }
-
-    for (const auto &image_id : kv.second) {
-      auto it = pool_replayers.find(image_id.id);
-      if (it == pool_replayers.end()) {
-       unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
-          m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid,
-          local_ioctx.get_id(), pool_id, image_id.id, image_id.global_id));
-       it = pool_replayers.insert(
-         std::make_pair(image_id.id, std::move(image_replayer))).first;
-      }
-      if (!it->second->is_running()) {
-        dout(20) << "starting image replayer for "
-                 << it->second->get_global_image_id() << dendl;
-      }
-      start_image_replayer(it->second, image_id.name);
+    if (!it->second->is_running()) {
+      dout(20) << "starting image replayer for "
+               << it->second->get_global_image_id() << dendl;
     }
+    start_image_replayer(it->second, image_id.name);
   }
 }
 
-int Replayer::mirror_image_status_init(int64_t pool_id,
-                                      librados::IoCtx& ioctx) {
-  assert(m_status_watchers.find(pool_id) == m_status_watchers.end());
-
-  uint64_t instance_id = librados::Rados(ioctx).get_instance_id();
+int Replayer::mirror_image_status_init() {
+  assert(!m_status_watcher);
 
-  dout(20) << "pool_id=" << pool_id << ", instance_id=" << instance_id << dendl;
+  uint64_t instance_id = librados::Rados(m_local_io_ctx).get_instance_id();
+  dout(20) << "pool_id=" << m_local_pool_id << ", "
+           << "instance_id=" << instance_id << dendl;
 
   librados::ObjectWriteOperation op;
   librbd::cls_client::mirror_image_status_remove_down(&op);
-  int r = ioctx.operate(RBD_MIRRORING, &op);
+  int r = m_local_io_ctx.operate(RBD_MIRRORING, &op);
   if (r < 0) {
     derr << "error initializing " << RBD_MIRRORING << "object: "
         << cpp_strerror(r) << dendl;
     return r;
   }
 
-  unique_ptr<MirrorStatusWatchCtx>
-    watch_ctx(new MirrorStatusWatchCtx(ioctx, m_threads->work_queue));
+  unique_ptr<MirrorStatusWatchCtx> watch_ctx(
+    new MirrorStatusWatchCtx(m_local_io_ctx, m_threads->work_queue));
 
   r = watch_ctx->register_watch();
   if (r < 0) {
@@ -749,22 +640,19 @@ int Replayer::mirror_image_status_init(int64_t pool_id,
     return r;
   }
 
-  m_status_watchers.insert(std::make_pair(pool_id, std::move(watch_ctx)));
-
+  m_status_watcher = std::move(watch_ctx);
   return 0;
 }
 
-void Replayer::mirror_image_status_shut_down(int64_t pool_id) {
-  auto watcher_it = m_status_watchers.find(pool_id);
-  assert(watcher_it != m_status_watchers.end());
+void Replayer::mirror_image_status_shut_down() {
+  assert(m_status_watcher);
 
-  int r = watcher_it->second->unregister_watch();
+  int r = m_status_watcher->unregister_watch();
   if (r < 0) {
-    derr << "error unregistering watcher for " << watcher_it->second->get_oid()
+    derr << "error unregistering watcher for " << m_status_watcher->get_oid()
         << " object: " << cpp_strerror(r) << dendl;
   }
-
-  m_status_watchers.erase(watcher_it);
+  m_status_watcher.reset();
 }
 
 void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer,
index ff01fca5269c21f2c030c527c6b32cfef28d6218..44b49e763f756f66e2a14b8c3835b9151cfec89f 100644 (file)
@@ -34,7 +34,7 @@ class MirrorStatusWatchCtx;
 class Replayer {
 public:
   Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
-           RadosRef local_cluster, const peer_t &peer,
+           RadosRef local_cluster, int64_t local_pool_id, const peer_t &peer,
            const std::vector<const char*> &args);
   ~Replayer();
   Replayer(const Replayer&) = delete;
@@ -50,18 +50,18 @@ public:
   void flush();
 
 private:
+  typedef PoolWatcher::ImageId ImageId;
   typedef PoolWatcher::ImageIds ImageIds;
-  typedef PoolWatcher::PoolImageIds PoolImageIds;
 
   void init_local_mirroring_images();
-  void set_sources(const PoolImageIds &pool_image_ids);
+  void set_sources(const ImageIds &image_ids);
 
   void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer,
                             const boost::optional<std::string>& image_name);
   bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
 
-  int mirror_image_status_init(int64_t pool_id, librados::IoCtx& ioctx);
-  void mirror_image_status_shut_down(int64_t pool_id);
+  int mirror_image_status_init();
+  void mirror_image_status_shut_down();
 
   Threads *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
@@ -72,35 +72,39 @@ private:
 
   peer_t m_peer;
   std::vector<const char*> m_args;
-  RadosRef m_local, m_remote;
+  RadosRef m_local;
+  RadosRef m_remote;
+
+  librados::IoCtx m_local_io_ctx;
+  librados::IoCtx m_remote_io_ctx;
+
+  int64_t m_local_pool_id = -1;
+  int64_t m_remote_pool_id = -1;
+
   std::unique_ptr<PoolWatcher> m_pool_watcher;
-  // index by pool so it's easy to tell what is affected
-  // when a pool's configuration changes
-  std::map<int64_t, std::map<std::string,
-                            std::unique_ptr<ImageReplayer<> > > > m_images;
-  std::map<int64_t, std::unique_ptr<MirrorStatusWatchCtx> > m_status_watchers;
+  std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
+  std::unique_ptr<MirrorStatusWatchCtx> m_status_watcher;
+
   ReplayerAdminSocketHook *m_asok_hook;
   struct InitImageInfo {
     std::string global_id;
-    int64_t pool_id;
     std::string id;
     std::string name;
 
-    InitImageInfo(const std::string& global_id, int64_t pool_id = 0,
-             const std::string &id = "", const std::string &name = "")
-      : global_id(global_id), pool_id(pool_id), id(id), name(name) {
+    InitImageInfo(const std::string& global_id, const std::string &id = "",
+                  const std::string &name = "")
+      : global_id(global_id), id(id), name(name) {
     }
 
     inline bool operator==(const InitImageInfo &rhs) const {
-      return (global_id == rhs.global_id && pool_id == rhs.pool_id &&
-              id == rhs.id && name == rhs.name);
+      return (global_id == rhs.global_id && id == rhs.id && name == rhs.name);
     }
     inline bool operator<(const InitImageInfo &rhs) const {
       return global_id < rhs.global_id;
     }
   };
 
-  std::map<int64_t, std::set<InitImageInfo> > m_init_images;
+  std::set<InitImageInfo> m_init_images;
 
   class ReplayerThread : public Thread {
     Replayer *m_replayer;