]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: clean up playback notification handling
authorJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 15:03:14 +0000 (10:03 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 15:08:13 +0000 (10:08 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h

index 664b4b001024f6b1b5d794f5140f471b4b6b188b..e001c018c6e359ede7f8926708f593dad6387871 100644 (file)
@@ -150,6 +150,8 @@ void JournalPlayer::unwatch() {
 bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
   ldout(m_cct, 20) << __func__ << dendl;
   Mutex::Locker locker(m_lock);
+
+  m_handler_notified = false;
   if (m_state != STATE_PLAYBACK) {
     return false;
   }
@@ -160,9 +162,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
 
   if (!verify_playback_ready()) {
     if (!m_watch_enabled) {
-      ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
-      m_journal_metadata->get_finisher().queue(new C_HandleComplete(
-        m_replay_handler), 0);
+      notify_complete(0);
     } else if (!m_watch_scheduled) {
       schedule_watch();
     }
@@ -180,8 +180,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
 
     m_state = STATE_ERROR;
-    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
-      m_replay_handler), -ENOMSG);
+    notify_complete(-ENOMSG);
     return false;
   } else if (m_journal_metadata->get_last_allocated_entry_tid(
                entry->get_tag_tid(), &last_entry_tid) &&
@@ -189,8 +188,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
 
     m_state = STATE_ERROR;
-    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
-      m_replay_handler), -ENOMSG);
+    notify_complete(-ENOMSG);
     return false;
   }
 
@@ -233,8 +231,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) {
 
   if (r < 0) {
     m_state = STATE_ERROR;
-    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
-      m_replay_handler), r);
+    notify_complete(r);
   }
 }
 
@@ -329,16 +326,13 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   if (!is_object_set_ready()) {
     ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
   } else if (verify_playback_ready()) {
-    ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-    m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
-      m_replay_handler), 0);
+    notify_entries_available();
   } else if (m_watch_enabled) {
     schedule_watch();
   } else {
     ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
                      << dendl;
-    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
-      m_replay_handler), 0);
+    notify_complete(0);
   }
   return 0;
 }
@@ -353,17 +347,13 @@ int JournalPlayer::process_playback(uint64_t object_number) {
 
   ObjectPlayerPtr object_player = get_object_player();
   if (verify_playback_ready()) {
-    ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-    m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
-      m_replay_handler), 0);
+    notify_entries_available();
   } else if (!m_watch_enabled && is_object_set_ready()) {
     uint8_t splay_width = m_journal_metadata->get_splay_width();
     uint64_t active_set = m_journal_metadata->get_active_set();
     uint64_t object_set = object_player->get_object_number() / splay_width;
     if (object_set == active_set) {
-      ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
-      m_journal_metadata->get_finisher().queue(new C_HandleComplete(
-        m_replay_handler), 0);
+      notify_complete(0);
     }
   }
   return 0;
@@ -558,4 +548,25 @@ void JournalPlayer::handle_watch(int r) {
   }
 }
 
+void JournalPlayer::notify_entries_available() {
+  assert(m_lock.is_locked());
+  if (m_handler_notified) {
+    return;
+  }
+  m_handler_notified = true;
+
+  ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+  m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+    m_replay_handler), 0);
+}
+
+void JournalPlayer::notify_complete(int r) {
+  assert(m_lock.is_locked());
+  m_handler_notified = true;
+
+  ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
+  m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+    m_replay_handler), r);
+}
+
 } // namespace journal
index bdb93d7d107b76ca2cb8eda85cf713cb73c144a4..007a4a160b59746d092d10e3bec7013055b94fcc 100644 (file)
@@ -112,6 +112,8 @@ private:
   bool m_watch_scheduled;
   double m_watch_interval;
 
+  bool m_handler_notified = false;
+
   PrefetchSplayOffsets m_prefetch_splay_offsets;
   SplayedObjectPlayers m_object_players;
   uint64_t m_commit_object;
@@ -136,6 +138,9 @@ private:
 
   void schedule_watch();
   void handle_watch(int r);
+
+  void notify_entries_available();
+  void notify_complete(int r);
 };
 
 } // namespace journal