]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: keep active tag to assist with pruning watched objects
authorJason Dillaman <dillaman@redhat.com>
Mon, 23 May 2016 18:57:03 +0000 (14:57 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 31 May 2016 15:49:37 +0000 (11:49 -0400)
It's possible that there might be additional entries to prune in
objects that haven't been prefetched yet. Keep the active tag
to allow these entries to be pruned after they have been loaded.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 0fb730e6be632e25d76695e1f37eba542162d620)

src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/test/journal/test_JournalPlayer.cc

index f3a27748fb8523324dcf284315f41d193a28c32c..66cd7afce8943fd575c637202d0f9cae19d785d3 100644 (file)
@@ -182,18 +182,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
   object_player->pop_front();
 
   uint64_t last_entry_tid;
-  if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
-    lderr(m_cct) << "unexpected tag in journal entry: "
-                 << "entry=" << *entry << ", "
-                 << "tag_tid=" << (m_active_tag_tid ? *m_active_tag_tid : -1)
-                 << dendl;
-
-    m_state = STATE_ERROR;
-    notify_complete(-ENOMSG);
-    return false;
-  } else if (m_journal_metadata->get_last_allocated_entry_tid(
-               entry->get_tag_tid(), &last_entry_tid) &&
-             entry->get_entry_tid() != last_entry_tid + 1) {
+  if (m_journal_metadata->get_last_allocated_entry_tid(
+        entry->get_tag_tid(), &last_entry_tid) &&
+      entry->get_entry_tid() != last_entry_tid + 1) {
     lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
 
     m_state = STATE_ERROR;
@@ -201,7 +192,6 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     return false;
   }
 
-  m_active_tag_tid = entry->get_tag_tid();
   advance_splay_object();
   remove_empty_object_player(object_player);
 
@@ -380,85 +370,111 @@ bool JournalPlayer::is_object_set_ready() const {
 bool JournalPlayer::verify_playback_ready() {
   assert(m_lock.is_locked());
 
-  if (!is_object_set_ready()) {
-    ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
-    return false;
-  }
-
-  ObjectPlayerPtr object_player = get_object_player();
-  assert(object_player);
-
-  // Verify is the active object player has another entry available
-  // in the sequence
-  Entry entry;
-  bool entry_available = false;
-  if (!object_player->empty()) {
-    entry_available = true;
-    m_watch_prune_active_tag = false;
-    object_player->front(&entry);
-    if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
-      return true;
-    } else if (m_active_tag_tid && entry.get_tag_tid() < m_active_tag_tid) {
-      // new tag is registered as primary commit position -- other object
-      // commit positions might still be associated with old tags
-      ldout(m_cct, 10) << __func__ << ": detected stale entry: " << entry
-                       << dendl;
-      return prune_tag(entry.get_tag_tid());
-    } else if (m_active_tag_tid && entry.get_tag_tid() > *m_active_tag_tid) {
-      // new tag at current playback position -- implies that previous
-      // tag ended abruptly without flushing out all records
-      // search for the start record for the next tag
-      ldout(m_cct, 10) << __func__ << ": new tag detected at current playback "
-                       << "position: active_tag=" << *m_active_tag_tid << ", "
-                       << "new_tag=" << entry.get_tag_tid() << dendl;
-      return prune_active_tag();
+  while (true) {
+    if (!is_object_set_ready()) {
+      ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+      return false;
     }
-  } else if (!m_watch_enabled && m_active_tag_tid) {
-    // current playback position is empty so this tag is done
-    ldout(m_cct, 10) << __func__ << ": no more in-sequence entries for tag "
-                     << *m_active_tag_tid << dendl;
-    return prune_active_tag();
-  } else if (m_watch_enabled && m_active_tag_tid && m_watch_prune_active_tag) {
-    // detected current tag is now longer active and we have re-read the current
-    // object but it's still empty, so this tag is done
-    ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries "
-                     << "for tag " << *m_active_tag_tid << dendl;
-    return prune_active_tag();
-  } else if (m_watch_enabled && object_player->refetch_required()) {
-    // if the active object requires a refetch, don't proceed looking for a
-    // new tag before this process completes
-    ldout(m_cct, 10) << __func__ << ": object requires refetch" << dendl;
-    return false;
-  }
 
-  // NOTE: replay currently does not check tag class to playback multiple tags
-  // from different classes (issue #14909).  When a new tag is discovered, it
-  // is assumed that the previous tag was closed at the last replayable entry.
-  object_player = m_object_players.begin()->second.begin()->second;
-  if (!object_player->empty() && m_active_tag_tid) {
-    object_player->front(&entry);
-    if (entry.get_tag_tid() > *m_active_tag_tid &&
-        entry.get_entry_tid() == 0) {
-      uint8_t splay_width = m_journal_metadata->get_splay_width();
-      m_active_tag_tid = entry.get_tag_tid();
-      m_splay_offset = object_player->get_object_number() % splay_width;
-
-      ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " "
-                       << "detected, adjusting offset to "
-                       << static_cast<uint32_t>(m_splay_offset) << dendl;
-      return true;
+    ObjectPlayerPtr object_player = get_object_player();
+    assert(object_player);
+    uint64_t object_num = object_player->get_object_number();
+
+    // Verify is the active object player has another entry available
+    // in the sequence
+    // NOTE: replay currently does not check tag class to playback multiple tags
+    // from different classes (issue #14909).  When a new tag is discovered, it
+    // is assumed that the previous tag was closed at the last replayable entry.
+    Entry entry;
+    if (!object_player->empty()) {
+      m_watch_prune_active_tag = false;
+      object_player->front(&entry);
+
+      if (!m_active_tag_tid) {
+        ldout(m_cct, 10) << __func__ << ": "
+                         << "object_num=" << object_num << ", "
+                         << "initial tag=" << entry.get_tag_tid()
+                         << dendl;
+        m_active_tag_tid = entry.get_tag_tid();
+        return true;
+      } else if (entry.get_tag_tid() < *m_active_tag_tid ||
+                 (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) {
+        // entry occurred before the current active tag
+        ldout(m_cct, 10) << __func__ << ": detected stale entry: "
+                         << "object_num=" << object_num << ", "
+                         << "entry=" << entry << dendl;
+        prune_tag(entry.get_tag_tid());
+        continue;
+      } else if (entry.get_tag_tid() > *m_active_tag_tid) {
+        // new tag at current playback position -- implies that previous
+        // tag ended abruptly without flushing out all records
+        // search for the start record for the next tag
+        ldout(m_cct, 10) << __func__ << ": new tag detected: "
+                         << "object_num=" << object_num << ", "
+                         << "active_tag=" << *m_active_tag_tid << ", "
+                         << "new_tag=" << entry.get_tag_tid() << dendl;
+        if (entry.get_entry_tid() == 0) {
+          // first entry in new tag -- can promote to active
+          prune_active_tag(entry.get_tag_tid());
+          return true;
+        } else {
+          // prune current active and wait for initial entry for new tag
+          prune_active_tag(boost::none);
+          continue;
+        }
+      } else {
+        ldout(m_cct, 20) << __func__ << ": "
+                         << "object_num=" << object_num << ", "
+                         << "entry: " << entry << dendl;
+        assert(entry.get_tag_tid() == *m_active_tag_tid);
+        return true;
+      }
+    } else {
+      if (!m_active_tag_tid) {
+        // waiting for our first entry
+        ldout(m_cct, 10) << __func__ << ": waiting for first entry: "
+                         << "object_num=" << object_num << dendl;
+        return false;
+      } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) {
+        ldout(m_cct, 10) << __func__ << ": no more entries" << dendl;
+        return false;
+      } else if (!m_watch_enabled) {
+        // current playback position is empty so this tag is done
+        ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
+                         << "object_num=" << object_num << ", "
+                         << "active_tag=" << *m_active_tag_tid << dendl;
+        prune_active_tag(boost::none);
+        continue;
+      } else if (m_watch_enabled && m_watch_prune_active_tag) {
+        // detected current tag is now longer active and we have re-read the
+        // current object but it's still empty, so this tag is done
+        ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: "
+                         << "object_num=" << object_num << ", "
+                         << "active_tag " << *m_active_tag_tid << dendl;
+        prune_active_tag(boost::none);
+        continue;
+      } else if (m_watch_enabled && object_player->refetch_required()) {
+        // if the active object requires a refetch, don't proceed looking for a
+        // new tag before this process completes
+        ldout(m_cct, 10) << __func__ << ": refetch required: "
+                         << "object_num=" << object_num << dendl;
+        return false;
+      }
     }
   }
-
-  // if any entry is available, we can test if the sequence is corrupt
-  return entry_available;
+  return false;
 }
 
-bool JournalPlayer::prune_tag(uint64_t tag_tid) {
+void JournalPlayer::prune_tag(uint64_t tag_tid) {
   assert(m_lock.is_locked());
   ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
                    << tag_tid << dendl;
 
+  // prune records that are at or below the largest prune tag tid
+  if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) {
+    m_prune_tag_tid = tag_tid;
+  }
+
   for (auto &players : m_object_players) {
     for (auto player_pair : players.second) {
       ObjectPlayerPtr object_player = player_pair.second;
@@ -482,26 +498,20 @@ bool JournalPlayer::prune_tag(uint64_t tag_tid) {
       remove_empty_object_player(player_pair.second);
     }
   }
-
-  // if we removed an empty object, a fetch will be in-flight
-  if (!is_object_set_ready()) {
-    return false;
-  }
-
-  // search for the start record for the next tag
-  return verify_playback_ready();
 }
 
-bool JournalPlayer::prune_active_tag() {
+void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
   assert(m_lock.is_locked());
   assert(m_active_tag_tid);
 
-  uint64_t tag_tid = *m_active_tag_tid;
-  m_active_tag_tid = boost::none;
+  uint64_t active_tag_tid = *m_active_tag_tid;
+  if (tag_tid) {
+    m_active_tag_tid = tag_tid;
+  }
   m_splay_offset = 0;
   m_watch_step = WATCH_STEP_FETCH_CURRENT;
 
-  return prune_tag(tag_tid);
+  prune_tag(active_tag_tid);
 }
 
 const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
@@ -561,13 +571,17 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
   if (!player->empty() || object_set == active_set) {
     return false;
   } else if (m_watch_enabled && player->refetch_required()) {
-    ldout(m_cct, 20) << __func__ << ": delaying removal of empty object"
-                     << dendl;
+    ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires "
+                     << "a refetch" << dendl;
     return false;
   }
 
   ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
                    << dendl;
+
+  m_watch_prune_active_tag = false;
+  m_watch_step = WATCH_STEP_FETCH_CURRENT;
+
   ObjectPlayers &object_players = m_object_players[
     player->get_object_number() % splay_width];
   assert(!object_players.empty());
@@ -631,8 +645,8 @@ void JournalPlayer::schedule_watch() {
   if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
     // detect if a new tag has been created in case we are blocked
     // by an incomplete tag sequence
-    ldout(m_cct, 20) << __func__ << ": asserting active tag" << dendl;
-    assert(m_active_tag_tid);
+    ldout(m_cct, 20) << __func__ << ": asserting active tag="
+                     << *m_active_tag_tid << dendl;
     FunctionContext *ctx = new FunctionContext([this](int r) {
         handle_watch_assert_active(r);
       });
@@ -710,7 +724,7 @@ void JournalPlayer::handle_watch_assert_active(int r) {
   assert(m_watch_scheduled);
   m_watch_scheduled = false;
 
-  if (m_active_tag_tid && r == -ESTALE) {
+  if (r == -ESTALE) {
     // newer tag exists -- since we are at this step in the watch sequence,
     // we know we can prune the active tag if watch fails again
     ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "
index d82edb1261e11cff0ae5d514f310d12302b30636..eb156b3e711f54d27bb18d724bcf43931a9b1156 100644 (file)
@@ -113,14 +113,16 @@ private:
   SplayedObjectPlayers m_object_players;
   uint64_t m_commit_object;
   SplayedObjectPositions m_commit_positions;
+
   boost::optional<uint64_t> m_active_tag_tid = boost::none;
+  boost::optional<uint64_t> m_prune_tag_tid = boost::none;
 
   void advance_splay_object();
 
   bool is_object_set_ready() const;
   bool verify_playback_ready();
-  bool prune_tag(uint64_t tag_tid);
-  bool prune_active_tag();
+  void prune_tag(uint64_t tag_tid);
+  void prune_active_tag(const boost::optional<uint64_t>& tag_tid);
 
   const ObjectPlayers &get_object_players() const;
   ObjectPlayerPtr get_object_player() const;
index 24db07d2755e65f2182905092a12c9d9521fe2be..d106ac4e28d983a71e52bd11bb08ff13df12ef33 100644 (file)
@@ -613,7 +613,6 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
   ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
   ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
   ASSERT_EQ(0, write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
-  ASSERT_EQ(0, metadata->set_active_set(1));
   player->prefetch_and_watch(0.25);
 
   Entries entries;
@@ -630,6 +629,7 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
 
   ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+  ASSERT_EQ(0, metadata->set_active_set(1));
   ASSERT_TRUE(wait_for_entries(player, 3, &entries));
 
   expected_entries = {