}
}
- // if we just advanced to this object, make sure we have the latest
- // set of data before advancing to a new tag
- if (m_watch_enabled && m_watch_required) {
- m_watch_required = false;
- return false;
- }
-
// NOTE: replay currently does not check tag class to playback multiple tags
// from different classes (issue #14909). When a new tag is discovered, it
// is assumed that the previous tag was closed at the last replayable entry.
assert(m_lock.is_locked());
++m_splay_offset;
m_splay_offset %= m_journal_metadata->get_splay_width();
- m_watch_required = true;
ldout(m_cct, 20) << __func__ << ": new offset "
<< static_cast<uint32_t>(m_splay_offset) << dendl;
}
uint64_t active_set = m_journal_metadata->get_active_set();
if (!player->empty() || object_set == active_set) {
return false;
+ } else if (m_watch_enabled && object_set < active_set &&
+ player->refetch_required()) {
+ ldout(m_cct, 20) << __func__ << ": refetching " << player->get_oid()
+ << dendl;
+ player->clear_refetch_required();
+ return false;
}
- ldout(m_cct, 15) << player->get_oid() << " empty" << dendl;
+ ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
+ << dendl;
ObjectPlayers &object_players = m_object_players[
player->get_object_number() % splay_width];
assert(!object_players.empty());
bool m_watch_enabled;
bool m_watch_scheduled;
double m_watch_interval;
- bool m_watch_required = false;
bool m_handler_notified = false;
assert(m_watch_in_progress);
if (r == -ENOENT) {
r = 0;
+ } else {
+ m_refetch_required = true;
}
std::swap(on_finish, m_watch_ctx);
}
*invalid_ranges = m_invalid_ranges;
}
+ inline bool refetch_required() const {
+ return m_refetch_required;
+ }
+ inline void clear_refetch_required() {
+ m_refetch_required = false;
+ }
+
private:
typedef std::pair<uint64_t, uint64_t> EntryKey;
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
Cond m_watch_in_progress_cond;
bool m_watch_in_progress;
+ bool m_refetch_required = true;
+
int handle_fetch_complete(int r, const bufferlist &bl);
void schedule_watch();
ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
ASSERT_EQ(3U, last_tid);
}
+
+TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
+ ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
+ ASSERT_EQ(0, write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
+ metadata->set_active_set(1);
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ create_entry(0, 0),
+ create_entry(0, 1),
+ create_entry(0, 2)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ expected_entries = {
+ create_entry(0, 3),
+ create_entry(0, 4),
+ create_entry(0, 5)};
+ ASSERT_EQ(expected_entries, entries);
+}