]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: properly handle tag transition
authorJason Dillaman <dillaman@redhat.com>
Wed, 2 Mar 2016 15:09:07 +0000 (10:09 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 14:03:44 +0000 (09:03 -0500)
Now that the tag concept has been re-used for delineating epochs for
librbd, we need playback to properly handle the cases where the active
playback tag abruptly ends and a newer tag is inserted in the first
splay offset object.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/ObjectPlayer.cc
src/test/journal/test_JournalPlayer.cc
src/test/journal/test_ObjectPlayer.cc

index 0478cec91be98d64f6fe37592618cd517d9dee96..664b4b001024f6b1b5d794f5140f471b4b6b188b 100644 (file)
@@ -68,7 +68,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
 
     // start replay after the last committed entry's object
     uint8_t splay_width = m_journal_metadata->get_splay_width();
-    m_commit_object = commit_position.object_positions.front().object_number;
+    auto &active_position = commit_position.object_positions.front();
+    m_active_tag_tid = active_position.tag_tid;
+    m_commit_object = active_position.object_number;
     m_splay_offset = m_commit_object % splay_width;
     for (auto &position : commit_position.object_positions) {
       uint8_t splay_offset = position.object_number % splay_width;
@@ -134,13 +136,13 @@ void JournalPlayer::prefetch_and_watch(double interval) {
 }
 
 void JournalPlayer::unwatch() {
+  ldout(m_cct, 20) << __func__ << dendl;
   Mutex::Locker locker(m_lock);
   m_watch_enabled = false;
   if (m_watch_scheduled) {
-    ObjectPlayerPtr object_player = get_object_player();
-    assert(object_player);
-
-    object_player->unwatch();
+    for (auto &players : m_object_players) {
+      players.second.begin()->second->unwatch();
+    }
     m_watch_scheduled = false;
   }
 }
@@ -152,30 +154,38 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     return false;
   }
 
-  ObjectPlayerPtr object_player = get_object_player();
-  assert(object_player);
+  if (!is_object_set_ready()) {
+    return false;
+  }
 
-  if (object_player->empty()) {
-    if (m_watch_enabled && !m_watch_scheduled) {
-      object_player->watch(
-        new C_Watch(this, object_player->get_object_number()),
-        m_watch_interval);
-      m_watch_scheduled = true;
-    } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
+  if (!verify_playback_ready()) {
+    if (!m_watch_enabled) {
       ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
       m_journal_metadata->get_finisher().queue(new C_HandleComplete(
         m_replay_handler), 0);
+    } else if (!m_watch_scheduled) {
+      schedule_watch();
     }
     return false;
   }
 
+  ObjectPlayerPtr object_player = get_object_player();
+  assert(object_player && !object_player->empty());
+
   object_player->front(entry);
   object_player->pop_front();
 
   uint64_t last_entry_tid;
-  if (m_journal_metadata->get_last_allocated_entry_tid(
-        entry->get_tag_tid(), &last_entry_tid) &&
-      entry->get_entry_tid() != last_entry_tid + 1) {
+  if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
+    lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
+
+    m_state = STATE_ERROR;
+    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+      m_replay_handler), -ENOMSG);
+    return false;
+  } else if (m_journal_metadata->get_last_allocated_entry_tid(
+               entry->get_tag_tid(), &last_entry_tid) &&
+             entry->get_entry_tid() != last_entry_tid + 1) {
     lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
 
     m_state = STATE_ERROR;
@@ -184,20 +194,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     return false;
   }
 
-  // skip to next splay offset if we cannot apply the next entry in-sequence
-  if (!object_player->empty()) {
-    Entry peek_entry;
-    object_player->front(&peek_entry);
-    if (peek_entry.get_tag_tid() == entry->get_tag_tid() ||
-        (m_journal_metadata->get_last_allocated_entry_tid(
-           peek_entry.get_tag_tid(), &last_entry_tid) &&
-         last_entry_tid + 1 != peek_entry.get_entry_tid())) {
-      advance_splay_object();
-    }
-  } else {
-    advance_splay_object();
-    remove_empty_object_player(object_player);
-  }
+  m_active_tag_tid = entry->get_tag_tid();
+  advance_splay_object();
+  remove_empty_object_player(object_player);
 
   m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
                                         entry->get_entry_tid());
