m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0),
- m_commit_object(0), m_commit_tag_tid(0) {
+ m_commit_object(0) {
m_replay_handler->get();
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
ObjectSetPosition commit_position;
m_journal_metadata->get_commit_position(&commit_position);
- // TODO
- /*
if (!commit_position.object_positions.empty()) {
+ ldout(m_cct, 5) << "commit position: " << commit_position << dendl;
+
+ // start replay after the last committed entry's object
uint8_t splay_width = m_journal_metadata->get_splay_width();
- m_splay_offset = commit_position.object_number % splay_width;
- m_commit_object = commit_position.object_number;
- m_commit_tag_tid = commit_position.object_positions.front().tag_tid;
-
- for (ObjectPositions::const_iterator it =
- commit_position.object_positions.begin();
- it != commit_position.object_positions.end(); ++it) {
- const ObjectPosition &object_position = *it;
- m_commit_tids[object_position.tag_tid] = object_position.entry_tid;
+ m_commit_object = commit_position.object_positions.front().object_number;
+ m_splay_offset = m_commit_object % splay_width;
+ for (auto &position : commit_position.object_positions) {
+ uint8_t splay_offset = position.object_number % splay_width;
+ m_commit_positions[splay_offset] = position;
}
}
- */
}
JournalPlayer::~JournalPlayer() {
m_state = STATE_PREFETCH;
uint8_t splay_width = m_journal_metadata->get_splay_width();
- for (uint8_t splay_index = 0; splay_index < splay_width; ++splay_index) {
- m_prefetch_splay_offsets.insert(splay_index);
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ m_prefetch_splay_offsets.insert(splay_offset);
+ }
+
+ // compute active object for each splay offset (might be before
+ // active set)
+ std::map<uint8_t, uint64_t> splay_offset_to_objects;
+ for (auto &position : m_commit_positions) {
+ assert(splay_offset_to_objects.count(position.first) == 0);
+ splay_offset_to_objects[position.first] = position.second.object_number;
}
- uint64_t object_set = m_commit_object / splay_width;
+ // prefetch the active object for each splay offset (and the following object)
uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t max_object_number = (splay_width * (active_set + 1)) - 1;
+ std::set<uint64_t> prefetch_object_numbers;
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ uint64_t object_number = splay_offset;
+ if (splay_offset_to_objects.count(splay_offset) != 0) {
+ object_number = splay_offset_to_objects[splay_offset];
+ }
- uint32_t object_count = splay_width *
- std::min<uint64_t>(2, active_set - object_set + 1);
- ldout(m_cct, 10) << __func__ << ": prefetching " << object_count << " "
- << "objects" << dendl;
+ prefetch_object_numbers.insert(object_number);
+ if (object_number + splay_width <= max_object_number) {
+ prefetch_object_numbers.insert(object_number + splay_width);
+ }
+ }
- // prefetch starting from the last known commit set
- uint64_t start_object = object_set * splay_width;
- for (uint64_t object_number = start_object;
- object_number < start_object + object_count; ++object_number) {
+ ldout(m_cct, 10) << __func__ << ": prefetching "
+ << prefetch_object_numbers.size() << " " << "objects"
+ << dendl;
+ for (auto object_number : prefetch_object_numbers) {
fetch(object_number);
}
}
// prefetch in-order since a newer splay object could prefetch first
while (!object_players.begin()->second->is_fetch_in_progress()) {
ObjectPlayerPtr object_player = object_players.begin()->second;
+ uint64_t player_object_number = object_player->get_object_number();
// skip past known committed records
- if (!m_commit_tids.empty() && !object_player->empty()) {
- ldout(m_cct, 15) << "seeking known commit position in "
+ if (m_commit_positions.count(splay_offset) != 0 &&
+ !object_player->empty()) {
+ ObjectPosition &position = m_commit_positions[splay_offset];
+
+ ldout(m_cct, 15) << "seeking known commit position " << position << " in "
<< object_player->get_oid() << dendl;
+
+ bool found_commit = false;
Entry entry;
- while (!m_commit_tids.empty() && !object_player->empty()) {
+ while (!object_player->empty()) {
object_player->front(&entry);
- if (entry.get_entry_tid() > m_commit_tids[entry.get_tag_tid()]) {
+
+ if (entry.get_tag_tid() == position.tag_tid &&
+ entry.get_entry_tid() == position.entry_tid) {
+ found_commit = true;
+ } else if (found_commit) {
ldout(m_cct, 10) << "located next uncommitted entry: " << entry
<< dendl;
break;
// if this object contains the commit position, our read should start with
// the next consistent journal entry in the sequence
- if (!m_commit_tids.empty() &&
- object_player->get_object_number() == m_commit_object) {
+ if (player_object_number == m_commit_object) {
if (object_player->empty()) {
advance_splay_object();
} else {
Entry entry;
object_player->front(&entry);
- if (entry.get_tag_tid() == m_commit_tag_tid) {
+ if (entry.get_tag_tid() == position.tag_tid) {
advance_splay_object();
}
}
}
+
+ // do not search for commit position for this object
+ // if we've already seen it
+ if (found_commit) {
+ m_commit_positions.erase(splay_offset);
+ }
}
// if the object is empty, pre-fetch the next splay object