]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: fixes multiple issues in the group replayer
authorN Balachandran <nithya.balachandran@ibm.com>
Fri, 21 Feb 2025 06:17:19 +0000 (11:47 +0530)
committerPrasanna Kumar Kalever <prasanna.kalever@redhat.com>
Thu, 24 Apr 2025 15:56:31 +0000 (21:26 +0530)
The commit includes the following:
- Fixed crashes in the start/stop in GroupReplayer
- Fixed crashes in the shut_down sequence in group_replayer::Replayer
- ImageMap will now send release_group notifications for non-empty
  groups.
- InstanceReplayer no longer checks if the GroupReplayer needs to be
  restarted. The GroupReplayer will stop itself if it determines that it
  needs to be restarted.

Signed-off-by: N Balachandran <nithya.balachandran@ibm.com>
qa/workunits/rbd/rbd_mirror_group.sh
src/tools/rbd_mirror/GroupReplayer.cc
src/tools/rbd_mirror/GroupReplayer.h
src/tools/rbd_mirror/ImageMap.cc
src/tools/rbd_mirror/InstanceReplayer.cc
src/tools/rbd_mirror/group_replayer/Replayer.cc
src/tools/rbd_mirror/group_replayer/Replayer.h

index c55e1a64e87ddb792bb6570415c3770230d99980..251ac56ddb60b6e18a37016e48f5dfef505a6703 100755 (executable)
@@ -303,7 +303,7 @@ test_fields_in_group_info ${CLUSTER2} ${POOL}/${group} 'snapshot' 'enabled' 'tru
 wait_for_group_replay_started ${CLUSTER1} ${POOL}/${group} 1
 mirror_group_snapshot_and_wait_for_sync_complete ${CLUSTER1} ${CLUSTER2} ${POOL}/${group}
 wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group} 'up+replaying' 1
-wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group} 'up+stopped' 1
+wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group} 'up+stopped' 0
 compare_images ${CLUSTER1} ${CLUSTER2} ${POOL} ${POOL} ${image}
 
 testlog " - failover"
@@ -317,7 +317,7 @@ test_fields_in_group_info ${CLUSTER1} ${POOL}/${group1} 'snapshot' 'enabled' 'tr
 wait_for_group_replay_started ${CLUSTER2} ${POOL}/${group1} 1
 write_image ${CLUSTER1} ${POOL} ${image1} 100
 mirror_group_snapshot_and_wait_for_sync_complete ${CLUSTER2} ${CLUSTER1} ${POOL}/${group1}
-wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group1} 'up+stopped' 1
+wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group1} 'up+stopped' 0
 wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group1} 'up+replaying' 1
 compare_images ${CLUSTER1} ${CLUSTER2} ${POOL} ${POOL} ${image1}
 
@@ -333,7 +333,7 @@ wait_for_group_replay_started ${CLUSTER1} ${POOL}/${group1} 1
 write_image ${CLUSTER2} ${POOL} ${image1} 100
 mirror_group_snapshot_and_wait_for_sync_complete ${CLUSTER1} ${CLUSTER2} ${POOL}/${group1}
 wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group1} 'up+replaying' 1
-wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group1} 'up+stopped' 1
+wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group1} 'up+stopped' 0
 compare_images ${CLUSTER1} ${CLUSTER2} ${POOL} ${POOL} ${image1}
 
 testlog " - force promote cluster1"
index ee24ae7a110522cf5105472c7a0fa5bfb00fe45e..5072bbf3ca344ea8efc8a7fd4776676d025a166e 100644 (file)
@@ -217,38 +217,12 @@ GroupReplayer<I>::GroupReplayer(
 template <typename I>
 GroupReplayer<I>::~GroupReplayer() {
   unregister_admin_socket_hook();
-  //ceph_assert(m_on_start_finish == nullptr);
-  ceph_assert(m_on_stop_finish == nullptr);
+  ceph_assert(m_on_start_finish == nullptr);
   ceph_assert(m_bootstrap_request == nullptr);
-}
-
-template <typename I>
-bool GroupReplayer<I>::needs_restart() const {
-  dout(10) << dendl;
-
-  std::lock_guard locker{m_lock};
-  if (!m_replayer) {
-    return true;
-  }
-
-  if (!m_local_group_ctx.primary) {
-    if (m_state != STATE_REPLAYING) {
-      return true;
-    }
-    for (auto &[_, image_replayer] : m_image_replayers) {
-      if (image_replayer->is_stopped()) {
-        dout(10) << "image replayer is in stopped state, needs restart" << dendl;
-        return true;
-      }
-    }
-  } else {
-    // this is how we determine if the remote state has changed,
-    // if we never restart the group replayer we never get to see updated
-    // snapshot information on remote and see if its demoted at all.
-    return true;
-  }
-
-  return false;
+  ceph_assert(m_bootstrap_request == nullptr);
+  ceph_assert(m_replayer == nullptr);
+  ceph_assert(m_image_replayers.empty());
+  ceph_assert(m_on_stop_contexts.empty());
 }
 
 template <typename I>
@@ -347,12 +321,13 @@ void GroupReplayer<I>::start(Context *on_finish, bool manual, bool restart) {
       m_last_r = 0;
       m_state_desc.clear();
       m_local_group_snaps.clear();
+      ceph_assert(m_replayer == nullptr);
+      ceph_assert(m_image_replayers.empty());
       m_image_replayers.clear();
       m_image_replayer_index.clear();
-      m_get_remote_group_snap_ret_vals.clear();
       m_manual_stop = false;
       m_finished = false;
-      //ceph_assert(m_on_start_finish == nullptr);
+      ceph_assert(m_on_start_finish == nullptr);
       std::swap(m_on_start_finish, on_finish);
     }
   }
