From: Jason Dillaman Date: Thu, 13 Jun 2019 00:06:11 +0000 (-0400) Subject: journal: support dynamically updating recorder flush options X-Git-Tag: v14.2.3~12^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=68ac2cc019d715d5648e71b545e1b616735acdb2;p=ceph.git journal: support dynamically updating recorder flush options Default to disabling writeback-style append flushes unless overridden by a call to 'set_append_batch_options'. Signed-off-by: Jason Dillaman (cherry picked from commit c0322a13c83f590067a120212620ebba15fc8661) --- diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index bf795e0e6dd9..aa90660a01fa 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -50,12 +50,9 @@ struct C_Flush : public Context { JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr& journal_metadata, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), - m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), + m_journal_metadata(journal_metadata), m_max_in_flight_appends(max_in_flight_appends), m_listener(this), m_object_handler(this), m_lock("JournalerRecorder::m_lock"), m_current_set(m_journal_metadata->get_active_set()) { @@ -66,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - m_object_locks.push_back(shared_ptr( - new Mutex("ObjectRecorder::m_lock::"+ - std::to_string(splay_offset)))); + shared_ptr object_lock(new Mutex( + "ObjectRecorder::m_lock::" + std::to_string(splay_offset))); + m_object_locks.push_back(object_lock); + uint64_t object_number = splay_offset + (m_current_set * splay_width); + Mutex::Locker locker(*object_lock); m_object_ptrs[splay_offset] = create_object_recorder( - object_number, - m_object_locks[splay_offset]); + object_number, m_object_locks[splay_offset]); } m_journal_metadata->add_listener(&m_listener); @@ -109,6 +107,27 @@ void JournalRecorder::shut_down(Context *on_safe) { flush(on_safe); } +void JournalRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + Mutex::Locker locker(m_lock); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + Mutex::Locker object_locker(*m_object_locks[splay_offset]); + auto object_recorder = get_object(splay_offset); + object_recorder->set_append_batch_options(flush_interval, flush_bytes, + flush_age); + } +} + Future JournalRecorder::append(uint64_t tag_tid, const bufferlist &payload_bl) { ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl; @@ -286,8 +305,10 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), object_number, lock, m_journal_metadata->get_work_queue(), - &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, - m_flush_bytes, m_flush_age, m_max_in_flight_appends)); + &m_object_handler, m_journal_metadata->get_order(), + m_max_in_flight_appends)); + object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); return object_recorder; } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index c27520a75895..382f75acef9c 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -23,11 +23,14 @@ class JournalRecorder { public: JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr &journal_metadata, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends); + uint64_t max_in_flight_appends); ~JournalRecorder(); void shut_down(Context *on_safe); + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + Future append(uint64_t tag_tid, const bufferlist &bl); void flush(Context *on_safe); @@ -79,9 +82,9 @@ private: JournalMetadataPtr m_journal_metadata; - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; uint64_t m_max_in_flight_appends; Listener m_listener; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index cc3bb1d60ca9..65435ae900f2 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -391,15 +391,20 @@ void Journaler::committed(const Future &future) { m_trimmer->committed(future_impl->get_commit_tid()); } -void Journaler::start_append(int flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends) { +void Journaler::start_append(uint64_t max_in_flight_appends) { ceph_assert(m_recorder == nullptr); // TODO verify active object set >= current replay object set m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, - m_metadata, flush_interval, flush_bytes, - flush_age, max_in_flight_appends); + m_metadata, max_in_flight_appends); +} + +void Journaler::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ceph_assert(m_recorder != nullptr); + m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age); } void Journaler::stop_append(Context *on_safe) { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 1424cb757612..5a6e0c7c18a8 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -106,8 +106,9 @@ public: void stop_replay(Context *on_finish); uint64_t get_max_append_size() const; - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends); + void start_append(uint64_t max_in_flight_appends); + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); Future append(uint64_t tag_tid, const bufferlist &bl); void flush_append(Context *on_safe); void stop_append(Context *on_safe); diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 162dfe90c089..127731e95c32 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -21,17 +21,13 @@ namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, shared_ptr lock, ContextWQ *work_queue, Handler *handler, - uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age, - int32_t max_in_flight_appends) + uint8_t order, int32_t max_in_flight_appends) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), - m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), - m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends), - m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()), - m_append_tid(0), m_overflowed(false), m_object_closed(false), - m_in_flight_flushes(false) { + m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), + m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0), + m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); ceph_assert(m_handler != NULL); @@ -45,6 +41,19 @@ ObjectRecorder::~ObjectRecorder() { ceph_assert(m_in_flight_appends.empty()); } +void ObjectRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + ceph_assert(m_lock->is_locked()); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; +} + bool ObjectRecorder::append(AppendBuffers &&append_buffers) { ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl; diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index d7cf6e668a97..ff00e0a0a1f0 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -41,10 +41,12 @@ public: ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, std::shared_ptr lock, ContextWQ *work_queue, Handler *handler, uint8_t order, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, int32_t max_in_flight_appends); + int32_t max_in_flight_appends); ~ObjectRecorder() override; + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + inline uint64_t get_object_number() const { return m_object_number; } @@ -115,9 +117,9 @@ private: uint8_t m_order; uint64_t m_soft_max_size; - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; int32_t m_max_in_flight_appends; FlushHandler m_flush_handler; diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 45d103f43277..f1b72b0bf74d 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -1168,11 +1168,14 @@ void Journal::complete_event(typename Events::iterator it, int r) { template void Journal::start_append() { ceph_assert(m_lock.is_locked()); + m_journaler->start_append( + m_image_ctx.config.template get_val("rbd_journal_object_max_in_flight_appends")); + m_journaler->set_append_batch_options( m_image_ctx.config.template get_val("rbd_journal_object_flush_interval"), m_image_ctx.config.template get_val("rbd_journal_object_flush_bytes"), - m_image_ctx.config.template get_val("rbd_journal_object_flush_age"), - m_image_ctx.config.template get_val("rbd_journal_object_max_in_flight_appends")); + m_image_ctx.config.template get_val("rbd_journal_object_flush_age")); + transition_state(STATE_READY, 0); } diff --git a/src/librbd/journal/DemoteRequest.cc b/src/librbd/journal/DemoteRequest.cc index 59aa0365a3f8..66a1d19e3320 100644 --- a/src/librbd/journal/DemoteRequest.cc +++ b/src/librbd/journal/DemoteRequest.cc @@ -135,7 +135,7 @@ void DemoteRequest::append_event() { bufferlist event_entry_bl; encode(event_entry, event_entry_bl); - m_journaler->start_append(0, 0, 0, 0); + m_journaler->start_append(0); m_future = m_journaler->append(m_tag_tid, event_entry_bl); auto ctx = create_context_callback< diff --git a/src/librbd/journal/PromoteRequest.cc b/src/librbd/journal/PromoteRequest.cc index 695acc388aaa..17f2957e4e32 100644 --- a/src/librbd/journal/PromoteRequest.cc +++ b/src/librbd/journal/PromoteRequest.cc @@ -119,7 +119,7 @@ void PromoteRequest::append_event() { bufferlist event_entry_bl; encode(event_entry, event_entry_bl); - m_journaler->start_append(0, 0, 0, 0); + m_journaler->start_append(0); m_future = m_journaler->append(m_tag_tid, event_entry_bl); auto ctx = create_context_callback< diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index b925ddfebe9e..236a42f90f29 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -120,7 +120,8 @@ struct MockJournaler { MOCK_METHOD0(stop_replay, void()); MOCK_METHOD1(stop_replay, void(Context *on_finish)); - MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t)); + MOCK_METHOD1(start_append, void(uint64_t)); + MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double)); MOCK_CONST_METHOD0(get_max_append_size, uint64_t()); MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id, const bufferlist &bl)); @@ -257,11 +258,14 @@ struct MockJournalerProxy { MockJournaler::get_instance().stop_replay(on_finish); } - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends) { - MockJournaler::get_instance().start_append(flush_interval, flush_bytes, - flush_age, - max_in_flight_appends); + void start_append(uint64_t max_in_flight_appends) { + MockJournaler::get_instance().start_append(max_in_flight_appends); + } + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age) { + MockJournaler::get_instance().set_append_batch_options( + flush_interval, flush_bytes, flush_age); } uint64_t get_max_append_size() const { diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc index fb7c06772ecc..7197526a1ce4 100644 --- a/src/test/journal/test_JournalRecorder.cc +++ b/src/test/journal/test_JournalRecorder.cc @@ -22,8 +22,9 @@ public: journal::JournalRecorder *create_recorder( const std::string &oid, const journal::JournalMetadataPtr &metadata) { journal::JournalRecorder *recorder(new journal::JournalRecorder( - m_ioctx, oid + ".", metadata, 0, std::numeric_limits::max(), - 0, 0)); + m_ioctx, oid + ".", metadata, 0)); + recorder->set_append_batch_options(0, std::numeric_limits::max(), + 0); m_recorders.push_back(recorder); return recorder; } diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index 7c3e7e99d166..3cc8e893cfe0 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -94,8 +94,13 @@ public: journal::ObjectRecorderPtr create_object(const std::string &oid, uint8_t order, shared_ptr lock) { journal::ObjectRecorderPtr object(new journal::ObjectRecorder( - m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval, - m_flush_bytes, m_flush_age, m_max_in_flight_appends)); + m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, + m_max_in_flight_appends)); + { + Mutex::Locker locker(*lock); + object->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); + } m_object_recorders.push_back(object); m_object_recorder_locks.insert(std::make_pair(oid, lock)); m_handler.object_lock = lock; diff --git a/src/test/librbd/fsx.cc b/src/test/librbd/fsx.cc index e0b40937d316..24b0538e72ab 100644 --- a/src/test/librbd/fsx.cc +++ b/src/test/librbd/fsx.cc @@ -431,7 +431,7 @@ int replay_journal(rados_ioctx_t ioctx, const char *image_name, return r; } - replay_journaler.start_append(0, 0, 0, 0); + replay_journaler.start_append(0); C_SaferCond replay_ctx; ReplayHandler replay_handler(&journaler, &replay_journaler, diff --git a/src/test/librbd/journal/test_mock_PromoteRequest.cc b/src/test/librbd/journal/test_mock_PromoteRequest.cc index 0e61a88890d3..68a627a79a8d 100644 --- a/src/test/librbd/journal/test_mock_PromoteRequest.cc +++ b/src/test/librbd/journal/test_mock_PromoteRequest.cc @@ -120,7 +120,7 @@ public: } void expect_start_append(::journal::MockJournaler &mock_journaler) { - EXPECT_CALL(mock_journaler, start_append(_, _, _, _)); + EXPECT_CALL(mock_journaler, start_append(_)); } void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) { diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 12f445d6c88b..144aa120066c 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -396,7 +396,11 @@ public: } void expect_start_append(::journal::MockJournaler &mock_journaler) { - EXPECT_CALL(mock_journaler, start_append(_, _, _, _)); + EXPECT_CALL(mock_journaler, start_append(_)); + } + + void expect_set_append_batch_options(::journal::MockJournaler &mock_journaler) { + EXPECT_CALL(mock_journaler, set_append_batch_options(_, _, _)); } void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) { @@ -518,6 +522,7 @@ public: expect_committed(mock_journaler, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); } @@ -585,6 +590,7 @@ TEST_F(TestMockJournal, StateTransitions) { expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); @@ -662,6 +668,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); @@ -719,6 +726,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); @@ -773,6 +781,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, -EINVAL); @@ -811,6 +820,7 @@ TEST_F(TestMockJournal, StopError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, -EINVAL); @@ -876,6 +886,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); C_SaferCond ctx; mock_journal.open(&ctx); @@ -958,6 +969,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); C_SaferCond ctx; mock_journal.open(&ctx); @@ -1272,6 +1284,7 @@ TEST_F(TestMockJournal, ExternalReplay) { InSequence seq; expect_stop_append(mock_journaler, 0); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; @@ -1303,6 +1316,7 @@ TEST_F(TestMockJournal, ExternalReplayFailure) { InSequence seq; expect_stop_append(mock_journaler, -EINVAL); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc index e36c6a6a5073..d3a54f94f848 100644 --- a/src/tools/rbd/action/Journal.cc +++ b/src/tools/rbd/action/Journal.cc @@ -832,7 +832,7 @@ public: if (r < 0) { return r; } - m_journaler.start_append(0, 0, 0, 0); + m_journaler.start_append(0); int r1 = 0; bufferlist bl;