From: Jason Dillaman Date: Mon, 23 Nov 2015 22:46:55 +0000 (-0500) Subject: journal: support replay passed skipped splay objects X-Git-Tag: v10.0.2~194^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d1c9bf63ead96f0e82ee3cf2eff213ffb64a530a;p=ceph.git journal: support replay passed skipped splay objects It's possible for a splay object within a set to be skipped if the set is closed due to a full object within the set. Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 2f971588abc7..1fd674ad93ee 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -53,9 +53,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, ReplayHandler *replay_handler) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_replay_handler(replay_handler), - m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), - m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false), - m_watch_interval(0), m_commit_object(0) { + m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0), + m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0), + m_commit_object(0) { m_replay_handler->get(); m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); @@ -83,25 +83,29 @@ JournalPlayer::~JournalPlayer() { } void JournalPlayer::prefetch() { - m_lock.Lock(); + Mutex::Locker locker(m_lock); assert(m_state == STATE_INIT); m_state = STATE_PREFETCH; uint8_t splay_width = m_journal_metadata->get_splay_width(); - ldout(m_cct, 10) << __func__ << ": prefetching " << (2 * splay_width) << " " + for (uint8_t splay_index = 0; splay_index < splay_width; ++splay_index) { + m_prefetch_splay_offsets.insert(splay_index); + } + + uint64_t object_set = m_commit_object / splay_width; + uint64_t active_set = m_journal_metadata->get_active_set(); + + uint32_t object_count = splay_width * + std::min(2, active_set - object_set + 1); + ldout(m_cct, 10) << __func__ << ": prefetching " << object_count << " " << "objects" << dendl; // prefetch starting from the last known commit set - C_PrefetchBatch *ctx = new C_PrefetchBatch(this); - uint64_t start_object = (m_commit_object / splay_width) * splay_width; + uint64_t start_object = object_set * splay_width; for (uint64_t object_number = start_object; - object_number < start_object + (2 * splay_width); ++object_number) { - ctx->add_fetch(); - fetch(object_number, ctx); + object_number < start_object + object_count; ++object_number) { + fetch(object_number); } - m_lock.Unlock(); - - ctx->complete(0); } void JournalPlayer::prefetch_and_watch(double interval) { @@ -126,6 +130,7 @@ void JournalPlayer::unwatch() { } bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { + ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); if (m_state != STATE_PLAYBACK) { return false; @@ -136,7 +141,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { if (object_player->empty()) { if (m_watch_enabled && !m_watch_scheduled) { - object_player->watch(&m_process_state, m_watch_interval); + object_player->watch( + new C_Watch(this, object_player->get_object_number()), + m_watch_interval); m_watch_scheduled = true; } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) { m_journal_metadata->get_finisher().queue(new C_HandleComplete( @@ -171,11 +178,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { } } else { advance_splay_object(); - - ObjectPlayerPtr next_set_object_player = get_next_set_object_player(); - if (!next_set_object_player->empty()) { - remove_object_player(object_player, &m_process_state); - } + remove_empty_object_player(object_player); } m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid()); @@ -184,14 +187,15 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { return true; } -void JournalPlayer::process_state(int r) { - ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; +void JournalPlayer::process_state(uint64_t object_number, int r) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", " + << "r=" << r << dendl; if (r >= 0) { Mutex::Locker locker(m_lock); switch (m_state) { case STATE_PREFETCH: ldout(m_cct, 10) << "PREFETCH" << dendl; - r = process_prefetch(); + r = process_prefetch(object_number); break; case STATE_PLAYBACK: ldout(m_cct, 10) << "PLAYBACK" << dendl; @@ -216,58 +220,77 @@ void JournalPlayer::process_state(int r) { } } -int JournalPlayer::process_prefetch() { +int JournalPlayer::process_prefetch(uint64_t object_number) { ldout(m_cct, 10) << __func__ << dendl; assert(m_lock.is_locked()); uint8_t splay_width = m_journal_metadata->get_splay_width(); - for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - assert(m_object_players.count(splay_offset) == 1); + uint8_t splay_offset = object_number % splay_width; - ObjectPlayers &object_players = m_object_players[splay_offset]; - assert(object_players.size() == 2); + PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find( + splay_offset); + if (it == m_prefetch_splay_offsets.end()) { + return 0; + } - ObjectPlayerPtr object_player = object_players.begin()->second; - assert(!object_player->is_fetch_in_progress()); - - ldout(m_cct, 15) << "seeking known commit position in " - << object_player->get_oid() << dendl; - Entry entry; - while (!m_commit_tids.empty() && !object_player->empty()) { - object_player->front(&entry); - if (entry.get_tid() > m_commit_tids[entry.get_tag()]) { - ldout(m_cct, 10) << "located next uncommitted entry: " << entry - << dendl; - break; - } + bool prefetch_complete = false; + assert(m_object_players.count(splay_offset) == 1); + ObjectPlayers &object_players = m_object_players[splay_offset]; - ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl; - m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid()); - object_player->pop_front(); - } + // prefetch in-order since a newer splay object could prefetch first + while (!object_players.begin()->second->is_fetch_in_progress()) { + ObjectPlayerPtr object_player = object_players.begin()->second; - // if this object contains the commit position, our read should start with - // the next consistent journal entry in the sequence - if (!m_commit_tids.empty() && - object_player->get_object_number() == m_commit_object) { - if (object_player->empty()) { - advance_splay_object(); - } else { - Entry entry; + // skip past known committed records + if (!m_commit_tids.empty() && !object_player->empty()) { + ldout(m_cct, 15) << "seeking known commit position in " + << object_player->get_oid() << dendl; + Entry entry; + while (!m_commit_tids.empty() && !object_player->empty()) { object_player->front(&entry); - if (entry.get_tag() == m_commit_tag) { + if (entry.get_tid() > m_commit_tids[entry.get_tag()]) { + ldout(m_cct, 10) << "located next uncommitted entry: " << entry + << dendl; + break; + } + + ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl; + m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid()); + object_player->pop_front(); + } + + // if this object contains the commit position, our read should start with + // the next consistent journal entry in the sequence + if (!m_commit_tids.empty() && + object_player->get_object_number() == m_commit_object) { + if (object_player->empty()) { advance_splay_object(); + } else { + Entry entry; + object_player->front(&entry); + if (entry.get_tag() == m_commit_tag) { + advance_splay_object(); + } } } } - ObjectPlayerPtr next_set_object_player = get_next_set_object_player(); - if (object_player->empty() && !next_set_object_player->empty()) { - ldout(m_cct, 15) << object_player->get_oid() << " empty" << dendl; - remove_object_player(object_player, &m_process_state); + // if the object is empty, pre-fetch the next splay object + if (!remove_empty_object_player(object_player)) { + prefetch_complete = true; + break; } } + if (!prefetch_complete) { + return 0; + } + + m_prefetch_splay_offsets.erase(it); + if (!m_prefetch_splay_offsets.empty()) { + return 0; + } + m_state = STATE_PLAYBACK; ObjectPlayerPtr object_player = get_object_player(); if (!object_player->empty()) { @@ -275,7 +298,9 @@ int JournalPlayer::process_prefetch() { m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( m_replay_handler), 0); } else if (m_watch_enabled) { - object_player->watch(&m_process_state, m_watch_interval); + object_player->watch( + new C_Watch(this, object_player->get_object_number()), + m_watch_interval); m_watch_scheduled = true; } else { ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" @@ -304,14 +329,11 @@ int JournalPlayer::process_playback() { const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const { assert(m_lock.is_locked()); - assert(m_object_players.count(m_splay_offset) == 1); SplayedObjectPlayers::const_iterator it = m_object_players.find( m_splay_offset); assert(it != m_object_players.end()); - const ObjectPlayers &object_players = it->second; - assert(object_players.size() == 2); - return object_players; + return it->second; } ObjectPlayerPtr JournalPlayer::get_object_player() const { @@ -336,27 +358,37 @@ void JournalPlayer::advance_splay_object() { << static_cast(m_splay_offset) << dendl; } -void JournalPlayer::remove_object_player(const ObjectPlayerPtr &object_player, - Context *on_fetch) { +bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { assert(m_lock.is_locked()); uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t object_set = player->get_object_number() / splay_width; + uint64_t active_set = m_journal_metadata->get_active_set(); + if (!player->empty() || object_set == active_set) { + return false; + } + + ldout(m_cct, 15) << player->get_oid() << " empty" << dendl; ObjectPlayers &object_players = m_object_players[ - object_player->get_object_number() % splay_width]; + player->get_object_number() % splay_width]; assert(!object_players.empty()); - assert(object_players.begin()->second == object_player); - object_players.erase(object_players.begin()); - fetch(object_player->get_object_number() + (2 * splay_width), on_fetch); + uint64_t next_object_num = object_players.rbegin()->first + splay_width; + uint64_t next_object_set = next_object_num / splay_width; + if (next_object_set <= active_set) { + fetch(next_object_num); + } + object_players.erase(player->get_object_number()); + return true; } -void JournalPlayer::fetch(uint64_t object_num, Context *ctx) { +void JournalPlayer::fetch(uint64_t object_num) { assert(m_lock.is_locked()); std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); ldout(m_cct, 10) << __func__ << ": " << oid << dendl; - C_Fetch *fetch_ctx = new C_Fetch(this, object_num, ctx); + C_Fetch *fetch_ctx = new C_Fetch(this, object_num); ObjectPlayerPtr object_player(new ObjectPlayer( m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order())); @@ -366,42 +398,33 @@ void JournalPlayer::fetch(uint64_t object_num, Context *ctx) { object_player->fetch(fetch_ctx); } -int JournalPlayer::handle_fetched(int r, uint64_t object_num) { - std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); - - ldout(m_cct, 10) << __func__ << ": fetched " +void JournalPlayer::handle_fetched(uint64_t object_num, int r) { + ldout(m_cct, 10) << __func__ << ": " << utils::get_object_name(m_object_oid_prefix, object_num) << ": r=" << r << dendl; - if (r < 0 && r != -ENOENT) { - return r; + if (r == -ENOENT) { + r = 0; } - return 0; -} + if (r == 0) { + Mutex::Locker locker(m_lock); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_num % splay_width; + assert(m_object_players.count(splay_offset) == 1); + ObjectPlayers &object_players = m_object_players[splay_offset]; -JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p) - : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1), - return_value(0) { - player->m_async_op_tracker.start_op(); -} + assert(object_players.count(object_num) == 1); + ObjectPlayerPtr object_player = object_players[object_num]; + remove_empty_object_player(object_player); + } -void JournalPlayer::C_PrefetchBatch::add_fetch() { - Mutex::Locker locker(lock); - ++refs; + process_state(object_num, r); } -void JournalPlayer::C_PrefetchBatch::complete(int r) { - { - Mutex::Locker locker(lock); - if (r < 0 && return_value == 0) { - return_value = r; - } - --refs; - } - - if (refs == 0) { - player->process_state(return_value); - delete this; - } +void JournalPlayer::handle_watch(uint64_t object_num, int r) { + ldout(m_cct, 10) << __func__ << ": " + << utils::get_object_name(m_object_oid_prefix, object_num) + << ": r=" << r << dendl; + process_state(object_num, r); } } // namespace journal diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 7d4855960d71..93347ea3f22e 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -39,6 +39,7 @@ public: bool try_pop_front(Entry *entry, uint64_t *commit_tid); private: + typedef std::set PrefetchSplayOffsets; typedef std::map AllocatedTids; typedef std::map ObjectPlayers; typedef std::map SplayedObjectPlayers; @@ -50,44 +51,28 @@ private: STATE_ERROR }; - struct C_ProcessState : public Context { + struct C_Watch : public Context { JournalPlayer *player; - C_ProcessState(JournalPlayer *p) : player(p) {} - virtual void complete(int r) { - player->process_state(r); - } - virtual void finish(int r) {} - }; - - struct C_PrefetchBatch : public Context { - JournalPlayer *player; - Mutex lock; - uint32_t refs; - int return_value; + uint64_t object_num; - C_PrefetchBatch(JournalPlayer *p); - virtual ~C_PrefetchBatch() { - player->m_async_op_tracker.finish_op(); + C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) { + } + virtual void finish(int r) { + player->handle_watch(object_num, r); } - void add_fetch(); - virtual void complete(int r); - virtual void finish(int r) {} }; struct C_Fetch : public Context { JournalPlayer *player; uint64_t object_num; - Context *on_fetch; - C_Fetch(JournalPlayer *p, uint64_t o, Context *c) - : player(p), object_num(o), on_fetch(c) { + C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) { player->m_async_op_tracker.start_op(); } virtual ~C_Fetch() { player->m_async_op_tracker.finish_op(); } virtual void finish(int r) { - r = player->handle_fetched(r, object_num); - on_fetch->complete(r); + player->handle_fetched(object_num, r); } }; @@ -98,8 +83,6 @@ private: ReplayHandler *m_replay_handler; - C_ProcessState m_process_state; - AsyncOpTracker m_async_op_tracker; mutable Mutex m_lock; @@ -110,6 +93,7 @@ private: bool m_watch_scheduled; double m_watch_interval; + PrefetchSplayOffsets m_prefetch_splay_offsets; SplayedObjectPlayers m_object_players; uint64_t m_commit_object; std::string m_commit_tag; @@ -120,15 +104,15 @@ private: const ObjectPlayers &get_object_players() const; ObjectPlayerPtr get_object_player() const; ObjectPlayerPtr get_next_set_object_player() const; - void remove_object_player(const ObjectPlayerPtr &object_player, - Context *on_fetch); + bool remove_empty_object_player(const ObjectPlayerPtr &object_player); - void process_state(int r); - int process_prefetch(); + void process_state(uint64_t object_number, int r); + int process_prefetch(uint64_t object_number); int process_playback(); - void fetch(uint64_t object_num, Context *ctx); - int handle_fetched(int r, uint64_t object_num); + void fetch(uint64_t object_num); + void handle_fetched(uint64_t object_num, int r); + void handle_watch(uint64_t object_num, int r); }; } // namespace journal