@@ -373,96 +348,69 @@ void GroupReplayer<I>::stop(Context *on_finish, bool manual, bool restart) {
            << ", restart=" << restart << dendl;
 
   group_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
-  std::map<std::string, std::map<ImageReplayer<I> *, Context *>> create_snap_requests;
   bool shut_down_replay = false;
-  bool running = true;
+  bool is_stopped = false;
   {
     std::lock_guard locker{m_lock};
 
-    if (restart) {
-      m_restart_requested = true;
-    }
+    dout(10) << "state: " << m_state << ", m_stop_requested: "
+             << m_stop_requested << dendl;
 
-    dout(10) << "state: " << m_state << ", m_stop_requested: " << m_stop_requested << dendl;
     if (!is_running_()) {
       dout(10) << "replayers not running" << dendl;
-      running = false;
+      if (manual && !m_manual_stop) {
+        dout(10) << "marking manual" << dendl;
+        m_manual_stop = true;
+      }
       if (!restart && m_restart_requested) {
         dout(10) << "canceling restart" << dendl;
         m_restart_requested = false;
       }
+      if (is_stopped_()) {
+        dout(10) << "already stopped" << dendl;
+        is_stopped = true;
+      } else {
+        dout(10) << "joining in-flight stop" << dendl;
+        if (on_finish != nullptr) {
+          m_on_stop_contexts.push_back(on_finish);
+        }
+      }
     } else {
       dout(10) << "replayers still running" << dendl;
-      if (!is_stopped_() || m_state == STATE_STOPPING) {
-       if (m_state == STATE_STARTING) {
-         dout(10) << "canceling start" << dendl;
-         if (m_bootstrap_request != nullptr) {
-            bootstrap_request = m_bootstrap_request;
-            bootstrap_request->get();
-         }
-         shut_down_replay = true;
-       } else {
-          dout(10) << "interrupting replay" << dendl;
-          shut_down_replay = true;
-          for (auto it = m_create_snap_requests.begin();
-               it != m_create_snap_requests.end(); ) {
-            auto &remote_group_snap_id = it->first;
-            auto &requests = it->second;
-            if (m_get_remote_group_snap_ret_vals.count(remote_group_snap_id) == 0) {
-              dout(20) << "getting remote group snap for "
-                       << remote_group_snap_id << " is still in-progress"
-                       << dendl;
-              shut_down_replay = false;
-            } else if (m_pending_snap_create.count(remote_group_snap_id) > 0) {
-              dout(20) << "group snap create for " << remote_group_snap_id
-                       << " is still in-progress" << dendl;
-              shut_down_replay = false;
-            } else {
-              create_snap_requests[remote_group_snap_id] = requests;
-              it = m_create_snap_requests.erase(it);
-              continue;
-            }
-            it++;
-          }
+      if (m_state == STATE_STARTING) {
+       dout(10) << "canceling start" << dendl;
+       if (m_bootstrap_request != nullptr) {
+          bootstrap_request = m_bootstrap_request;
+          bootstrap_request->get();
        }
-        m_state = STATE_STOPPING;
+      } else {
+        dout(10) << "interrupting replay" << dendl;
+        shut_down_replay = true;
+      }
 
-        ceph_assert(m_on_stop_finish == nullptr);
-        std::swap(m_on_stop_finish, on_finish);
-        m_stop_requested = true;
-        m_manual_stop = manual;
+      m_stop_requested = true;
+      m_manual_stop = manual;
+      if (on_finish != nullptr) {
+       m_on_stop_contexts.push_back(on_finish);
       }
     }
   }
 
+  if (is_stopped) {
+    if (on_finish) {
+      on_finish->complete(-EINVAL);
+    }
+    return;
+  }
+
   if (bootstrap_request != nullptr) {
     dout(10) << "canceling bootstrap" << dendl;
     bootstrap_request->cancel();
     bootstrap_request->put();
   }
 
-  for (auto &[_, requests] : create_snap_requests) {
-    for (auto &[_, on_finish] : requests) {
-      on_finish->complete(-ESTALE);
-    }
-  }
-
   if (shut_down_replay) {
-    stop_group_replayer();
-  } else if (on_finish != nullptr) {
-    // XXXMG: clean up
-    {
-      std::lock_guard locker{m_lock};
-      m_stop_requested = false;
-    }
-    on_finish->complete(0);
-  }
-
-  if (!running && shut_down_replay) {
-    dout(20) << "not running" << dendl;
-    if (on_finish) {
-      on_finish->complete(-EINVAL);
-    }
+    on_stop_replay(0);
   }
 }
 
@@ -501,7 +449,7 @@ void GroupReplayer<I>::flush() {
 
 template <typename I>
 void GroupReplayer<I>::print_status(Formatter *f) {
-  dout(10) << dendl;
+  dout(10) <<  m_state << dendl;
 
   std::lock_guard l{m_lock};
 
@@ -509,9 +457,11 @@ void GroupReplayer<I>::print_status(Formatter *f) {
   f->dump_string("name", m_group_spec);
   auto state = m_state;
   if (m_local_group_ctx.primary && state == STATE_REPLAYING) { // XXXMG
+    dout(10) <<  "setting state to stopped" << dendl;
     state = STATE_STOPPED;
   }
   f->dump_string("state", state_to_string(state));
+// TODO: remove the image_replayers section
   f->open_array_section("image_replayers");
   for (auto &[_, image_replayer] : m_image_replayers) {
     image_replayer->print_status(f);
@@ -520,6 +470,26 @@ void GroupReplayer<I>::print_status(Formatter *f) {
   f->close_section(); // group_replayer
 }
 
+template <typename I>
+void GroupReplayer<I>::on_stop_replay(int r, const std::string &desc)
+{
+  dout(10) << "r=" << r << ", desc=" << desc << dendl;
+  {
+    std::lock_guard locker{m_lock};
+    if (m_state != STATE_REPLAYING) {
+      // might be invoked multiple times while stopping
+      return;
+    }
+
+    m_stop_requested = true;
+    m_state = STATE_STOPPING;
+    m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED;
+    m_state_desc = "";
+  }
+
+  shut_down(r);
+}
+
 template <typename I>
 void GroupReplayer<I>::bootstrap_group() {
   dout(10) << dendl;
@@ -529,7 +499,7 @@ void GroupReplayer<I>::bootstrap_group() {
     locker.unlock();
 
     dout(5) << "no peer clusters" << dendl;
-    finish_start(-ENOENT, "no peer clusters");
+    finish_start_fail(-ENOENT, "no peer clusters");
     return;
   }
 
@@ -541,6 +511,7 @@ void GroupReplayer<I>::bootstrap_group() {
     return;
   }
 
+  ceph_assert(m_replayer == nullptr);
   ceph_assert(m_image_replayers.empty());
 
   auto ctx = create_context_callback<
@@ -568,6 +539,7 @@ void GroupReplayer<I>::handle_bootstrap_group(int r) {
   dout(10) << "r=" << r << dendl;
   {
     std::lock_guard locker{m_lock};
+// Should never happen
     if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
       dout(10) << "stop prevailed" <<dendl;
       return;
@@ -576,12 +548,11 @@ void GroupReplayer<I>::handle_bootstrap_group(int r) {
       m_bootstrap_request->put();
       m_bootstrap_request = nullptr;
     }
-    m_local_group_ctx.listener = &m_listener;
-    if (!m_local_group_ctx.name.empty()) {
-      m_local_group_name = m_local_group_ctx.name;
-    }
   }
 
+  if (!m_local_group_ctx.name.empty()) {
+    m_local_group_name = m_local_group_ctx.name;
+  }
   if (r == -EINVAL) {
     sync_group_names();
   } else {
@@ -591,38 +562,48 @@ void GroupReplayer<I>::handle_bootstrap_group(int r) {
   if (finish_start_if_interrupted()) {
     return;
   } else if (r == -ENOENT) {
-    finish_start(r, "group removed");
+    finish_start_fail(r, "group removed");
     return;
   } else if (r == -EREMOTEIO) {
-    finish_start(r, "remote group is non-primary");
+    finish_start_fail(r, "remote group is non-primary");
     return;
   } else if (r == -EEXIST) {
-    finish_start(r, "split-brain detected");
+    finish_start_fail(r, "split-brain detected");
+    return;
+  } else if (m_remote_group_id.empty()){
+    r = -EINVAL;
+ //FIXME: The primary should not care if the remote is ready.
+ // Bootstrap again when the replayer is restarted
+    finish_start_fail(r, "remote is not ready yet");
     return;
+  } else if (r == -EINVAL) {
+    sync_group_names();
   } else if (r < 0) {
-    finish_start(r, "bootstrap failed");
+    finish_start_fail(r, "bootstrap failed");
     return;
   }
 
-  if (!m_remote_group_id.empty()) {
-    C_SaferCond ctx;
-    create_group_replayer(&ctx);
-    ctx.wait();
-  } else {
-    r = -EINVAL;
-    finish_start(r, "remote is not ready yet"); // bootstrap again
+  if (m_local_group_ctx.primary) { // XXXMG
+    set_mirror_group_status_update(
+       cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED,
+       "local group is primary");
+    finish_start_fail(0, "local group is primary");
     return;
   }
+
+  m_local_group_ctx.listener = &m_listener;
+
+  create_group_replayer();
 }
 
 template <typename I>
-void GroupReplayer<I>::create_group_replayer(Context *on_finish) {
+void GroupReplayer<I>::create_group_replayer() {
   dout(10) << dendl;
 
-  auto ctx = new LambdaContext(
-    [this, on_finish](int r) {
-      handle_create_group_replayer(r, on_finish);
-    });
+  ceph_assert(m_replayer == nullptr);
+
+  auto ctx = create_context_callback<
+    GroupReplayer, &GroupReplayer<I>::handle_create_group_replayer>(this);
 
   m_replayer = group_replayer::Replayer<I>::create(
     m_threads, m_local_io_ctx, m_remote_group_peer.io_ctx, m_global_group_id,
@@ -633,18 +614,21 @@ void GroupReplayer<I>::create_group_replayer(Context *on_finish) {
 }
 
 template <typename I>
-void GroupReplayer<I>::handle_create_group_replayer(int r, Context *on_finish) {
+void GroupReplayer<I>::handle_create_group_replayer(int r) {
   dout(10) << "r=" << r << dendl;
 
-  if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
-    dout(10) << "stop prevailed" <<dendl;
-    on_finish->complete(r);
+  if (finish_start_if_interrupted()) {
+    return;
+  } else if (r < 0) {
+
+    finish_start_fail(r, "failed to create group replayer");
     return;
   }
-  on_finish->complete(0);
+
   start_image_replayers();
 }
 
+// TODO: Move this to group_replayer::Replayer?
 template <typename I>
 void GroupReplayer<I>::start_image_replayers() {
   dout(10) << m_image_replayers.size() << dendl;
@@ -678,197 +662,154 @@ void GroupReplayer<I>::handle_start_image_replayers(int r) {
   if (finish_start_if_interrupted()) {
     return;
   } else if (r < 0) {
-    finish_start(r, "");
+    finish_start_fail(r, "failed to start image replayers");
     return;
   }
 
-  finish_start(0, "");
-}
+  Context *on_finish = nullptr;
+  {
+    std::unique_lock locker{m_lock};
+    ceph_assert(m_state == STATE_STARTING);
+    m_state = STATE_REPLAYING;
+    std::swap(m_on_start_finish, on_finish);
+  }
 
-template <typename I>
-void GroupReplayer<I>::stop_group_replayer() {
-  dout(10) << dendl;
   set_mirror_group_status_update(
-      cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY, "stopping");
+     cls::rbd::MIRROR_GROUP_STATUS_STATE_REPLAYING, "replaying");
 
-  if (m_replayer != nullptr) {
-    C_SaferCond ctx;
-    m_replayer->shut_down(&ctx);
-    ctx.wait();
+  if(on_finish) {
+    on_finish->complete(0);
   }
-
-  handle_stop_group_replayer(0);
 }
 
 template <typename I>
-void GroupReplayer<I>::handle_stop_group_replayer(int r) {
-  dout(10) << "r=" << r << dendl;
-
+bool GroupReplayer<I>::finish_start_if_interrupted() {
   std::lock_guard locker{m_lock};
-  stop_image_replayers();
+
+  return finish_start_if_interrupted(m_lock);
 }
 
 template <typename I>
-void GroupReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
-                                              Context *on_finish) {
-  dout(10) << image_replayer << " global_image_id="
-           << image_replayer->get_global_image_id() << ", on_finish="
-           << on_finish << dendl;
-
-  if (image_replayer->is_stopped()) {
-    m_threads->work_queue->queue(on_finish, 0);
-    return;
+bool GroupReplayer<I>::finish_start_if_interrupted(ceph::mutex &lock) {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  ceph_assert(m_state == STATE_STARTING);
+  if (!m_stop_requested) {
+    return false;
   }
 
-  m_async_op_tracker.start_op();
-  Context *ctx = create_async_context_callback(
-    m_threads->work_queue, new LambdaContext(
-      [this, image_replayer, on_finish] (int r) {
-        stop_image_replayer(image_replayer, on_finish);
-        m_async_op_tracker.finish_op();
-      }));
-
-  if (image_replayer->is_running()) {
-    image_replayer->stop(ctx, false);
-  } else {
-    int after = 1;
-    dout(10) << "scheduling image replayer " << image_replayer << " stop after "
-             << after << " sec (task " << ctx << ")" << dendl;
-    ctx = new LambdaContext(
-      [this, after, ctx] (int r) {
-        std::lock_guard timer_locker{m_threads->timer_lock};
-        m_threads->timer->add_event_after(after, ctx);
-      });
-    m_threads->work_queue->queue(ctx, 0);
-  }
+  finish_start_fail(-ECANCELED, "");
+  return true;
 }
 
 template <typename I>
-void GroupReplayer<I>::stop_image_replayers() {
-  dout(10) << dendl;
-
-  ceph_assert(ceph_mutex_is_locked(m_lock));
-
-  Context *ctx = create_async_context_callback(
-    m_threads->work_queue, create_context_callback<GroupReplayer<I>,
-    &GroupReplayer<I>::handle_stop_image_replayers>(this));
+void GroupReplayer<I>::finish_start_fail(int r, const std::string &desc) {
+  dout(10) << "r=" << r << ", desc=" << desc << dendl;
+  Context *ctx = new LambdaContext([this, r, desc](int _r) {
+    m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED;
+    m_state_desc = desc;
+    {
+      std::lock_guard locker{m_lock};
+      ceph_assert(m_state == STATE_STARTING);
+      m_state = STATE_STOPPING;
+      if (r < 0) {
+       if (r == -ECANCELED) {
+         dout(10) << "start canceled" << dendl;
+       } else if (r == -ENOENT) {
+         dout(10) << "mirroring group removed" << dendl;
+       } else if (r == -EREMOTEIO) {
+         dout(10) << "mirroring group demoted" << dendl;
+       } else {
+         derr << "start failed: " << cpp_strerror(r) << dendl;
+         m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_ERROR;
+       }
+      }
+    }
+    shut_down(r);
+  });
 
-  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
-  for (auto &it : m_image_replayers) {
-    stop_image_replayer(it.second, gather_ctx->new_sub());
-  }
-  gather_ctx->activate();
+  m_threads->work_queue->queue(ctx, 0);
 }
 
 template <typename I>
-void GroupReplayer<I>::handle_stop_image_replayers(int r) {
-  dout(10) << "r=" << r << dendl;
-
-  ceph_assert(r == 0);
+void GroupReplayer<I>::shut_down(int r) {
+  dout(10) << "r=" << r << ", state=" << m_state << dendl;
 
-  Context *on_finish = nullptr;
   {
     std::lock_guard locker{m_lock};
-
-    for (auto &it : m_image_replayers) {
-      ceph_assert(it.second->is_stopped());
-      it.second->destroy();
-    }
     ceph_assert(m_state == STATE_STOPPING);
-    m_image_replayers.clear();
-
-    m_stop_requested = false;
-    m_state = STATE_STOPPED;
-    std::swap(on_finish, m_on_stop_finish);
   }
 
-  dout(15) << "waiting for in-flight operations to complete" << dendl;
-  m_async_op_tracker.wait_for_ops(new LambdaContext([this](int r) {
-        set_mirror_group_status_update(
-            cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED, "stopped");
-        }));
+  // chain the shut down sequence (reverse order)
+  Context *ctx = new LambdaContext(
+    [this, r](int _r) {
+      set_mirror_group_status_update(m_status_state, m_state_desc);
+      handle_shut_down(r);
+    });
 
-  if (on_finish) {
-    on_finish->complete(r);
-  }
-}
+  // stop and destroy the replayers
+  ctx = new LambdaContext([this, ctx](int r) {
+    {
+      std::lock_guard locker{m_lock};
 
-template <typename I>
-bool GroupReplayer<I>::finish_start_if_interrupted() {
-  std::lock_guard locker{m_lock};
+      for (auto &it : m_image_replayers) {
+       ceph_assert(it.second->is_stopped());
+       it.second->destroy();
+      }
+      m_image_replayers.clear();
+    }
+    ctx->complete(0);
+  });
 
-  return finish_start_if_interrupted(m_lock);
-}
+  ctx = new LambdaContext([this, ctx](int r) {
+    C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+    {
+      std::lock_guard locker{m_lock};
+      for (auto &it : m_image_replayers) {
+        it.second->stop(gather_ctx->new_sub());
+      }
+    }
+    gather_ctx->activate();
+  });
 
-template <typename I>
-bool GroupReplayer<I>::finish_start_if_interrupted(ceph::mutex &lock) {
-  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
-  ceph_assert(m_state == STATE_STARTING);
-  if (!m_stop_requested) {
-    return false;
+  if (m_replayer != nullptr) {
+    ctx = new LambdaContext([this, ctx](int r) {
+      m_replayer->destroy();
+      m_replayer = nullptr;
+      ctx->complete(0);
+    });
+
+    ctx = new LambdaContext([this, ctx](int r) {
+      m_replayer->shut_down(ctx);
+    });
   }
 
-  finish_start(-ECANCELED, "");
-  return true;
+  m_threads->work_queue->queue(ctx, 0);
 }
 
 template <typename I>
-void GroupReplayer<I>::finish_start(int r, const std::string &desc) {
-  dout(10) << "r=" << r << ", desc=" << desc << dendl;
-  Context *ctx = new LambdaContext(
-    [this, r, desc](int _r) {
-      Context *on_finish = nullptr;
-      {
-       std::lock_guard locker{m_lock};
-        ceph_assert(m_state == STATE_STARTING);
-        m_state = STATE_REPLAYING;
-        std::swap(m_on_start_finish, on_finish);
-        m_state_desc = desc;
-        if (r < 0) {
-          auto state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED;
-          if (r == -ECANCELED) {
-            dout(10) << "start canceled" << dendl;
-          } else if (r == -ENOENT) {
-            dout(10) << "mirroring group removed" << dendl;
-          } else if (r == -EREMOTEIO) {
-            dout(10) << "mirroring group demoted" << dendl;
-          } else {
-            derr << "start failed: " << cpp_strerror(r) << dendl;
-            state = cls::rbd::MIRROR_GROUP_STATUS_STATE_ERROR;
-          }
-          on_finish = new LambdaContext(
-              [this, r, state, desc, on_finish](int) {
-                set_mirror_group_status_update(state, desc);
-
-                if (r == -ENOENT && !m_resync_requested) {
-                  set_finished(true);
-                }
-                if (on_finish != nullptr) {
-                  on_finish->complete(r);
-                }
-              });
-        }
-      }
+void GroupReplayer<I>::handle_shut_down(int r) {
 
-      if (r < 0) {
-        stop(on_finish, false, false);
-        return;
-      }
-
-      if (m_local_group_ctx.primary) { // XXXMG
-        set_mirror_group_status_update(
-            cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED,
-            "local group is primary");
-      } else {
-        set_mirror_group_status_update(
-            cls::rbd::MIRROR_GROUP_STATUS_STATE_REPLAYING, "replaying");
-      }
+  dout(10) << "stop complete" << dendl;
+  Context *on_start = nullptr;
+  std::list<Context *> on_stop_contexts;
+  {
+    std::lock_guard locker{m_lock};
+    std::swap(on_start, m_on_start_finish);
+    on_stop_contexts = std::move(m_on_stop_contexts);
+    m_stop_requested = false;
+    ceph_assert(m_state == STATE_STOPPING);
+    m_state = STATE_STOPPED;
+  }
 
-      if (on_finish != nullptr) {
-        on_finish->complete(0);
-      }
-    });
-  m_threads->work_queue->queue(ctx, 0);
+  if (on_start != nullptr) {
+    dout(10) << "on start finish complete, r=" << r << dendl;
+    on_start->complete(r);
+    r = 0;
+  }
+  for (auto ctx : on_stop_contexts) {
+    dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
+    ctx->complete(r);
+  }
 }
 
 
@@ -881,12 +822,12 @@ void GroupReplayer<I>::register_admin_socket_hook() {
       return;
     }
 
-    dout(15) << "registered asok hook: " << m_group_spec << dendl;
     asok_hook = new GroupReplayerAdminSocketHook<I>(
       g_ceph_context, m_group_spec, this);
     int r = asok_hook->register_commands();
     if (r == 0) {
       m_asok_hook = asok_hook;
+      dout(15) << "registered asok hook: " << m_group_spec << dendl;
       return;
     }
     derr << "error registering admin socket commands" << dendl;
index 27dd214dc6c5208184c8df44c02dd5000c146346..ee36e7192b51c30af3b83178e053010f09b2ee2c 100644 (file)
@@ -100,7 +100,6 @@ public:
     return (m_last_r == -EBLOCKLISTED);
   }
 
-  bool needs_restart() const;
   void sync_group_names();
 
   image_replayer::HealthState get_health_state() const;
@@ -206,9 +205,11 @@ private:
   AsyncOpTracker m_async_op_tracker;
   State m_state = STATE_STOPPED;
   std::string m_state_desc;
+  cls::rbd::MirrorGroupStatusState m_status_state;
   int m_last_r = 0;
 
   Context *m_on_start_finish = nullptr;
+  std::list<Context *> m_on_stop_contexts;
   Context *m_on_stop_finish = nullptr;
   bool m_stop_requested = false;
   bool m_resync_requested = false;
@@ -216,6 +217,7 @@ private:
   bool m_manual_stop = false;
   bool m_finished = false;
 
+
   AdminSocketHook *m_asok_hook = nullptr;
 
   group_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
@@ -225,7 +227,6 @@ private:
   Listener m_listener = {this};
   std::map<std::pair<int64_t, std::string>, ImageReplayer<ImageCtxT> *> m_image_replayer_index;
   std::map<std::string, cls::rbd::GroupSnapshot> m_local_group_snaps;
-  std::map<std::string, int> m_get_remote_group_snap_ret_vals;
   std::map<std::string, std::map<ImageReplayer<ImageCtxT> *, Context *>> m_create_snap_requests;
   std::set<std::string> m_pending_snap_create;
 
@@ -254,26 +255,19 @@ private:
     return (m_state == STATE_REPLAYING);
   }
 
+  void on_stop_replay(int r = 0, const std::string &desc = "");
   void bootstrap_group();
   void handle_bootstrap_group(int r);
 
-  void create_group_replayer(Context *on_finish);
-  void handle_create_group_replayer(int r, Context *on_finish);
+  void create_group_replayer();
+  void handle_create_group_replayer(int r);
 
   void start_image_replayers();
   void handle_start_image_replayers(int r);
 
   bool finish_start_if_interrupted();
   bool finish_start_if_interrupted(ceph::mutex &lock);
-  void finish_start(int r, const std::string &desc);
-
-  void stop_group_replayer();
-  void handle_stop_group_replayer(int r);
-
-  void stop_image_replayer(ImageReplayer<ImageCtxT> *image_replayer,
-                           Context *on_finish);
-  void stop_image_replayers();
-  void handle_stop_image_replayers(int r);
+  void finish_start_fail(int r, const std::string &desc);
 
   void register_admin_socket_hook();
   void unregister_admin_socket_hook();
@@ -283,6 +277,9 @@ private:
                                       const std::string &desc);
   void wait_for_ops();
   void handle_wait_for_ops(int r);
+
+  void shut_down(int r);
+  void handle_shut_down(int r);
 };
 
 } // namespace mirror
index 9ec991a6d9f67fadb8beb59c707f0b3514fa23b1..db76bceeab0f7c556f7f5e9a9d5f02b00cdb7d47 100644 (file)
@@ -340,7 +340,7 @@ void ImageMap<I>::notify_listener_acquire_release_images(
 
   for (auto const &update : release) {
     auto global_id = GlobalId(update.entity.type, update.entity.global_id);
-    if (update.entity.type == MIRROR_ENTITY_TYPE_IMAGE || update.entity.count != 0) {
+    if (update.entity.type == MIRROR_ENTITY_TYPE_IMAGE) {
       m_listener.release_image(
         update.entity.global_id, update.instance_id,
         create_async_context_callback(
index 01e61a09e43a2ab80278050ac808f36cb474b489..9c573c6debae7d65da460f38f879d5030df1ba03 100644 (file)
@@ -658,10 +658,6 @@ void InstanceReplayer<I>::start_group_replayer(
 
   std::string global_group_id = group_replayer->get_global_group_id();
   if (!group_replayer->is_stopped()) {
-    if (group_replayer->needs_restart()) {
-      stop_group_replayer(group_replayer, new C_TrackedOp(m_async_op_tracker,
-                                                          nullptr));
-    }
     return;
   } else if (group_replayer->is_blocklisted()) {
     derr << "global_group_id=" << global_group_id << ": blocklisted detected "
index 3c45a83d8cf9775af275882ce1b8f1a558c8c22c..9f278dccf2b9359965a56a2d7829a8337a7d91cd 100644 (file)
@@ -77,9 +77,9 @@ bool Replayer<I>::is_replay_interrupted() {
 
 template <typename I>
 bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
+
   if (m_state == STATE_COMPLETE) {
     locker->unlock();
-
     return true;
   }
 
@@ -88,14 +88,58 @@ bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
 
 template <typename I>
 void Replayer<I>::schedule_load_group_snapshots() {
+
+  std::lock_guard timer_locker{m_threads->timer_lock};
+  std::lock_guard locker{m_lock};
+
+  if (m_state != STATE_REPLAYING) {
+    return;
+  }
+
   dout(10) << dendl;
 
+  ceph_assert(m_load_snapshots_task == nullptr);
+  m_load_snapshots_task = create_context_callback<
+    Replayer<I>,
+    &Replayer<I>::handle_schedule_load_group_snapshots>(this);
+
+  m_threads->timer->add_event_after(1, m_load_snapshots_task);
+}
+
+template <typename I>
+void Replayer<I>::handle_schedule_load_group_snapshots(int r) {
+  dout(10) << dendl;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+
+  {
+    std::unique_lock locker{m_lock};
+    if (m_state != STATE_REPLAYING) {
+      return;
+    }
+  }
+
+  ceph_assert(m_load_snapshots_task != nullptr);
+  m_load_snapshots_task = nullptr;
+
   auto ctx = new LambdaContext(
     [this](int r) {
       load_local_group_snapshots();
     });
-  std::lock_guard timer_locker{m_threads->timer_lock};
-  m_threads->timer->add_event_after(1, ctx);
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::cancel_load_group_snapshots() {
+  dout(10) << dendl;
+
+  std::unique_lock timer_locker{m_threads->timer_lock};
+  if (m_load_snapshots_task != nullptr) {
+    dout(10) << dendl;
+
+    if (m_threads->timer->cancel_event(m_load_snapshots_task)) {
+      m_load_snapshots_task = nullptr;
+    }
+  }
 }
 
 template <typename I>
@@ -123,6 +167,7 @@ int Replayer<I>::local_group_image_list_by_id(
   do {
     std::vector<cls::rbd::GroupImageStatus> image_ids_page;
 
+//TODO: Make this async
     r = librbd::cls_client::group_image_list(&m_local_io_ctx, group_header_oid,
                                              start_last, max_read,
                                              &image_ids_page);
@@ -152,6 +197,7 @@ bool Replayer<I>::is_resync_requested() {
   std::string group_header_oid = librbd::util::group_header_name(
       m_local_group_id);
   std::string value;
+// TODO: make this async
   int r = librbd::cls_client::metadata_get(&m_local_io_ctx, group_header_oid,
                                            RBD_GROUP_RESYNC, &value);
   if (r < 0 && r != -ENOENT) {
@@ -212,7 +258,8 @@ template <typename I>
 void Replayer<I>::load_local_group_snapshots() {
   dout(10) << "m_local_group_id=" << m_local_group_id << dendl;
 
-  if (is_replay_interrupted()) {
+  std::unique_lock locker{m_lock};
+  if (is_replay_interrupted(&locker)) {
     return;
   }
 
@@ -227,14 +274,16 @@ void Replayer<I>::load_local_group_snapshots() {
     dout(10) << "local group resync requested" << dendl;
     // send stop for Group Replayer
     notify_group_listener_stop();
+    return;
   } else if (is_rename_requested()) {
     m_stop_requested = true;
     dout(10) << "remote group rename requested" << dendl;
     // send stop for Group Replayer
     notify_group_listener_stop();
+    return;
   }
 
-  std::unique_lock locker{m_lock};
+  m_in_flight_op_tracker.start_op();
   m_local_group_snaps.clear();
   auto ctx = create_context_callback<
       Replayer<I>,
@@ -249,6 +298,12 @@ template <typename I>
 void Replayer<I>::handle_load_local_group_snapshots(int r) {
   dout(10) << "r=" << r << dendl;
 
+  if (is_replay_interrupted()) {
+    m_in_flight_op_tracker.finish_op();
+    return;
+  }
+  m_in_flight_op_tracker.finish_op();
+
   if (r < 0) {
     derr << "error listing local mirror group snapshots: " << cpp_strerror(r)
          << dendl;
@@ -268,6 +323,7 @@ void Replayer<I>::handle_load_local_group_snapshots(int r) {
     }
     // this is primary, IDLE the group replayer
     m_state = STATE_IDLE;
+    notify_group_listener_stop();
     return;
   }
 
@@ -288,6 +344,7 @@ void Replayer<I>::load_remote_group_snapshots() {
       handle_load_remote_group_snapshots(r);
   });
 
+  m_in_flight_op_tracker.start_op();
   auto req = librbd::group::ListSnapshotsRequest<I>::create(m_remote_io_ctx,
       m_remote_group_id, true, true, &m_remote_group_snaps, ctx);
   req->send();
@@ -297,6 +354,12 @@ template <typename I>
 void Replayer<I>::handle_load_remote_group_snapshots(int r) {
   dout(10) << "r=" << r << dendl;
 
+  if (is_replay_interrupted()) {
+    m_in_flight_op_tracker.finish_op();
+    return;
+  }
+  m_in_flight_op_tracker.finish_op();
+
   if (r < 0) {
     derr << "error listing remote mirror group snapshots: " << cpp_strerror(r)
          << dendl;
@@ -1134,17 +1197,44 @@ template <typename I>
 void Replayer<I>::shut_down(Context* on_finish) {
   dout(10) << dendl;
 
-  std::unique_lock locker{m_lock};
-  m_stop_requested = true;
-  auto state = STATE_COMPLETE;
-  std::swap(m_state, state);
-  locker.unlock();
-  if (on_finish) {
-    on_finish->complete(0);
+  {
+    std::unique_lock locker{m_lock};
+    m_stop_requested = true;
+    ceph_assert(m_on_shutdown == nullptr);
+    std::swap(m_on_shutdown, on_finish);
+
+    auto state = STATE_COMPLETE;
+    std::swap(m_state, state);
   }
+
+  cancel_load_group_snapshots();
+
+  if (!m_in_flight_op_tracker.empty()) {
+    m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this](int) {
+        finish_shut_down();
+      }));
+    return;
+  }
+
+  finish_shut_down();
   return;
 }
 
+template <typename I>
+void Replayer<I>::finish_shut_down() {
+  dout(10) << dendl;
+
+  Context *on_finish = nullptr;
+
+  {
+    std::unique_lock locker{m_lock};
+    ceph_assert(m_on_shutdown != nullptr);
+    std::swap(m_on_shutdown, on_finish);
+  }
+  if (on_finish) {
+    on_finish->complete(0);
+  }
+}
 
 } // namespace group_replayer
 } // namespace mirror
index e977b1469a0dffccc20963726e53ebbe9eeb4bb9..445ac67ae2b7c0dd6813fceaba5fba56f4428a24 100644 (file)
@@ -4,12 +4,13 @@
 #ifndef RBD_MIRROR_GROUP_REPLAYER_REPLAYER_H
 #define RBD_MIRROR_GROUP_REPLAYER_REPLAYER_H
 
-#include "tools/rbd_mirror/image_replayer/Replayer.h"
+#include "common/AsyncOpTracker.h"
 #include "common/ceph_mutex.h"
 #include "cls/rbd/cls_rbd_types.h"
 #include "include/rados/librados.hpp"
 #include "librbd/mirror/snapshot/Types.h"
 #include "tools/rbd_mirror/Types.h"
+#include "tools/rbd_mirror/image_replayer/Replayer.h"
 #include "tools/rbd_mirror/image_replayer/Types.h"
 #include <string>
 
@@ -64,6 +65,7 @@ public:
   }
   void init(Context* on_finish);
   void shut_down(Context* on_finish);
+  void finish_shut_down();
 
   bool is_replaying() const {
     std::unique_lock locker{m_lock};
@@ -98,6 +100,11 @@ private:
   std::vector<cls::rbd::GroupSnapshot> m_local_group_snaps;
   std::vector<cls::rbd::GroupSnapshot> m_remote_group_snaps;
 
+  Context* m_load_snapshots_task = nullptr;
+  Context* m_on_shutdown = nullptr;
+
+  AsyncOpTracker m_in_flight_op_tracker;
+
   bool m_stop_requested = false;
 
   // map of <group_snap_id, pair<GroupSnapshot, on_finish>>
@@ -112,6 +119,9 @@ private:
       std::vector<cls::rbd::GroupImageStatus> *image_ids);
 
   void schedule_load_group_snapshots();
+  void handle_schedule_load_group_snapshots(int r);
+  void cancel_load_group_snapshots();
+
   void notify_group_listener_stop();
   bool is_resync_requested();
   bool is_rename_requested();