]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: close local image before stopping remote journaler
authorJason Dillaman <dillaman@redhat.com>
Mon, 15 May 2017 14:35:46 +0000 (10:35 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 1 Jun 2017 16:22:47 +0000 (12:22 -0400)
Fixes: http://tracker.ceph.com/issues/18963
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/journal/Replay.cc
src/librbd/journal/Replay.h
src/test/librbd/journal/test_mock_Replay.cc
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc

index 2d408d5b8eba5a6ad0a2832d458acf29b65798c8..d0b585922980cd89d78d0eef10cd76d1ab458bcf 100644 (file)
@@ -216,6 +216,7 @@ void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
     // safely commit any remaining AIO modify operations
     if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
       flush_comp = create_aio_flush_completion(nullptr);
+      assert(flush_comp != nullptr);
     }
 
     for (auto &op_event_pair : m_op_events) {
@@ -240,6 +241,9 @@ void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
       }
     }
 
+    assert(!m_shut_down);
+    m_shut_down = true;
+
     assert(m_flush_ctx == nullptr);
     if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
       std::swap(m_flush_ctx, on_finish);
@@ -263,6 +267,9 @@ void Replay<I>::flush(Context *on_finish) {
     Mutex::Locker locker(m_lock);
     aio_comp = create_aio_flush_completion(
       util::create_async_context_callback(m_image_ctx, on_finish));
+    if (aio_comp == nullptr) {
+      return;
+    }
   }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
@@ -318,6 +325,10 @@ void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
                                                io::AIO_TYPE_DISCARD,
                                                &flush_required);
+  if (aio_comp == nullptr) {
+    return;
+  }
+
   io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
                                    event.length, event.skip_partial_discard,
                                   {});
@@ -326,7 +337,9 @@ void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
     auto flush_comp = create_aio_flush_completion(nullptr);
     m_lock.Unlock();
 
-    io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+    if (flush_comp != nullptr) {
+      io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+    }
   }
 }
 
@@ -341,6 +354,10 @@ void Replay<I>::handle_event(const journal::AioWriteEvent &event,
   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
                                                io::AIO_TYPE_WRITE,
                                                &flush_required);
+  if (aio_comp == nullptr) {
+    return;
+  }
+
   io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
                                  {{event.offset, event.length}},
                                  std::move(data), 0, {});
@@ -349,7 +366,9 @@ void Replay<I>::handle_event(const journal::AioWriteEvent &event,
     auto flush_comp = create_aio_flush_completion(nullptr);
     m_lock.Unlock();
 
-    io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+    if (flush_comp != nullptr) {
+      io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+    }
   }
 }
 
@@ -364,8 +383,10 @@ void Replay<I>::handle_event(const journal::AioFlushEvent &event,
     Mutex::Locker locker(m_lock);
     aio_comp = create_aio_flush_completion(on_safe);
   }
-  io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
 
+  if (aio_comp != nullptr) {
+    io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
+  }
   on_ready->complete(0);
 }
 
@@ -380,6 +401,10 @@ void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
                                                io::AIO_TYPE_WRITESAME,
                                                &flush_required);
+  if (aio_comp == nullptr) {
+    return;
+  }
+
   io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
                                      event.length, std::move(data), 0, {});
   if (flush_required) {
@@ -387,7 +412,9 @@ void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
     auto flush_comp = create_aio_flush_completion(nullptr);
     m_lock.Unlock();
 
-    io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+    if (flush_comp != nullptr) {
+      io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+    }
   }
 }
 
