]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: re-fetch active object before advancing set during replay
authorJason Dillaman <dillaman@redhat.com>
Fri, 13 May 2016 18:49:07 +0000 (14:49 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 18 May 2016 15:02:28 +0000 (11:02 -0400)
During a live replay, it's possible that an append and and overflow
into the next object could race with the live playback of the same
object.  Re-fetch an "empty" object at least once before advancing
to next set to ensure all records have been read.

Fixes: http://tracker.ceph.com/issues/15665
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/ObjectPlayer.cc
src/journal/ObjectPlayer.h
src/test/journal/test_JournalPlayer.cc

index 2f1d96df15c68e7669f246d121a6fdca3000791d..d0d62694a9af38fed5169969eb4c84437ac4f4d2 100644 (file)
@@ -394,13 +394,6 @@ bool JournalPlayer::verify_playback_ready() {
     }
   }
 
-  // if we just advanced to this object, make sure we have the latest
-  // set of data before advancing to a new tag
-  if (m_watch_enabled && m_watch_required) {
-    m_watch_required = false;
-    return false;
-  }
-
   // NOTE: replay currently does not check tag class to playback multiple tags
   // from different classes (issue #14909).  When a new tag is discovered, it
   // is assumed that the previous tag was closed at the last replayable entry.
@@ -452,7 +445,6 @@ void JournalPlayer::advance_splay_object() {
   assert(m_lock.is_locked());
   ++m_splay_offset;
   m_splay_offset %= m_journal_metadata->get_splay_width();
-  m_watch_required = true;
   ldout(m_cct, 20) << __func__ << ": new offset "
                    << static_cast<uint32_t>(m_splay_offset) << dendl;
 }
@@ -466,9 +458,16 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
   uint64_t active_set = m_journal_metadata->get_active_set();
   if (!player->empty() || object_set == active_set) {
     return false;
+  } else if (m_watch_enabled && object_set < active_set &&
+             player->refetch_required()) {
+    ldout(m_cct, 20) << __func__ << ": refetching " << player->get_oid()
+                     << dendl;
+    player->clear_refetch_required();
+    return false;
   }
 
-  ldout(m_cct, 15) << player->get_oid() << " empty" << dendl;
+  ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
+                   << dendl;
   ObjectPlayers &object_players = m_object_players[
     player->get_object_number() % splay_width];
   assert(!object_players.empty());
index 80a9ff7c1f0898979cc59c3949d34da639bb9d6d..85a8b28da25f52d04449c0154265a90dc4c05e2d 100644 (file)
@@ -114,7 +114,6 @@ private:
   bool m_watch_enabled;
   bool m_watch_scheduled;
   double m_watch_interval;
-  bool m_watch_required = false;
 
   bool m_handler_notified = false;
 
index 894b56f5da2da1d77450d6656fd36d9dca13c951..2c2f3e33a40b8e1d3134613c3a57a465c2174141 100644 (file)
@@ -221,6 +221,8 @@ void ObjectPlayer::handle_watch_fetched(int r) {
     assert(m_watch_in_progress);
     if (r == -ENOENT) {
       r = 0;
+    } else {
+      m_refetch_required = true;
     }
     std::swap(on_finish, m_watch_ctx);
   }
index f68ee3741d8277fc02d4854fef476d2f442d4a29..73fa7d132b289877d71bf5ea3bd920478e7fbfa0 100644 (file)
@@ -62,6 +62,13 @@ public:
     *invalid_ranges = m_invalid_ranges;
   }
 
+  inline bool refetch_required() const {
+    return m_refetch_required;
+  }
+  inline void clear_refetch_required() {
+    m_refetch_required = false;
+  }
+
 private:
   typedef std::pair<uint64_t, uint64_t> EntryKey;
   typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
@@ -114,6 +121,8 @@ private:
   Cond m_watch_in_progress_cond;
   bool m_watch_in_progress;
 
+  bool m_refetch_required = true;
+
   int handle_fetch_complete(int r, const bufferlist &bl);
 
   void schedule_watch();
index f989dd536b7a0a770d68714c748da29cf6a6c03e..7ad92be3e2ca090d70a039b4b65a66d0583747b6 100644 (file)
@@ -452,3 +452,48 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) {
   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
   ASSERT_EQ(3U, last_tid);
 }
+
+TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
+  ASSERT_EQ(0, write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
+  metadata->set_active_set(1);
+  player->prefetch_and_watch(0.25);
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  Entries expected_entries = {
+    create_entry(0, 0),
+    create_entry(0, 1),
+    create_entry(0, 2)};
+  ASSERT_EQ(expected_entries, entries);
+
+  journal::Entry entry;
+  uint64_t commit_tid;
+  ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  expected_entries = {
+    create_entry(0, 3),
+    create_entry(0, 4),
+    create_entry(0, 5)};
+  ASSERT_EQ(expected_entries, entries);
+}