{
Mutex::Locker locker(m_lock);
assert(m_fetch_object_numbers.empty());
+ assert(!m_watch_scheduled);
}
m_replay_handler->put();
}
Mutex::Locker locker(m_lock);
m_watch_enabled = true;
m_watch_interval = interval;
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
}
prefetch();
}
return false;
}
- if (!is_object_set_ready()) {
- m_handler_notified = false;
- return false;
- }
-
if (!verify_playback_ready()) {
- if (!m_watch_enabled) {
- notify_complete(0);
- } else if (!m_watch_scheduled) {
+ if (!is_object_set_ready()) {
m_handler_notified = false;
- schedule_watch();
+ } else {
+ if (!m_watch_enabled) {
+ notify_complete(0);
+ } else if (!m_watch_scheduled) {
+ m_handler_notified = false;
+ schedule_watch();
+ }
}
return false;
}
uint64_t last_entry_tid;
if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
- lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
+ lderr(m_cct) << "unexpected tag in journal entry: "
+ << "entry=" << *entry << ", "
+ << "tag_tid=" << (m_active_tag_tid ? *m_active_tag_tid : -1)
+ << dendl;
m_state = STATE_ERROR;
notify_complete(-ENOMSG);
}
m_state = STATE_PLAYBACK;
- if (!is_object_set_ready()) {
- ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
- } else if (verify_playback_ready()) {
+ if (verify_playback_ready()) {
notify_entries_available();
- } else if (m_watch_enabled) {
- schedule_watch();
- } else {
- ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
- << dendl;
- notify_complete(0);
+ } else if (is_object_set_ready()) {
+ if (m_watch_enabled) {
+ schedule_watch();
+ } else {
+ ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
+ << dendl;
+ notify_complete(0);
+ }
}
return 0;
}
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
assert(m_lock.is_locked());
- if (!is_object_set_ready()) {
- return 0;
- }
-
ObjectPlayerPtr object_player = get_object_player();
if (verify_playback_ready()) {
notify_entries_available();
- } else if (m_watch_enabled) {
- schedule_watch();
- } else {
- uint8_t splay_width = m_journal_metadata->get_splay_width();
- uint64_t active_set = m_journal_metadata->get_active_set();
- uint64_t object_set = object_player->get_object_number() / splay_width;
- if (object_set == active_set) {
- notify_complete(0);
+ } else if (is_object_set_ready()) {
+ if (m_watch_enabled) {
+ schedule_watch();
+ } else {
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t object_set = object_player->get_object_number() / splay_width;
+ if (object_set == active_set) {
+ notify_complete(0);
+ }
}
}
return 0;
bool JournalPlayer::verify_playback_ready() {
assert(m_lock.is_locked());
- assert(is_object_set_ready());
+
+ if (!is_object_set_ready()) {
+ ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+ return false;
+ }
ObjectPlayerPtr object_player = get_object_player();
assert(object_player);
bool entry_available = false;
if (!object_player->empty()) {
entry_available = true;
+ m_watch_prune_active_tag = false;
object_player->front(&entry);
if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
return true;
+ } else if (m_active_tag_tid && entry.get_tag_tid() < m_active_tag_tid) {
+ // new tag is registered as primary commit position -- other object
+ // commit positions might still be associated with old tags
+ ldout(m_cct, 10) << __func__ << ": detected stale entry: " << entry
+ << dendl;
+ return prune_tag(entry.get_tag_tid());
+ } else if (m_active_tag_tid && entry.get_tag_tid() > *m_active_tag_tid) {
+ // new tag at current playback position -- implies that previous
+ // tag ended abruptly without flushing out all records
+ // search for the start record for the next tag
+ ldout(m_cct, 10) << __func__ << ": new tag detected at current playback "
+ << "position: active_tag=" << *m_active_tag_tid << ", "
+ << "new_tag=" << entry.get_tag_tid() << dendl;
+ return prune_active_tag();
}
+ } else if (!m_watch_enabled && m_active_tag_tid) {
+ // current playback position is empty so this tag is done
+ ldout(m_cct, 10) << __func__ << ": no more in-sequence entries for tag "
+ << *m_active_tag_tid << dendl;
+ return prune_active_tag();
+ } else if (m_watch_enabled && m_active_tag_tid && m_watch_prune_active_tag) {
+ // detected current tag is now longer active and we have re-read the current
+ // object but it's still empty, so this tag is done
+ ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries "
+ << "for tag " << *m_active_tag_tid << dendl;
+ return prune_active_tag();
}
// NOTE: replay currently does not check tag class to playback multiple tags
return entry_available;
}
+bool JournalPlayer::prune_tag(uint64_t tag_tid) {
+ assert(m_lock.is_locked());
+ ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
+ << tag_tid << dendl;
+
+ for (auto &players : m_object_players) {
+ for (auto player_pair : players.second) {
+ ObjectPlayerPtr object_player = player_pair.second;
+ ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
+ << dendl;
+ while (!object_player->empty()) {
+ Entry entry;
+ object_player->front(&entry);
+ if (entry.get_tag_tid() == tag_tid) {
+ ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl;
+ object_player->pop_front();
+ } else {
+ break;
+ }
+ }
+ }
+
+ // trim any empty players to prefetch the next available object
+ ObjectPlayers object_players(players.second);
+ for (auto player_pair : object_players) {
+ remove_empty_object_player(player_pair.second);
+ }
+ }
+
+ // if we removed an empty object, a fetch will be in-flight
+ if (!is_object_set_ready()) {
+ return false;
+ }
+
+ // search for the start record for the next tag
+ return verify_playback_ready();
+}
+
+bool JournalPlayer::prune_active_tag() {
+ assert(m_lock.is_locked());
+ assert(m_active_tag_tid);
+
+ uint64_t tag_tid = *m_active_tag_tid;
+ m_active_tag_tid = boost::none;
+ m_splay_offset = 0;
+
+ return prune_tag(tag_tid);
+}
+
const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
assert(m_lock.is_locked());
return object_players.begin()->second;
}
+ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
+ assert(m_lock.is_locked());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ auto splay_it = m_object_players.find(splay_offset);
+ assert(splay_it != m_object_players.end());
+
+ const ObjectPlayers &object_players = splay_it->second;
+ auto player_it = object_players.find(object_number);
+ assert(player_it != object_players.end());
+ return player_it->second;
+}
+
ObjectPlayerPtr JournalPlayer::get_next_set_object_player() const {
assert(m_lock.is_locked());
assert(m_lock.is_locked());
++m_splay_offset;
m_splay_offset %= m_journal_metadata->get_splay_width();
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
ldout(m_cct, 20) << __func__ << ": new offset "
<< static_cast<uint32_t>(m_splay_offset) << dendl;
}
r = 0;
}
if (r == 0) {
- uint8_t splay_width = m_journal_metadata->get_splay_width();
- uint8_t splay_offset = object_num % splay_width;
- assert(m_object_players.count(splay_offset) == 1);
- ObjectPlayers &object_players = m_object_players[splay_offset];
-
- assert(object_players.count(object_num) == 1);
- ObjectPlayerPtr object_player = object_players[object_num];
+ ObjectPlayerPtr object_player = get_object_player(object_num);
remove_empty_object_player(object_player);
}
process_state(object_num, r);
return;
}
- // poll first splay offset and active splay offset since
- // new records should only appear in those two objects
- C_Watch *ctx = new C_Watch(this);
-
- ObjectPlayerPtr object_player = get_object_player();
- uint8_t splay_width = m_journal_metadata->get_splay_width();
- if (object_player->get_object_number() % splay_width != 0) {
- ++ctx->pending_fetches;
+ m_watch_scheduled = true;
- ObjectPlayerPtr first_object_player =
- m_object_players.begin()->second.begin()->second;
- first_object_player->watch(ctx, m_watch_interval);
+ if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
+ // detect if a new tag has been created in case we are blocked
+ // by an incomplete tag sequence
+ ldout(m_cct, 20) << __func__ << ": asserting active tag" << dendl;
+ assert(m_active_tag_tid);
+ FunctionContext *ctx = new FunctionContext([this](int r) {
+ handle_watch_assert_active(r);
+ });
+ m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx);
+ return;
}
+ ObjectPlayerPtr object_player = get_object_player();
+ switch (m_watch_step) {
+ case WATCH_STEP_FETCH_CURRENT:
+ object_player = get_object_player();
+ break;
+ case WATCH_STEP_FETCH_FIRST:
+ object_player = m_object_players.begin()->second.begin()->second;
+ break;
+ default:
+ assert(false);
+ }
+
+ ldout(m_cct, 20) << __func__ << ": scheduling watch on "
+ << object_player->get_oid() << dendl;
+ C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
object_player->watch(ctx, m_watch_interval);
- m_watch_scheduled = true;
}
-void JournalPlayer::handle_watch(int r) {
+void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
if (r == -ECANCELED) {
// unwatch of object player(s)
}
Mutex::Locker locker(m_lock);
+ assert(m_watch_scheduled);
m_watch_scheduled = false;
- std::set<uint64_t> object_numbers;
- for (auto &players : m_object_players) {
- object_numbers.insert(
- players.second.begin()->second->get_object_number());
+ ObjectPlayerPtr object_player = get_object_player(object_num);
+ if (r == 0) {
+ if (object_player->empty() && !object_player->refetch_required()) {
+ // already re-read object after trying to remove it before ... it's
+ // still empty so it's safe to remove
+ remove_empty_object_player(object_player);
+ }
+ }
+
+ // determine what object to query on next watch schedule tick
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ if (m_watch_step == WATCH_STEP_FETCH_CURRENT &&
+ object_player->get_object_number() % splay_width != 0) {
+ m_watch_step = WATCH_STEP_FETCH_FIRST;
+ } else if (m_active_tag_tid) {
+ m_watch_step = WATCH_STEP_ASSERT_ACTIVE;
+ } else {
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
}
- for (auto object_num : object_numbers) {
- process_state(object_num, r);
+ process_state(object_num, r);
+}
+
+void JournalPlayer::handle_watch_assert_active(int r) {
+ ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_watch_scheduled);
+ m_watch_scheduled = false;
+
+ if (m_active_tag_tid && r == -ESTALE) {
+ // newer tag exists -- since we are at this step in the watch sequence,
+ // we know we can prune the active tag if watch fails again
+ ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "
+ << "no longer active" << dendl;
+ m_watch_prune_active_tag = true;
}
+
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+ schedule_watch();
}
void JournalPlayer::notify_entries_available() {
ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
}
+TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid, 14, 4));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 860));
+ ASSERT_EQ(0, write_entry(oid, 1, 2, 853));
+ ASSERT_EQ(0, write_entry(oid, 1, 2, 857));
+ ASSERT_EQ(0, write_entry(oid, 5, 2, 861));
+ ASSERT_EQ(0, write_entry(oid, 2, 2, 854));
+ ASSERT_EQ(0, write_entry(oid, 0, 3, 0));
+ ASSERT_EQ(0, write_entry(oid, 5, 3, 1));
+ ASSERT_EQ(0, write_entry(oid, 2, 3, 2));
+ ASSERT_EQ(0, write_entry(oid, 3, 3, 3));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 7, &entries));
+
+ Entries expected_entries = {
+ create_entry(2, 852),
+ create_entry(2, 853),
+ create_entry(2, 854),
+ create_entry(3, 0),
+ create_entry(3, 1),
+ create_entry(3, 2),
+ create_entry(3, 3)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_TRUE(wait_for_complete(player));
+ ASSERT_EQ(0, m_replay_hander.complete_result);
+}
+
+TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, write_entry(oid, 4, 1, 0));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ create_entry(0, 0),
+ create_entry(0, 1),
+ create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
+ ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 4, &entries));
+
+ Entries expected_entries = {
+ create_entry(0, 0),
+ create_entry(0, 1),
+ create_entry(0, 2),
+ create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, PrefetchStaleEntries) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(0, 1, 0) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 1, 1));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_TRUE(wait_for_complete(player));
+ ASSERT_EQ(0, m_replay_hander.complete_result);
+}
+
TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
std::string oid = get_temp_oid();
uint64_t commit_tid;
ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
ASSERT_TRUE(wait_for_complete(player));
- ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
+ ASSERT_EQ(0, m_replay_hander.complete_result);
}
TEST_F(TestJournalPlayer, PrefetchAndWatch) {
create_entry(0, 5)};
ASSERT_EQ(expected_entries, entries);
}
+
+TEST_F(TestJournalPlayer, LiveReplayMissingSequence) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid, 14, 4));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 860));
+ ASSERT_EQ(0, write_entry(oid, 1, 2, 853));
+ ASSERT_EQ(0, write_entry(oid, 1, 2, 857));
+ ASSERT_EQ(0, write_entry(oid, 2, 2, 854));
+ ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ create_entry(2, 852),
+ create_entry(2, 853),
+ create_entry(2, 854)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ ASSERT_EQ(0, write_entry(oid, 3, 3, 3));
+ ASSERT_EQ(0, write_entry(oid, 2, 3, 2));
+ ASSERT_EQ(0, write_entry(oid, 1, 3, 1));
+ ASSERT_EQ(0, write_entry(oid, 0, 3, 0));
+ ASSERT_TRUE(wait_for_entries(player, 4, &entries));
+
+ expected_entries = {
+ create_entry(3, 0),
+ create_entry(3, 1),
+ create_entry(3, 2),
+ create_entry(3, 3)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, write_entry(oid, 4, 1, 0));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ create_entry(0, 0),
+ create_entry(0, 1),
+ create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ C_SaferCond ctx1;
+ cls::journal::Tag tag1;
+ metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ ASSERT_EQ(0, metadata->set_active_set(0));
+ ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, tag1.tid, 1));
+ ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 2));
+ ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 4));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ create_entry(tag1.tid, 0),
+ create_entry(tag1.tid, 1),
+ create_entry(tag1.tid, 2)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ C_SaferCond ctx2;
+ cls::journal::Tag tag2;
+ metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+
+ ASSERT_EQ(0, write_entry(oid, 0, tag2.tid, 0));
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ expected_entries = {
+ create_entry(tag2.tid, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, LiveReplayStaleEntries) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(0, 1, 0) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 1, 1));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+}
+