const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
ReplayHandler *replay_handler)
- : RefCountedObject(NULL, 0), m_cct(NULL),
- m_object_oid_prefix(object_oid_prefix),
+ : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
if (m_watch_enabled && !m_watch_scheduled) {
object_player->watch(&m_process_state, m_watch_interval);
m_watch_scheduled = true;
+ } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
+ m_lock.Unlock();
+ m_replay_handler->handle_complete(0);
+ m_lock.Lock();
}
return false;
}
m_state = STATE_ERROR;
m_lock.Unlock();
- m_replay_handler->handle_error(-EINVAL);
+ m_replay_handler->handle_complete(-EINVAL);
m_lock.Lock();
return false;
}
if (peek_entry.get_tag() == entry->get_tag() ||
(m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(),
&last_tid) &&
- last_tid +1 != peek_entry.get_tid())) {
+ last_tid + 1 != peek_entry.get_tid())) {
advance_splay_object();
}
- } else {
+ } else {
advance_splay_object();
ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
Mutex::Locker locker(m_lock);
m_state = STATE_ERROR;
}
- m_replay_handler->handle_error(r);
+ m_replay_handler->handle_complete(r);
}
}
ldout(m_cct, 15) << "seeking known commit position in "
<< object_player->get_oid() << dendl;
Entry entry;
- while (!object_player->empty()) {
+ while (!m_commit_tids.empty() && !object_player->empty()) {
object_player->front(&entry);
if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
ldout(m_cct, 10) << "located next uncommitted entry: " << entry
// if this object contains the commit position, our read should start with
// the next consistent journal entry in the sequence
- if (object_player->get_object_number() == m_commit_object) {
+ if (!m_commit_tids.empty() &&
+ object_player->get_object_number() == m_commit_object) {
if (object_player->empty()) {
advance_splay_object();
} else {
} else if (m_watch_enabled) {
object_player->watch(&m_process_state, m_watch_interval);
m_watch_scheduled = true;
+ } else {
+ ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
+ << dendl;
+ m_lock.Unlock();
+ m_replay_handler->handle_complete(0);
+ m_lock.Lock();
}
return 0;
}