From: Jason Dillaman Date: Mon, 13 Jul 2015 19:24:18 +0000 (-0400) Subject: journal: playback should notify when complete X-Git-Tag: v10.0.1~102^2~23 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cd3c378d03223e7c669756cfae9d6a228d784a30;p=ceph.git journal: playback should notify when complete When not performing a live-playback, the client should be notified when there are no more records to read. Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index ed70519ab63f8..9172a7046f359 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -15,8 +15,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr& journal_metadata, ReplayHandler *replay_handler) - : RefCountedObject(NULL, 0), m_cct(NULL), - m_object_oid_prefix(object_oid_prefix), + : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_replay_handler(replay_handler), m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false), @@ -95,6 +94,10 @@ bool JournalPlayer::try_pop_front(Entry *entry, if (m_watch_enabled && !m_watch_scheduled) { object_player->watch(&m_process_state, m_watch_interval); m_watch_scheduled = true; + } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) { + m_lock.Unlock(); + m_replay_handler->handle_complete(0); + m_lock.Lock(); } return false; } @@ -109,7 +112,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, m_state = STATE_ERROR; m_lock.Unlock(); - m_replay_handler->handle_error(-EINVAL); + m_replay_handler->handle_complete(-EINVAL); m_lock.Lock(); return false; } @@ -121,10 +124,10 @@ bool JournalPlayer::try_pop_front(Entry *entry, if (peek_entry.get_tag() == entry->get_tag() || (m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(), &last_tid) && - last_tid +1 != peek_entry.get_tid())) { + last_tid + 1 != peek_entry.get_tid())) { advance_splay_object(); } - } else { + } else { advance_splay_object(); ObjectPlayerPtr next_set_object_player = get_next_set_object_player(); @@ -167,7 +170,7 @@ void JournalPlayer::process_state(int r) { Mutex::Locker locker(m_lock); m_state = STATE_ERROR; } - m_replay_handler->handle_error(r); + m_replay_handler->handle_complete(r); } } @@ -188,7 +191,7 @@ int JournalPlayer::process_prefetch() { ldout(m_cct, 15) << "seeking known commit position in " << object_player->get_oid() << dendl; Entry entry; - while (!object_player->empty()) { + while (!m_commit_tids.empty() && !object_player->empty()) { object_player->front(&entry); if (entry.get_tid() > m_commit_tids[entry.get_tag()]) { ldout(m_cct, 10) << "located next uncommitted entry: " << entry @@ -202,7 +205,8 @@ int JournalPlayer::process_prefetch() { // if this object contains the commit position, our read should start with // the next consistent journal entry in the sequence - if (object_player->get_object_number() == m_commit_object) { + if (!m_commit_tids.empty() && + object_player->get_object_number() == m_commit_object) { if (object_player->empty()) { advance_splay_object(); } else { @@ -231,6 +235,12 @@ int JournalPlayer::process_prefetch() { } else if (m_watch_enabled) { object_player->watch(&m_process_state, m_watch_interval); m_watch_scheduled = true; + } else { + ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" + << dendl; + m_lock.Unlock(); + m_replay_handler->handle_complete(0); + m_lock.Lock(); } return 0; } diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h index 47126847c8d44..208350461fe47 100644 --- a/src/journal/ReplayHandler.h +++ b/src/journal/ReplayHandler.h @@ -10,7 +10,7 @@ struct ReplayHandler { virtual ~ReplayHandler() {} virtual void handle_entries_available() = 0; - virtual void handle_error(int r) = 0; + virtual void handle_complete(int r) = 0; }; } // namespace journal