]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: playback should notify when complete
authorJason Dillaman <dillaman@redhat.com>
Mon, 13 Jul 2015 19:24:18 +0000 (15:24 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:41 +0000 (20:42 -0500)
When not performing a live-playback, the client should be
notified when there are no more records to read.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/ReplayHandler.h

index ed70519ab63f8aca6b7ddce656c90f835c1ed304..9172a7046f3595349fc1f04e90150eb861f960a4 100644 (file)
@@ -15,8 +15,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
                              const std::string &object_oid_prefix,
                              const JournalMetadataPtr& journal_metadata,
                              ReplayHandler *replay_handler)
-  : RefCountedObject(NULL, 0), m_cct(NULL),
-    m_object_oid_prefix(object_oid_prefix),
+  : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
     m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
     m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
@@ -95,6 +94,10 @@ bool JournalPlayer::try_pop_front(Entry *entry,
     if (m_watch_enabled && !m_watch_scheduled) {
       object_player->watch(&m_process_state, m_watch_interval);
       m_watch_scheduled = true;
+    } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
+      m_lock.Unlock();
+      m_replay_handler->handle_complete(0);
+      m_lock.Lock();
     }
     return false;
   }
@@ -109,7 +112,7 @@ bool JournalPlayer::try_pop_front(Entry *entry,
 
     m_state = STATE_ERROR;
     m_lock.Unlock();
-    m_replay_handler->handle_error(-EINVAL);
+    m_replay_handler->handle_complete(-EINVAL);
     m_lock.Lock();
     return false;
   }
@@ -121,10 +124,10 @@ bool JournalPlayer::try_pop_front(Entry *entry,
     if (peek_entry.get_tag() == entry->get_tag() ||
         (m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(),
                                                     &last_tid) &&
-         last_tid +1 != peek_entry.get_tid())) {
+         last_tid + 1 != peek_entry.get_tid())) {
       advance_splay_object();
     }
-    } else {
+  } else {
     advance_splay_object();
 
     ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
@@ -167,7 +170,7 @@ void JournalPlayer::process_state(int r) {
       Mutex::Locker locker(m_lock);
       m_state = STATE_ERROR;
     }
-    m_replay_handler->handle_error(r);
+    m_replay_handler->handle_complete(r);
   }
 }
 
@@ -188,7 +191,7 @@ int JournalPlayer::process_prefetch() {
     ldout(m_cct, 15) << "seeking known commit position in "
                      << object_player->get_oid() << dendl;
     Entry entry;
-    while (!object_player->empty()) {
+    while (!m_commit_tids.empty() && !object_player->empty()) {
       object_player->front(&entry);
       if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
         ldout(m_cct, 10) << "located next uncommitted entry: " << entry
@@ -202,7 +205,8 @@ int JournalPlayer::process_prefetch() {
 
     // if this object contains the commit position, our read should start with
     // the next consistent journal entry in the sequence
-    if (object_player->get_object_number() == m_commit_object) {
+    if (!m_commit_tids.empty() &&
+        object_player->get_object_number() == m_commit_object) {
       if (object_player->empty()) {
         advance_splay_object();
       } else {
@@ -231,6 +235,12 @@ int JournalPlayer::process_prefetch() {
   } else if (m_watch_enabled) {
     object_player->watch(&m_process_state, m_watch_interval);
     m_watch_scheduled = true;
+  } else {
+    ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
+                     << dendl;
+    m_lock.Unlock();
+    m_replay_handler->handle_complete(0);
+    m_lock.Lock();
   }
   return 0;
 }
index 47126847c8d44091a25dd69bc4bdb541c6a65685..208350461fe47e29b656391d3805aa14a7317764 100644 (file)
@@ -10,7 +10,7 @@ struct ReplayHandler  {
   virtual ~ReplayHandler() {}
 
   virtual void handle_entries_available() = 0;
-  virtual void handle_error(int r) = 0;
+  virtual void handle_complete(int r) = 0;
 };
 
 } // namespace journal