@@ -210,8 +209,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
 void JournalPlayer::process_state(uint64_t object_number, int r) {
   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
                    << "r=" << r << dendl;
+
+  assert(m_lock.is_locked());
   if (r >= 0) {
-    Mutex::Locker locker(m_lock);
     switch (m_state) {
     case STATE_PREFETCH:
       ldout(m_cct, 10) << "PREFETCH" << dendl;
@@ -232,11 +232,9 @@ void JournalPlayer::process_state(uint64_t object_number, int r) {
   }
 
   if (r < 0) {
-    {
-      Mutex::Locker locker(m_lock);
-      m_state = STATE_ERROR;
-    }
-    m_replay_handler->handle_complete(r);
+    m_state = STATE_ERROR;
+    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+      m_replay_handler), r);
   }
 }
 
@@ -328,16 +326,14 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   }
 
   m_state = STATE_PLAYBACK;
-  ObjectPlayerPtr object_player = get_object_player();
-  if (!object_player->empty()) {
+  if (!is_object_set_ready()) {
+    ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+  } else if (verify_playback_ready()) {
     ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
     m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
       m_replay_handler), 0);
   } else if (m_watch_enabled) {
-    object_player->watch(
-      new C_Watch(this, object_player->get_object_number()),
-      m_watch_interval);
-    m_watch_scheduled = true;
+    schedule_watch();
   } else {
     ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
                      << dendl;
@@ -351,18 +347,20 @@ int JournalPlayer::process_playback(uint64_t object_number) {
   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
   assert(m_lock.is_locked());
 
-  m_watch_scheduled = false;
+  if (!is_object_set_ready()) {
+    return 0;
+  }
 
   ObjectPlayerPtr object_player = get_object_player();
-  if (object_player->get_object_number() == object_number) {
+  if (verify_playback_ready()) {
+    ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+    m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+      m_replay_handler), 0);
+  } else if (!m_watch_enabled && is_object_set_ready()) {
     uint8_t splay_width = m_journal_metadata->get_splay_width();
     uint64_t active_set = m_journal_metadata->get_active_set();
     uint64_t object_set = object_player->get_object_number() / splay_width;
-    if (!object_player->empty()) {
-      ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-      m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
-        m_replay_handler), 0);
-    } else if (object_set == active_set) {
+    if (object_set == active_set) {
       ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
       m_journal_metadata->get_finisher().queue(new C_HandleComplete(
         m_replay_handler), 0);
@@ -371,6 +369,61 @@ int JournalPlayer::process_playback(uint64_t object_number) {
   return 0;
 }
 
+bool JournalPlayer::is_object_set_ready() const {
+  assert(m_lock.is_locked());
+  if (m_watch_scheduled) {
+    return false;
+  }
+  for (auto &players : m_object_players) {
+    if (players.second.begin()->second->is_fetch_in_progress()) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool JournalPlayer::verify_playback_ready() {
+  assert(m_lock.is_locked());
+  assert(is_object_set_ready());
+
+  ObjectPlayerPtr object_player = get_object_player();
+  assert(object_player);
+
+  // Verify is the active object player has another entry available
+  // in the sequence
+  Entry entry;
+  bool entry_available = false;
+  if (!object_player->empty()) {
+    entry_available = true;
+    object_player->front(&entry);
+    if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
+      return true;
+    }
+  }
+
+  // NOTE: replay currently does not check tag class to playback multiple tags
+  // from different classes (issue #14909).  When a new tag is discovered, it
+  // is assumed that the previous tag was closed at the last replayable entry.
+  object_player = m_object_players.begin()->second.begin()->second;
+  if (!object_player->empty() && m_active_tag_tid) {
+    object_player->front(&entry);
+    if (entry.get_tag_tid() > *m_active_tag_tid &&
+        entry.get_entry_tid() == 0) {
+      uint8_t splay_width = m_journal_metadata->get_splay_width();
+      m_active_tag_tid = entry.get_tag_tid();
+      m_splay_offset = object_player->get_object_number() / splay_width;
+
+      ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " "
+                       << "detected, adjusting offset to "
+                       << static_cast<uint32_t>(m_splay_offset) << dendl;
+      return true;
+    }
+  }
+
+  // if any entry is available, we can test if the sequence is corrupt
+  return entry_available;
+}
+
 const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
   assert(m_lock.is_locked());
 
@@ -405,6 +458,7 @@ void JournalPlayer::advance_splay_object() {
 
 bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
   assert(m_lock.is_locked());
+  assert(!m_watch_scheduled);
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   uint64_t object_set = player->get_object_number() / splay_width;
@@ -447,11 +501,12 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
   ldout(m_cct, 10) << __func__ << ": "
                    << utils::get_object_name(m_object_oid_prefix, object_num)
                    << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
   if (r == -ENOENT) {
     r = 0;
   }
   if (r == 0) {
-    Mutex::Locker locker(m_lock);
     uint8_t splay_width = m_journal_metadata->get_splay_width();
     uint8_t splay_offset = object_num % splay_width;
     assert(m_object_players.count(splay_offset) == 1);
@@ -461,15 +516,46 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
     ObjectPlayerPtr object_player = object_players[object_num];
     remove_empty_object_player(object_player);
   }
-
   process_state(object_num, r);
 }
 
-void JournalPlayer::handle_watch(uint64_t object_num, int r) {
-  ldout(m_cct, 10) << __func__ << ": "
-                   << utils::get_object_name(m_object_oid_prefix, object_num)
-                   << ": r=" << r << dendl;
-  process_state(object_num, r);
+void JournalPlayer::schedule_watch() {
+  ldout(m_cct, 10) << __func__ << dendl;
+  assert(m_lock.is_locked());
+  if (m_watch_scheduled) {
+    return;
+  }
+
+  // poll first splay offset and active splay offset since
+  // new records should only appear in those two objects
+  C_Watch *ctx = new C_Watch(this);
+  ObjectPlayerPtr object_player = get_object_player();
+  object_player->watch(ctx, m_watch_interval);
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  if (object_player->get_object_number() % splay_width != 0) {
+    ++ctx->pending_fetches;
+
+    object_player = m_object_players.begin()->second.begin()->second;
+    object_player->watch(ctx, m_watch_interval);
+  }
+  m_watch_scheduled = true;
+}
+
+void JournalPlayer::handle_watch(int r) {
+  ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  m_watch_scheduled = false;
+  std::set<uint64_t> object_numbers;
+  for (auto &players : m_object_players) {
+    object_numbers.insert(
+      players.second.begin()->second->get_object_number());
+  }
+
+  for (auto object_num : object_numbers) {
+    process_state(object_num, r);
+  }
 }
 
 } // namespace journal
index a07728ec0a2106696a2ae068ed509a71f2a082f6..bdb93d7d107b76ca2cb8eda85cf713cb73c144a4 100644 (file)
@@ -12,6 +12,8 @@
 #include "journal/JournalMetadata.h"
 #include "journal/ObjectPlayer.h"
 #include "cls/journal/cls_journal_types.h"
+#include <boost/none.hpp>
+#include <boost/optional.hpp>
 #include <map>
 
 class SafeTimer;
@@ -51,17 +53,6 @@ private:
     STATE_ERROR
   };
 
-  struct C_Watch : public Context {
-    JournalPlayer *player;
-    uint64_t object_num;
-
-    C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
-    }
-    virtual void finish(int r) {
-      player->handle_watch(object_num, r);
-    }
-  };
-
   struct C_Fetch : public Context {
     JournalPlayer *player;
     uint64_t object_num;
@@ -76,6 +67,34 @@ private:
     }
   };
 
