]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: added object dispatch handler for journal IO
authorJason Dillaman <dillaman@redhat.com>
Wed, 21 Feb 2018 01:11:49 +0000 (20:11 -0500)
committerJason Dillaman <dillaman@redhat.com>
Wed, 7 Mar 2018 17:45:00 +0000 (12:45 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/CMakeLists.txt
src/librbd/Journal.cc
src/librbd/journal/ObjectDispatch.cc [new file with mode: 0644]
src/librbd/journal/ObjectDispatch.h [new file with mode: 0644]
src/test/librbd/test_mock_Journal.cc

index 8618a19726d346370e0bb67714e1ab1a52c5bb37..6a770ba12b406157aabcb720f79f158c015b6f23 100644 (file)
@@ -68,6 +68,7 @@ set(librbd_internal_srcs
   io/Utils.cc
   journal/CreateRequest.cc
   journal/DemoteRequest.cc
+  journal/ObjectDispatch.cc
   journal/OpenRequest.cc
   journal/PromoteRequest.cc
   journal/RemoveRequest.cc
index 848b81dbf142ffa5bac0d88f6de88babc7e451a8..575ceb255118544d93daa1f1d36166978b993b6b 100644 (file)
@@ -19,6 +19,7 @@
 #include "librbd/io/ObjectDispatcher.h"
 #include "librbd/journal/CreateRequest.h"
 #include "librbd/journal/DemoteRequest.h"
+#include "librbd/journal/ObjectDispatch.h"
 #include "librbd/journal/OpenRequest.h"
 #include "librbd/journal/RemoveRequest.h"
 #include "librbd/journal/ResetRequest.h"
@@ -565,6 +566,10 @@ void Journal<I>::open(Context *on_finish) {
 
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
 
+  // inject our handler into the object dispatcher chain
+  m_image_ctx.io_object_dispatcher->register_object_dispatch(
+    journal::ObjectDispatch<I>::create(&m_image_ctx, this));
+
   Mutex::Locker locker(m_lock);
   assert(m_state == STATE_UNINITIALIZED);
   wait_for_steady_state(on_finish);
@@ -576,6 +581,14 @@ void Journal<I>::close(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      // remove our handler from object dispatcher chain - preserve error
+      auto ctx = new FunctionContext([on_finish, r](int _) {
+          on_finish->complete(r);
+        });
+      m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
+        io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
+    });
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
 
   Mutex::Locker locker(m_lock);
diff --git a/src/librbd/journal/ObjectDispatch.cc b/src/librbd/journal/ObjectDispatch.cc
new file mode 100644 (file)
index 0000000..737f5ef
--- /dev/null
@@ -0,0 +1,213 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/ObjectDispatch.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "osdc/Striper.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcher.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::ObjectDispatch: " << this \
+                           << " " << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+namespace {
+
+template <typename I>
+struct C_CommitIOEvent : public Context {
+  I* image_ctx;
+  Journal<I>* journal;
+  uint64_t object_no;
+  uint64_t object_off;
+  uint64_t object_len;
+  uint64_t journal_tid;
+  int object_dispatch_flags;
+  Context* on_finish;
+
+  C_CommitIOEvent(I* image_ctx, Journal<I>* journal, uint64_t object_no,
+                  uint64_t object_off, uint64_t object_len,
+                  uint64_t journal_tid, int object_dispatch_flags,
+                  Context* on_finish)
+    : image_ctx(image_ctx), journal(journal), object_no(object_no),
+      object_off(object_off), object_len(object_len), journal_tid(journal_tid),
+      object_dispatch_flags(object_dispatch_flags), on_finish(on_finish) {
+  }
+
+  void finish(int r) override {
+    // don't commit the IO extent if a previous dispatch handler will just
+    // retry the failed IO
+    if (r >= 0 ||
+        (object_dispatch_flags &
+           io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR) == 0) {
+      io::Extents file_extents;
+      Striper::extent_to_file(image_ctx->cct, &image_ctx->layout, object_no,
+                              object_off, object_len, file_extents);
+      for (auto& extent : file_extents) {
+        journal->commit_io_event_extent(journal_tid, extent.first,
+                                        extent.second, r);
+      }
+    }
+
+    if (on_finish != nullptr) {
+      on_finish->complete(r);
+    }
+  }
+};
+
+} // anonymous namespace
+
+template <typename I>
+ObjectDispatch<I>::ObjectDispatch(I* image_ctx, Journal<I>* journal)
+  : m_image_ctx(image_ctx), m_journal(journal) {
+}
+
+template <typename I>
+void ObjectDispatch<I>::shut_down(Context* on_finish) {
+  m_image_ctx->op_work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+bool ObjectDispatch<I>::discard(
+    const std::string &oid, uint64_t object_no, uint64_t object_off,
+    uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+    const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+    uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+    Context** on_finish, Context* on_dispatched) {
+  if (*journal_tid == 0) {
+    // non-journaled IO
+    return false;
+  }
+
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+  *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+                                      object_off, object_len, *journal_tid,
+                                      *object_dispatch_flags, *on_finish);
+
+  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+  wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+  return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write(
+    const std::string &oid, uint64_t object_no, uint64_t object_off,
+    ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+    const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+    uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+    Context** on_finish, Context* on_dispatched) {
+  if (*journal_tid == 0) {
+    // non-journaled IO
+    return false;
+  }
+
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << oid << " " << object_off << "~" << data.length() << dendl;
+
+  *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+                                      object_off, data.length(), *journal_tid,
+                                      *object_dispatch_flags, *on_finish);
+
+  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+  wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+  return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write_same(
+    const std::string &oid, uint64_t object_no, uint64_t object_off,
+    uint64_t object_len, io::Extents&& buffer_extents, ceph::bufferlist&& data,
+    const ::SnapContext &snapc, int op_flags,
+    const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+    uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+    Context** on_finish, Context* on_dispatched) {
+  if (*journal_tid == 0) {
+    // non-journaled IO
+    return false;
+  }
+
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+  *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+                                      object_off, object_len, *journal_tid,
+                                      *object_dispatch_flags, *on_finish);
+
+  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+  wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+  return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::compare_and_write(
+    const std::string &oid, uint64_t object_no, uint64_t object_off,
+    ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+    const ::SnapContext &snapc, int op_flags,
+    const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+    int* object_dispatch_flags, uint64_t* journal_tid,
+    io::DispatchResult* dispatch_result, Context** on_finish,
+    Context* on_dispatched) {
+  if (*journal_tid == 0) {
+    // non-journaled IO
+    return false;
+  }
+
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << oid << " " << object_off << "~" << write_data.length()
+                 << dendl;
+
+  *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+                                      object_off, write_data.length(),
+                                      *journal_tid, *object_dispatch_flags,
+                                      *on_finish);
+
+  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+  wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+  return true;
+}
+
+template <typename I>
+void ObjectDispatch<I>::extent_overwritten(
+    uint64_t object_no, uint64_t object_off, uint64_t object_len,
+    uint64_t journal_tid, uint64_t new_journal_tid) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << object_no << " " << object_off << "~" << object_len
+                 << dendl;
+
+  auto ctx = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+                                    object_off, object_len, journal_tid, false,
+                                    nullptr);
+  if (new_journal_tid != 0) {
+    // ensure new journal event is safely committed to disk before
+    // committing old event
+    m_journal->flush_event(new_journal_tid, ctx);
+  } else {
+    ctx->complete(0);
+  }
+}
+
+template <typename I>
+void ObjectDispatch<I>::wait_or_flush_event(
+    uint64_t journal_tid, int object_dispatch_flags, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "journal_tid=" << journal_tid << dendl;
+
+  if ((object_dispatch_flags & io::OBJECT_DISPATCH_FLAG_FLUSH) != 0) {
+    m_journal->flush_event(journal_tid, on_dispatched);
+  } else {
+    m_journal->wait_event(journal_tid, on_dispatched);
+  }
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::ObjectDispatch<librbd::ImageCtx>;
diff --git a/src/librbd/journal/ObjectDispatch.h b/src/librbd/journal/ObjectDispatch.h
new file mode 100644 (file)
index 0000000..b5ae747
--- /dev/null
@@ -0,0 +1,113 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H
+#define CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/Types.h"
+#include "librbd/io/ObjectDispatchInterface.h"
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+template <typename> class Journal;
+
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ObjectDispatch : public io::ObjectDispatchInterface {
+public:
+  static ObjectDispatch* create(ImageCtxT* image_ctx,
+                                Journal<ImageCtxT>* journal) {
+    return new ObjectDispatch(image_ctx, journal);
+  }
+
+  ObjectDispatch(ImageCtxT* image_ctx, Journal<ImageCtxT>* journal);
+
+  io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+    return io::OBJECT_DISPATCH_LAYER_JOURNAL;
+  }
+
+  void shut_down(Context* on_finish) override;
+
+  bool read(
+      const std::string &oid, uint64_t object_no, uint64_t object_off,
+      uint64_t object_len, librados::snap_t snap_id, int op_flags,
+      const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+      io::ExtentMap* extent_map, int* object_dispatch_flags,
+      io::DispatchResult* dispatch_result, Context** on_finish,
+      Context* on_dispatched) {
+    return false;
+  }
+
+  bool discard(
+      const std::string &oid, uint64_t object_no, uint64_t object_off,
+      uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+      Context** on_finish, Context* on_dispatched) override;
+
+  bool write(
+      const std::string &oid, uint64_t object_no, uint64_t object_off,
+      ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+      Context** on_finish, Context* on_dispatched) override;
+
+  bool write_same(
+      const std::string &oid, uint64_t object_no, uint64_t object_off,
+      uint64_t object_len, io::Extents&& buffer_extents,
+      ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+      Context** on_finish, Context* on_dispatched) override;
+
+  bool compare_and_write(
+      const std::string &oid, uint64_t object_no, uint64_t object_off,
+      ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+      const ::SnapContext &snapc, int op_flags,
+      const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+      int* object_dispatch_flags, uint64_t* journal_tid,
+      io::DispatchResult* dispatch_result, Context** on_finish,
+      Context* on_dispatched) override;
+
+  bool flush(
+      io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
+      io::DispatchResult* dispatch_result, Context** on_finish,
+      Context* on_dispatched) override {
+    return false;
+  }
+
+  bool invalidate_cache(Context* on_finish) override {
+    return false;
+  }
+  bool reset_existence_cache(Context* on_finish) override {
+    return false;
+  }
+
+  void extent_overwritten(
+      uint64_t object_no, uint64_t object_off, uint64_t object_len,
+      uint64_t journal_tid, uint64_t new_journal_tid) override;
+
+private:
+  ImageCtxT* m_image_ctx;
+  Journal<ImageCtxT>* m_journal;
+
+  void wait_or_flush_event(uint64_t journal_tid, int object_dispatch_flags,
+                           Context* on_dispatched);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::ObjectDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H
index bcc31a2706b511fb4681d8f8ed385377ea78c6bc..adfe0734b2ec37e3fbbb525de08ff0c4927a3415 100644 (file)
@@ -6,6 +6,7 @@
 #include "test/librbd/test_support.h"
 #include "test/librbd/mock/MockImageCtx.h"
 #include "test/librbd/mock/MockJournalPolicy.h"
+#include "test/librbd/mock/io/MockObjectDispatch.h"
 #include "common/Cond.h"
 #include "common/Mutex.h"
 #include "cls/journal/cls_journal_types.h"
@@ -17,6 +18,7 @@
 #include "librbd/journal/Replay.h"
 #include "librbd/journal/RemoveRequest.h"
 #include "librbd/journal/CreateRequest.h"
+#include "librbd/journal/ObjectDispatch.h"
 #include "librbd/journal/OpenRequest.h"
 #include "librbd/journal/Types.h"
 #include "librbd/journal/TypeTraits.h"
@@ -198,6 +200,23 @@ public:
 
 PromoteRequest<MockJournalImageCtx> PromoteRequest<MockJournalImageCtx>::s_instance;
 
+template <>
+struct ObjectDispatch<MockJournalImageCtx> : public io::MockObjectDispatch {
+  static ObjectDispatch* s_instance;
+
+  static ObjectDispatch* create(MockJournalImageCtx* image_ctx,
+                                Journal<MockJournalImageCtx>* journal) {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  ObjectDispatch() {
+    s_instance = this;
+  }
+};
+
+ObjectDispatch<MockJournalImageCtx>* ObjectDispatch<MockJournalImageCtx>::s_instance = nullptr;
+
 } // namespace journal
 } // namespace librbd
 