@@ -861,6 +888,12 @@ Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
                                                Context *on_safe,
                                                OpEvent **op_event) {
   CephContext *cct = m_image_ctx.cct;
+  if (m_shut_down) {
+    ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+    on_ready->complete(0);
+    m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+    return nullptr;
+  }
 
   assert(m_lock.is_locked());
   if (m_op_events.count(op_tid) != 0) {
@@ -898,7 +931,10 @@ void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
     op_event = std::move(op_it->second);
     m_op_events.erase(op_it);
 
-    shutting_down = (m_flush_ctx != nullptr);
+    if (m_shut_down) {
+      assert(m_flush_ctx != nullptr);
+      shutting_down = true;
+    }
   }
 
   assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
@@ -958,6 +994,13 @@ Replay<I>::create_aio_modify_completion(Context *on_ready, Context *on_safe,
   CephContext *cct = m_image_ctx.cct;
   assert(m_on_aio_ready == nullptr);
 
+  if (m_shut_down) {
+    ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+    on_ready->complete(0);
+    m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+    return nullptr;
+  }
+
   ++m_in_flight_aio_modify;
   m_aio_modify_unsafe_contexts.push_back(on_safe);
 
@@ -997,6 +1040,15 @@ template <typename I>
 io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
   assert(m_lock.is_locked());
 
+  CephContext *cct = m_image_ctx.cct;
+  if (m_shut_down) {
+    ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+    if (on_safe != nullptr) {
+      m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+    }
+    return nullptr;
+  }
+
   ++m_in_flight_aio_flush;
 
   // associate all prior write/discard ops to this flush request
index d4a65e089bb2dbc25cec7fbb288370b7d91b8a52..5b2219851371f70ee341be40a8bbef2c1f48dc3f 100644 (file)
@@ -126,6 +126,7 @@ private:
   OpEvents m_op_events;
   uint64_t m_in_flight_op_events = 0;
 
+  bool m_shut_down = false;
   Context *m_flush_ctx = nullptr;
   Context *m_on_aio_ready = nullptr;
 
index e13a156ee05677a3423a96df972ef672797c680e..14d2a6a4b79efefeeb3d097f3f5aba6a4c3f21d6 100644 (file)
@@ -1592,5 +1592,69 @@ TEST_F(TestMockJournalReplay, RefreshImageBeforeOpStart) {
   ASSERT_EQ(0, on_finish_safe.wait());
 }
 
+TEST_F(TestMockJournalReplay, FlushEventAfterShutDown) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockReplayImageCtx mock_image_ctx(*ictx);
+  MockJournalReplay mock_journal_replay(mock_image_ctx);
+  MockIoImageRequest mock_io_image_request;
+  expect_op_work_queue(mock_image_ctx);
+
+  ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+
+  C_SaferCond on_ready;
+  C_SaferCond on_safe;
+  when_process(mock_journal_replay, EventEntry{AioFlushEvent()},
+               &on_ready, &on_safe);
+  ASSERT_EQ(0, on_ready.wait());
+  ASSERT_EQ(-ESHUTDOWN, on_safe.wait());
+}
+
+TEST_F(TestMockJournalReplay, ModifyEventAfterShutDown) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockReplayImageCtx mock_image_ctx(*ictx);
+  MockJournalReplay mock_journal_replay(mock_image_ctx);
+  MockIoImageRequest mock_io_image_request;
+  expect_op_work_queue(mock_image_ctx);
+
+  ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+
+  C_SaferCond on_ready;
+  C_SaferCond on_safe;
+  when_process(mock_journal_replay,
+               EventEntry{AioWriteEvent(123, 456, to_bl("test"))},
+               &on_ready, &on_safe);
+  ASSERT_EQ(0, on_ready.wait());
+  ASSERT_EQ(-ESHUTDOWN, on_safe.wait());
+}
+
+TEST_F(TestMockJournalReplay, OpEventAfterShutDown) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockReplayImageCtx mock_image_ctx(*ictx);
+  MockJournalReplay mock_journal_replay(mock_image_ctx);
+  MockIoImageRequest mock_io_image_request;
+  expect_op_work_queue(mock_image_ctx);
+
+  ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+
+  C_SaferCond on_ready;
+  C_SaferCond on_safe;
+  when_process(mock_journal_replay, EventEntry{RenameEvent(123, "image")},
+               &on_ready, &on_safe);
+  ASSERT_EQ(0, on_ready.wait());
+  ASSERT_EQ(-ESHUTDOWN, on_safe.wait());
+}
+
 } // namespace journal
 } // namespace librbd
index 430edaa16d31e576ba6f9ed477d247feb2fbfd67..c845faacaf548ef0144d566b9b27be3614aec7ad 100644 (file)
@@ -385,6 +385,10 @@ public:
       .WillOnce(CompleteContext(r));
   }
 
