object_player->pop_front();
uint64_t last_entry_tid;
- if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
- lderr(m_cct) << "unexpected tag in journal entry: "
- << "entry=" << *entry << ", "
- << "tag_tid=" << (m_active_tag_tid ? *m_active_tag_tid : -1)
- << dendl;
-
- m_state = STATE_ERROR;
- notify_complete(-ENOMSG);
- return false;
- } else if (m_journal_metadata->get_last_allocated_entry_tid(
- entry->get_tag_tid(), &last_entry_tid) &&
- entry->get_entry_tid() != last_entry_tid + 1) {
+ if (m_journal_metadata->get_last_allocated_entry_tid(
+ entry->get_tag_tid(), &last_entry_tid) &&
+ entry->get_entry_tid() != last_entry_tid + 1) {
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
return false;
}
- m_active_tag_tid = entry->get_tag_tid();
advance_splay_object();
remove_empty_object_player(object_player);
bool JournalPlayer::verify_playback_ready() {
assert(m_lock.is_locked());
- if (!is_object_set_ready()) {
- ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
- return false;
- }
-
- ObjectPlayerPtr object_player = get_object_player();
- assert(object_player);
-
- // Verify is the active object player has another entry available
- // in the sequence
- Entry entry;
- bool entry_available = false;
- if (!object_player->empty()) {
- entry_available = true;
- m_watch_prune_active_tag = false;
- object_player->front(&entry);
- if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
- return true;
- } else if (m_active_tag_tid && entry.get_tag_tid() < m_active_tag_tid) {
- // new tag is registered as primary commit position -- other object
- // commit positions might still be associated with old tags
- ldout(m_cct, 10) << __func__ << ": detected stale entry: " << entry
- << dendl;
- return prune_tag(entry.get_tag_tid());
- } else if (m_active_tag_tid && entry.get_tag_tid() > *m_active_tag_tid) {
- // new tag at current playback position -- implies that previous
- // tag ended abruptly without flushing out all records
- // search for the start record for the next tag
- ldout(m_cct, 10) << __func__ << ": new tag detected at current playback "
- << "position: active_tag=" << *m_active_tag_tid << ", "
- << "new_tag=" << entry.get_tag_tid() << dendl;
- return prune_active_tag();
+ while (true) {
+ if (!is_object_set_ready()) {
+ ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+ return false;
}
- } else if (!m_watch_enabled && m_active_tag_tid) {
- // current playback position is empty so this tag is done
- ldout(m_cct, 10) << __func__ << ": no more in-sequence entries for tag "
- << *m_active_tag_tid << dendl;
- return prune_active_tag();
- } else if (m_watch_enabled && m_active_tag_tid && m_watch_prune_active_tag) {
- // detected current tag is now longer active and we have re-read the current
- // object but it's still empty, so this tag is done
- ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries "
- << "for tag " << *m_active_tag_tid << dendl;
- return prune_active_tag();
- } else if (m_watch_enabled && object_player->refetch_required()) {
- // if the active object requires a refetch, don't proceed looking for a
- // new tag before this process completes
- ldout(m_cct, 10) << __func__ << ": object requires refetch" << dendl;
- return false;
- }
- // NOTE: replay currently does not check tag class to playback multiple tags
- // from different classes (issue #14909). When a new tag is discovered, it
- // is assumed that the previous tag was closed at the last replayable entry.
- object_player = m_object_players.begin()->second.begin()->second;
- if (!object_player->empty() && m_active_tag_tid) {
- object_player->front(&entry);
- if (entry.get_tag_tid() > *m_active_tag_tid &&
- entry.get_entry_tid() == 0) {
- uint8_t splay_width = m_journal_metadata->get_splay_width();
- m_active_tag_tid = entry.get_tag_tid();
- m_splay_offset = object_player->get_object_number() % splay_width;
-
- ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " "
- << "detected, adjusting offset to "
- << static_cast<uint32_t>(m_splay_offset) << dendl;
- return true;
+ ObjectPlayerPtr object_player = get_object_player();
+ assert(object_player);
+ uint64_t object_num = object_player->get_object_number();
+
+ // Verify is the active object player has another entry available
+ // in the sequence
+ // NOTE: replay currently does not check tag class to playback multiple tags
+ // from different classes (issue #14909). When a new tag is discovered, it
+ // is assumed that the previous tag was closed at the last replayable entry.
+ Entry entry;
+ if (!object_player->empty()) {
+ m_watch_prune_active_tag = false;
+ object_player->front(&entry);
+
+ if (!m_active_tag_tid) {
+ ldout(m_cct, 10) << __func__ << ": "
+ << "object_num=" << object_num << ", "
+ << "initial tag=" << entry.get_tag_tid()
+ << dendl;
+ m_active_tag_tid = entry.get_tag_tid();
+ return true;
+ } else if (entry.get_tag_tid() < *m_active_tag_tid ||
+ (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) {
+ // entry occurred before the current active tag
+ ldout(m_cct, 10) << __func__ << ": detected stale entry: "
+ << "object_num=" << object_num << ", "
+ << "entry=" << entry << dendl;
+ prune_tag(entry.get_tag_tid());
+ continue;
+ } else if (entry.get_tag_tid() > *m_active_tag_tid) {
+ // new tag at current playback position -- implies that previous
+ // tag ended abruptly without flushing out all records
+ // search for the start record for the next tag
+ ldout(m_cct, 10) << __func__ << ": new tag detected: "
+ << "object_num=" << object_num << ", "
+ << "active_tag=" << *m_active_tag_tid << ", "
+ << "new_tag=" << entry.get_tag_tid() << dendl;
+ if (entry.get_entry_tid() == 0) {
+ // first entry in new tag -- can promote to active
+ prune_active_tag(entry.get_tag_tid());
+ return true;
+ } else {
+ // prune current active and wait for initial entry for new tag
+ prune_active_tag(boost::none);
+ continue;
+ }
+ } else {
+ ldout(m_cct, 20) << __func__ << ": "
+ << "object_num=" << object_num << ", "
+ << "entry: " << entry << dendl;
+ assert(entry.get_tag_tid() == *m_active_tag_tid);
+ return true;
+ }
+ } else {
+ if (!m_active_tag_tid) {
+ // waiting for our first entry
+ ldout(m_cct, 10) << __func__ << ": waiting for first entry: "
+ << "object_num=" << object_num << dendl;
+ return false;
+ } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) {
+ ldout(m_cct, 10) << __func__ << ": no more entries" << dendl;
+ return false;
+ } else if (!m_watch_enabled) {
+ // current playback position is empty so this tag is done
+ ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
+ << "object_num=" << object_num << ", "
+ << "active_tag=" << *m_active_tag_tid << dendl;
+ prune_active_tag(boost::none);
+ continue;
+ } else if (m_watch_enabled && m_watch_prune_active_tag) {
+ // detected current tag is now longer active and we have re-read the
+ // current object but it's still empty, so this tag is done
+ ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: "
+ << "object_num=" << object_num << ", "
+ << "active_tag " << *m_active_tag_tid << dendl;
+ prune_active_tag(boost::none);
+ continue;
+ } else if (m_watch_enabled && object_player->refetch_required()) {
+ // if the active object requires a refetch, don't proceed looking for a
+ // new tag before this process completes
+ ldout(m_cct, 10) << __func__ << ": refetch required: "
+ << "object_num=" << object_num << dendl;
+ return false;
+ }
}
}
-
- // if any entry is available, we can test if the sequence is corrupt
- return entry_available;
+ return false;
}
-bool JournalPlayer::prune_tag(uint64_t tag_tid) {
+void JournalPlayer::prune_tag(uint64_t tag_tid) {
assert(m_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
<< tag_tid << dendl;
+ // prune records that are at or below the largest prune tag tid
+ if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) {
+ m_prune_tag_tid = tag_tid;
+ }
+
for (auto &players : m_object_players) {
for (auto player_pair : players.second) {
ObjectPlayerPtr object_player = player_pair.second;
remove_empty_object_player(player_pair.second);
}
}
-
- // if we removed an empty object, a fetch will be in-flight
- if (!is_object_set_ready()) {
- return false;
- }
-
- // search for the start record for the next tag
- return verify_playback_ready();
}
-bool JournalPlayer::prune_active_tag() {
+void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
assert(m_lock.is_locked());
assert(m_active_tag_tid);
- uint64_t tag_tid = *m_active_tag_tid;
- m_active_tag_tid = boost::none;
+ uint64_t active_tag_tid = *m_active_tag_tid;
+ if (tag_tid) {
+ m_active_tag_tid = tag_tid;
+ }
m_splay_offset = 0;
m_watch_step = WATCH_STEP_FETCH_CURRENT;
- return prune_tag(tag_tid);
+ prune_tag(active_tag_tid);
}
const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
if (!player->empty() || object_set == active_set) {
return false;
} else if (m_watch_enabled && player->refetch_required()) {
- ldout(m_cct, 20) << __func__ << ": delaying removal of empty object"
- << dendl;
+ ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires "
+ << "a refetch" << dendl;
return false;
}
ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
<< dendl;
+
+ m_watch_prune_active_tag = false;
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+
ObjectPlayers &object_players = m_object_players[
player->get_object_number() % splay_width];
assert(!object_players.empty());
if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
// detect if a new tag has been created in case we are blocked
// by an incomplete tag sequence
- ldout(m_cct, 20) << __func__ << ": asserting active tag" << dendl;
- assert(m_active_tag_tid);
+ ldout(m_cct, 20) << __func__ << ": asserting active tag="
+ << *m_active_tag_tid << dendl;
FunctionContext *ctx = new FunctionContext([this](int r) {
handle_watch_assert_active(r);
});
assert(m_watch_scheduled);
m_watch_scheduled = false;
- if (m_active_tag_tid && r == -ESTALE) {
+ if (r == -ESTALE) {
// newer tag exists -- since we are at this step in the watch sequence,
// we know we can prune the active tag if watch fails again
ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "