]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: delayed replication support 11879/head
authorMykola Golub <mgolub@mirantis.com>
Mon, 21 Nov 2016 20:43:11 +0000 (22:43 +0200)
committerMykola Golub <mgolub@mirantis.com>
Thu, 12 Jan 2017 17:14:48 +0000 (18:14 +0100)
Fixes: http://tracker.ceph.com/issues/15371
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/common/config_opts.h
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/Journal.cc
src/librbd/journal/Types.cc
src/librbd/journal/Types.h
src/test/librbd/mock/MockImageCtx.h
src/test/rbd_mirror/test_ImageReplayer.cc
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index 85121c1180e4490328e2d1f163d03d240af37ae8..04338d2ce7ff050fd7da3c20ee1aa4e3fa212547 100644 (file)
@@ -1292,6 +1292,7 @@ OPTION(rbd_validate_pool, OPT_BOOL, true) // true if empty pools should be valid
 OPTION(rbd_validate_names, OPT_BOOL, true) // true if image specs should be validated
 OPTION(rbd_auto_exclusive_lock_until_manual_request, OPT_BOOL, true) // whether to automatically acquire/release exclusive lock until it is explicitly requested, i.e. before we know the user of librbd is properly using the lock API
 OPTION(rbd_mirroring_resync_after_disconnect, OPT_BOOL, false) // automatically start image resync after mirroring is disconnected due to being laggy
