From: Jason Dillaman Date: Tue, 24 May 2016 20:12:16 +0000 (-0400) Subject: journal: extend asynchronous shutdown to facade X-Git-Tag: v11.0.0~399^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f40c595255a6828e27abfe63b4209ebae03e237b;p=ceph.git journal: extend asynchronous shutdown to facade Fixes: http://tracker.ceph.com/issues/14530 Signed-off-by: Jason Dillaman --- diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 82f844e4a9b..1db5247eb77 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -28,18 +28,6 @@ namespace { static const std::string JOURNAL_HEADER_PREFIX = "journal."; static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; -struct C_DeleteRecorder : public Context { - JournalRecorder *recorder; - Context *on_safe; - C_DeleteRecorder(JournalRecorder *_recorder, Context *_on_safe) - : recorder(_recorder), on_safe(_on_safe) { - } - virtual void finish(int r) { - delete recorder; - on_safe->complete(r); - } -}; - } // anonymous namespace using namespace cls::journal; @@ -113,17 +101,11 @@ void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, Journaler::~Journaler() { if (m_metadata != nullptr) { + assert(!m_metadata->is_initialized()); m_metadata->put(); m_metadata = nullptr; } - - // TODO - if (m_trimmer != nullptr) { - C_SaferCond ctx; - m_trimmer->shut_down(&ctx); - ctx.wait(); - delete m_trimmer; - } + assert(m_trimmer == nullptr); assert(m_player == nullptr); assert(m_recorder == nullptr); @@ -170,10 +152,37 @@ int Journaler::init_complete() { void Journaler::shut_down() { C_SaferCond ctx; - m_metadata->shut_down(&ctx); + shut_down(&ctx); ctx.wait(); } +void Journaler::shut_down(Context *on_finish) { + assert(m_player == nullptr); + assert(m_recorder == nullptr); + + JournalMetadata *metadata = nullptr; + std::swap(metadata, m_metadata); + assert(metadata != nullptr); + + on_finish = new FunctionContext([metadata, on_finish](int r) { + metadata->put(); + on_finish->complete(0); + }); + + JournalTrimmer *trimmer = nullptr; + std::swap(trimmer, m_trimmer); + if (trimmer == nullptr) { + metadata->shut_down(on_finish); + return; + } + + on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) { + delete trimmer; + metadata->shut_down(on_finish); + }); + trimmer->shut_down(on_finish); +} + bool Journaler::is_initialized() const { return m_metadata->is_initialized(); } @@ -327,15 +336,21 @@ bool Journaler::try_pop_front(ReplayEntry *replay_entry, } void Journaler::stop_replay() { - assert(m_player != NULL); - - // TODO C_SaferCond ctx; - m_player->shut_down(&ctx); + stop_replay(&ctx); ctx.wait(); +} - delete m_player; - m_player = NULL; +void Journaler::stop_replay(Context *on_finish) { + JournalPlayer *player = nullptr; + std::swap(player, m_player); + assert(player != nullptr); + + on_finish = new FunctionContext([player, on_finish](int r) { + delete player; + on_finish->complete(r); + }); + player->shut_down(on_finish); } void Journaler::committed(const ReplayEntry &replay_entry) { @@ -359,10 +374,15 @@ void Journaler::start_append(int flush_interval, uint64_t flush_bytes, } void Journaler::stop_append(Context *on_safe) { - assert(m_recorder != NULL); - - flush_append(new C_DeleteRecorder(m_recorder, on_safe)); - m_recorder = NULL; + JournalRecorder *recorder = nullptr; + std::swap(recorder, m_recorder); + assert(recorder != nullptr); + + on_safe = new FunctionContext([recorder, on_safe](int r) { + delete recorder; + on_safe->complete(r); + }); + recorder->flush(on_safe); } uint64_t Journaler::get_max_append_size() const { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index f74f7ab9ab3..f30a3a512b5 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -62,6 +62,7 @@ public: void init(Context *on_init); void shut_down(); + void shut_down(Context *on_finish); bool is_initialized() const; @@ -95,6 +96,7 @@ public: void start_live_replay(ReplayHandler *replay_handler, double interval); bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr); void stop_replay(); + void stop_replay(Context *on_finish); uint64_t get_max_append_size() const; void start_append(int flush_interval, uint64_t flush_bytes, double flush_age); diff --git a/src/test/journal/test_Journaler.cc b/src/test/journal/test_Journaler.cc index bdc70fb855d..4a4ecbae019 100644 --- a/src/test/journal/test_Journaler.cc +++ b/src/test/journal/test_Journaler.cc @@ -40,8 +40,9 @@ public: } int shut_down_journaler() { - m_journaler->shut_down(); - return 0; + C_SaferCond ctx; + m_journaler->shut_down(&ctx); + return ctx.wait(); } int register_client(const std::string &client_id, const std::string &desc) {