]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: ensure proper handling of status updates during shutdown
authorJason Dillaman <dillaman@redhat.com>
Wed, 18 May 2016 04:55:01 +0000 (00:55 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 19 May 2016 14:21:43 +0000 (10:21 -0400)
Previously, several shutdown race conditions could occur due to the
use of the async work queue for scheduling updates.

Fixes: http://tracker.ceph.com/issues/15909
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index 34d28423fa0aee1874abf573fe6eed935708b7e0..6a10c0e62a413433e8e8bbb44550c073847ec436 100644 (file)
@@ -226,15 +226,10 @@ void ImageReplayer<I>::BootstrapProgressContext::update_progress(
   const std::string &description, bool flush)
 {
   const std::string desc = "bootstrapping, " + description;
-
-  FunctionContext *ctx = new FunctionContext(
-    [this, desc, flush](int r) {
-      replayer->set_state_description(0, desc);
-      if (flush) {
-       replayer->update_mirror_image_status();
-      }
-    });
-  replayer->m_threads->work_queue->queue(ctx, 0);
+  replayer->set_state_description(0, desc);
+  if (flush) {
+    replayer->update_mirror_image_status(false, boost::none);
+  }
 }
 
 template <typename I>
@@ -286,7 +281,7 @@ ImageReplayer<I>::~ImageReplayer()
   assert(m_replay_handler == nullptr);
   assert(m_on_start_finish == nullptr);
   assert(m_on_stop_finish == nullptr);
-
+  assert(m_in_flight_status_updates == 0);
   delete m_asok_hook;
 }
 
@@ -355,8 +350,6 @@ void ImageReplayer<I>::start(Context *on_finish,
     return;
   }
 
-  reschedule_update_status_task(10);
-
   CephContext *cct = static_cast<CephContext *>(m_local->cct());
   double commit_interval = cct->_conf->rbd_journal_commit_age;
   m_remote_journaler = new Journaler(m_threads->work_queue,
@@ -388,8 +381,10 @@ void ImageReplayer<I>::bootstrap() {
     m_bootstrap_request = request;
   }
 
+  update_mirror_image_status(false, boost::none);
+  reschedule_update_status_task(10);
+
   request->send();
-  update_mirror_image_status();
 }
 
 template <typename I>
@@ -431,8 +426,7 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     }
   }
 
-  update_mirror_image_status();
-
+  update_mirror_image_status(false, boost::none);
   init_remote_journaler();
 }
 
@@ -482,7 +476,8 @@ void ImageReplayer<I>::start_replay() {
 
   m_replay_status_formatter =
     ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
-  update_mirror_image_status();
+
+  update_mirror_image_status(true, boost::none);
   reschedule_update_status_task(30);
 
   dout(20) << "start succeeded" << dendl;
@@ -523,6 +518,12 @@ void ImageReplayer<I>::on_start_fail_finish(int r)
 {
   dout(20) << "r=" << r << dendl;
 
+  {
+    Mutex::Locker locker(m_lock);
+    m_state = STATE_STOPPING;
+  }
+  update_mirror_image_status(false, boost::none);
+
   if (m_remote_journaler) {
     if (m_remote_journaler->is_initialized()) {
       m_remote_journaler->shut_down();
@@ -552,7 +553,7 @@ void ImageReplayer<I>::on_start_fail_finish(int r)
   Context *on_stop_finish(nullptr);
   {
     Mutex::Locker locker(m_lock);
-    if (r < 0) {
+    if (r < 0 && r != -EINTR) {
       derr << "start failed: " << cpp_strerror(r) << dendl;
     } else {
       dout(20) << "start interrupted" << dendl;
@@ -562,6 +563,7 @@ void ImageReplayer<I>::on_start_fail_finish(int r)
     std::swap(m_on_stop_finish, on_stop_finish);
   }
 
+  update_mirror_image_status(true, STATE_STOPPED);
   handle_stop(r, on_start_finish, on_stop_finish);
 }
 
@@ -642,7 +644,8 @@ void ImageReplayer<I>::on_stop_journal_replay_shut_down_start()
     m_local_replay->shut_down(false, ctx);
   }
 
-  update_mirror_image_status();
+  set_state_description(0, "");
+  update_mirror_image_status(false, boost::none);
 }
 
 template <typename I>
@@ -704,6 +707,7 @@ void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
     std::swap(m_on_stop_finish, on_finish);
   }
 
+  update_mirror_image_status(true, STATE_STOPPED);
   handle_stop(r, nullptr, on_finish);
 }
 
@@ -818,7 +822,7 @@ void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
         << cpp_strerror(r) << dendl;
   }
 
-  update_mirror_image_status();
+  update_mirror_image_status(false, boost::none);
 
   dout(20) << "flush complete, r=" << r << dendl;
   on_flush->complete(r);