+OPTION(rbd_mirroring_replay_delay, OPT_INT, 0) // time-delay in seconds for rbd-mirror asynchronous replication
 
 /*
  * The following options change the behavior for librbd's image creation methods that
index 2e867ab65cb4a253c5051869740193cb7b8dfde3..258ccab56f6bfca7edbd0eadd705fb91ea9e452a 100644 (file)
@@ -967,7 +967,8 @@ struct C_InvalidateCache : public Context {
         "rbd_journal_pool", false)(
         "rbd_journal_max_payload_bytes", false)(
         "rbd_journal_max_concurrent_object_sets", false)(
-        "rbd_mirroring_resync_after_disconnect", false);
+        "rbd_mirroring_resync_after_disconnect", false)(
+        "rbd_mirroring_replay_delay", false);
 
     md_config_t local_config_t;
     std::map<std::string, bufferlist> res;
@@ -1025,6 +1026,7 @@ struct C_InvalidateCache : public Context {
     ASSIGN_OPTION(journal_max_payload_bytes);
     ASSIGN_OPTION(journal_max_concurrent_object_sets);
     ASSIGN_OPTION(mirroring_resync_after_disconnect);
+    ASSIGN_OPTION(mirroring_replay_delay);
   }
 
   ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
index de8232ddc25f6f03aeae4fdf480505dcc617ab57..dfa134c18f273b2c551f94df9a19b323da728d77 100644 (file)
@@ -190,6 +190,7 @@ namespace librbd {
     uint32_t journal_max_payload_bytes;
     int journal_max_concurrent_object_sets;
     bool mirroring_resync_after_disconnect;
+    int mirroring_replay_delay;
 
     LibrbdAdminSocketHook *asok_hook;
 
index f799f6b8d14246cf7eab7eee36d882c66a522c4c..096820d21a9cbcb164d6d275a308db2aae2e1fc3 100644 (file)
@@ -723,7 +723,7 @@ int Journal<I>::demote() {
       return r;
     }
 
-    journal::EventEntry event_entry{journal::DemoteEvent{}};
+    journal::EventEntry event_entry{journal::DemoteEvent{}, ceph_clock_now()};
     bufferlist event_entry_bl;
     ::encode(event_entry, event_entry_bl);
 
@@ -845,7 +845,8 @@ uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
     event_bl.substr_of(bl, event_offset, event_length);
     journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
                                                            event_length,
-                                                           event_bl));
+                                                           event_bl),
+                                    ceph_clock_now());
 
     bufferlists.emplace_back();
     ::encode(event_entry, bufferlists.back());
@@ -864,6 +865,7 @@ uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
                                      uint64_t offset, size_t length,
                                      bool flush_entry) {
   bufferlist bl;
+  event_entry.timestamp = ceph_clock_now();
   ::encode(event_entry, bl);
   return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
                           length, flush_entry);
@@ -974,6 +976,7 @@ void Journal<I>::append_op_event(uint64_t op_tid,
   assert(m_image_ctx.owner_lock.is_locked());
 
   bufferlist bl;
+  event_entry.timestamp = ceph_clock_now();
   ::encode(event_entry, bl);
 
   Future future;
@@ -1007,7 +1010,8 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
   ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
                  << "r=" << r << dendl;
 
-  journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)));
+  journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
+                                  ceph_clock_now());
 
   bufferlist bl;
   ::encode(event_entry, bl);
index 8242cffbec87184facfaa4a8538a1ff777885cd8..36010ca1a30b9ca64346bd2eaa07ad2dd83ee161 100644 (file)
@@ -83,6 +83,10 @@ void AioDiscardEvent::dump(Formatter *f) const {
   f->dump_unsigned("length", length);
 }
 
+uint32_t AioWriteEvent::get_fixed_size() {
+  return EventEntry::get_fixed_size() + 16 /* offset, length */;
+}
+
 void AioWriteEvent::encode(bufferlist& bl) const {
   ::encode(offset, bl);
   ::encode(length, bl);
@@ -314,9 +318,10 @@ EventType EventEntry::get_event_type() const {
 }
 
 void EventEntry::encode(bufferlist& bl) const {
-  ENCODE_START(3, 1, bl);
+  ENCODE_START(4, 1, bl);
   boost::apply_visitor(EncodeVisitor(bl), event);
   ENCODE_FINISH(bl);
+  encode_metadata(bl);
 }
 
 void EventEntry::decode(bufferlist::iterator& it) {
@@ -385,62 +390,80 @@ void EventEntry::decode(bufferlist::iterator& it) {
 
   boost::apply_visitor(DecodeVisitor(struct_v, it), event);
   DECODE_FINISH(it);
+  if (struct_v >= 4) {
+    decode_metadata(it);
+  }
 }
 
 void EventEntry::dump(Formatter *f) const {
   boost::apply_visitor(DumpVisitor(f, "event_type"), event);
+  f->dump_stream("timestamp") << timestamp;
+}
+
+void EventEntry::encode_metadata(bufferlist& bl) const {
+  ENCODE_START(1, 1, bl);
+  ::encode(timestamp, bl);
+  ENCODE_FINISH(bl);
+}
+
+void EventEntry::decode_metadata(bufferlist::iterator& it) {
+  DECODE_START(1, it);
+  ::decode(timestamp, it);
+  DECODE_FINISH(it);
 }
 
 void EventEntry::generate_test_instances(std::list<EventEntry *> &o) {
   o.push_back(new EventEntry(AioDiscardEvent()));
-  o.push_back(new EventEntry(AioDiscardEvent(123, 345)));
+  o.push_back(new EventEntry(AioDiscardEvent(123, 345), utime_t(1, 1)));
 
   bufferlist bl;
   bl.append(std::string(32, '1'));
   o.push_back(new EventEntry(AioWriteEvent()));
-  o.push_back(new EventEntry(AioWriteEvent(123, 456, bl)));
+  o.push_back(new EventEntry(AioWriteEvent(123, 456, bl), utime_t(1, 1)));
 
   o.push_back(new EventEntry(AioFlushEvent()));
 
-  o.push_back(new EventEntry(OpFinishEvent(123, -1)));
+  o.push_back(new EventEntry(OpFinishEvent(123, -1), utime_t(1, 1)));
 
-  o.push_back(new EventEntry(SnapCreateEvent()));
+  o.push_back(new EventEntry(SnapCreateEvent(), utime_t(1, 1)));
   o.push_back(new EventEntry(SnapCreateEvent(234, "snap",
-                                              cls::rbd::UserSnapshotNamespace())));
+                                             cls::rbd::UserSnapshotNamespace()),
+                             utime_t(1, 1)));
 
   o.push_back(new EventEntry(SnapRemoveEvent()));