+  struct C_Watch : public Context {
+    JournalPlayer *player;
+    uint8_t pending_fetches = 1;
+    int ret_val = 0;
+
+    C_Watch(JournalPlayer *player) : player(player) {
+    }
+
+    virtual void complete(int r) override {
+      player->m_lock.Lock();
+      if (ret_val == 0 && r < 0) {
+        ret_val = r;
+      }
+
+      assert(pending_fetches > 0);
+      if (--pending_fetches == 0) {
+        player->m_lock.Unlock();
+        Context::complete(ret_val);
+      } else {
+        player->m_lock.Unlock();
+      }
+    }
+
+    virtual void finish(int r) override {
+      player->handle_watch(r);
+    }
+  };
+
   librados::IoCtx m_ioctx;
   CephContext *m_cct;
   std::string m_object_oid_prefix;
@@ -97,9 +116,12 @@ private:
   SplayedObjectPlayers m_object_players;
   uint64_t m_commit_object;
   SplayedObjectPositions m_commit_positions;
+  boost::optional<uint64_t> m_active_tag_tid = boost::none;
 
   void advance_splay_object();
 
+  bool is_object_set_ready() const;
+  bool verify_playback_ready();
   const ObjectPlayers &get_object_players() const;
   ObjectPlayerPtr get_object_player() const;
   ObjectPlayerPtr get_next_set_object_player() const;