+  void expect_flush(MockReplay &mock_replay, int r) {
+    EXPECT_CALL(mock_replay, flush(_)).WillOnce(CompleteContext(r));
+  }
+
   void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) {
     EXPECT_CALL(mock_replay, shut_down(cancel_ops, _))
       .WillOnce(WithArg<1>(CompleteContext(r)));
@@ -514,17 +518,15 @@ TEST_F(TestMockImageReplayer, StartStop) {
 
   MockCloseImageRequest mock_close_local_image_request;
 
-  expect_stop_replay(mock_remote_journaler, 0);
   expect_shut_down(mock_local_replay, true, 0);
-
   EXPECT_CALL(mock_local_journal, remove_listener(_));
   EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_send(mock_close_local_image_request, 0);
 
+  expect_stop_replay(mock_remote_journaler, 0);
   EXPECT_CALL(mock_remote_journaler, remove_listener(_));
   expect_shut_down(mock_remote_journaler, 0);
 
-  expect_send(mock_close_local_image_request, 0);
-
   C_SaferCond stop_ctx;
   m_image_replayer->stop(&stop_ctx);
   ASSERT_EQ(0, stop_ctx.wait());
@@ -650,14 +652,12 @@ TEST_F(TestMockImageReplayer, StartExternalReplayError) {
   expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
 
   MockCloseImageRequest mock_close_local_image_request;
-
   EXPECT_CALL(mock_local_journal, remove_listener(_));
+  expect_send(mock_close_local_image_request, 0);
 
   EXPECT_CALL(mock_remote_journaler, remove_listener(_));
   expect_shut_down(mock_remote_journaler, 0);
 
-  expect_send(mock_close_local_image_request, 0);
-
   C_SaferCond start_ctx;
   m_image_replayer->start(&start_ctx);
   ASSERT_EQ(-EINVAL, start_ctx.wait());
@@ -706,17 +706,15 @@ TEST_F(TestMockImageReplayer, StopError) {
 
   MockCloseImageRequest mock_close_local_image_request;
 
-  expect_stop_replay(mock_remote_journaler, -EINVAL);
   expect_shut_down(mock_local_replay, true, -EINVAL);
-
   EXPECT_CALL(mock_local_journal, remove_listener(_));
   EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_send(mock_close_local_image_request, -EINVAL);
 
+  expect_stop_replay(mock_remote_journaler, -EINVAL);
   EXPECT_CALL(mock_remote_journaler, remove_listener(_));
   expect_shut_down(mock_remote_journaler, -EINVAL);
 
-  expect_send(mock_close_local_image_request, -EINVAL);
-
   C_SaferCond stop_ctx;
   m_image_replayer->stop(&stop_ctx);
   ASSERT_EQ(0, stop_ctx.wait());
@@ -806,18 +804,15 @@ TEST_F(TestMockImageReplayer, Replay) {
   // STOP
 
   MockCloseImageRequest mock_close_local_image_request;
-
-  expect_stop_replay(mock_remote_journaler, 0);
   expect_shut_down(mock_local_replay, true, 0);
-
   EXPECT_CALL(mock_local_journal, remove_listener(_));
   EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_send(mock_close_local_image_request, 0);
 
+  expect_stop_replay(mock_remote_journaler, 0);
   EXPECT_CALL(mock_remote_journaler, remove_listener(_));
   expect_shut_down(mock_remote_journaler, 0);
 
-  expect_send(mock_close_local_image_request, 0);
-
   C_SaferCond stop_ctx;
   m_image_replayer->stop(&stop_ctx);
   ASSERT_EQ(0, stop_ctx.wait());
@@ -886,15 +881,10 @@ TEST_F(TestMockImageReplayer, DecodeError) {
     .WillOnce(Return(-EINVAL));
 
   // stop on error
-  expect_stop_replay(mock_remote_journaler, 0);
   expect_shut_down(mock_local_replay, true, 0);
-
   EXPECT_CALL(mock_local_journal, remove_listener(_));
   EXPECT_CALL(mock_local_journal, stop_external_replay());
 
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
-
   MockCloseImageRequest mock_close_local_image_request;
   C_SaferCond close_ctx;
   EXPECT_CALL(mock_close_local_image_request, send())
@@ -904,6 +894,10 @@ TEST_F(TestMockImageReplayer, DecodeError) {
          close_ctx.complete(0);
        }));
 
+  expect_stop_replay(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
   // fire
   m_image_replayer->handle_replay_ready();
   ASSERT_EQ(0, close_ctx.wait());
@@ -1010,17 +1004,15 @@ TEST_F(TestMockImageReplayer, DelayedReplay) {
 
   MockCloseImageRequest mock_close_local_image_request;
 
-  expect_stop_replay(mock_remote_journaler, 0);
   expect_shut_down(mock_local_replay, true, 0);
-
   EXPECT_CALL(mock_local_journal, remove_listener(_));
   EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_send(mock_close_local_image_request, 0);
 
+  expect_stop_replay(mock_remote_journaler, 0);
   EXPECT_CALL(mock_remote_journaler, remove_listener(_));
   expect_shut_down(mock_remote_journaler, 0);
 
-  expect_send(mock_close_local_image_request, 0);
-
   C_SaferCond stop_ctx;
   m_image_replayer->stop(&stop_ctx);
   ASSERT_EQ(0, stop_ctx.wait());
index 3276dba3b8f63835ff08c99347a37ac8de30d468..7e4211b02ee6a7c6f6d4e6b004259a52f73909ab 100644 (file)
@@ -810,6 +810,17 @@ void ImageReplayer<I>::handle_replay_ready()
   }
 
   m_event_replay_tracker.start_op();
+
+  m_lock.Lock();
+  bool stopping = (m_state == STATE_STOPPING);
+  m_lock.Unlock();
+
+  if (stopping) {
+    dout(10) << "stopping event replay" << dendl;
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
   if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
     preprocess_entry();
     return;
@@ -1452,19 +1463,18 @@ void ImageReplayer<I>::shut_down(int r) {
     }
   }
 
+  // NOTE: it's important to ensure that the local image is fully
+  // closed before attempting to close the remote journal in
+  // case the remote cluster is unreachable
+
   // chain the shut down sequence (reverse order)
   Context *ctx = new FunctionContext(
     [this, r](int _r) {
       update_mirror_image_status(true, STATE_STOPPED);
       handle_shut_down(r);
     });
-  if (m_local_image_ctx) {
-    ctx = new FunctionContext([this, ctx](int r) {
-      CloseImageRequest<I> *request = CloseImageRequest<I>::create(
-        &m_local_image_ctx, ctx);
-      request->send();
-    });
-  }
+
+  // close the remote journal
   if (m_remote_journaler != nullptr) {
     ctx = new FunctionContext([this, ctx](int r) {
         delete m_remote_journaler;
@@ -1481,6 +1491,30 @@ void ImageReplayer<I>::shut_down(int r) {
         });
     }
   }
+
+  // stop the replay of remote journal events
+  if (m_replay_handler != nullptr) {
+    ctx = new FunctionContext([this, ctx](int r) {
+        delete m_replay_handler;
+        m_replay_handler = nullptr;
+
+        m_event_replay_tracker.wait_for_ops(ctx);
+      });
+    ctx = new FunctionContext([this, ctx](int r) {
+        m_remote_journaler->stop_replay(ctx);
+      });
+  }
+
+  // close the local image (release exclusive lock)
+  if (m_local_image_ctx) {
+    ctx = new FunctionContext([this, ctx](int r) {
+      CloseImageRequest<I> *request = CloseImageRequest<I>::create(
+        &m_local_image_ctx, ctx);
+      request->send();
+    });
+  }
+
+  // shut down event replay into the local image
   if (m_local_journal != nullptr) {
     ctx = new FunctionContext([this, ctx](int r) {
         m_local_journal = nullptr;
@@ -1497,32 +1531,29 @@ void ImageReplayer<I>::shut_down(int r) {
         });
     }
     ctx = new FunctionContext([this, ctx](int r) {
-        if (r < 0) {
-          derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
-        }
-
         // blocks if listener notification is in-progress
         m_local_journal->remove_listener(m_journal_listener);
-
-        // wait for all in-flight replayed events to complete
-        m_event_replay_tracker.wait_for_ops(ctx);
-      });
-    if (m_local_replay != nullptr) {
-      ctx = new FunctionContext([this, ctx](int r) {
-          m_local_replay->shut_down(true, ctx);
-        });
-    }
-  }
-  if (m_replay_handler != nullptr) {
-    ctx = new FunctionContext([this, ctx](int r) {
-        delete m_replay_handler;
-        m_replay_handler = nullptr;
         ctx->complete(0);
       });
+  }
+
+  // wait for all local in-flight replay events to complete
+  ctx = new FunctionContext([this, ctx](int r) {
+      if (r < 0) {
+        derr << "error shutting down journal replay: " << cpp_strerror(r)
+             << dendl;
+      }
+
+      m_event_replay_tracker.wait_for_ops(ctx);
+    });
+
+  // flush any local in-flight replay events
+  if (m_local_replay != nullptr) {
     ctx = new FunctionContext([this, ctx](int r) {
-        m_remote_journaler->stop_replay(ctx);
+        m_local_replay->shut_down(true, ctx);
       });
   }
+
   m_threads->work_queue->queue(ctx, 0);
 }