-  o.push_back(new EventEntry(SnapRemoveEvent(345, "snap")));
+  o.push_back(new EventEntry(SnapRemoveEvent(345, "snap"), utime_t(1, 1)));
 
   o.push_back(new EventEntry(SnapRenameEvent()));
-  o.push_back(new EventEntry(SnapRenameEvent(456, 1, "src snap", "dest snap")));
+  o.push_back(new EventEntry(SnapRenameEvent(456, 1, "src snap", "dest snap"),
+                             utime_t(1, 1)));
 
   o.push_back(new EventEntry(SnapProtectEvent()));
-  o.push_back(new EventEntry(SnapProtectEvent(567, "snap")));
+  o.push_back(new EventEntry(SnapProtectEvent(567, "snap"), utime_t(1, 1)));
 
   o.push_back(new EventEntry(SnapUnprotectEvent()));
-  o.push_back(new EventEntry(SnapUnprotectEvent(678, "snap")));
+  o.push_back(new EventEntry(SnapUnprotectEvent(678, "snap"), utime_t(1, 1)));
 
   o.push_back(new EventEntry(SnapRollbackEvent()));
-  o.push_back(new EventEntry(SnapRollbackEvent(789, "snap")));
+  o.push_back(new EventEntry(SnapRollbackEvent(789, "snap"), utime_t(1, 1)));
 
   o.push_back(new EventEntry(RenameEvent()));
-  o.push_back(new EventEntry(RenameEvent(890, "image name")));
+  o.push_back(new EventEntry(RenameEvent(890, "image name"), utime_t(1, 1)));
 
   o.push_back(new EventEntry(ResizeEvent()));
-  o.push_back(new EventEntry(ResizeEvent(901, 1234)));
+  o.push_back(new EventEntry(ResizeEvent(901, 1234), utime_t(1, 1)));
 
-  o.push_back(new EventEntry(FlattenEvent(123)));
+  o.push_back(new EventEntry(FlattenEvent(123), utime_t(1, 1)));
 
   o.push_back(new EventEntry(DemoteEvent()));
 
   o.push_back(new EventEntry(UpdateFeaturesEvent()));
-  o.push_back(new EventEntry(UpdateFeaturesEvent(123, 127, true)));
+  o.push_back(new EventEntry(UpdateFeaturesEvent(123, 127, true), utime_t(1, 1)));
 
   o.push_back(new EventEntry(MetadataSetEvent()));
-  o.push_back(new EventEntry(MetadataSetEvent(123, "key", "value")));
+  o.push_back(new EventEntry(MetadataSetEvent(123, "key", "value"), utime_t(1, 1)));
 
   o.push_back(new EventEntry(MetadataRemoveEvent()));
-  o.push_back(new EventEntry(MetadataRemoveEvent(123, "key")));
+  o.push_back(new EventEntry(MetadataRemoveEvent(123, "key"), utime_t(1, 1)));
 }
 
 // Journal Client
index 505822d9ffc2a9a1cee4b4edffdcec5dccbdaa92..53c1310e89afefff80a6d06f0e9b7ef9bcb86b9e 100644 (file)
@@ -9,6 +9,7 @@
 #include "include/buffer.h"
 #include "include/encoding.h"
 #include "include/types.h"
+#include "include/utime.h"
 #include <iosfwd>
 #include <list>
 #include <boost/none.hpp>
