// start replay after the last committed entry's object
uint8_t splay_width = m_journal_metadata->get_splay_width();
- m_commit_object = commit_position.object_positions.front().object_number;
+ auto &active_position = commit_position.object_positions.front();
+ m_active_tag_tid = active_position.tag_tid;
+ m_commit_object = active_position.object_number;
m_splay_offset = m_commit_object % splay_width;
for (auto &position : commit_position.object_positions) {
uint8_t splay_offset = position.object_number % splay_width;
}
void JournalPlayer::unwatch() {
+ ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
m_watch_enabled = false;
if (m_watch_scheduled) {
- ObjectPlayerPtr object_player = get_object_player();
- assert(object_player);
-
- object_player->unwatch();
+ for (auto &players : m_object_players) {
+ players.second.begin()->second->unwatch();
+ }
m_watch_scheduled = false;
}
}
return false;
}
- ObjectPlayerPtr object_player = get_object_player();
- assert(object_player);
+ if (!is_object_set_ready()) {
+ return false;
+ }
- if (object_player->empty()) {
- if (m_watch_enabled && !m_watch_scheduled) {
- object_player->watch(
- new C_Watch(this, object_player->get_object_number()),
- m_watch_interval);
- m_watch_scheduled = true;
- } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
+ if (!verify_playback_ready()) {
+ if (!m_watch_enabled) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
+ } else if (!m_watch_scheduled) {
+ schedule_watch();
}
return false;
}
+ ObjectPlayerPtr object_player = get_object_player();
+ assert(object_player && !object_player->empty());
+
object_player->front(entry);
object_player->pop_front();
uint64_t last_entry_tid;
- if (m_journal_metadata->get_last_allocated_entry_tid(
- entry->get_tag_tid(), &last_entry_tid) &&
- entry->get_entry_tid() != last_entry_tid + 1) {
+ if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
+ lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
+
+ m_state = STATE_ERROR;
+ m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_replay_handler), -ENOMSG);
+ return false;
+ } else if (m_journal_metadata->get_last_allocated_entry_tid(
+ entry->get_tag_tid(), &last_entry_tid) &&
+ entry->get_entry_tid() != last_entry_tid + 1) {
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
return false;
}
- // skip to next splay offset if we cannot apply the next entry in-sequence
- if (!object_player->empty()) {
- Entry peek_entry;
- object_player->front(&peek_entry);
- if (peek_entry.get_tag_tid() == entry->get_tag_tid() ||
- (m_journal_metadata->get_last_allocated_entry_tid(
- peek_entry.get_tag_tid(), &last_entry_tid) &&
- last_entry_tid + 1 != peek_entry.get_entry_tid())) {
- advance_splay_object();
- }
- } else {
- advance_splay_object();
- remove_empty_object_player(object_player);
- }
+ m_active_tag_tid = entry->get_tag_tid();
+ advance_splay_object();
+ remove_empty_object_player(object_player);
m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
entry->get_entry_tid());
void JournalPlayer::process_state(uint64_t object_number, int r) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
<< "r=" << r << dendl;
+
+ assert(m_lock.is_locked());
if (r >= 0) {
- Mutex::Locker locker(m_lock);
switch (m_state) {
case STATE_PREFETCH:
ldout(m_cct, 10) << "PREFETCH" << dendl;
}
if (r < 0) {
- {
- Mutex::Locker locker(m_lock);
- m_state = STATE_ERROR;
- }
- m_replay_handler->handle_complete(r);
+ m_state = STATE_ERROR;
+ m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_replay_handler), r);
}
}
}
m_state = STATE_PLAYBACK;
- ObjectPlayerPtr object_player = get_object_player();
- if (!object_player->empty()) {
+ if (!is_object_set_ready()) {
+ ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+ } else if (verify_playback_ready()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
} else if (m_watch_enabled) {
- object_player->watch(
- new C_Watch(this, object_player->get_object_number()),
- m_watch_interval);
- m_watch_scheduled = true;
+ schedule_watch();
} else {
ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
<< dendl;
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
assert(m_lock.is_locked());
- m_watch_scheduled = false;
+ if (!is_object_set_ready()) {
+ return 0;
+ }
ObjectPlayerPtr object_player = get_object_player();
- if (object_player->get_object_number() == object_number) {
+ if (verify_playback_ready()) {
+ ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+ m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
+ } else if (!m_watch_enabled && is_object_set_ready()) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t active_set = m_journal_metadata->get_active_set();
uint64_t object_set = object_player->get_object_number() / splay_width;
- if (!object_player->empty()) {
- ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
- m_replay_handler), 0);
- } else if (object_set == active_set) {
+ if (object_set == active_set) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
return 0;
}
+bool JournalPlayer::is_object_set_ready() const {
+ assert(m_lock.is_locked());
+ if (m_watch_scheduled) {
+ return false;
+ }
+ for (auto &players : m_object_players) {
+ if (players.second.begin()->second->is_fetch_in_progress()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool JournalPlayer::verify_playback_ready() {
+ assert(m_lock.is_locked());
+ assert(is_object_set_ready());
+
+ ObjectPlayerPtr object_player = get_object_player();
+ assert(object_player);
+
+ // Verify is the active object player has another entry available
+ // in the sequence
+ Entry entry;
+ bool entry_available = false;
+ if (!object_player->empty()) {
+ entry_available = true;
+ object_player->front(&entry);
+ if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
+ return true;
+ }
+ }
+
+ // NOTE: replay currently does not check tag class to playback multiple tags
+ // from different classes (issue #14909). When a new tag is discovered, it
+ // is assumed that the previous tag was closed at the last replayable entry.
+ object_player = m_object_players.begin()->second.begin()->second;
+ if (!object_player->empty() && m_active_tag_tid) {
+ object_player->front(&entry);
+ if (entry.get_tag_tid() > *m_active_tag_tid &&
+ entry.get_entry_tid() == 0) {
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ m_active_tag_tid = entry.get_tag_tid();
+ m_splay_offset = object_player->get_object_number() / splay_width;
+
+ ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " "
+ << "detected, adjusting offset to "
+ << static_cast<uint32_t>(m_splay_offset) << dendl;
+ return true;
+ }
+ }
+
+ // if any entry is available, we can test if the sequence is corrupt
+ return entry_available;
+}
+
const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
assert(m_lock.is_locked());
bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
assert(m_lock.is_locked());
+ assert(!m_watch_scheduled);
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t object_set = player->get_object_number() / splay_width;
ldout(m_cct, 10) << __func__ << ": "
<< utils::get_object_name(m_object_oid_prefix, object_num)
<< ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
if (r == -ENOENT) {
r = 0;
}
if (r == 0) {
- Mutex::Locker locker(m_lock);
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_num % splay_width;
assert(m_object_players.count(splay_offset) == 1);
ObjectPlayerPtr object_player = object_players[object_num];
remove_empty_object_player(object_player);
}
-
process_state(object_num, r);
}
-void JournalPlayer::handle_watch(uint64_t object_num, int r) {
- ldout(m_cct, 10) << __func__ << ": "
- << utils::get_object_name(m_object_oid_prefix, object_num)
- << ": r=" << r << dendl;
- process_state(object_num, r);
+void JournalPlayer::schedule_watch() {
+ ldout(m_cct, 10) << __func__ << dendl;
+ assert(m_lock.is_locked());
+ if (m_watch_scheduled) {
+ return;
+ }
+
+ // poll first splay offset and active splay offset since
+ // new records should only appear in those two objects
+ C_Watch *ctx = new C_Watch(this);
+ ObjectPlayerPtr object_player = get_object_player();
+ object_player->watch(ctx, m_watch_interval);
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ if (object_player->get_object_number() % splay_width != 0) {
+ ++ctx->pending_fetches;
+
+ object_player = m_object_players.begin()->second.begin()->second;
+ object_player->watch(ctx, m_watch_interval);
+ }
+ m_watch_scheduled = true;
+}
+
+void JournalPlayer::handle_watch(int r) {
+ ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_watch_scheduled = false;
+ std::set<uint64_t> object_numbers;
+ for (auto &players : m_object_players) {
+ object_numbers.insert(
+ players.second.begin()->second->get_object_number());
+ }
+
+ for (auto object_num : object_numbers) {
+ process_state(object_num, r);
+ }
}
} // namespace journal
#include "journal/JournalMetadata.h"
#include "journal/ObjectPlayer.h"
#include "cls/journal/cls_journal_types.h"
+#include <boost/none.hpp>
+#include <boost/optional.hpp>
#include <map>
class SafeTimer;
STATE_ERROR
};
- struct C_Watch : public Context {
- JournalPlayer *player;
- uint64_t object_num;
-
- C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
- }
- virtual void finish(int r) {
- player->handle_watch(object_num, r);
- }
- };
-
struct C_Fetch : public Context {
JournalPlayer *player;
uint64_t object_num;
}
};
+ struct C_Watch : public Context {
+ JournalPlayer *player;
+ uint8_t pending_fetches = 1;
+ int ret_val = 0;
+
+ C_Watch(JournalPlayer *player) : player(player) {
+ }
+
+ virtual void complete(int r) override {
+ player->m_lock.Lock();
+ if (ret_val == 0 && r < 0) {
+ ret_val = r;
+ }
+
+ assert(pending_fetches > 0);
+ if (--pending_fetches == 0) {
+ player->m_lock.Unlock();
+ Context::complete(ret_val);
+ } else {
+ player->m_lock.Unlock();
+ }
+ }
+
+ virtual void finish(int r) override {
+ player->handle_watch(r);
+ }
+ };
+
librados::IoCtx m_ioctx;
CephContext *m_cct;
std::string m_object_oid_prefix;
SplayedObjectPlayers m_object_players;
uint64_t m_commit_object;
SplayedObjectPositions m_commit_positions;
+ boost::optional<uint64_t> m_active_tag_tid = boost::none;
void advance_splay_object();
+ bool is_object_set_ready() const;
+ bool verify_playback_ready();
const ObjectPlayers &get_object_players() const;
ObjectPlayerPtr get_object_player() const;
ObjectPlayerPtr get_next_set_object_player() const;
void fetch(uint64_t object_num);
void handle_fetched(uint64_t object_num, int r);
- void handle_watch(uint64_t object_num, int r);
+
+ void schedule_watch();
+ void handle_watch(int r);
};
} // namespace journal
journal::JournalPlayer::ObjectPositions positions;
positions = {
- cls::journal::ObjectPosition(0, 234, 122),
- cls::journal::ObjectPosition(1, 345, 1)};
+ cls::journal::ObjectPosition(2, 234, 122),
+ cls::journal::ObjectPosition(1, 234, 121),
+ cls::journal::ObjectPosition(0, 234, 120)};
cls::journal::ObjectSetPosition commit_position(positions);
- ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, create(oid, 3));
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalPlayer *player = create_player(oid, metadata);
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
- ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
- ASSERT_EQ(0, write_entry(oid, 1, 345, 1));
- ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
- ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
- ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
- ASSERT_EQ(0, write_entry(oid, 0, 345, 2));
+ ASSERT_EQ(0, write_entry(oid, 2, 234, 122));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 123));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 124));
+ ASSERT_EQ(0, write_entry(oid, 0, 236, 0)); // new tag allocated
player->prefetch();
uint64_t last_tid;
ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
ASSERT_EQ(124U, last_tid);
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(345, &last_tid));
- ASSERT_EQ(2U, last_tid);
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
+ ASSERT_EQ(0U, last_tid);
}
TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
journal::JournalPlayer *player = create_player(oid, metadata);
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
- ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
player->prefetch();
Entries entries;
- ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(wait_for_entries(player, 2, &entries));
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+ ASSERT_TRUE(wait_for_complete(player));
+ ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
+}
+
+TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
journal::Entry entry;
uint64_t commit_tid;