From: Jason Dillaman Date: Wed, 25 May 2016 04:21:14 +0000 (-0400) Subject: librbd: integrate with async journaler shutdown API X-Git-Tag: v11.0.0~399^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ad297850b1be8ed38f77b86913c6821748f3368b;p=ceph.git librbd: integrate with async journaler shutdown API Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 618ad0e36670..ffdfe1d40da7 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -161,14 +161,13 @@ public: }; template -int open_journaler(CephContext *cct, J *journaler, bool *initialized, +int open_journaler(CephContext *cct, J *journaler, cls::journal::Client *client, journal::ImageClientMeta *client_meta, journal::TagData *tag_data) { C_SaferCond init_ctx; journaler->init(&init_ctx); int r = init_ctx.wait(); - *initialized = (r >= 0); if (r < 0) { return r; } @@ -414,6 +413,9 @@ int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) { C_SaferCond cond; journaler.init(&cond); + BOOST_SCOPE_EXIT_ALL(&journaler) { + journaler.shut_down(); + }; r = cond.wait(); if (r == -ENOENT) { @@ -441,6 +443,9 @@ int Journal::reset(librados::IoCtx &io_ctx, const std::string &image_id) { C_SaferCond cond; journaler.init(&cond); + BOOST_SCOPE_EXIT_ALL(&journaler) { + journaler.shut_down(); + }; int r = cond.wait(); if (r == -ENOENT) { @@ -510,19 +515,15 @@ int Journal::get_tag_owner(IoCtx& io_ctx, std::string& image_id, Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, cct->_conf->rbd_journal_commit_age); - bool initialized; cls::journal::Client client; journal::ImageClientMeta client_meta; journal::TagData tag_data; - int r = open_journaler(cct, &journaler, &initialized, &client, - &client_meta, &tag_data); + int r = open_journaler(cct, &journaler, &client, &client_meta, &tag_data); if (r >= 0) { *mirror_uuid = tag_data.mirror_uuid; } - if (initialized) { - journaler.shut_down(); - } + journaler.shut_down(); return r; } @@ -534,16 +535,13 @@ int Journal::request_resync(I *image_ctx) { Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, image_ctx->cct->_conf->rbd_journal_commit_age); - bool initialized; cls::journal::Client client; journal::ImageClientMeta client_meta; journal::TagData tag_data; - int r = open_journaler(image_ctx->cct, &journaler, &initialized, &client, - &client_meta, &tag_data); - BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) { - if (initialized) { - journaler.shut_down(); - } + int r = open_journaler(image_ctx->cct, &journaler, &client, &client_meta, + &tag_data); + BOOST_SCOPE_EXIT_ALL(&journaler) { + journaler.shut_down(); }; if (r < 0) { @@ -575,16 +573,13 @@ int Journal::promote(I *image_ctx) { Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, image_ctx->cct->_conf->rbd_journal_commit_age); - bool initialized; cls::journal::Client client; journal::ImageClientMeta client_meta; journal::TagData tag_data; - int r = open_journaler(image_ctx->cct, &journaler, &initialized, &client, - &client_meta, &tag_data); - BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) { - if (initialized) { - journaler.shut_down(); - } + int r = open_journaler(image_ctx->cct, &journaler, &client, &client_meta, + &tag_data); + BOOST_SCOPE_EXIT_ALL(&journaler) { + journaler.shut_down(); }; if (r < 0) { @@ -1129,8 +1124,9 @@ void Journal::destroy_journaler(int r) { m_journal_replay = NULL; transition_state(STATE_CLOSING, r); - m_image_ctx.op_work_queue->queue(create_context_callback< - Journal, &Journal::handle_journal_destroyed>(this), 0); + m_journaler->shut_down(create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_journal_destroyed>(this))); } template @@ -1146,8 +1142,9 @@ void Journal::recreate_journaler(int r) { m_journal_replay = NULL; transition_state(STATE_RESTARTING_REPLAY, r); - m_image_ctx.op_work_queue->queue(create_context_callback< - Journal, &Journal::handle_journal_destroyed>(this), 0); + m_journaler->shut_down(create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_journal_destroyed>(this))); } template @@ -1289,27 +1286,47 @@ template void Journal::handle_replay_complete(int r) { CephContext *cct = m_image_ctx.cct; - m_lock.Lock(); - if (m_state != STATE_REPLAYING) { - m_lock.Unlock(); - return; + bool cancel_ops = false; + { + Mutex::Locker locker(m_lock); + if (m_state != STATE_REPLAYING) { + return; + } + + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + if (r < 0) { + cancel_ops = true; + transition_state(STATE_FLUSHING_RESTART, r); + } else { + // state might change back to FLUSHING_RESTART on flush error + transition_state(STATE_FLUSHING_REPLAY, 0); + } } - ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; - m_journaler->stop_replay(); - if (r < 0) { - transition_state(STATE_FLUSHING_RESTART, r); - m_lock.Unlock(); + Context *ctx = new FunctionContext([this, cct](int r) { + ldout(cct, 20) << this << " handle_replay_complete: " + << "handle shut down replay" << dendl; - m_journal_replay->shut_down(true, create_context_callback< - Journal, &Journal::handle_flushing_restart>(this)); - } else { - transition_state(STATE_FLUSHING_REPLAY, 0); - m_lock.Unlock(); + State state; + { + Mutex::Locker locker(m_lock); + assert(m_state == STATE_FLUSHING_RESTART || + m_state == STATE_FLUSHING_REPLAY); + state = m_state; + } - m_journal_replay->shut_down(false, create_context_callback< - Journal, &Journal::handle_flushing_replay>(this)); - } + if (state == STATE_FLUSHING_RESTART) { + handle_flushing_restart(0); + } else { + handle_flushing_replay(); + } + }); + ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) { + ldout(cct, 20) << this << " handle_replay_complete: " + << "shut down replay" << dendl; + m_journal_replay->shut_down(cancel_ops, ctx); + }); + m_journaler->stop_replay(ctx); } template @@ -1329,9 +1346,9 @@ void Journal::handle_replay_process_ready(int r) { template void Journal::handle_replay_process_safe(ReplayEntry replay_entry, int r) { - Mutex::Locker locker(m_lock); - CephContext *cct = m_image_ctx.cct; + + m_lock.Lock(); ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; if (r < 0) { lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r) @@ -1339,20 +1356,33 @@ void Journal::handle_replay_process_safe(ReplayEntry replay_entry, int r) { if (m_state == STATE_REPLAYING) { // abort the replay if we have an error - m_journaler->stop_replay(); transition_state(STATE_FLUSHING_RESTART, r); - - m_journal_replay->shut_down(true, create_context_callback< - Journal, &Journal::handle_flushing_restart>(this)); + m_lock.Unlock(); + + // stop replay, shut down, and restart + Context *ctx = new FunctionContext([this, cct](int r) { + ldout(cct, 20) << this << " handle_replay_process_safe: " + << "shut down replay" << dendl; + { + Mutex::Locker locker(m_lock); + assert(m_state == STATE_FLUSHING_RESTART); + } + + m_journal_replay->shut_down(true, create_context_callback< + Journal, &Journal::handle_flushing_restart>(this)); + }); + m_journaler->stop_replay(ctx); return; } else if (m_state == STATE_FLUSHING_REPLAY) { // end-of-replay flush in-progress -- we need to restart replay transition_state(STATE_FLUSHING_RESTART, r); + m_lock.Unlock(); return; } } else { // only commit the entry if written successfully m_journaler->committed(replay_entry); + m_lock.Unlock(); } } @@ -1374,16 +1404,15 @@ void Journal::handle_flushing_restart(int r) { } template -void Journal::handle_flushing_replay(int r) { +void Journal::handle_flushing_replay() { Mutex::Locker locker(m_lock); CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + ldout(cct, 20) << this << " " << __func__ << dendl; - assert(r == 0); assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART); if (m_close_pending) { - destroy_journaler(r); + destroy_journaler(0); return; } else if (m_state == STATE_FLUSHING_RESTART) { // failed to replay one-or-more events -- restart diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 1840fb10986e..48fe9a2e0072 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -312,7 +312,7 @@ private: void handle_replay_process_safe(ReplayEntry replay_entry, int r); void handle_flushing_restart(int r); - void handle_flushing_replay(int r); + void handle_flushing_replay(); void handle_recording_stopped(int r); diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index 5bbd0ca21b94..8ca97a4c1b4c 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -91,6 +91,7 @@ struct MockJournaler { MOCK_METHOD1(init, void(Context *)); MOCK_METHOD0(shut_down, void()); + MOCK_METHOD1(shut_down, void(Context *)); MOCK_CONST_METHOD0(is_initialized, bool()); MOCK_METHOD3(get_metadata, void(uint8_t *order, uint8_t *splay_width, @@ -113,6 +114,7 @@ struct MockJournaler { MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *)); MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *)); MOCK_METHOD0(stop_replay, void()); + MOCK_METHOD1(stop_replay, void(Context *on_finish)); MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes, double flush_age)); @@ -164,6 +166,9 @@ struct MockJournalerProxy { void shut_down() { MockJournaler::get_instance().shut_down(); } + void shut_down(Context *on_finish) { + MockJournaler::get_instance().shut_down(on_finish); + } bool is_initialized() const { return MockJournaler::get_instance().is_initialized(); } @@ -225,6 +230,9 @@ struct MockJournalerProxy { void stop_replay() { MockJournaler::get_instance().stop_replay(); } + void stop_replay(Context *on_finish) { + MockJournaler::get_instance().stop_replay(on_finish); + } void start_append(int flush_interval, uint64_t flush_bytes, double flush_age) { MockJournaler::get_instance().start_append(flush_interval, flush_bytes, diff --git a/src/test/librbd/journal/test_Entries.cc b/src/test/librbd/journal/test_Entries.cc index ec6c689b69d4..bd984fd16928 100644 --- a/src/test/librbd/journal/test_Entries.cc +++ b/src/test/librbd/journal/test_Entries.cc @@ -57,6 +57,7 @@ public: it != m_journalers.end(); ++it) { journal::Journaler *journaler = *it; journaler->stop_replay(); + journaler->shut_down(); delete journaler; } diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index d8c54512d677..53a6fa8632d0 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -20,6 +20,8 @@ #include #include +#define dout_subsys ceph_subsys_rbd + namespace librbd { namespace { @@ -148,6 +150,11 @@ public: .WillOnce(CompleteContext(r, NULL)); } + void expect_shut_down_journaler(::journal::MockJournaler &mock_journaler) { + EXPECT_CALL(mock_journaler, shut_down(_)) + .WillOnce(CompleteContext(0, NULL)); + } + void expect_get_max_append_size(::journal::MockJournaler &mock_journaler, uint32_t max_size) { EXPECT_CALL(mock_journaler, get_max_append_size()) @@ -193,7 +200,8 @@ public: } void expect_stop_replay(::journal::MockJournaler &mock_journaler) { - EXPECT_CALL(mock_journaler, stop_replay()); + EXPECT_CALL(mock_journaler, stop_replay(_)) + .WillOnce(CompleteContext(0, NULL)); } void expect_shut_down_replay(MockJournalImageCtx &mock_image_ctx, @@ -310,10 +318,18 @@ public: Contexts commit_contexts; std::swap(commit_contexts, m_commit_contexts); + derr << "SHUT DOWN REPLAY START" << dendl; for (auto ctx : commit_contexts) { mock_image_ctx.image_ctx->op_work_queue->queue(ctx, r); } + + on_flush = new FunctionContext([on_flush](int r) { + derr << "FLUSH START" << dendl; + on_flush->complete(r); + derr << "FLUSH FINISH" << dendl; + }); mock_image_ctx.image_ctx->op_work_queue->queue(on_flush, 0); + derr << "SHUT DOWN REPLAY FINISH" << dendl; } void open_journal(MockJournalImageCtx &mock_image_ctx, @@ -400,6 +416,7 @@ TEST_F(TestMockJournal, StateTransitions) { ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(0, when_close(mock_journal)); } @@ -418,6 +435,7 @@ TEST_F(TestMockJournal, InitError) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, -EINVAL); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-EINVAL, when_open(mock_journal)); } @@ -438,6 +456,7 @@ TEST_F(TestMockJournal, GetCachedClientError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, -ENOENT); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-ENOENT, when_open(mock_journal)); } @@ -459,6 +478,7 @@ TEST_F(TestMockJournal, GetTagsError) { expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-EBADMSG, when_open(mock_journal)); } @@ -488,6 +508,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { MockJournalReplay mock_journal_replay; expect_stop_replay(mock_journaler); expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true); + expect_shut_down_journaler(mock_journaler); // replay failure should result in replay-restart expect_construct_journaler(mock_journaler); @@ -506,6 +527,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(0, when_close(mock_journal)); } @@ -540,6 +562,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_try_pop_front(mock_journaler, false, mock_replay_entry); expect_stop_replay(mock_journaler); expect_shut_down_replay(mock_image_ctx, mock_journal_replay, -EINVAL); + expect_shut_down_journaler(mock_journaler); // replay flush failure should result in replay-restart expect_construct_journaler(mock_journaler); @@ -558,6 +581,7 @@ TEST_F(TestMockJournal, FlushReplayError) { ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(0, when_close(mock_journal)); } @@ -591,6 +615,7 @@ TEST_F(TestMockJournal, StopError) { ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, -EINVAL); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-EINVAL, when_close(mock_journal)); } @@ -631,6 +656,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_try_pop_front(mock_journaler, false, mock_replay_entry); expect_stop_replay(mock_journaler); expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true); + expect_shut_down_journaler(mock_journaler); // replay write-to-disk failure should result in replay-restart expect_construct_journaler(mock_journaler); @@ -670,6 +696,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { ASSERT_EQ(0, ctx.wait()); expect_stop_append(mock_journaler, 0); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(0, when_close(mock_journal)); } @@ -710,6 +737,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { InvokeWithoutArgs(this, &TestMockJournal::wake_up))); // replay write-to-disk failure should result in replay-restart + expect_shut_down_journaler(mock_journaler); expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); @@ -750,6 +778,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { ASSERT_EQ(0, ctx.wait()); expect_stop_append(mock_journaler, 0); + expect_shut_down_journaler(mock_journaler); ASSERT_EQ(0, when_close(mock_journal)); } @@ -793,6 +822,8 @@ TEST_F(TestMockJournal, EventAndIOCommitOrder) { on_journal_safe2->complete(0); ictx->op_work_queue->drain(); ASSERT_EQ(0, event_ctx.wait()); + + expect_shut_down_journaler(mock_journaler); } TEST_F(TestMockJournal, AppendWriteEvent) { @@ -829,6 +860,8 @@ TEST_F(TestMockJournal, AppendWriteEvent) { expect_future_committed(mock_journaler); mock_journal.commit_io_event(1U, 0); ictx->op_work_queue->drain(); + + expect_shut_down_journaler(mock_journaler); } TEST_F(TestMockJournal, EventCommitError) { @@ -866,6 +899,8 @@ TEST_F(TestMockJournal, EventCommitError) { C_SaferCond flush_ctx; mock_journal.flush_event(1U, &flush_ctx); ASSERT_EQ(-EINVAL, flush_ctx.wait()); + + expect_shut_down_journaler(mock_journaler); } TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) { @@ -904,6 +939,8 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) { // cache should receive the error if waiting ASSERT_EQ(-EINVAL, flush_ctx.wait()); + + expect_shut_down_journaler(mock_journaler); } TEST_F(TestMockJournal, IOCommitError) { @@ -930,6 +967,8 @@ TEST_F(TestMockJournal, IOCommitError) { on_journal_safe->complete(0); ictx->op_work_queue->drain(); mock_journal.commit_io_event(1U, -EINVAL); + + expect_shut_down_journaler(mock_journaler); } TEST_F(TestMockJournal, FlushCommitPosition) { @@ -950,6 +989,8 @@ TEST_F(TestMockJournal, FlushCommitPosition) { C_SaferCond ctx; mock_journal.flush_commit_position(&ctx); ASSERT_EQ(0, ctx.wait()); + + expect_shut_down_journaler(mock_journaler); } } // namespace librbd diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc index e82265ca471c..0c85c26a9a29 100644 --- a/src/tools/rbd/action/Journal.cc +++ b/src/tools/rbd/action/Journal.cc @@ -200,13 +200,13 @@ public: } int shut_down() { - ::journal::Journaler::shut_down(); - int r = unregister_client(); if (r < 0) { std::cerr << "rbd: failed to unregister journal client: " << cpp_strerror(r) << std::endl; } + ::journal::Journaler::shut_down(); + return r; } }; @@ -235,7 +235,6 @@ public: m_journaler.start_replay(&replay_handler); r = m_cond.wait(); - if (r < 0) { std::cerr << "rbd: failed to process journal: " << cpp_strerror(r) << std::endl; @@ -243,15 +242,13 @@ public: m_r = r; } } - - r = m_journaler.shut_down(); - if (r < 0 && m_r == 0) { - m_r = r; - } - return m_r; } + int shut_down() { + return m_journaler.shut_down(); + } + protected: struct ReplayHandler : public ::journal::ReplayHandler { JournalPlayer *journal; @@ -288,8 +285,10 @@ protected: uint64_t tag_id) = 0; void handle_replay_complete(int r) { - m_journaler.stop_replay(); - m_cond.complete(r); + if (m_r == 0 && r < 0) { + m_r = r; + } + m_journaler.stop_replay(&m_cond); } Journaler m_journaler; @@ -370,7 +369,18 @@ private: static int do_inspect_journal(librados::IoCtx& io_ctx, const std::string& journal_id, bool verbose) { - return JournalInspector(io_ctx, journal_id, verbose).exec(); + JournalInspector inspector(io_ctx, journal_id, verbose); + int r = inspector.exec(); + if (r < 0) { + inspector.shut_down(); + return r; + } + + r = inspector.shut_down(); + if (r < 0) { + return r; + } + return 0; } struct ExportEntry { @@ -504,12 +514,18 @@ static int do_export_journal(librados::IoCtx& io_ctx, posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); } - r = JournalExporter(io_ctx, journal_id, fd, no_error, verbose).exec(); + JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose); + r = exporter.exec(); if (!to_stdout) { close(fd); } + int shut_down_r = exporter.shut_down(); + if (r == 0 && shut_down_r < 0) { + r = shut_down_r; + } + return r; } @@ -670,16 +686,16 @@ public: std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl; } - if (r1 < 0 && r == 0) { - r = r1; - } - r1 = m_journaler.shut_down(); if (r1 < 0 && r == 0) { r = r1; } return r; } + int shut_down() { + return m_journaler.shut_down(); + } + private: Journaler m_journaler; int m_fd; @@ -706,12 +722,18 @@ static int do_import_journal(librados::IoCtx& io_ctx, posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); } - r = JournalImporter(io_ctx, journal_id, fd, no_error, verbose).exec(); + JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose); + r = importer.exec(); if (!from_stdin) { close(fd); } + int shut_down_r = importer.shut_down(); + if (r == 0 && shut_down_r < 0) { + r = shut_down_r; + } + return r; }