@@ -67,9 +68,7 @@ struct AioWriteEvent {
   uint64_t length;
   bufferlist data;
 
-  static uint32_t get_fixed_size() {
-    return 30; /// version encoding, type, offset, length
-  }
+  static uint32_t get_fixed_size();
 
   AioWriteEvent() : offset(0), length(0) {
   }
@@ -372,12 +371,18 @@ typedef boost::variant<AioDiscardEvent,
                        UnknownEvent> Event;
 
 struct EventEntry {
+  static uint32_t get_fixed_size() {
+    return EVENT_FIXED_SIZE + METADATA_FIXED_SIZE;
+  }
+
   EventEntry() : event(UnknownEvent()) {
   }
-  EventEntry(const Event &_event) : event(_event) {
+  EventEntry(const Event &_event, const utime_t &_timestamp = utime_t())
+    : event(_event), timestamp(_timestamp) {
   }
 
   Event event;
+  utime_t timestamp;
 
   EventType get_event_type() const;
 
@@ -386,6 +391,13 @@ struct EventEntry {
   void dump(Formatter *f) const;
 
   static void generate_test_instances(std::list<EventEntry *> &o);
+
+private:
+  static const uint32_t EVENT_FIXED_SIZE = 14; /// version encoding, type
+  static const uint32_t METADATA_FIXED_SIZE = 14; /// version encoding, timestamp
+
+  void encode_metadata(bufferlist& bl) const;
+  void decode_metadata(bufferlist::iterator& it);
 };
 
 // Journal Client data structures
index 943f0e874882a36d66e93911be099fe0671c955b..4ea509dc6bdb5136f436fce9abe778aac4b72b07 100644 (file)
@@ -96,7 +96,8 @@ struct MockImageCtx {
       journal_max_concurrent_object_sets(
           image_ctx.journal_max_concurrent_object_sets),
       mirroring_resync_after_disconnect(
-          image_ctx.mirroring_resync_after_disconnect)
+          image_ctx.mirroring_resync_after_disconnect),
+      mirroring_replay_delay(image_ctx.mirroring_replay_delay)
   {
     md_ctx.dup(image_ctx.md_ctx);
     data_ctx.dup(image_ctx.data_ctx);
@@ -273,6 +274,7 @@ struct MockImageCtx {
   uint32_t journal_max_payload_bytes;
   int journal_max_concurrent_object_sets;
   bool mirroring_resync_after_disconnect;
+  int mirroring_replay_delay;
 };
 
 } // namespace librbd
index 5e38271c65d61a7c30ceb93a0bfb451d5c46210d..363ff6d23c8a77e396af0a48c995bc16fe2854ae 100644 (file)
@@ -1107,3 +1107,55 @@ TEST_F(TestImageReplayer, MetadataSetRemove)
 
   stop();
 }
+
+TEST_F(TestImageReplayer, MirroringDelay)
+{
+  const double DELAY = 10; // set less than wait_for_replay_complete timeout
+
+  librbd::ImageCtx *ictx;
+  utime_t start_time;
+  double delay;
+
+  bootstrap();
+
+  ASSERT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_replay_delay",
+                                         stringify(DELAY).c_str()));
+  open_local_image(&ictx);
+  ASSERT_EQ(DELAY, ictx->mirroring_replay_delay);
+  close_image(ictx);
+
+  start();
+
+  // Test delay
+
+  generate_test_data();
+  open_remote_image(&ictx);
+  start_time = ceph_clock_now();
+  for (int i = 0; i < TEST_IO_COUNT; ++i) {
+    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+  }
+  flush(ictx);
+  close_image(ictx);
+
+  wait_for_replay_complete();
+  delay = ceph_clock_now() - start_time;
+  ASSERT_GE(delay, DELAY);
+
+  // Test stop when delaying replay
+
+  open_remote_image(&ictx);
+  start_time = ceph_clock_now();
+  for (int i = 0; i < TEST_IO_COUNT; ++i) {
+    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+  }
+
+  sleep(DELAY / 2);
+  stop();
+  start();
+
+  wait_for_replay_complete();
+  delay = ceph_clock_now() - start_time;
+  ASSERT_GE(delay, DELAY);
+
+  stop();
+}
index cc1013a023d2b466e248a293ad7428675fccc3e5..3503b76be0ef447603a1b4a11d06ed90120a2ba9 100644 (file)
@@ -788,5 +788,115 @@ TEST_F(TestMockImageReplayer, DecodeError) {
   ASSERT_EQ(0, close_ctx.wait());
 }
 
