]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: integrate with async journaler shutdown API
authorJason Dillaman <dillaman@redhat.com>
Wed, 25 May 2016 04:21:14 +0000 (00:21 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 31 May 2016 15:49:38 +0000 (11:49 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit ad297850b1be8ed38f77b86913c6821748f3368b)

src/librbd/Journal.cc
src/librbd/Journal.h
src/test/journal/mock/MockJournaler.h
src/test/librbd/journal/test_Entries.cc
src/test/librbd/test_mock_Journal.cc
src/tools/rbd/action/Journal.cc

index 618ad0e36670193eaf019d6eb4d927a698622ddd..ffdfe1d40da7509676ae750605ecdef1fd3aece6 100644 (file)
@@ -161,14 +161,13 @@ public:
 };
 
 template <typename J>
-int open_journaler(CephContext *cct, J *journaler, bool *initialized,
+int open_journaler(CephContext *cct, J *journaler,
                    cls::journal::Client *client,
                    journal::ImageClientMeta *client_meta,
                    journal::TagData *tag_data) {
   C_SaferCond init_ctx;
   journaler->init(&init_ctx);
   int r = init_ctx.wait();
-  *initialized = (r >= 0);
   if (r < 0) {
     return r;
   }
@@ -414,6 +413,9 @@ int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
 
   C_SaferCond cond;
   journaler.init(&cond);
+  BOOST_SCOPE_EXIT_ALL(&journaler) {
+    journaler.shut_down();
+  };
 
   r = cond.wait();
   if (r == -ENOENT) {
@@ -441,6 +443,9 @@ int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
 
   C_SaferCond cond;
   journaler.init(&cond);
+  BOOST_SCOPE_EXIT_ALL(&journaler) {
+    journaler.shut_down();
+  };
 
   int r = cond.wait();
   if (r == -ENOENT) {
@@ -510,19 +515,15 @@ int Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
   Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
                       cct->_conf->rbd_journal_commit_age);
 
-  bool initialized;
   cls::journal::Client client;
   journal::ImageClientMeta client_meta;
   journal::TagData tag_data;
-  int r = open_journaler(cct, &journaler, &initialized, &client,
-                         &client_meta, &tag_data);
+  int r = open_journaler(cct, &journaler, &client, &client_meta, &tag_data);
   if (r >= 0) {
     *mirror_uuid = tag_data.mirror_uuid;
   }
 
-  if (initialized) {
-    journaler.shut_down();
-  }
+  journaler.shut_down();
   return r;
 }
 
@@ -534,16 +535,13 @@ int Journal<I>::request_resync(I *image_ctx) {
   Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID,
                       image_ctx->cct->_conf->rbd_journal_commit_age);
 
-  bool initialized;
   cls::journal::Client client;
   journal::ImageClientMeta client_meta;
   journal::TagData tag_data;
-  int r = open_journaler(image_ctx->cct, &journaler, &initialized, &client,
-                         &client_meta, &tag_data);
-  BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) {
-    if (initialized) {
-      journaler.shut_down();
-    }
+  int r = open_journaler(image_ctx->cct, &journaler, &client, &client_meta,
+                         &tag_data);
+  BOOST_SCOPE_EXIT_ALL(&journaler) {
+    journaler.shut_down();
   };
 
   if (r < 0) {
@@ -575,16 +573,13 @@ int Journal<I>::promote(I *image_ctx) {
   Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID,
                       image_ctx->cct->_conf->rbd_journal_commit_age);
 
-  bool initialized;
   cls::journal::Client client;
   journal::ImageClientMeta client_meta;
   journal::TagData tag_data;
-  int r = open_journaler(image_ctx->cct, &journaler, &initialized, &client,
-                         &client_meta, &tag_data);
-  BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) {
-    if (initialized) {
-      journaler.shut_down();
-    }
+  int r = open_journaler(image_ctx->cct, &journaler, &client, &client_meta,
+                         &tag_data);
+  BOOST_SCOPE_EXIT_ALL(&journaler) {
+    journaler.shut_down();
   };
 
   if (r < 0) {
@@ -1129,8 +1124,9 @@ void Journal<I>::destroy_journaler(int r) {
   m_journal_replay = NULL;
 
   transition_state(STATE_CLOSING, r);
-  m_image_ctx.op_work_queue->queue(create_context_callback<
-    Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
+  m_journaler->shut_down(create_async_context_callback(
+    m_image_ctx, create_context_callback<
+      Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
 }
 
 template <typename I>
@@ -1146,8 +1142,9 @@ void Journal<I>::recreate_journaler(int r) {
   m_journal_replay = NULL;
 
   transition_state(STATE_RESTARTING_REPLAY, r);
-  m_image_ctx.op_work_queue->queue(create_context_callback<
-    Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
+  m_journaler->shut_down(create_async_context_callback(
+    m_image_ctx, create_context_callback<
+      Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
 }
 
 template <typename I>
@@ -1289,27 +1286,47 @@ template <typename I>
 void Journal<I>::handle_replay_complete(int r) {
   CephContext *cct = m_image_ctx.cct;
 
-  m_lock.Lock();
-  if (m_state != STATE_REPLAYING) {
-    m_lock.Unlock();
-    return;
+  bool cancel_ops = false;
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_state != STATE_REPLAYING) {
+      return;
+    }
+
+    ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+    if (r < 0) {
+      cancel_ops = true;
+      transition_state(STATE_FLUSHING_RESTART, r);
+    } else {
+      // state might change back to FLUSHING_RESTART on flush error
+      transition_state(STATE_FLUSHING_REPLAY, 0);
+    }
   }
 
-  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
-  m_journaler->stop_replay();
-  if (r < 0) {
-    transition_state(STATE_FLUSHING_RESTART, r);
-    m_lock.Unlock();
+  Context *ctx = new FunctionContext([this, cct](int r) {
+      ldout(cct, 20) << this << " handle_replay_complete: "
+                     << "handle shut down replay" << dendl;
 
-    m_journal_replay->shut_down(true, create_context_callback<
-      Journal<I>, &Journal<I>::handle_flushing_restart>(this));
-  } else {
-    transition_state(STATE_FLUSHING_REPLAY, 0);
-    m_lock.Unlock();
+      State state;
+      {
+        Mutex::Locker locker(m_lock);
+        assert(m_state == STATE_FLUSHING_RESTART ||
+               m_state == STATE_FLUSHING_REPLAY);
+        state = m_state;
+      }
 
-    m_journal_replay->shut_down(false, create_context_callback<
-      Journal<I>, &Journal<I>::handle_flushing_replay>(this));
-  }
+      if (state == STATE_FLUSHING_RESTART) {
+        handle_flushing_restart(0);
+      } else {
+        handle_flushing_replay();
+      }
+    });
+  ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
+      ldout(cct, 20) << this << " handle_replay_complete: "
+                     << "shut down replay" << dendl;
+      m_journal_replay->shut_down(cancel_ops, ctx);
+    });
+  m_journaler->stop_replay(ctx);
 }
 
 template <typename I>
@@ -1329,9 +1346,9 @@ void Journal<I>::handle_replay_process_ready(int r) {
 
 template <typename I>
 void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
-  Mutex::Locker locker(m_lock);
-
   CephContext *cct = m_image_ctx.cct;
+
+  m_lock.Lock();
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
   if (r < 0) {
     lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r)
@@ -1339,20 +1356,33 @@ void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
 
     if (m_state == STATE_REPLAYING) {
       // abort the replay if we have an error
-      m_journaler->stop_replay();
       transition_state(STATE_FLUSHING_RESTART, r);
-
-      m_journal_replay->shut_down(true, create_context_callback<
-        Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+      m_lock.Unlock();
+
+      // stop replay, shut down, and restart
+      Context *ctx = new FunctionContext([this, cct](int r) {
+          ldout(cct, 20) << this << " handle_replay_process_safe: "
+                         << "shut down replay" << dendl;
+          {
+            Mutex::Locker locker(m_lock);
+            assert(m_state == STATE_FLUSHING_RESTART);
+          }
+
+          m_journal_replay->shut_down(true, create_context_callback<
+            Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+        });
+      m_journaler->stop_replay(ctx);
       return;
     } else if (m_state == STATE_FLUSHING_REPLAY) {
       // end-of-replay flush in-progress -- we need to restart replay
       transition_state(STATE_FLUSHING_RESTART, r);
+      m_lock.Unlock();
       return;
     }
   } else {
     // only commit the entry if written successfully
     m_journaler->committed(replay_entry);
+    m_lock.Unlock();
   }
 }
 
@@ -1374,16 +1404,15 @@ void Journal<I>::handle_flushing_restart(int r) {
 }
 
 template <typename I>
-void Journal<I>::handle_flushing_replay(int r) {
+void Journal<I>::handle_flushing_replay() {
   Mutex::Locker locker(m_lock);
 
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  assert(r == 0);
   assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
   if (m_close_pending) {
-    destroy_journaler(r);
+    destroy_journaler(0);
     return;
   } else if (m_state == STATE_FLUSHING_RESTART) {
     // failed to replay one-or-more events -- restart
index 1840fb10986e4f3b3b430a706e40449037e3babd..48fe9a2e00723a91b6e53c300848f2d1428b3c9b 100644 (file)
@@ -312,7 +312,7 @@ private:
   void handle_replay_process_safe(ReplayEntry replay_entry, int r);
 
   void handle_flushing_restart(int r);
-  void handle_flushing_replay(int r);
+  void handle_flushing_replay();
 
   void handle_recording_stopped(int r);
 
index 4d9053f527461d36766db211d8614d14d4ea4e3f..05efb42ab93c4f56c169c4892aef4ba43c6d3186 100644 (file)
@@ -91,6 +91,7 @@ struct MockJournaler {
 
   MOCK_METHOD1(init, void(Context *));
   MOCK_METHOD0(shut_down, void());
+  MOCK_METHOD1(shut_down, void(Context *));
   MOCK_CONST_METHOD0(is_initialized, bool());
 
   MOCK_METHOD3(get_metadata, void(uint8_t *order, uint8_t *splay_width,
@@ -113,6 +114,7 @@ struct MockJournaler {
   MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *));
   MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *));
   MOCK_METHOD0(stop_replay, void());
+  MOCK_METHOD1(stop_replay, void(Context *on_finish));
 
   MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes,
                                   double flush_age));
@@ -164,6 +166,9 @@ struct MockJournalerProxy {
   void shut_down() {
     MockJournaler::get_instance().shut_down();
   }
+  void shut_down(Context *on_finish) {
+    MockJournaler::get_instance().shut_down(on_finish);
+  }
   bool is_initialized() const {
     return MockJournaler::get_instance().is_initialized();
   }
@@ -225,6 +230,9 @@ struct MockJournalerProxy {
   void stop_replay() {
     MockJournaler::get_instance().stop_replay();
   }
+  void stop_replay(Context *on_finish) {
+    MockJournaler::get_instance().stop_replay(on_finish);
+  }
 
   void start_append(int flush_interval, uint64_t flush_bytes, double flush_age) { 
     MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
index ec6c689b69d42fe57866035a5dc4ec4341a1b83f..bd984fd16928e40f87bf29c37d190720be44f0ff 100644 (file)
@@ -57,6 +57,7 @@ public:
          it != m_journalers.end(); ++it) {
       journal::Journaler *journaler = *it;
       journaler->stop_replay();
+      journaler->shut_down();
       delete journaler;
     }
 
index d8c54512d677da24fe823316ba524d2152a6372f..53a6fa8632d05eff550bf00605227e7d04a58aa3 100644 (file)
@@ -20,6 +20,8 @@
 #include <list>
 #include <boost/scope_exit.hpp>
 
+#define dout_subsys ceph_subsys_rbd
+
 namespace librbd {
 
 namespace {
@@ -148,6 +150,11 @@ public:
                   .WillOnce(CompleteContext(r, NULL));
   }
 
+  void expect_shut_down_journaler(::journal::MockJournaler &mock_journaler) {
+    EXPECT_CALL(mock_journaler, shut_down(_))
+                  .WillOnce(CompleteContext(0, NULL));
+  }
+
   void expect_get_max_append_size(::journal::MockJournaler &mock_journaler,
                                   uint32_t max_size) {
     EXPECT_CALL(mock_journaler, get_max_append_size())
@@ -193,7 +200,8 @@ public:
   }
 
   void expect_stop_replay(::journal::MockJournaler &mock_journaler) {
-    EXPECT_CALL(mock_journaler, stop_replay());
+    EXPECT_CALL(mock_journaler, stop_replay(_))
+                  .WillOnce(CompleteContext(0, NULL));
   }
 
   void expect_shut_down_replay(MockJournalImageCtx &mock_image_ctx,
@@ -310,10 +318,18 @@ public:
     Contexts commit_contexts;
     std::swap(commit_contexts, m_commit_contexts);
 
+    derr << "SHUT DOWN REPLAY START" << dendl;
     for (auto ctx : commit_contexts) {
       mock_image_ctx.image_ctx->op_work_queue->queue(ctx, r);
     }
+
+    on_flush = new FunctionContext([on_flush](int r) {
+        derr << "FLUSH START" << dendl;
+        on_flush->complete(r);
+        derr << "FLUSH FINISH" << dendl;
+      });
     mock_image_ctx.image_ctx->op_work_queue->queue(on_flush, 0);
+    derr << "SHUT DOWN REPLAY FINISH" << dendl;
   }
 
   void open_journal(MockJournalImageCtx &mock_image_ctx,
@@ -400,6 +416,7 @@ TEST_F(TestMockJournal, StateTransitions) {
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -418,6 +435,7 @@ TEST_F(TestMockJournal, InitError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, -EINVAL);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(-EINVAL, when_open(mock_journal));
 }
 
@@ -438,6 +456,7 @@ TEST_F(TestMockJournal, GetCachedClientError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, -ENOENT);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(-ENOENT, when_open(mock_journal));
 }
 
@@ -459,6 +478,7 @@ TEST_F(TestMockJournal, GetTagsError) {
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(-EBADMSG, when_open(mock_journal));
 }
 
@@ -488,6 +508,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   MockJournalReplay mock_journal_replay;
   expect_stop_replay(mock_journaler);
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
+  expect_shut_down_journaler(mock_journaler);
 
   // replay failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
@@ -506,6 +527,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -540,6 +562,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   expect_try_pop_front(mock_journaler, false, mock_replay_entry);
   expect_stop_replay(mock_journaler);
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, -EINVAL);
+  expect_shut_down_journaler(mock_journaler);
 
   // replay flush failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
@@ -558,6 +581,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -591,6 +615,7 @@ TEST_F(TestMockJournal, StopError) {
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, -EINVAL);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(-EINVAL, when_close(mock_journal));
 }
 
@@ -631,6 +656,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   expect_try_pop_front(mock_journaler, false, mock_replay_entry);
   expect_stop_replay(mock_journaler);
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
+  expect_shut_down_journaler(mock_journaler);
 
   // replay write-to-disk failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
@@ -670,6 +696,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   ASSERT_EQ(0, ctx.wait());
 
   expect_stop_append(mock_journaler, 0);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -710,6 +737,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
                     InvokeWithoutArgs(this, &TestMockJournal::wake_up)));
 
   // replay write-to-disk failure should result in replay-restart
+  expect_shut_down_journaler(mock_journaler);
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
@@ -750,6 +778,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   ASSERT_EQ(0, ctx.wait());
 
   expect_stop_append(mock_journaler, 0);
+  expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
@@ -793,6 +822,8 @@ TEST_F(TestMockJournal, EventAndIOCommitOrder) {
   on_journal_safe2->complete(0);
   ictx->op_work_queue->drain();
   ASSERT_EQ(0, event_ctx.wait());
+
+  expect_shut_down_journaler(mock_journaler);
 }
 
 TEST_F(TestMockJournal, AppendWriteEvent) {
@@ -829,6 +860,8 @@ TEST_F(TestMockJournal, AppendWriteEvent) {
   expect_future_committed(mock_journaler);
   mock_journal.commit_io_event(1U, 0);
   ictx->op_work_queue->drain();
+
+  expect_shut_down_journaler(mock_journaler);
 }
 
 TEST_F(TestMockJournal, EventCommitError) {
@@ -866,6 +899,8 @@ TEST_F(TestMockJournal, EventCommitError) {
   C_SaferCond flush_ctx;
   mock_journal.flush_event(1U, &flush_ctx);
   ASSERT_EQ(-EINVAL, flush_ctx.wait());
+
+  expect_shut_down_journaler(mock_journaler);
 }
 
 TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
@@ -904,6 +939,8 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
 
   // cache should receive the error if waiting
   ASSERT_EQ(-EINVAL, flush_ctx.wait());
+
+  expect_shut_down_journaler(mock_journaler);
 }
 
 TEST_F(TestMockJournal, IOCommitError) {
@@ -930,6 +967,8 @@ TEST_F(TestMockJournal, IOCommitError) {
   on_journal_safe->complete(0);
   ictx->op_work_queue->drain();
   mock_journal.commit_io_event(1U, -EINVAL);
+
+  expect_shut_down_journaler(mock_journaler);
 }
 
 TEST_F(TestMockJournal, FlushCommitPosition) {
@@ -950,6 +989,8 @@ TEST_F(TestMockJournal, FlushCommitPosition) {
   C_SaferCond ctx;
   mock_journal.flush_commit_position(&ctx);
   ASSERT_EQ(0, ctx.wait());
+
+  expect_shut_down_journaler(mock_journaler);
 }
 
 } // namespace librbd
index e82265ca471cbb03ef764e30fec78aff22fd5aa8..0c85c26a9a291d7d252dc423bbd253196d109c1c 100644 (file)
@@ -200,13 +200,13 @@ public:
   }
 
   int shut_down() {
-    ::journal::Journaler::shut_down();
-
     int r = unregister_client();
     if (r < 0) {
       std::cerr << "rbd: failed to unregister journal client: "
                << cpp_strerror(r) << std::endl;
     }
+    ::journal::Journaler::shut_down();
+
     return r;
   }
 };
@@ -235,7 +235,6 @@ public:
     m_journaler.start_replay(&replay_handler);
 
     r = m_cond.wait();
-
     if (r < 0) {
       std::cerr << "rbd: failed to process journal: " << cpp_strerror(r)
                << std::endl;
@@ -243,15 +242,13 @@ public:
        m_r = r;
       }
     }
-
-    r = m_journaler.shut_down();
-    if (r < 0 && m_r == 0) {
-      m_r = r;
-    }
-
     return m_r;
   }
 
+  int shut_down() {
+    return m_journaler.shut_down();
+  }
+
 protected:
   struct ReplayHandler : public ::journal::ReplayHandler {
     JournalPlayer *journal;
@@ -288,8 +285,10 @@ protected:
                            uint64_t tag_id) = 0;
 
   void handle_replay_complete(int r) {
-    m_journaler.stop_replay();
-    m_cond.complete(r);
+    if (m_r == 0 && r < 0) {
+      m_r = r;
+    }
+    m_journaler.stop_replay(&m_cond);
   }
 
   Journaler m_journaler;
@@ -370,7 +369,18 @@ private:
 static int do_inspect_journal(librados::IoCtx& io_ctx,
                              const std::string& journal_id,
                              bool verbose) {
-  return JournalInspector(io_ctx, journal_id, verbose).exec();
+  JournalInspector inspector(io_ctx, journal_id, verbose);
+  int r = inspector.exec();
+  if (r < 0) {
+    inspector.shut_down();
+    return r;
+  }
+
+  r = inspector.shut_down();
+  if (r < 0) {
+    return r;
+  }
+  return 0;
 }
 
 struct ExportEntry {
@@ -504,12 +514,18 @@ static int do_export_journal(librados::IoCtx& io_ctx,
     posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
   }
 
-  r = JournalExporter(io_ctx, journal_id, fd, no_error, verbose).exec();
+  JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose);
+  r = exporter.exec();
 
   if (!to_stdout) {
     close(fd);
   }
 
+  int shut_down_r = exporter.shut_down();
+  if (r == 0 && shut_down_r < 0) {
+    r = shut_down_r;
+  }
+
   return r;
 }
 
@@ -670,16 +686,16 @@ public:
       std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl;
     }
 
-    if (r1 < 0 && r == 0) {
-      r = r1;
-    }
-    r1 = m_journaler.shut_down();
     if (r1 < 0 && r == 0) {
       r = r1;
     }
     return r;
   }
 
+  int shut_down() {
+    return m_journaler.shut_down();
+  }
+
 private:
   Journaler m_journaler;
   int m_fd;
@@ -706,12 +722,18 @@ static int do_import_journal(librados::IoCtx& io_ctx,
     posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
   }
 
-  r = JournalImporter(io_ctx, journal_id, fd, no_error, verbose).exec();
+  JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose);
+  r = importer.exec();
 
   if (!from_stdin) {
     close(fd);
   }
 
+  int shut_down_r = importer.shut_down();
+  if (r == 0 && shut_down_r < 0) {
+    r = shut_down_r;
+  }
+
   return r;
 }