From: Jason Dillaman Date: Sun, 15 May 2016 13:52:41 +0000 (-0400) Subject: journal: skip partially complete tag entries during playback X-Git-Tag: v11.0.0~518^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9454f7e4c62437b1c288f371009feba1fd374584;p=ceph.git journal: skip partially complete tag entries during playback If a journal client does not fully write out its buffered entries before quiting, replay should skip over all remaining out-of- sequence entries for the tag. Fixes: http://tracker.ceph.com/issues/15864 Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index d0d62694a9af..cb728edea7b8 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -83,6 +83,7 @@ JournalPlayer::~JournalPlayer() { { Mutex::Locker locker(m_lock); assert(m_fetch_object_numbers.empty()); + assert(!m_watch_scheduled); } m_replay_handler->put(); } @@ -134,6 +135,7 @@ void JournalPlayer::prefetch_and_watch(double interval) { Mutex::Locker locker(m_lock); m_watch_enabled = true; m_watch_interval = interval; + m_watch_step = WATCH_STEP_FETCH_CURRENT; } prefetch(); } @@ -159,17 +161,16 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { return false; } - if (!is_object_set_ready()) { - m_handler_notified = false; - return false; - } - if (!verify_playback_ready()) { - if (!m_watch_enabled) { - notify_complete(0); - } else if (!m_watch_scheduled) { + if (!is_object_set_ready()) { m_handler_notified = false; - schedule_watch(); + } else { + if (!m_watch_enabled) { + notify_complete(0); + } else if (!m_watch_scheduled) { + m_handler_notified = false; + schedule_watch(); + } } return false; } @@ -182,7 +183,10 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { uint64_t last_entry_tid; if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) { - lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl; + lderr(m_cct) << "unexpected tag in journal entry: " + << "entry=" << *entry << ", " + << "tag_tid=" << (m_active_tag_tid ? *m_active_tag_tid : -1) + << dendl; m_state = STATE_ERROR; notify_complete(-ENOMSG); @@ -329,16 +333,16 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { } m_state = STATE_PLAYBACK; - if (!is_object_set_ready()) { - ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; - } else if (verify_playback_ready()) { + if (verify_playback_ready()) { notify_entries_available(); - } else if (m_watch_enabled) { - schedule_watch(); - } else { - ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" - << dendl; - notify_complete(0); + } else if (is_object_set_ready()) { + if (m_watch_enabled) { + schedule_watch(); + } else { + ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" + << dendl; + notify_complete(0); + } } return 0; } @@ -347,21 +351,19 @@ int JournalPlayer::process_playback(uint64_t object_number) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; assert(m_lock.is_locked()); - if (!is_object_set_ready()) { - return 0; - } - ObjectPlayerPtr object_player = get_object_player(); if (verify_playback_ready()) { notify_entries_available(); - } else if (m_watch_enabled) { - schedule_watch(); - } else { - uint8_t splay_width = m_journal_metadata->get_splay_width(); - uint64_t active_set = m_journal_metadata->get_active_set(); - uint64_t object_set = object_player->get_object_number() / splay_width; - if (object_set == active_set) { - notify_complete(0); + } else if (is_object_set_ready()) { + if (m_watch_enabled) { + schedule_watch(); + } else { + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t object_set = object_player->get_object_number() / splay_width; + if (object_set == active_set) { + notify_complete(0); + } } } return 0; @@ -377,7 +379,11 @@ bool JournalPlayer::is_object_set_ready() const { bool JournalPlayer::verify_playback_ready() { assert(m_lock.is_locked()); - assert(is_object_set_ready()); + + if (!is_object_set_ready()) { + ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; + return false; + } ObjectPlayerPtr object_player = get_object_player(); assert(object_player); @@ -388,10 +394,36 @@ bool JournalPlayer::verify_playback_ready() { bool entry_available = false; if (!object_player->empty()) { entry_available = true; + m_watch_prune_active_tag = false; object_player->front(&entry); if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) { return true; + } else if (m_active_tag_tid && entry.get_tag_tid() < m_active_tag_tid) { + // new tag is registered as primary commit position -- other object + // commit positions might still be associated with old tags + ldout(m_cct, 10) << __func__ << ": detected stale entry: " << entry + << dendl; + return prune_tag(entry.get_tag_tid()); + } else if (m_active_tag_tid && entry.get_tag_tid() > *m_active_tag_tid) { + // new tag at current playback position -- implies that previous + // tag ended abruptly without flushing out all records + // search for the start record for the next tag + ldout(m_cct, 10) << __func__ << ": new tag detected at current playback " + << "position: active_tag=" << *m_active_tag_tid << ", " + << "new_tag=" << entry.get_tag_tid() << dendl; + return prune_active_tag(); } + } else if (!m_watch_enabled && m_active_tag_tid) { + // current playback position is empty so this tag is done + ldout(m_cct, 10) << __func__ << ": no more in-sequence entries for tag " + << *m_active_tag_tid << dendl; + return prune_active_tag(); + } else if (m_watch_enabled && m_active_tag_tid && m_watch_prune_active_tag) { + // detected current tag is now longer active and we have re-read the current + // object but it's still empty, so this tag is done + ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries " + << "for tag " << *m_active_tag_tid << dendl; + return prune_active_tag(); } // NOTE: replay currently does not check tag class to playback multiple tags @@ -417,6 +449,55 @@ bool JournalPlayer::verify_playback_ready() { return entry_available; } +bool JournalPlayer::prune_tag(uint64_t tag_tid) { + assert(m_lock.is_locked()); + ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag " + << tag_tid << dendl; + + for (auto &players : m_object_players) { + for (auto player_pair : players.second) { + ObjectPlayerPtr object_player = player_pair.second; + ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid() + << dendl; + while (!object_player->empty()) { + Entry entry; + object_player->front(&entry); + if (entry.get_tag_tid() == tag_tid) { + ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl; + object_player->pop_front(); + } else { + break; + } + } + } + + // trim any empty players to prefetch the next available object + ObjectPlayers object_players(players.second); + for (auto player_pair : object_players) { + remove_empty_object_player(player_pair.second); + } + } + + // if we removed an empty object, a fetch will be in-flight + if (!is_object_set_ready()) { + return false; + } + + // search for the start record for the next tag + return verify_playback_ready(); +} + +bool JournalPlayer::prune_active_tag() { + assert(m_lock.is_locked()); + assert(m_active_tag_tid); + + uint64_t tag_tid = *m_active_tag_tid; + m_active_tag_tid = boost::none; + m_splay_offset = 0; + + return prune_tag(tag_tid); +} + const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const { assert(m_lock.is_locked()); @@ -434,6 +515,20 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const { return object_players.begin()->second; } +ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const { + assert(m_lock.is_locked()); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + auto splay_it = m_object_players.find(splay_offset); + assert(splay_it != m_object_players.end()); + + const ObjectPlayers &object_players = splay_it->second; + auto player_it = object_players.find(object_number); + assert(player_it != object_players.end()); + return player_it->second; +} + ObjectPlayerPtr JournalPlayer::get_next_set_object_player() const { assert(m_lock.is_locked()); @@ -445,6 +540,7 @@ void JournalPlayer::advance_splay_object() { assert(m_lock.is_locked()); ++m_splay_offset; m_splay_offset %= m_journal_metadata->get_splay_width(); + m_watch_step = WATCH_STEP_FETCH_CURRENT; ldout(m_cct, 20) << __func__ << ": new offset " << static_cast(m_splay_offset) << dendl; } @@ -513,13 +609,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { r = 0; } if (r == 0) { - uint8_t splay_width = m_journal_metadata->get_splay_width(); - uint8_t splay_offset = object_num % splay_width; - assert(m_object_players.count(splay_offset) == 1); - ObjectPlayers &object_players = m_object_players[splay_offset]; - - assert(object_players.count(object_num) == 1); - ObjectPlayerPtr object_player = object_players[object_num]; + ObjectPlayerPtr object_player = get_object_player(object_num); remove_empty_object_player(object_player); } process_state(object_num, r); @@ -532,25 +622,39 @@ void JournalPlayer::schedule_watch() { return; } - // poll first splay offset and active splay offset since - // new records should only appear in those two objects - C_Watch *ctx = new C_Watch(this); - - ObjectPlayerPtr object_player = get_object_player(); - uint8_t splay_width = m_journal_metadata->get_splay_width(); - if (object_player->get_object_number() % splay_width != 0) { - ++ctx->pending_fetches; + m_watch_scheduled = true; - ObjectPlayerPtr first_object_player = - m_object_players.begin()->second.begin()->second; - first_object_player->watch(ctx, m_watch_interval); + if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) { + // detect if a new tag has been created in case we are blocked + // by an incomplete tag sequence + ldout(m_cct, 20) << __func__ << ": asserting active tag" << dendl; + assert(m_active_tag_tid); + FunctionContext *ctx = new FunctionContext([this](int r) { + handle_watch_assert_active(r); + }); + m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx); + return; } + ObjectPlayerPtr object_player = get_object_player(); + switch (m_watch_step) { + case WATCH_STEP_FETCH_CURRENT: + object_player = get_object_player(); + break; + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second.begin()->second; + break; + default: + assert(false); + } + + ldout(m_cct, 20) << __func__ << ": scheduling watch on " + << object_player->get_oid() << dendl; + C_Watch *ctx = new C_Watch(this, object_player->get_object_number()); object_player->watch(ctx, m_watch_interval); - m_watch_scheduled = true; } -void JournalPlayer::handle_watch(int r) { +void JournalPlayer::handle_watch(uint64_t object_num, int r) { ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; if (r == -ECANCELED) { // unwatch of object player(s) @@ -558,17 +662,49 @@ void JournalPlayer::handle_watch(int r) { } Mutex::Locker locker(m_lock); + assert(m_watch_scheduled); m_watch_scheduled = false; - std::set object_numbers; - for (auto &players : m_object_players) { - object_numbers.insert( - players.second.begin()->second->get_object_number()); + ObjectPlayerPtr object_player = get_object_player(object_num); + if (r == 0) { + if (object_player->empty() && !object_player->refetch_required()) { + // already re-read object after trying to remove it before ... it's + // still empty so it's safe to remove + remove_empty_object_player(object_player); + } + } + + // determine what object to query on next watch schedule tick + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (m_watch_step == WATCH_STEP_FETCH_CURRENT && + object_player->get_object_number() % splay_width != 0) { + m_watch_step = WATCH_STEP_FETCH_FIRST; + } else if (m_active_tag_tid) { + m_watch_step = WATCH_STEP_ASSERT_ACTIVE; + } else { + m_watch_step = WATCH_STEP_FETCH_CURRENT; } - for (auto object_num : object_numbers) { - process_state(object_num, r); + process_state(object_num, r); +} + +void JournalPlayer::handle_watch_assert_active(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + assert(m_watch_scheduled); + m_watch_scheduled = false; + + if (m_active_tag_tid && r == -ESTALE) { + // newer tag exists -- since we are at this step in the watch sequence, + // we know we can prune the active tag if watch fails again + ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " " + << "no longer active" << dendl; + m_watch_prune_active_tag = true; } + + m_watch_step = WATCH_STEP_FETCH_CURRENT; + schedule_watch(); } void JournalPlayer::notify_entries_available() { diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 85a8b28da25f..d82edb1261e1 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -54,6 +54,12 @@ private: STATE_ERROR }; + enum WatchStep { + WATCH_STEP_FETCH_CURRENT, + WATCH_STEP_FETCH_FIRST, + WATCH_STEP_ASSERT_ACTIVE + }; + struct C_Fetch : public Context { JournalPlayer *player; uint64_t object_num; @@ -70,31 +76,13 @@ private: struct C_Watch : public Context { JournalPlayer *player; - Mutex lock; - uint8_t pending_fetches = 1; - int ret_val = 0; - - C_Watch(JournalPlayer *player) - : player(player), lock("JournalPlayer::C_Watch::lock") { - } - - virtual void complete(int r) override { - lock.Lock(); - if (ret_val == 0 && r < 0) { - ret_val = r; - } - - assert(pending_fetches > 0); - if (--pending_fetches == 0) { - lock.Unlock(); - Context::complete(ret_val); - } else { - lock.Unlock(); - } + uint64_t object_num; + C_Watch(JournalPlayer *player, uint64_t object_num) + : player(player), object_num(object_num) { } virtual void finish(int r) override { - player->handle_watch(r); + player->handle_watch(object_num, r); } }; @@ -114,6 +102,8 @@ private: bool m_watch_enabled; bool m_watch_scheduled; double m_watch_interval; + WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT; + bool m_watch_prune_active_tag = false; bool m_handler_notified = false; @@ -129,8 +119,12 @@ private: bool is_object_set_ready() const; bool verify_playback_ready(); + bool prune_tag(uint64_t tag_tid); + bool prune_active_tag(); + const ObjectPlayers &get_object_players() const; ObjectPlayerPtr get_object_player() const; + ObjectPlayerPtr get_object_player(uint64_t object_number) const; ObjectPlayerPtr get_next_set_object_player() const; bool remove_empty_object_player(const ObjectPlayerPtr &object_player); @@ -142,7 +136,8 @@ private: void handle_fetched(uint64_t object_num, int r); void schedule_watch(); - void handle_watch(int r); + void handle_watch(uint64_t object_num, int r); + void handle_watch_assert_active(int r); void notify_entries_available(); void notify_complete(int r); diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index 54e555fc37d6..ed3e241630de 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -298,6 +298,147 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) { ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result); } +TEST_F(TestJournalPlayer, PrefetchMissingSequence) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid, 14, 4)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, metadata->set_active_set(1)); + ASSERT_EQ(0, write_entry(oid, 0, 2, 852)); + ASSERT_EQ(0, write_entry(oid, 0, 2, 856)); + ASSERT_EQ(0, write_entry(oid, 0, 2, 860)); + ASSERT_EQ(0, write_entry(oid, 1, 2, 853)); + ASSERT_EQ(0, write_entry(oid, 1, 2, 857)); + ASSERT_EQ(0, write_entry(oid, 5, 2, 861)); + ASSERT_EQ(0, write_entry(oid, 2, 2, 854)); + ASSERT_EQ(0, write_entry(oid, 0, 3, 0)); + ASSERT_EQ(0, write_entry(oid, 5, 3, 1)); + ASSERT_EQ(0, write_entry(oid, 2, 3, 2)); + ASSERT_EQ(0, write_entry(oid, 3, 3, 3)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 7, &entries)); + + Entries expected_entries = { + create_entry(2, 852), + create_entry(2, 853), + create_entry(2, 854), + create_entry(3, 0), + create_entry(3, 1), + create_entry(3, 2), + create_entry(3, 3)}; + ASSERT_EQ(expected_entries, entries); + + ASSERT_TRUE(wait_for_complete(player)); + ASSERT_EQ(0, m_replay_hander.complete_result); +} + +TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, metadata->set_active_set(2)); + ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, write_entry(oid, 3, 0, 3)); + ASSERT_EQ(0, write_entry(oid, 4, 1, 0)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + create_entry(0, 0), + create_entry(0, 1), + create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, write_entry(oid, 0, 0, 2)); + ASSERT_EQ(0, write_entry(oid, 0, 0, 4)); + ASSERT_EQ(0, write_entry(oid, 0, 1, 0)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 4, &entries)); + + Entries expected_entries = { + create_entry(0, 0), + create_entry(0, 1), + create_entry(0, 2), + create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TEST_F(TestJournalPlayer, PrefetchStaleEntries) { + std::string oid = get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions = { + cls::journal::ObjectPosition(0, 1, 0) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, write_entry(oid, 1, 0, 3)); + ASSERT_EQ(0, write_entry(oid, 0, 1, 0)); + ASSERT_EQ(0, write_entry(oid, 1, 1, 1)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 1, &entries)); + + Entries expected_entries = { + create_entry(1, 1)}; + ASSERT_EQ(expected_entries, entries); + + ASSERT_TRUE(wait_for_complete(player)); + ASSERT_EQ(0, m_replay_hander.complete_result); +} + TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) { std::string oid = get_temp_oid(); @@ -324,7 +465,7 @@ TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) { uint64_t commit_tid; ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); ASSERT_TRUE(wait_for_complete(player)); - ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result); + ASSERT_EQ(0, m_replay_hander.complete_result); } TEST_F(TestJournalPlayer, PrefetchAndWatch) { @@ -497,3 +638,167 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) { create_entry(0, 5)}; ASSERT_EQ(expected_entries, entries); } + +TEST_F(TestJournalPlayer, LiveReplayMissingSequence) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid, 14, 4)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 0, 2, 852)); + ASSERT_EQ(0, write_entry(oid, 0, 2, 856)); + ASSERT_EQ(0, write_entry(oid, 0, 2, 860)); + ASSERT_EQ(0, write_entry(oid, 1, 2, 853)); + ASSERT_EQ(0, write_entry(oid, 1, 2, 857)); + ASSERT_EQ(0, write_entry(oid, 2, 2, 854)); + ASSERT_EQ(0, write_entry(oid, 0, 2, 856)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + create_entry(2, 852), + create_entry(2, 853), + create_entry(2, 854)}; + ASSERT_EQ(expected_entries, entries); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + + ASSERT_EQ(0, write_entry(oid, 3, 3, 3)); + ASSERT_EQ(0, write_entry(oid, 2, 3, 2)); + ASSERT_EQ(0, write_entry(oid, 1, 3, 1)); + ASSERT_EQ(0, write_entry(oid, 0, 3, 0)); + ASSERT_TRUE(wait_for_entries(player, 4, &entries)); + + expected_entries = { + create_entry(3, 0), + create_entry(3, 1), + create_entry(3, 2), + create_entry(3, 3)}; + ASSERT_EQ(expected_entries, entries); +} + +TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, metadata->set_active_set(2)); + ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, write_entry(oid, 3, 0, 3)); + ASSERT_EQ(0, write_entry(oid, 4, 1, 0)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + create_entry(0, 0), + create_entry(0, 1), + create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + C_SaferCond ctx1; + cls::journal::Tag tag1; + metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1); + ASSERT_EQ(0, ctx1.wait()); + + ASSERT_EQ(0, metadata->set_active_set(0)); + ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 0)); + ASSERT_EQ(0, write_entry(oid, 1, tag1.tid, 1)); + ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 2)); + ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 4)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + create_entry(tag1.tid, 0), + create_entry(tag1.tid, 1), + create_entry(tag1.tid, 2)}; + ASSERT_EQ(expected_entries, entries); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + + C_SaferCond ctx2; + cls::journal::Tag tag2; + metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + + ASSERT_EQ(0, write_entry(oid, 0, tag2.tid, 0)); + ASSERT_TRUE(wait_for_entries(player, 1, &entries)); + + expected_entries = { + create_entry(tag2.tid, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TEST_F(TestJournalPlayer, LiveReplayStaleEntries) { + std::string oid = get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions = { + cls::journal::ObjectPosition(0, 1, 0) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, write_entry(oid, 1, 0, 3)); + ASSERT_EQ(0, write_entry(oid, 0, 1, 0)); + ASSERT_EQ(0, write_entry(oid, 1, 1, 1)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 1, &entries)); + + Entries expected_entries = { + create_entry(1, 1)}; + ASSERT_EQ(expected_entries, entries); +} +