+TEST_F(TestMockImageReplayer, DelayedReplay) {
+
+  // START
+
+  create_local_image();
+  librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
+
+  librbd::MockTestJournal mock_local_journal;
+  mock_local_image_ctx.journal = &mock_local_journal;
+
+  journal::MockJournaler mock_remote_journaler;
+  MockBootstrapRequest mock_bootstrap_request;
+  MockReplay mock_local_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  ::journal::MockReplayEntry mock_replay_entry;
+
+  expect_get_or_send_update(mock_replay_status_formatter);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_committed(mock_remote_journaler, 1);
+
+  InSequence seq;
+  EXPECT_CALL(mock_remote_journaler, construct());
+  expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
+
+  EXPECT_CALL(mock_local_journal, add_listener(_));
+
+  expect_init(mock_remote_journaler, 0);
+
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, 0);
+
+  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
+
+  EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+
+  C_SaferCond start_ctx;
+  m_image_replayer->start(&start_ctx);
+  ASSERT_EQ(0, start_ctx.wait());
+
+  // REPLAY
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+  // replay_flush
+  expect_shut_down(mock_local_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+
+  // process with delay
+  EXPECT_CALL(mock_replay_entry, get_data());
+  librbd::journal::EventEntry event_entry(
+    librbd::journal::AioDiscardEvent(123, 345), ceph_clock_now());
+  EXPECT_CALL(mock_local_replay, decode(_, _))
+    .WillOnce(DoAll(SetArgPointee<1>(event_entry),
+                    Return(0)));
+  expect_preprocess(mock_event_preprocessor, false, 0);
+  expect_process(mock_local_replay, 0, 0);
+
+  // attempt to process the next event
+  C_SaferCond replay_ctx;
+  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+  // fire
+  mock_local_image_ctx.mirroring_replay_delay = 2;
+  m_image_replayer->handle_replay_ready();
+  ASSERT_EQ(0, replay_ctx.wait());
+
+  // add a pending (delayed) entry before stop
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  EXPECT_CALL(mock_replay_entry, get_data());
+  C_SaferCond decode_ctx;
+  EXPECT_CALL(mock_local_replay, decode(_, _))
+    .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::iterator* it,
+                                         librbd::journal::EventEntry *e) {
+                             decode_ctx.complete(0);
+                           }),
+                    Return(0)));
+
+  mock_local_image_ctx.mirroring_replay_delay = 10;
+  m_image_replayer->handle_replay_ready();
+  ASSERT_EQ(0, decode_ctx.wait());
+
+  // STOP
+
+  MockCloseImageRequest mock_close_local_image_request;
+
+  expect_stop_replay(mock_remote_journaler, 0);
+  expect_shut_down(mock_local_replay, true, 0);
+
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  expect_send(mock_close_local_image_request, 0);
+
+  C_SaferCond stop_ctx;
+  m_image_replayer->stop(&stop_ctx);
+  ASSERT_EQ(0, stop_ctx.wait());
+}
+
 } // namespace mirror
 } // namespace rbd
index 71e5a479b0f7d3f602d8cbff36d5e503ae05b39e..132db547da95dbbe2d966c600322e58e0ea6e524 100644 (file)
@@ -226,6 +226,21 @@ private:
   Commands commands;
 };
 