@@ -1027,206 +1031,260 @@ void ImageReplayer<I>::shut_down_journal_replay(bool cancel_ops)
 }
 
 template <typename I>
-void ImageReplayer<I>::update_mirror_image_status(bool final,
-                                                 State expected_state)
-{
-  dout(20) << "final=" << final << ", expected_state=" << expected_state
-          << dendl;
-
-  cls::rbd::MirrorImageStatus status;
-
+bool ImageReplayer<I>::update_mirror_image_status(bool force,
+                                                  const OptionalState &state) {
+  dout(20) << dendl;
   {
     Mutex::Locker locker(m_lock);
+    if (!start_mirror_image_status_update(force, false)) {
+      return false;
+    }
+  }
 
-    assert(!final || !is_running_());
-
-    if (!final) {
-      if (expected_state != STATE_UNKNOWN && expected_state != m_state) {
-       dout(20) << "state changed" << dendl;
-       return;
-      }
-      if (m_update_status_comp) {
-       dout(20) << "already sending update" << dendl;
-       m_update_status_pending = true;
-       return;
-      }
+  queue_mirror_image_status_update(state);
+  return true;
+}
 
-      Context *ctx = new FunctionContext(
-       [this](int r) {
-         if (r < 0) {
-           derr << "error updating mirror image status: " << cpp_strerror(r)
-           << dendl;
-         }
-         bool pending = false;
-         librados::AioCompletion *comp = nullptr;
-         {
-           Mutex::Locker locker(m_lock);
-           std::swap(m_update_status_comp, comp);
-           std::swap(m_update_status_pending, pending);
-         }
-         if (comp) {
-           comp->release();
-         }
-         if (pending && r == 0 && is_running_()) {
-           update_mirror_image_status();
-         }
-       });
-      m_update_status_comp = create_rados_ack_callback(ctx);
-      m_update_status_pending = false;
-    }
+template <typename I>
+bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
+                                                        bool restarting) {
+  assert(m_lock.is_locked());
 
-    switch (m_state) {
-    case STATE_STARTING:
-      if (m_bootstrap_request != nullptr) {
-       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
-       status.description = m_state_desc.empty() ? "syncing" : m_state_desc;
-      } else {
-       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
-       status.description = "starting replay";
-      }
-      break;
-    case STATE_REPLAYING:
-      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
-      break;
-    case STATE_STOPPING:
-      if (m_local_image_ctx) {
-       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
-       status.description = "stopping replay";
-       break;
-      }
-      /* FALLTHROUGH */
-    case STATE_STOPPED:
-      if (m_last_r < 0) {
-       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
-       status.description = m_state_desc;
-      } else {
-       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
-       status.description = m_state_desc.empty() ? "stopped" : m_state_desc;
-      }
-      break;
-    default:
-      assert(!"invalid state");
+  if (!force && !is_stopped_()) {
+    if (!is_running_()) {
+      dout(20) << "shut down in-progress: ignoring update" << dendl;
+      return false;
+    } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
+      dout(20) << "already sending update" << dendl;
+      m_update_status_requested = true;
+      return false;
     }
   }
 
-  if (status.state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING) {
-    Context *on_req_finish = new FunctionContext(
-      [this](int r) {
-       if (r == 0) {
-         librados::AioCompletion *comp = nullptr;
-         {
-           Mutex::Locker locker(m_lock);
-           std::swap(m_update_status_comp, comp);
-         }
-         if (comp) {
-           comp->release();
-         }
-         update_mirror_image_status(false, STATE_REPLAYING);
-       }
-      });
-    std::string desc;
-    if (!m_replay_status_formatter->get_or_send_update(&desc, on_req_finish)) {
+  dout(20) << dendl;
+  ++m_in_flight_status_updates;
+  return true;
+}
+
+template <typename I>
+void ImageReplayer<I>::finish_mirror_image_status_update() {
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_in_flight_status_updates > 0);
+    if (--m_in_flight_status_updates > 0) {
+      dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
+               << "updates" << dendl;
       return;
     }
-    status.description = "replaying, " + desc;
+
+    std::swap(on_finish, m_on_update_status_finish);
   }
 
-  dout(20) << "status=" << status << dendl;
+  dout(20) << dendl;
+  if (on_finish != nullptr) {
+    on_finish->complete(0);
+  }
+}
 
-  librados::ObjectWriteOperation op;
-  librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
+template <typename I>
+void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
+  dout(20) << dendl;
+  FunctionContext *ctx = new FunctionContext(
+    [this, state](int r) {
+      send_mirror_status_update(state);
+    });
+  m_threads->work_queue->queue(ctx, 0);
+}
 
