ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries "
<< "for tag " << *m_active_tag_tid << dendl;
return prune_active_tag();
+ } else if (m_watch_enabled && object_player->refetch_required()) {
+ // if the active object requires a refetch, don't proceed looking for a
+ // new tag before this process completes
+ ldout(m_cct, 10) << __func__ << ": object requires refetch" << dendl;
+ return false;
}
// NOTE: replay currently does not check tag class to playback multiple tags
uint64_t active_set = m_journal_metadata->get_active_set();
if (!player->empty() || object_set == active_set) {
return false;
- } else if (m_watch_enabled && object_set < active_set &&
- player->refetch_required()) {
- ldout(m_cct, 20) << __func__ << ": refetching " << player->get_oid()
+ } else if (m_watch_enabled && player->refetch_required()) {
+ ldout(m_cct, 20) << __func__ << ": delaying removal of empty object"
<< dendl;
- player->clear_refetch_required();
return false;
}
ObjectPlayerPtr object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_CURRENT:
- object_player = get_object_player();
+ {
+ object_player = get_object_player();
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t object_set = object_player->get_object_number() / splay_width;
+ if (object_set < active_set && object_player->refetch_required()) {
+ ldout(m_cct, 20) << __func__ << ": refetching "
+ << object_player->get_oid()
+ << dendl;
+ object_player->clear_refetch_required();
+ }
+ }
break;
case WATCH_STEP_FETCH_FIRST:
object_player = m_object_players.begin()->second.begin()->second;
m_watch_scheduled = false;
ObjectPlayerPtr object_player = get_object_player(object_num);
- if (r == 0) {
- if (object_player->empty() && !object_player->refetch_required()) {
- // already re-read object after trying to remove it before ... it's
- // still empty so it's safe to remove
- remove_empty_object_player(object_player);
- }
+ if (r == 0 && object_player->empty()) {
+ // possibly need to prune this empty object player if we've
+ // already fetched it after the active set was advanced with no
+ // new records
+ remove_empty_object_player(object_player);
}
// determine what object to query on next watch schedule tick
ASSERT_EQ(expected_entries, entries);
}
+TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(1, 0, 1),
+ cls::journal::ObjectPosition(0, 0, 0)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, write_entry(oid, 2, 1, 0));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+
+ // should remove player for offset 3 after refetching
+ ASSERT_EQ(0, metadata->set_active_set(3));
+ ASSERT_EQ(0, write_entry(oid, 7, 1, 1));
+
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ expected_entries = {
+ create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+}
+