bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
+
+ m_handler_notified = false;
if (m_state != STATE_PLAYBACK) {
return false;
}
if (!verify_playback_ready()) {
if (!m_watch_enabled) {
- ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
- m_replay_handler), 0);
+ notify_complete(0);
} else if (!m_watch_scheduled) {
schedule_watch();
}
lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
- m_replay_handler), -ENOMSG);
+ notify_complete(-ENOMSG);
return false;
} else if (m_journal_metadata->get_last_allocated_entry_tid(
entry->get_tag_tid(), &last_entry_tid) &&
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
- m_replay_handler), -ENOMSG);
+ notify_complete(-ENOMSG);
return false;
}
if (r < 0) {
m_state = STATE_ERROR;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
- m_replay_handler), r);
+ notify_complete(r);
}
}
if (!is_object_set_ready()) {
ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
} else if (verify_playback_ready()) {
- ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
- m_replay_handler), 0);
+ notify_entries_available();
} else if (m_watch_enabled) {
schedule_watch();
} else {
ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
<< dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
- m_replay_handler), 0);
+ notify_complete(0);
}
return 0;
}
ObjectPlayerPtr object_player = get_object_player();
if (verify_playback_ready()) {
- ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
- m_replay_handler), 0);
+ notify_entries_available();
} else if (!m_watch_enabled && is_object_set_ready()) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t active_set = m_journal_metadata->get_active_set();
uint64_t object_set = object_player->get_object_number() / splay_width;
if (object_set == active_set) {
- ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
- m_replay_handler), 0);
+ notify_complete(0);
}
}
return 0;
}
}
+void JournalPlayer::notify_entries_available() {
+ assert(m_lock.is_locked());
+ if (m_handler_notified) {
+ return;
+ }
+ m_handler_notified = true;
+
+ ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+ m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
+}
+
+void JournalPlayer::notify_complete(int r) {
+ assert(m_lock.is_locked());
+ m_handler_notified = true;
+
+ ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
+ m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_replay_handler), r);
+}
+
} // namespace journal