-  if (final) {
-    reschedule_update_status_task(-1);
-    m_local_ioctx.aio_flush();
-    librados::AioCompletion *comp = nullptr;
+template <typename I>
+void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
+  State state;
+  std::string state_desc;
+  int last_r;
+  bool bootstrapping;
+  bool stopping_replay;
+  {
+    Mutex::Locker locker(m_lock);
+    state = m_state;
+    state_desc = m_state_desc;
+    last_r = m_last_r;
+    bootstrapping = (m_bootstrap_request != nullptr);
+    stopping_replay = (m_local_image_ctx != nullptr);
+  }
+
+  if (opt_state) {
+    state = *opt_state;
+  }
+
+  cls::rbd::MirrorImageStatus status;
+  status.up = true;
+  switch (state) {
+  case STATE_STARTING:
+    if (bootstrapping) {
+      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
+      status.description = state_desc.empty() ? "syncing" : state_desc;
+    } else {
+      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
+      status.description = "starting replay";
+    }
+    break;
+  case STATE_REPLAYING:
+    status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
     {
-      Mutex::Locker locker(m_lock);
-      std::swap(m_update_status_comp, comp);
+      Context *on_req_finish = new FunctionContext(
+        [this](int r) {
+          if (r >= 0) {
+            dout(20) << "replay status ready" << dendl;
+            send_mirror_status_update(boost::none);
+          }
+        });
+
+      std::string desc;
+      if (!m_replay_status_formatter->get_or_send_update(&desc,
+                                                         on_req_finish)) {
+        dout(20) << "waiting for replay status" << dendl;
+        return;
+      }
+      status.description = "replaying, " + desc;
     }
-    if (comp) {
-      comp->wait_for_complete();
+    break;
+  case STATE_STOPPING:
+    if (stopping_replay) {
+      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
+      status.description = "stopping replay";
+      break;
     }
-    int r = m_local_ioctx.operate(RBD_MIRRORING, &op);
-    if (r < 0) {
-      derr << "error updating mirror image status: " << cpp_strerror(r)
-          << dendl;
+    // FALLTHROUGH
+  case STATE_STOPPED:
+    if (last_r < 0) {
+      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+      status.description = state_desc;
+    } else {
+      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
+      status.description = state_desc.empty() ? "stopped" : state_desc;
     }
-    return;
+    break;
+  default:
+    assert(!"invalid state");
   }
 
-  int r = m_local_ioctx.aio_operate(RBD_MIRRORING, m_update_status_comp, &op);
-  assert(r == 0);
+  dout(20) << "status=" << status << dendl;
+  librados::ObjectWriteOperation op;
+  librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
 
-  reschedule_update_status_task();
+  librados::AioCompletion *aio_comp = create_rados_ack_callback<
+    ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
+  int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
+  assert(r == 0);
+  aio_comp->release();
 }
 
 template <typename I>
-void ImageReplayer<I>::reschedule_update_status_task(int new_interval)
-{
-  Mutex::Locker locker(m_threads->timer_lock);
+void ImageReplayer<I>::handle_mirror_status_update(int r) {
+  dout(20) << "r=" << r << dendl;
 
-  if (m_update_status_task) {
-    m_threads->timer->cancel_event(m_update_status_task);
-    m_update_status_task = nullptr;
-  }
+  bool running = false;
+  bool started = false;
+  {
+    Mutex::Locker locker(m_lock);
+    bool update_status_requested = false;
+    std::swap(update_status_requested, m_update_status_requested);
 
-  if (new_interval > 0) {
-    m_update_status_interval = new_interval;
+    running = is_running_();
+    if (running && update_status_requested) {
+      started = start_mirror_image_status_update(false, true);
+    }
   }
 
-  if (new_interval < 0) {
-    return;
+  // if a deferred update is available, send it -- otherwise reschedule
+  // the timer task
+  if (started) {
+    queue_mirror_image_status_update(boost::none);
+  } else if (running) {
+    reschedule_update_status_task();
   }
 
-  m_update_status_task = new FunctionContext(
-    [this](int r) {
-      start_update_status_task();
-    });
-
-  m_threads->timer->add_event_after(m_update_status_interval,
-                                   m_update_status_task);
+  // mark committed status update as no longer in-flight
+  finish_mirror_image_status_update();
 }
 
 template <typename I>
-void ImageReplayer<I>::start_update_status_task()
-{
-  FunctionContext *ctx = new FunctionContext(
-    [this](int r) {
-      {
-       Mutex::Locker locker(m_threads->timer_lock);
-       m_update_status_task = nullptr;
-      }
-      update_mirror_image_status();
-    });
-  m_threads->work_queue->queue(ctx, 0);
+void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
+  dout(20) << dendl;
+
+  bool canceled_task = false;
+  {
+    Mutex::Locker locker(m_lock);
+    Mutex::Locker timer_locker(m_threads->timer_lock);
+
+    if (m_update_status_task) {
+      canceled_task = m_threads->timer->cancel_event(m_update_status_task);
+      m_update_status_task = nullptr;
+    }
+
+    if (new_interval > 0) {
+      m_update_status_interval = new_interval;
+    }
+
+    bool restarting = (new_interval == 0 || canceled_task);
+    if (new_interval >= 0 && is_running_() &&
+        start_mirror_image_status_update(false, restarting)) {
+      m_update_status_task = new FunctionContext(
+        [this](int r) {
+          assert(m_threads->timer_lock.is_locked());
+          m_update_status_task = nullptr;
+
+          queue_mirror_image_status_update(boost::none);
+        });
+      m_threads->timer->add_event_after(m_update_status_interval,
+                                        m_update_status_task);
+    }
+  }
+
+  if (canceled_task) {
+    dout(20) << "canceled task" << dendl;
+    finish_mirror_image_status_update();
+  }
 }
 
 template <typename I>
 void ImageReplayer<I>::handle_stop(int r, Context *on_start, Context *on_stop) {
+  reschedule_update_status_task(-1);
+
   {
     Mutex::Locker locker(m_lock);
+
+    // if status updates are in-flight, wait for them to complete
+    // before proceeding
+    if (m_in_flight_status_updates > 0) {
+      dout(20) << "waiting for in-flight status update" << dendl;
+      assert(m_on_update_status_finish == nullptr);
+      m_on_update_status_finish = new FunctionContext(
+        [this, r, on_start, on_stop](int r) {
+          handle_stop(r, on_start, on_stop);
+        });
+      return;
+    }
+
     m_stop_requested = false;
     m_state = STATE_STOPPED;
-    m_last_r = r;
-    if (r >= 0) {
-      m_state_desc.clear();
-    }
+    m_state_desc.clear();
+    m_last_r = 0;
   }
   dout(20) << "stop complete" << dendl;
 
-  update_mirror_image_status(true);
-
   m_local_ioctx.close();
   m_remote_ioctx.close();
 
   if (on_start != nullptr) {
     dout(20) << "on start finish complete, r=" << r << dendl;
     on_start->complete(r);
+    r = 0;
   }
   if (on_stop != nullptr) {
     dout(20) << "on stop finish complete, r=" << r << dendl;
-    on_stop->complete(0);
+    on_stop->complete(r);
   }
 }
 
index 42ad9f92a25e416455a910f997162d05097be313..f832741e9ea113e424cff51c0b39dfbeda207232 100644 (file)
@@ -8,6 +8,7 @@
 #include <string>
 #include <vector>
 
+#include "include/atomic.h"
 #include "common/Mutex.h"
 #include "common/WorkQueue.h"
 #include "include/rados/librados.hpp"
@@ -18,6 +19,7 @@
 #include "librbd/journal/TypeTraits.h"
 #include "ProgressContext.h"
 #include "types.h"
+#include <boost/optional.hpp>
 
 class AdminSocketHook;
 
@@ -187,6 +189,7 @@ protected:
 
 private:
   typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+  typedef boost::optional<State> OptionalState;
 
   class BootstrapProgressContext : public ProgressContext {
   public:
@@ -226,7 +229,6 @@ private:
   Context *m_update_status_task = nullptr;
   int m_update_status_interval = 0;
   librados::AioCompletion *m_update_status_comp = nullptr;
-  bool m_update_status_pending = false;
   bool m_stop_requested = false;
   bool m_manual_stop = false;
 
@@ -234,6 +236,10 @@ private:
 
   image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
 
+  uint32_t m_in_flight_status_updates = 0;
+  bool m_update_status_requested = false;
+  Context *m_on_update_status_finish = nullptr;
+
   librbd::journal::MirrorPeerClientMeta m_client_meta;
 
   ReplayEntry m_replay_entry;
@@ -269,10 +275,13 @@ private:
 
   void shut_down_journal_replay(bool cancel_ops);
 
-  void update_mirror_image_status(bool final = false,
-                                 State expected_state = STATE_UNKNOWN);
+  bool update_mirror_image_status(bool force, const OptionalState &state);
+  bool start_mirror_image_status_update(bool force, bool restarting);
+  void finish_mirror_image_status_update();
+  void queue_mirror_image_status_update(const OptionalState &state);
+  void send_mirror_status_update(const OptionalState &state);
+  void handle_mirror_status_update(int r);
   void reschedule_update_status_task(int new_interval = 0);
-  void start_update_status_task();
 
   void handle_stop(int r, Context *on_start, Context *on_stop);