+uint32_t calculate_replay_delay(const utime_t &event_time,
+                                int mirroring_replay_delay) {
+    if (mirroring_replay_delay <= 0) {
+      return 0;
+    }
+
+    utime_t now = ceph_clock_now();
+    if (event_time + mirroring_replay_delay <= now) {
+      return 0;
+    }
+
+    // ensure it is rounded up when converting to integer
+    return (event_time + mirroring_replay_delay - now) + 1;
+}
+
 } // anonymous namespace
 
 template <typename I>
@@ -645,6 +660,7 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
   image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
   bool shut_down_replay = false;
   bool running = true;
+  bool canceled_task = false;
   {
     Mutex::Locker locker(m_lock);
 
@@ -667,6 +683,14 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
         std::swap(m_on_stop_finish, on_finish);
         m_stop_requested = true;
         m_manual_stop = manual;
+
+       Mutex::Locker timer_locker(m_threads->timer_lock);
+        if (m_delayed_preprocess_task != nullptr) {
+          canceled_task = m_threads->timer->cancel_event(
+            m_delayed_preprocess_task);
+          assert(canceled_task);
+          m_delayed_preprocess_task = nullptr;
+        }
       }
     }
   }
@@ -677,6 +701,11 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
     bootstrap_request->put();
   }
 
+  if (canceled_task) {
+    m_event_replay_tracker.finish_op();
+    on_replay_interrupted();
+  }
+
   if (!running) {
     dout(20) << "not running" << dendl;
     if (on_finish) {
@@ -1031,18 +1060,45 @@ void ImageReplayer<I>::preprocess_entry() {
     return;
   }
 
+  uint32_t delay = calculate_replay_delay(
+    m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
+  if (delay == 0) {
+    handle_preprocess_entry_ready(0);
+    return;
+  }
+
+  dout(20) << "delaying replay by " << delay << " sec" << dendl;
+
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  assert(m_delayed_preprocess_task == nullptr);
+  m_delayed_preprocess_task = new FunctionContext(
+    [this](int r) {
+      assert(m_threads->timer_lock.is_locked());
+      m_delayed_preprocess_task = nullptr;
+      m_threads->work_queue->queue(
+        create_context_callback<ImageReplayer,
+        &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
+    });
+  m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
+  dout(20) << "r=" << r << dendl;
+  assert(r == 0);
+
   if (!m_event_preprocessor->is_required(m_event_entry)) {
     process_entry();
     return;
   }
 
   Context *ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry>(this);
+    ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
   m_event_preprocessor->preprocess(&m_event_entry, ctx);
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry(int r) {
+void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
   dout(20) << "r=" << r << dendl;
 
   if (r < 0) {
@@ -1459,6 +1515,7 @@ void ImageReplayer<I>::handle_shut_down(int r) {
     std::swap(on_start, m_on_start_finish);
     std::swap(on_stop, m_on_stop_finish);
     m_stop_requested = false;
+    assert(m_delayed_preprocess_task == nullptr);
     assert(m_state == STATE_STOPPING);
     m_state = STATE_STOPPED;
   }
index ee080f1c900e86cf5159e49b0e18cb0fabf415fd..0964a86e465bd19b7ab5b2f94c0352c78561cf27 100644 (file)
@@ -292,6 +292,7 @@ private:
   librbd::journal::TagData m_replay_tag_data;
   librbd::journal::EventEntry m_event_entry;
   AsyncOpTracker m_event_replay_tracker;
+  Context *m_delayed_preprocess_task = nullptr;
 
   struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
     ImageReplayer *replayer;
@@ -361,7 +362,8 @@ private:
   void handle_allocate_local_tag(int r);
 
   void preprocess_entry();
-  void handle_preprocess_entry(int r);
+  void handle_preprocess_entry_ready(int r);
+  void handle_preprocess_entry_safe(int r);
 
   void process_entry();
   void handle_process_entry_ready(int r);