]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: extend asynchronous shutdown to facade
authorJason Dillaman <dillaman@redhat.com>
Tue, 24 May 2016 20:12:16 +0000 (16:12 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 25 May 2016 12:19:17 +0000 (08:19 -0400)
Fixes: http://tracker.ceph.com/issues/14530
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/Journaler.cc
src/journal/Journaler.h
src/test/journal/test_Journaler.cc

index 82f844e4a9b2e1eac4cdcd85e51e3591a3588380..1db5247eb770d6b99e6e70cbe67679ceb6eba065 100644 (file)
@@ -28,18 +28,6 @@ namespace {
 static const std::string JOURNAL_HEADER_PREFIX = "journal.";
 static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
 
-struct C_DeleteRecorder : public Context {
-  JournalRecorder *recorder;
-  Context *on_safe;
-  C_DeleteRecorder(JournalRecorder *_recorder, Context *_on_safe)
-    : recorder(_recorder), on_safe(_on_safe) {
-  }
-  virtual void finish(int r) {
-    delete recorder;
-    on_safe->complete(r);
-  }
-};
-
 } // anonymous namespace
 
 using namespace cls::journal;
@@ -113,17 +101,11 @@ void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
 
 Journaler::~Journaler() {
   if (m_metadata != nullptr) {
+    assert(!m_metadata->is_initialized());
     m_metadata->put();
     m_metadata = nullptr;
   }
-
-  // TODO
-  if (m_trimmer != nullptr) {
-    C_SaferCond ctx;
-    m_trimmer->shut_down(&ctx);
-    ctx.wait();
-    delete m_trimmer;
-  }
+  assert(m_trimmer == nullptr);
   assert(m_player == nullptr);
   assert(m_recorder == nullptr);
 
@@ -170,10 +152,37 @@ int Journaler::init_complete() {
 
 void Journaler::shut_down() {
   C_SaferCond ctx;
-  m_metadata->shut_down(&ctx);
+  shut_down(&ctx);
   ctx.wait();
 }
 
+void Journaler::shut_down(Context *on_finish) {
+  assert(m_player == nullptr);
+  assert(m_recorder == nullptr);
+
+  JournalMetadata *metadata = nullptr;
+  std::swap(metadata, m_metadata);
+  assert(metadata != nullptr);
+
+  on_finish = new FunctionContext([metadata, on_finish](int r) {
+      metadata->put();
+      on_finish->complete(0);
+    });
+
+  JournalTrimmer *trimmer = nullptr;
+  std::swap(trimmer, m_trimmer);
+  if (trimmer == nullptr) {
+    metadata->shut_down(on_finish);
+    return;
+  }
+
+  on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) {
+      delete trimmer;
+      metadata->shut_down(on_finish);
+    });
+  trimmer->shut_down(on_finish);
+}
+
 bool Journaler::is_initialized() const {
   return m_metadata->is_initialized();
 }
@@ -327,15 +336,21 @@ bool Journaler::try_pop_front(ReplayEntry *replay_entry,
 }
 
 void Journaler::stop_replay() {
-  assert(m_player != NULL);
-
-  // TODO
   C_SaferCond ctx;
-  m_player->shut_down(&ctx);
+  stop_replay(&ctx);
   ctx.wait();
+}
 
-  delete m_player;
-  m_player = NULL;
+void Journaler::stop_replay(Context *on_finish) {
+  JournalPlayer *player = nullptr;
+  std::swap(player, m_player);
+  assert(player != nullptr);
+
+  on_finish = new FunctionContext([player, on_finish](int r) {
+      delete player;
+      on_finish->complete(r);
+    });
+  player->shut_down(on_finish);
 }
 
 void Journaler::committed(const ReplayEntry &replay_entry) {
@@ -359,10 +374,15 @@ void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
 }
 
 void Journaler::stop_append(Context *on_safe) {
-  assert(m_recorder != NULL);
-
-  flush_append(new C_DeleteRecorder(m_recorder, on_safe));
-  m_recorder = NULL;
+  JournalRecorder *recorder = nullptr;
+  std::swap(recorder, m_recorder);
+  assert(recorder != nullptr);
+
+  on_safe = new FunctionContext([recorder, on_safe](int r) {
+      delete recorder;
+      on_safe->complete(r);
+    });
+  recorder->flush(on_safe);
 }
 
 uint64_t Journaler::get_max_append_size() const {
index f74f7ab9ab35a455dba7624c4a898bbf36b65dc9..f30a3a512b54bb9931c52539959a9a3b77c4efa5 100644 (file)
@@ -62,6 +62,7 @@ public:
 
   void init(Context *on_init);
   void shut_down();
+  void shut_down(Context *on_finish);
 
   bool is_initialized() const;
 
@@ -95,6 +96,7 @@ public:
   void start_live_replay(ReplayHandler *replay_handler, double interval);
   bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr);
   void stop_replay();
+  void stop_replay(Context *on_finish);
 
   uint64_t get_max_append_size() const;
   void start_append(int flush_interval, uint64_t flush_bytes, double flush_age);
index bdc70fb855d084c27b6c0f9e4e77c936bb316330..4a4ecbae0199512db06243122ac58b3b017d5876 100644 (file)
@@ -40,8 +40,9 @@ public:
   }
 
   int shut_down_journaler() {
-    m_journaler->shut_down();
-    return 0;
+    C_SaferCond ctx;
+    m_journaler->shut_down(&ctx);
+    return ctx.wait();
   }
 
   int register_client(const std::string &client_id, const std::string &desc) {