ReplayHandler *replay_handler)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
- m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
- m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
- m_watch_interval(0), m_commit_object(0) {
+ m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
+ m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0),
+ m_commit_object(0) {
m_replay_handler->get();
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
}
void JournalPlayer::prefetch() {
- m_lock.Lock();
+ Mutex::Locker locker(m_lock);
assert(m_state == STATE_INIT);
m_state = STATE_PREFETCH;
uint8_t splay_width = m_journal_metadata->get_splay_width();
- ldout(m_cct, 10) << __func__ << ": prefetching " << (2 * splay_width) << " "
+ for (uint8_t splay_index = 0; splay_index < splay_width; ++splay_index) {
+ m_prefetch_splay_offsets.insert(splay_index);
+ }
+
+ uint64_t object_set = m_commit_object / splay_width;
+ uint64_t active_set = m_journal_metadata->get_active_set();
+
+ uint32_t object_count = splay_width *
+ std::min<uint64_t>(2, active_set - object_set + 1);
+ ldout(m_cct, 10) << __func__ << ": prefetching " << object_count << " "
<< "objects" << dendl;
// prefetch starting from the last known commit set
- C_PrefetchBatch *ctx = new C_PrefetchBatch(this);
- uint64_t start_object = (m_commit_object / splay_width) * splay_width;
+ uint64_t start_object = object_set * splay_width;
for (uint64_t object_number = start_object;
- object_number < start_object + (2 * splay_width); ++object_number) {
- ctx->add_fetch();
- fetch(object_number, ctx);
+ object_number < start_object + object_count; ++object_number) {
+ fetch(object_number);
}
- m_lock.Unlock();
-
- ctx->complete(0);
}
void JournalPlayer::prefetch_and_watch(double interval) {
}
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
+ ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
if (m_state != STATE_PLAYBACK) {
return false;
if (object_player->empty()) {
if (m_watch_enabled && !m_watch_scheduled) {
- object_player->watch(&m_process_state, m_watch_interval);
+ object_player->watch(
+ new C_Watch(this, object_player->get_object_number()),
+ m_watch_interval);
m_watch_scheduled = true;
} else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
}
} else {
advance_splay_object();
-
- ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
- if (!next_set_object_player->empty()) {
- remove_object_player(object_player, &m_process_state);
- }
+ remove_empty_object_player(object_player);
}
m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
return true;
}
-void JournalPlayer::process_state(int r) {
- ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+void JournalPlayer::process_state(uint64_t object_number, int r) {
+ ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
+ << "r=" << r << dendl;
if (r >= 0) {
Mutex::Locker locker(m_lock);
switch (m_state) {
case STATE_PREFETCH:
ldout(m_cct, 10) << "PREFETCH" << dendl;
- r = process_prefetch();
+ r = process_prefetch(object_number);
break;
case STATE_PLAYBACK:
ldout(m_cct, 10) << "PLAYBACK" << dendl;
}
}
-int JournalPlayer::process_prefetch() {
+int JournalPlayer::process_prefetch(uint64_t object_number) {
ldout(m_cct, 10) << __func__ << dendl;
assert(m_lock.is_locked());
uint8_t splay_width = m_journal_metadata->get_splay_width();
- for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- assert(m_object_players.count(splay_offset) == 1);
+ uint8_t splay_offset = object_number % splay_width;
- ObjectPlayers &object_players = m_object_players[splay_offset];
- assert(object_players.size() == 2);
+ PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find(
+ splay_offset);
+ if (it == m_prefetch_splay_offsets.end()) {
+ return 0;
+ }
- ObjectPlayerPtr object_player = object_players.begin()->second;
- assert(!object_player->is_fetch_in_progress());
-
- ldout(m_cct, 15) << "seeking known commit position in "
- << object_player->get_oid() << dendl;
- Entry entry;
- while (!m_commit_tids.empty() && !object_player->empty()) {
- object_player->front(&entry);
- if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
- ldout(m_cct, 10) << "located next uncommitted entry: " << entry
- << dendl;
- break;
- }
+ bool prefetch_complete = false;
+ assert(m_object_players.count(splay_offset) == 1);
+ ObjectPlayers &object_players = m_object_players[splay_offset];
- ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
- m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid());
- object_player->pop_front();
- }
+ // prefetch in-order since a newer splay object could prefetch first
+ while (!object_players.begin()->second->is_fetch_in_progress()) {
+ ObjectPlayerPtr object_player = object_players.begin()->second;
- // if this object contains the commit position, our read should start with
- // the next consistent journal entry in the sequence
- if (!m_commit_tids.empty() &&
- object_player->get_object_number() == m_commit_object) {
- if (object_player->empty()) {
- advance_splay_object();
- } else {
- Entry entry;
+ // skip past known committed records
+ if (!m_commit_tids.empty() && !object_player->empty()) {
+ ldout(m_cct, 15) << "seeking known commit position in "
+ << object_player->get_oid() << dendl;
+ Entry entry;
+ while (!m_commit_tids.empty() && !object_player->empty()) {
object_player->front(&entry);
- if (entry.get_tag() == m_commit_tag) {
+ if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
+ ldout(m_cct, 10) << "located next uncommitted entry: " << entry
+ << dendl;
+ break;
+ }
+
+ ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
+ m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid());
+ object_player->pop_front();
+ }
+
+ // if this object contains the commit position, our read should start with
+ // the next consistent journal entry in the sequence
+ if (!m_commit_tids.empty() &&
+ object_player->get_object_number() == m_commit_object) {
+ if (object_player->empty()) {
advance_splay_object();
+ } else {
+ Entry entry;
+ object_player->front(&entry);
+ if (entry.get_tag() == m_commit_tag) {
+ advance_splay_object();
+ }
}
}
}
- ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
- if (object_player->empty() && !next_set_object_player->empty()) {
- ldout(m_cct, 15) << object_player->get_oid() << " empty" << dendl;
- remove_object_player(object_player, &m_process_state);
+ // if the object is empty, pre-fetch the next splay object
+ if (!remove_empty_object_player(object_player)) {
+ prefetch_complete = true;
+ break;
}
}
+ if (!prefetch_complete) {
+ return 0;
+ }
+
+ m_prefetch_splay_offsets.erase(it);
+ if (!m_prefetch_splay_offsets.empty()) {
+ return 0;
+ }
+
m_state = STATE_PLAYBACK;
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
} else if (m_watch_enabled) {
- object_player->watch(&m_process_state, m_watch_interval);
+ object_player->watch(
+ new C_Watch(this, object_player->get_object_number()),
+ m_watch_interval);
m_watch_scheduled = true;
} else {
ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
assert(m_lock.is_locked());
- assert(m_object_players.count(m_splay_offset) == 1);
SplayedObjectPlayers::const_iterator it = m_object_players.find(
m_splay_offset);
assert(it != m_object_players.end());
- const ObjectPlayers &object_players = it->second;
- assert(object_players.size() == 2);
- return object_players;
+ return it->second;
}
ObjectPlayerPtr JournalPlayer::get_object_player() const {
<< static_cast<uint32_t>(m_splay_offset) << dendl;
}
-void JournalPlayer::remove_object_player(const ObjectPlayerPtr &object_player,
- Context *on_fetch) {
+bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
assert(m_lock.is_locked());
uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t object_set = player->get_object_number() / splay_width;
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ if (!player->empty() || object_set == active_set) {
+ return false;
+ }
+
+ ldout(m_cct, 15) << player->get_oid() << " empty" << dendl;
ObjectPlayers &object_players = m_object_players[
- object_player->get_object_number() % splay_width];
+ player->get_object_number() % splay_width];
assert(!object_players.empty());
- assert(object_players.begin()->second == object_player);
- object_players.erase(object_players.begin());
- fetch(object_player->get_object_number() + (2 * splay_width), on_fetch);
+ uint64_t next_object_num = object_players.rbegin()->first + splay_width;
+ uint64_t next_object_set = next_object_num / splay_width;
+ if (next_object_set <= active_set) {
+ fetch(next_object_num);
+ }
+ object_players.erase(player->get_object_number());
+ return true;
}
-void JournalPlayer::fetch(uint64_t object_num, Context *ctx) {
+void JournalPlayer::fetch(uint64_t object_num) {
assert(m_lock.is_locked());
std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
- C_Fetch *fetch_ctx = new C_Fetch(this, object_num, ctx);
+ C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
ObjectPlayerPtr object_player(new ObjectPlayer(
m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order()));
object_player->fetch(fetch_ctx);
}
-int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
- std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
-
- ldout(m_cct, 10) << __func__ << ": fetched "
+void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
+ ldout(m_cct, 10) << __func__ << ": "
<< utils::get_object_name(m_object_oid_prefix, object_num)
<< ": r=" << r << dendl;
- if (r < 0 && r != -ENOENT) {
- return r;
+ if (r == -ENOENT) {
+ r = 0;
}
- return 0;
-}
+ if (r == 0) {
+ Mutex::Locker locker(m_lock);
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_num % splay_width;
+ assert(m_object_players.count(splay_offset) == 1);
+ ObjectPlayers &object_players = m_object_players[splay_offset];
-JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
- : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
- return_value(0) {
- player->m_async_op_tracker.start_op();
-}
+ assert(object_players.count(object_num) == 1);
+ ObjectPlayerPtr object_player = object_players[object_num];
+ remove_empty_object_player(object_player);
+ }
-void JournalPlayer::C_PrefetchBatch::add_fetch() {
- Mutex::Locker locker(lock);
- ++refs;
+ process_state(object_num, r);
}
-void JournalPlayer::C_PrefetchBatch::complete(int r) {
- {
- Mutex::Locker locker(lock);
- if (r < 0 && return_value == 0) {
- return_value = r;
- }
- --refs;
- }
-
- if (refs == 0) {
- player->process_state(return_value);
- delete this;
- }
+void JournalPlayer::handle_watch(uint64_t object_num, int r) {
+ ldout(m_cct, 10) << __func__ << ": "
+ << utils::get_object_name(m_object_oid_prefix, object_num)
+ << ": r=" << r << dendl;
+ process_state(object_num, r);
}
} // namespace journal
bool try_pop_front(Entry *entry, uint64_t *commit_tid);
private:
+ typedef std::set<uint8_t> PrefetchSplayOffsets;
typedef std::map<std::string, uint64_t> AllocatedTids;
typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
STATE_ERROR
};
- struct C_ProcessState : public Context {
+ struct C_Watch : public Context {
JournalPlayer *player;
- C_ProcessState(JournalPlayer *p) : player(p) {}
- virtual void complete(int r) {
- player->process_state(r);
- }
- virtual void finish(int r) {}
- };
-
- struct C_PrefetchBatch : public Context {
- JournalPlayer *player;
- Mutex lock;
- uint32_t refs;
- int return_value;
+ uint64_t object_num;
- C_PrefetchBatch(JournalPlayer *p);
- virtual ~C_PrefetchBatch() {
- player->m_async_op_tracker.finish_op();
+ C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
+ }
+ virtual void finish(int r) {
+ player->handle_watch(object_num, r);
}
- void add_fetch();
- virtual void complete(int r);
- virtual void finish(int r) {}
};
struct C_Fetch : public Context {
JournalPlayer *player;
uint64_t object_num;
- Context *on_fetch;
- C_Fetch(JournalPlayer *p, uint64_t o, Context *c)
- : player(p), object_num(o), on_fetch(c) {
+ C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
player->m_async_op_tracker.start_op();
}
virtual ~C_Fetch() {
player->m_async_op_tracker.finish_op();
}
virtual void finish(int r) {
- r = player->handle_fetched(r, object_num);
- on_fetch->complete(r);
+ player->handle_fetched(object_num, r);
}
};
ReplayHandler *m_replay_handler;
- C_ProcessState m_process_state;
-
AsyncOpTracker m_async_op_tracker;
mutable Mutex m_lock;
bool m_watch_scheduled;
double m_watch_interval;
+ PrefetchSplayOffsets m_prefetch_splay_offsets;
SplayedObjectPlayers m_object_players;
uint64_t m_commit_object;
std::string m_commit_tag;
const ObjectPlayers &get_object_players() const;
ObjectPlayerPtr get_object_player() const;
ObjectPlayerPtr get_next_set_object_player() const;
- void remove_object_player(const ObjectPlayerPtr &object_player,
- Context *on_fetch);
+ bool remove_empty_object_player(const ObjectPlayerPtr &object_player);
- void process_state(int r);
- int process_prefetch();
+ void process_state(uint64_t object_number, int r);
+ int process_prefetch(uint64_t object_number);
int process_playback();
- void fetch(uint64_t object_num, Context *ctx);
- int handle_fetched(int r, uint64_t object_num);
+ void fetch(uint64_t object_num);
+ void handle_fetched(uint64_t object_num, int r);
+ void handle_watch(uint64_t object_num, int r);
};
} // namespace journal