From 18f663d834edd03b42e2c08b9428e72fdc6bae9d Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Sat, 14 May 2016 18:58:41 -0400 Subject: [PATCH] journal: close, advance, and open object set ordering Flush in-flight appends to open objects before advancing the active object set. Additionally, don't start recording to the new objects until after advancing the active set. Signed-off-by: Jason Dillaman (cherry picked from commit de830057d0f7286914e019540c6263423cb60428) --- src/journal/JournalRecorder.cc | 150 ++++++++++++++++++++++++++++----- src/journal/JournalRecorder.h | 22 ++++- 2 files changed, 151 insertions(+), 21 deletions(-) diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 02836b87c8619..ce66a13144373 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "journal/JournalRecorder.h" +#include "common/errno.h" #include "journal/Entry.h" #include "journal/Utils.h" @@ -67,6 +68,10 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, JournalRecorder::~JournalRecorder() { m_journal_metadata->remove_listener(&m_listener); + + Mutex::Locker locker(m_lock); + assert(m_in_flight_advance_sets == 0); + assert(m_in_flight_object_closes == 0); } Future JournalRecorder::append(uint64_t tag_tid, @@ -97,9 +102,8 @@ Future JournalRecorder::append(uint64_t tag_tid, if (object_full) { ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" << dendl; - close_object_set(object_ptr->get_object_number() / splay_width); + close_and_advance_object_set(object_ptr->get_object_number() / splay_width); } - return Future(future); } @@ -127,36 +131,100 @@ ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { return object_recoder; } -void JournalRecorder::close_object_set(uint64_t object_set) { +void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { assert(m_lock.is_locked()); - uint8_t splay_width = m_journal_metadata->get_splay_width(); - if (object_set != m_current_set) { + // entry overflow from open object + if (m_current_set != object_set) { + ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl; return; } + // we shouldn't overflow upon append if already closed and we + // shouldn't receive an overflowed callback if already closed + assert(m_in_flight_advance_sets == 0); + assert(m_in_flight_object_closes == 0); + uint64_t active_set = m_journal_metadata->get_active_set(); - if (active_set < m_current_set + 1) { - m_journal_metadata->set_active_set(m_current_set + 1); + assert(m_current_set == active_set); + ++m_current_set; + ++m_in_flight_advance_sets; + + ldout(m_cct, 20) << __func__ << ": closing active object set " + << object_set << dendl; + if (close_object_set(m_current_set)) { + advance_object_set(); + } +} + +void JournalRecorder::advance_object_set() { + assert(m_lock.is_locked()); + + assert(m_in_flight_object_closes == 0); + ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set + << dendl; + m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( + this)); +} + +void JournalRecorder::handle_advance_object_set(int r) { + Mutex::Locker locker(m_lock); + ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; + + assert(m_in_flight_advance_sets > 0); + --m_in_flight_advance_sets; + + if (r < 0 && r != -ESTALE) { + lderr(m_cct) << __func__ << ": failed to advance object set: " + << cpp_strerror(r) << dendl; + } + + if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { + open_object_set(); + } +} + +void JournalRecorder::open_object_set() { + assert(m_lock.is_locked()); + + ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set + << dendl; + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); + it != m_object_ptrs.end(); ++it) { + ObjectRecorderPtr object_recorder = it->second; + if (object_recorder->get_object_number() / splay_width != m_current_set) { + assert(object_recorder->is_closed()); + + // ready to close object and open object in active set + create_next_object_recorder(object_recorder); + } } - m_current_set = m_journal_metadata->get_active_set(); +} - ldout(m_cct, 10) << __func__ << ": advancing to object set " - << m_current_set << dendl; +bool JournalRecorder::close_object_set(uint64_t active_set) { + assert(m_lock.is_locked()); // object recorders will invoke overflow handler as they complete // closing the object to ensure correct order of future appends + uint8_t splay_width = m_journal_metadata->get_splay_width(); for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { ObjectRecorderPtr object_recorder = it->second; - if (object_recorder != NULL && - object_recorder->get_object_number() / splay_width == m_current_set) { - if (object_recorder->close()) { - // no in-flight ops, immediately create new recorder - create_next_object_recorder(object_recorder); + if (object_recorder->get_object_number() / splay_width != active_set) { + ldout(m_cct, 10) << __func__ << ": closing object " + << object_recorder->get_oid() << dendl; + // flush out all queued appends and hold future appends + if (!object_recorder->close()) { + ++m_in_flight_object_closes; + } else { + ldout(m_cct, 20) << __func__ << ": object " + << object_recorder->get_oid() << " closed" << dendl; } } } + return (m_in_flight_object_closes == 0); } ObjectRecorderPtr JournalRecorder::create_object_recorder( @@ -181,6 +249,9 @@ void JournalRecorder::create_next_object_recorder( ObjectRecorderPtr new_object_recorder = create_object_recorder( (m_current_set * splay_width) + splay_offset); + ldout(m_cct, 10) << __func__ << ": " + << "old oid=" << object_recorder->get_oid() << ", " + << "new oid=" << new_object_recorder->get_oid() << dendl; AppendBuffers append_buffers; object_recorder->claim_append_buffers(&append_buffers); new_object_recorder->append(append_buffers); @@ -192,13 +263,50 @@ void JournalRecorder::handle_update() { Mutex::Locker locker(m_lock); uint64_t active_set = m_journal_metadata->get_active_set(); - if (active_set > m_current_set) { - close_object_set(m_current_set); + if (m_current_set < active_set) { + // peer journal client advanced the active set + ldout(m_cct, 20) << __func__ << ": " + << "current_set=" << m_current_set << ", " + << "active_set=" << active_set << dendl; + + uint64_t current_set = m_current_set; + m_current_set = active_set; + if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { + ldout(m_cct, 20) << __func__ << ": closing current object set " + << current_set << dendl; + if (close_object_set(active_set)) { + open_object_set(); + } + } } } void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { - // TODO + ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; + + Mutex::Locker locker(m_lock); + + uint64_t object_number = object_recorder->get_object_number(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; + assert(active_object_recorder->get_object_number() == object_number); + + assert(m_in_flight_object_closes > 0); + --m_in_flight_object_closes; + + // object closed after advance active set committed + ldout(m_cct, 20) << __func__ << ": object " + << active_object_recorder->get_oid() << " closed" << dendl; + if (m_in_flight_object_closes == 0) { + if (m_in_flight_advance_sets == 0) { + // peer forced closing of object set + open_object_set(); + } else { + // local overflow advanced object set + advance_object_set(); + } + } } void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { @@ -212,8 +320,10 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; assert(active_object_recorder->get_object_number() == object_number); - close_object_set(object_number / splay_width); - create_next_object_recorder(active_object_recorder); + ldout(m_cct, 20) << __func__ << ": object " + << active_object_recorder->get_oid() << " overflowed" + << dendl; + close_and_advance_object_set(object_number / splay_width); } } // namespace journal diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index dbd289883d46c..68a1d8f50e38e 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -62,6 +62,17 @@ private: } }; + struct C_AdvanceObjectSet : public Context { + JournalRecorder *journal_recorder; + + C_AdvanceObjectSet(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) { + } + virtual void finish(int r) { + journal_recorder->handle_advance_object_set(r); + } + }; + librados::IoCtx m_ioctx; CephContext *m_cct; std::string m_object_oid_prefix; @@ -77,12 +88,21 @@ private: Mutex m_lock; + uint32_t m_in_flight_advance_sets = 0; + uint32_t m_in_flight_object_closes = 0; uint64_t m_current_set; ObjectRecorderPtrs m_object_ptrs; FutureImplPtr m_prev_future; - void close_object_set(uint64_t object_set); + void open_object_set(); + bool close_object_set(uint64_t active_set); + + void advance_object_set(); + void handle_advance_object_set(int r); + + void close_and_advance_object_set(uint64_t object_set); + ObjectRecorderPtr create_object_recorder(uint64_t object_number); void create_next_object_recorder(ObjectRecorderPtr object_recorder); -- 2.39.5