From 46159af76cda5e2f4a54c1aefdc6838dabd890b2 Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Mon, 9 Jul 2018 15:32:13 +0300 Subject: [PATCH] journal: limit in-flight appends Signed-off-by: Mykola Golub --- src/journal/JournalRecorder.cc | 8 ++- src/journal/JournalRecorder.h | 3 +- src/journal/Journaler.cc | 4 +- src/journal/Journaler.h | 3 +- src/journal/ObjectRecorder.cc | 79 ++++++++++++------------ src/journal/ObjectRecorder.h | 4 +- src/test/journal/mock/MockJournaler.h | 9 +-- src/test/journal/test_JournalRecorder.cc | 7 ++- src/test/journal/test_ObjectRecorder.cc | 4 +- 9 files changed, 67 insertions(+), 54 deletions(-) diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 0cfebe480f4ab..6fd5d7fc189f5 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -50,10 +50,12 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr& journal_metadata, uint32_t flush_interval, uint64_t flush_bytes, - double flush_age) + double flush_age, + uint64_t max_in_flight_appends) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this), + m_flush_bytes(flush_bytes), m_flush_age(flush_age), + m_max_in_flight_appends(max_in_flight_appends), m_listener(this), m_object_handler(this), m_lock("JournalerRecorder::m_lock"), m_current_set(m_journal_metadata->get_active_set()) { @@ -253,7 +255,7 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( object_number, lock, m_journal_metadata->get_work_queue(), m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, - m_flush_bytes, m_flush_age)); + m_flush_bytes, m_flush_age, m_max_in_flight_appends)); return object_recorder; } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index a16339faddf73..93c0e9e5bb7f6 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -24,7 +24,7 @@ public: JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr &journal_metadata, uint32_t flush_interval, uint64_t flush_bytes, - double flush_age); + double flush_age, uint64_t max_in_flight_appends); ~JournalRecorder(); Future append(uint64_t tag_tid, const bufferlist &bl); @@ -81,6 +81,7 @@ private: uint32_t m_flush_interval; uint64_t m_flush_bytes; double m_flush_age; + uint64_t m_max_in_flight_appends; Listener m_listener; ObjectHandler m_object_handler; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 65d32de3d1e95..946b8192a296c 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -392,14 +392,14 @@ void Journaler::committed(const Future &future) { } void Journaler::start_append(int flush_interval, uint64_t flush_bytes, - double flush_age) { + double flush_age, uint64_t max_in_flight_appends) { ceph_assert(m_recorder == nullptr); // TODO verify active object set >= current replay object set m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, m_metadata, flush_interval, flush_bytes, - flush_age); + flush_age, max_in_flight_appends); } void Journaler::stop_append(Context *on_safe) { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index bd3529d06a755..c5194ced51249 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -106,7 +106,8 @@ public: void stop_replay(Context *on_finish); uint64_t get_max_append_size() const; - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age); + void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, + uint64_t max_in_flight_appends); Future append(uint64_t tag_tid, const bufferlist &bl); void flush_append(Context *on_safe); void stop_append(Context *on_safe); diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index b54ba9ce8f35a..bc7b927b8c86a 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -22,12 +22,14 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock, Handler *handler, uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age) + uint64_t flush_bytes, double flush_age, + uint64_t max_in_flight_appends) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this), + m_flush_bytes(flush_bytes), m_flush_age(flush_age), + m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false), m_aio_scheduled(false) { @@ -99,6 +101,8 @@ void ObjectRecorder::flush(Context *on_safe) { future = Future(m_append_buffers.rbegin()->first); flush_appends(true); + } else if (!m_pending_buffers.empty()) { + future = Future(m_pending_buffers.rbegin()->first); } else if (!m_in_flight_appends.empty()) { AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second; ceph_assert(!append_buffers.empty()); @@ -163,7 +167,7 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { } bool ObjectRecorder::close() { - assert (m_lock->is_locked()); + ceph_assert(m_lock->is_locked()); ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; @@ -263,6 +267,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { // notify of overflow once all in-flight ops are complete if (m_in_flight_tids.empty() && !m_aio_scheduled) { + m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers); append_overflowed(); notify_handler_unlock(); } else { @@ -293,9 +298,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { m_in_flight_flushes = false; m_in_flight_flushes_cond.Signal(); - if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) { - // all remaining unsent appends should be redirected to new object - notify_handler_unlock(); + if (!m_aio_scheduled) { + if (m_in_flight_appends.empty() && m_object_closed) { + // all remaining unsent appends should be redirected to new object + m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers); + notify_handler_unlock(); + } else { + m_aio_scheduled = true; + m_lock->Unlock(); + send_appends_aio(); + } } else { m_lock->Unlock(); } @@ -357,18 +369,32 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { } void ObjectRecorder::send_appends_aio() { - librados::ObjectWriteOperation op; - client::guard_append(&op, m_soft_max_size); - C_Gather *gather_ctx; + librados::AioCompletion *rados_completion; { Mutex::Locker locker(*m_lock); + m_aio_scheduled = false; + + if (m_pending_buffers.empty()) { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty" + << dendl; + return; + } + + if (m_max_in_flight_appends != 0 && + m_in_flight_tids.size() >= m_max_in_flight_appends) { + ldout(m_cct, 20) << __func__ << ": " << m_oid + << " max in flight appends reached" << dendl; + return; + } + uint64_t append_tid = m_append_tid++; m_in_flight_tids.insert(append_tid); ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" << append_tid << dendl; - gather_ctx = new C_Gather(m_cct, new C_AppendFlush(this, append_tid)); + librados::ObjectWriteOperation op; + client::guard_append(&op, m_soft_max_size); auto append_buffers = &m_in_flight_appends[append_tid]; for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { @@ -382,36 +408,13 @@ void ObjectRecorder::send_appends_aio() { break; } } + rados_completion = librados::Rados::aio_create_completion( + new C_AppendFlush(this, append_tid), nullptr, + utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); + ceph_assert(r == 0); } - - librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); - ceph_assert(r == 0); rados_completion->release(); - - { - m_lock->Lock(); - if (m_pending_buffers.empty()) { - m_aio_scheduled = false; - if (m_in_flight_appends.empty() && m_object_closed) { - // all remaining unsent appends should be redirected to new object - notify_handler_unlock(); - } else { - m_lock->Unlock(); - } - } else { - // additional pending items -- reschedule - m_op_work_queue->queue(new FunctionContext([this] (int r) { - send_appends_aio(); - })); - m_lock->Unlock(); - } - } - - // allow append op to complete - gather_ctx->activate(); } void ObjectRecorder::notify_handler_unlock() { diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index fd4e88ffafca3..2b754e5be3fe1 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -41,7 +41,8 @@ public: uint64_t object_number, std::shared_ptr lock, ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock, Handler *handler, uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age); + uint64_t flush_bytes, double flush_age, + uint64_t max_in_flight_appends); ~ObjectRecorder() override; inline uint64_t get_object_number() const { @@ -121,6 +122,7 @@ private: uint32_t m_flush_interval; uint64_t m_flush_bytes; double m_flush_age; + uint32_t m_max_in_flight_appends; FlushHandler m_flush_handler; diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index 8a72537fcc64c..0b159a8efb2db 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -120,8 +120,7 @@ struct MockJournaler { MOCK_METHOD0(stop_replay, void()); MOCK_METHOD1(stop_replay, void(Context *on_finish)); - MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes, - double flush_age)); + MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t)); MOCK_CONST_METHOD0(get_max_append_size, uint64_t()); MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id, const bufferlist &bl)); @@ -258,9 +257,11 @@ struct MockJournalerProxy { MockJournaler::get_instance().stop_replay(on_finish); } - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age) { + void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, + uint64_t max_in_flight_appends) { MockJournaler::get_instance().start_append(flush_interval, flush_bytes, - flush_age); + flush_age, + max_in_flight_appends); } uint64_t get_max_append_size() const { diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc index 59cc3907f8b92..fb7c06772eccb 100644 --- a/src/test/journal/test_JournalRecorder.cc +++ b/src/test/journal/test_JournalRecorder.cc @@ -19,10 +19,11 @@ public: RadosTestFixture::TearDown(); } - journal::JournalRecorder *create_recorder(const std::string &oid, - const journal::JournalMetadataPtr &metadata) { + journal::JournalRecorder *create_recorder( + const std::string &oid, const journal::JournalMetadataPtr &metadata) { journal::JournalRecorder *recorder(new journal::JournalRecorder( - m_ioctx, oid + ".", metadata, 0, std::numeric_limits::max(), 0)); + m_ioctx, oid + ".", metadata, 0, std::numeric_limits::max(), + 0, 0)); m_recorders.push_back(recorder); return recorder; } diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index ed071c8d56b45..21c741e589637 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -57,6 +57,7 @@ public: uint32_t m_flush_interval; uint64_t m_flush_bytes; double m_flush_age; + uint64_t m_max_in_flight_appends = 0; Handler m_handler; void TearDown() override { @@ -96,7 +97,8 @@ public: uint8_t order, shared_ptr lock) { journal::ObjectRecorderPtr object(new journal::ObjectRecorder( m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler, - order, m_flush_interval, m_flush_bytes, m_flush_age)); + order, m_flush_interval, m_flush_bytes, m_flush_age, + m_max_in_flight_appends)); m_object_recorders.push_back(object); m_object_recorder_locks.insert(std::make_pair(oid, lock)); m_handler.object_lock = lock; -- 2.39.5