From 91ea6da859cd23a3a0e484c15e4511410e142f8c Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 23 Feb 2016 20:44:29 -0500 Subject: [PATCH] journal: update JournalPlayer to support new commit tracking Signed-off-by: Jason Dillaman --- src/journal/JournalPlayer.cc | 89 +++++++++++++++++--------- src/journal/JournalPlayer.h | 5 +- src/test/journal/test_JournalPlayer.cc | 3 +- 3 files changed, 62 insertions(+), 35 deletions(-) diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 7b0ad8a1faa4f..0478cec91be98 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -55,7 +55,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, m_journal_metadata(journal_metadata), m_replay_handler(replay_handler), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0), - m_commit_object(0), m_commit_tag_tid(0) { + m_commit_object(0) { m_replay_handler->get(); m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); @@ -63,22 +63,18 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, ObjectSetPosition commit_position; m_journal_metadata->get_commit_position(&commit_position); - // TODO - /* if (!commit_position.object_positions.empty()) { + ldout(m_cct, 5) << "commit position: " << commit_position << dendl; + + // start replay after the last committed entry's object uint8_t splay_width = m_journal_metadata->get_splay_width(); - m_splay_offset = commit_position.object_number % splay_width; - m_commit_object = commit_position.object_number; - m_commit_tag_tid = commit_position.object_positions.front().tag_tid; - - for (ObjectPositions::const_iterator it = - commit_position.object_positions.begin(); - it != commit_position.object_positions.end(); ++it) { - const ObjectPosition &object_position = *it; - m_commit_tids[object_position.tag_tid] = object_position.entry_tid; + m_commit_object = commit_position.object_positions.front().object_number; + m_splay_offset = m_commit_object % splay_width; + for (auto &position : commit_position.object_positions) { + uint8_t splay_offset = position.object_number % splay_width; + m_commit_positions[splay_offset] = position; } } - */ } JournalPlayer::~JournalPlayer() { @@ -92,22 +88,38 @@ void JournalPlayer::prefetch() { m_state = STATE_PREFETCH; uint8_t splay_width = m_journal_metadata->get_splay_width(); - for (uint8_t splay_index = 0; splay_index < splay_width; ++splay_index) { - m_prefetch_splay_offsets.insert(splay_index); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + m_prefetch_splay_offsets.insert(splay_offset); + } + + // compute active object for each splay offset (might be before + // active set) + std::map splay_offset_to_objects; + for (auto &position : m_commit_positions) { + assert(splay_offset_to_objects.count(position.first) == 0); + splay_offset_to_objects[position.first] = position.second.object_number; } - uint64_t object_set = m_commit_object / splay_width; + // prefetch the active object for each splay offset (and the following object) uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t max_object_number = (splay_width * (active_set + 1)) - 1; + std::set prefetch_object_numbers; + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + uint64_t object_number = splay_offset; + if (splay_offset_to_objects.count(splay_offset) != 0) { + object_number = splay_offset_to_objects[splay_offset]; + } - uint32_t object_count = splay_width * - std::min(2, active_set - object_set + 1); - ldout(m_cct, 10) << __func__ << ": prefetching " << object_count << " " - << "objects" << dendl; + prefetch_object_numbers.insert(object_number); + if (object_number + splay_width <= max_object_number) { + prefetch_object_numbers.insert(object_number + splay_width); + } + } - // prefetch starting from the last known commit set - uint64_t start_object = object_set * splay_width; - for (uint64_t object_number = start_object; - object_number < start_object + object_count; ++object_number) { + ldout(m_cct, 10) << __func__ << ": prefetching " + << prefetch_object_numbers.size() << " " << "objects" + << dendl; + for (auto object_number : prefetch_object_numbers) { fetch(object_number); } } @@ -248,15 +260,25 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { // prefetch in-order since a newer splay object could prefetch first while (!object_players.begin()->second->is_fetch_in_progress()) { ObjectPlayerPtr object_player = object_players.begin()->second; + uint64_t player_object_number = object_player->get_object_number(); // skip past known committed records - if (!m_commit_tids.empty() && !object_player->empty()) { - ldout(m_cct, 15) << "seeking known commit position in " + if (m_commit_positions.count(splay_offset) != 0 && + !object_player->empty()) { + ObjectPosition &position = m_commit_positions[splay_offset]; + + ldout(m_cct, 15) << "seeking known commit position " << position << " in " << object_player->get_oid() << dendl; + + bool found_commit = false; Entry entry; - while (!m_commit_tids.empty() && !object_player->empty()) { + while (!object_player->empty()) { object_player->front(&entry); - if (entry.get_entry_tid() > m_commit_tids[entry.get_tag_tid()]) { + + if (entry.get_tag_tid() == position.tag_tid && + entry.get_entry_tid() == position.entry_tid) { + found_commit = true; + } else if (found_commit) { ldout(m_cct, 10) << "located next uncommitted entry: " << entry << dendl; break; @@ -270,18 +292,23 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { // if this object contains the commit position, our read should start with // the next consistent journal entry in the sequence - if (!m_commit_tids.empty() && - object_player->get_object_number() == m_commit_object) { + if (player_object_number == m_commit_object) { if (object_player->empty()) { advance_splay_object(); } else { Entry entry; object_player->front(&entry); - if (entry.get_tag_tid() == m_commit_tag_tid) { + if (entry.get_tag_tid() == position.tag_tid) { advance_splay_object(); } } } + + // do not search for commit position for this object + // if we've already seen it + if (found_commit) { + m_commit_positions.erase(splay_offset); + } } // if the object is empty, pre-fetch the next splay object diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 54b4a93f2a11d..a07728ec0a210 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -40,9 +40,9 @@ public: private: typedef std::set PrefetchSplayOffsets; - typedef std::map AllocatedEntryTids; typedef std::map ObjectPlayers; typedef std::map SplayedObjectPlayers; + typedef std::map SplayedObjectPositions; enum State { STATE_INIT, @@ -96,8 +96,7 @@ private: PrefetchSplayOffsets m_prefetch_splay_offsets; SplayedObjectPlayers m_object_players; uint64_t m_commit_object; - uint64_t m_commit_tag_tid; - AllocatedEntryTids m_commit_tids; + SplayedObjectPositions m_commit_positions; void advance_splay_object(); diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index beafc0a692227..7612b4982f561 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -185,7 +185,8 @@ TEST_F(TestJournalPlayer, PrefetchSkip) { journal::JournalPlayer::ObjectPositions positions; positions = { - cls::journal::ObjectPosition(0, 234, 125) }; + cls::journal::ObjectPosition(0, 234, 125), + cls::journal::ObjectPosition(1, 234, 124) }; cls::journal::ObjectSetPosition commit_position(positions); ASSERT_EQ(0, create(oid)); -- 2.39.5