@@ -111,7 +133,9 @@ private:
 
   void fetch(uint64_t object_num);
   void handle_fetched(uint64_t object_num, int r);
-  void handle_watch(uint64_t object_num, int r);
+
+  void schedule_watch();
+  void handle_watch(int r);
 };
 
 } // namespace journal
index 2aca0d73b47b8a553b5a04a0a151497fc2d08970..e890dfa2d90b04c7ab1de4c14e7ee2ec0958e8ee 100644 (file)
@@ -207,11 +207,10 @@ void ObjectPlayer::handle_watch_fetched(int r) {
     Mutex::Locker timer_locker(m_timer_lock);
     assert(m_watch_in_progress);
     if (r == -ENOENT) {
-      schedule_watch();
-    } else {
-      on_finish = m_watch_ctx;
-      m_watch_ctx = NULL;
+      r = 0;
     }
+    on_finish = m_watch_ctx;
+    m_watch_ctx = NULL;
   }
 
   if (on_finish != NULL) {
index 7612b4982f5611a3ef2fc02ac8f985f88fb14553..4eb5628e59b44c47aaf7c955dc3808698df3705c 100644 (file)
@@ -249,11 +249,12 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
 
   journal::JournalPlayer::ObjectPositions positions;
   positions = {
-    cls::journal::ObjectPosition(0, 234, 122),
-    cls::journal::ObjectPosition(1, 345, 1)};
+    cls::journal::ObjectPosition(2, 234, 122),
+    cls::journal::ObjectPosition(1, 234, 121),
+    cls::journal::ObjectPosition(0, 234, 120)};
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, create(oid, 3));
   ASSERT_EQ(0, client_register(oid));
   ASSERT_EQ(0, client_commit(oid, commit_position));
 
@@ -263,13 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
   journal::JournalPlayer *player = create_player(oid, metadata);
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
-  ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
-  ASSERT_EQ(0, write_entry(oid, 1, 345, 1));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
-  ASSERT_EQ(0, write_entry(oid, 0, 345, 2));
+  ASSERT_EQ(0, write_entry(oid, 2, 234, 122));
+  ASSERT_EQ(0, write_entry(oid, 0, 234, 123));
+  ASSERT_EQ(0, write_entry(oid, 1, 234, 124));
+  ASSERT_EQ(0, write_entry(oid, 0, 236, 0)); // new tag allocated
 
   player->prefetch();
 
@@ -280,8 +279,8 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
   uint64_t last_tid;
   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
   ASSERT_EQ(124U, last_tid);
-  ASSERT_TRUE(metadata->get_last_allocated_entry_tid(345, &last_tid));
-  ASSERT_EQ(2U, last_tid);
+  ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
+  ASSERT_EQ(0U, last_tid);
 }
 
 TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
@@ -299,13 +298,41 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
   journal::JournalPlayer *player = create_player(oid, metadata);
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
-  ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
   ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(wait_for_entries(player, 2, &entries));
+
+  journal::Entry entry;
+  uint64_t commit_tid;
+  ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
+}
+
+TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
+  ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
+  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+
+  player->prefetch();
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
 
   journal::Entry entry;
   uint64_t commit_tid;
index 586f97094a6da50d662f0feb9dd1f4877bf7785a..6103ee6b67f01eb6930c79d3aae1950cf7c0ae86 100644 (file)
@@ -267,7 +267,7 @@ TEST_F(TestObjectPlayer, Unwatch) {
   bool done = false;
   int rval = 0;
   C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
-  object->watch(ctx, 0.1);
+  object->watch(ctx, 600);
 
   usleep(200000);
   ASSERT_FALSE(done);