@@ -227,7 +246,7 @@ public:
   typedef journal::MockReplay MockJournalReplay;
   typedef Journal<MockJournalImageCtx> MockJournal;
   typedef journal::OpenRequest<MockJournalImageCtx> MockJournalOpenRequest;
-
+  typedef journal::ObjectDispatch<MockJournalImageCtx> MockObjectDispatch;
   typedef std::function<void(::journal::ReplayHandler*)> ReplayAction;
   typedef std::list<Context *> Contexts;
 
@@ -283,6 +302,18 @@ public:
                   .WillOnce(CompleteContext(0, static_cast<ContextWQ*>(NULL)));
   }
 
+  void expect_register_object_dispatch(MockImageCtx& mock_image_ctx,
+                                       MockObjectDispatch& mock_object_dispatch) {
+    EXPECT_CALL(*mock_image_ctx.io_object_dispatcher,
+                register_object_dispatch(&mock_object_dispatch));
+  }
+
+  void expect_shut_down_object_dispatch(MockImageCtx& mock_image_ctx) {
+    EXPECT_CALL(*mock_image_ctx.io_object_dispatcher,
+                shut_down_object_dispatch(io::OBJECT_DISPATCH_LAYER_JOURNAL, _))
+      .WillOnce(WithArg<1>(CompleteContext(0, mock_image_ctx.image_ctx->op_work_queue)));
+  }
+
   void expect_get_max_append_size(::journal::MockJournaler &mock_journaler,
                                   uint32_t max_size) {
     EXPECT_CALL(mock_journaler, get_max_append_size())
@@ -470,12 +501,14 @@ public:
 
   void open_journal(MockJournalImageCtx &mock_image_ctx,
                     MockJournal &mock_journal,
+                    MockObjectDispatch& mock_object_dispatch,
                     ::journal::MockJournaler &mock_journaler,
                     MockJournalOpenRequest &mock_open_request,
                     bool primary = true) {
     expect_op_work_queue(mock_image_ctx);
 
     InSequence seq;
+    expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
     expect_construct_journaler(mock_journaler);
     expect_open_journaler(mock_image_ctx, mock_journaler, mock_open_request,
                           primary, 0);
@@ -493,9 +526,11 @@ public:
     ASSERT_EQ(0, when_open(mock_journal));
   }
 
-  void close_journal(MockJournal &mock_journal,
+  void close_journal(MockJournalImageCtx& mock_image_ctx,
+                     MockJournal &mock_journal,
                      ::journal::MockJournaler &mock_journaler) {
     expect_stop_append(mock_journaler, 0);
+    expect_shut_down_object_dispatch(mock_image_ctx);
     ASSERT_EQ(0, when_close(mock_journal));
   }
 
@@ -523,6 +558,9 @@ TEST_F(TestMockJournal, StateTransitions) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -557,6 +595,7 @@ TEST_F(TestMockJournal, StateTransitions) {
 
   expect_stop_append(mock_journaler, 0);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -572,6 +611,9 @@ TEST_F(TestMockJournal, InitError) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -593,6 +635,9 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -626,6 +671,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
 
   expect_stop_append(mock_journaler, 0);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -641,6 +687,9 @@ TEST_F(TestMockJournal, FlushReplayError) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -679,6 +728,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
 
   expect_stop_append(mock_journaler, 0);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -694,6 +744,9 @@ TEST_F(TestMockJournal, CorruptEntry) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -729,6 +782,7 @@ TEST_F(TestMockJournal, CorruptEntry) {
 
   expect_stop_append(mock_journaler, -EINVAL);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(-EINVAL, when_close(mock_journal));
 }
 
@@ -744,6 +798,9 @@ TEST_F(TestMockJournal, StopError) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -763,6 +820,7 @@ TEST_F(TestMockJournal, StopError) {
 
   expect_stop_append(mock_journaler, -EINVAL);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(-EINVAL, when_close(mock_journal));
 }
 
@@ -777,6 +835,9 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   expect_op_work_queue(mock_image_ctx);
 
   InSequence seq;
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -845,6 +906,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
 
   expect_stop_append(mock_journaler, 0);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -860,6 +922,9 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
 
   InSequence seq;
 
+  MockObjectDispatch mock_object_dispatch;
+  expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
   expect_construct_journaler(mock_journaler);
@@ -926,6 +991,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
 
   expect_stop_append(mock_journaler, 0);
   expect_shut_down_journaler(mock_journaler);
+  expect_shut_down_object_dispatch(mock_image_ctx);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -937,11 +1003,13 @@ TEST_F(TestMockJournal, EventAndIOCommitOrder) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   ::journal::MockFuture mock_future;
@@ -984,11 +1052,13 @@ TEST_F(TestMockJournal, AppendWriteEvent) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   InSequence seq;
@@ -1024,11 +1094,13 @@ TEST_F(TestMockJournal, EventCommitError) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   C_SaferCond object_request_ctx;
@@ -1066,11 +1138,13 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   C_SaferCond object_request_ctx;
@@ -1109,11 +1183,13 @@ TEST_F(TestMockJournal, IOCommitError) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   ::journal::MockFuture mock_future;
@@ -1139,11 +1215,13 @@ TEST_F(TestMockJournal, IOCommitErrorFiltered) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   ::journal::MockFuture mock_future;
@@ -1171,11 +1249,13 @@ TEST_F(TestMockJournal, FlushCommitPosition) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   expect_flush_commit_position(mock_journaler);
@@ -1194,11 +1274,13 @@ TEST_F(TestMockJournal, ExternalReplay) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   InSequence seq;
@@ -1223,11 +1305,13 @@ TEST_F(TestMockJournal, ExternalReplayFailure) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   InSequence seq;
@@ -1250,13 +1334,15 @@ TEST_F(TestMockJournal, AppendDisabled) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   MockJournalPolicy mock_journal_policy;
 
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
   BOOST_SCOPE_EXIT_ALL(&) {
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   InSequence seq;
@@ -1281,9 +1367,11 @@ TEST_F(TestMockJournal, CloseListenerEvent) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request);
 
   struct Listener : public journal::Listener {
     C_SaferCond ctx;
@@ -1300,7 +1388,7 @@ TEST_F(TestMockJournal, CloseListenerEvent) {
   mock_journal.add_listener(&listener);
 
   expect_shut_down_journaler(mock_journaler);
-  close_journal(mock_journal, mock_journaler);
+  close_journal(mock_image_ctx, mock_journal, mock_journaler);
 
   ASSERT_EQ(0, listener.ctx.wait());
   mock_journal.remove_listener(&listener);
@@ -1314,9 +1402,11 @@ TEST_F(TestMockJournal, ResyncRequested) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request,
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request,
                false);
 
   struct Listener : public journal::Listener {
@@ -1335,7 +1425,7 @@ TEST_F(TestMockJournal, ResyncRequested) {
 
   BOOST_SCOPE_EXIT_ALL(&) {
     mock_journal.remove_listener(&listener);
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   InSequence seq;
@@ -1366,10 +1456,11 @@ TEST_F(TestMockJournal, ForcePromoted) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockObjectDispatch mock_object_dispatch;
   ::journal::MockJournaler mock_journaler;
   MockJournalOpenRequest mock_open_request;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request,
-               false);
+  open_journal(mock_image_ctx, mock_journal, mock_object_dispatch,
+               mock_journaler, mock_open_request, false);
 
   struct Listener : public journal::Listener {
     C_SaferCond ctx;
@@ -1387,7 +1478,7 @@ TEST_F(TestMockJournal, ForcePromoted) {
 
   BOOST_SCOPE_EXIT_ALL(&) {
     mock_journal.remove_listener(&listener);
-    close_journal(mock_journal, mock_journaler);
+    close_journal(mock_image_ctx, mock_journal, mock_journaler);
   };
 
   InSequence seq;