]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: update JournalPlayer to support new commit tracking
authorJason Dillaman <dillaman@redhat.com>
Wed, 24 Feb 2016 01:44:29 +0000 (20:44 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 26 Feb 2016 16:54:52 +0000 (11:54 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/test/journal/test_JournalPlayer.cc

index 7b0ad8a1faa4f0d3d7a993b02478f11f5b5ff110..0478cec91be98d64f6fe37592618cd517d9dee96 100644 (file)
@@ -55,7 +55,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
     m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
     m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
     m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0),
-    m_commit_object(0), m_commit_tag_tid(0) {
+    m_commit_object(0) {
   m_replay_handler->get();
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -63,22 +63,18 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
   ObjectSetPosition commit_position;
   m_journal_metadata->get_commit_position(&commit_position);
 
-  // TODO
-  /*
   if (!commit_position.object_positions.empty()) {
+    ldout(m_cct, 5) << "commit position: " << commit_position << dendl;
+
+    // start replay after the last committed entry's object
     uint8_t splay_width = m_journal_metadata->get_splay_width();
-    m_splay_offset = commit_position.object_number % splay_width;
-    m_commit_object = commit_position.object_number;
-    m_commit_tag_tid = commit_position.object_positions.front().tag_tid;
-
-    for (ObjectPositions::const_iterator it =
-           commit_position.object_positions.begin();
-         it != commit_position.object_positions.end(); ++it) {
-      const ObjectPosition &object_position = *it;
-      m_commit_tids[object_position.tag_tid] = object_position.entry_tid;
+    m_commit_object = commit_position.object_positions.front().object_number;
+    m_splay_offset = m_commit_object % splay_width;
+    for (auto &position : commit_position.object_positions) {
+      uint8_t splay_offset = position.object_number % splay_width;
+      m_commit_positions[splay_offset] = position;
     }
   }
-  */
 }
 
 JournalPlayer::~JournalPlayer() {
@@ -92,22 +88,38 @@ void JournalPlayer::prefetch() {
   m_state = STATE_PREFETCH;
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
-  for (uint8_t splay_index = 0; splay_index < splay_width; ++splay_index) {
-    m_prefetch_splay_offsets.insert(splay_index);
+  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    m_prefetch_splay_offsets.insert(splay_offset);
+  }
+
+  // compute active object for each splay offset (might be before
+  // active set)
+  std::map<uint8_t, uint64_t> splay_offset_to_objects;
+  for (auto &position : m_commit_positions) {
+    assert(splay_offset_to_objects.count(position.first) == 0);
+    splay_offset_to_objects[position.first] = position.second.object_number;
   }
 
-  uint64_t object_set = m_commit_object / splay_width;
+  // prefetch the active object for each splay offset (and the following object)
   uint64_t active_set = m_journal_metadata->get_active_set();
+  uint64_t max_object_number = (splay_width * (active_set + 1)) - 1;
+  std::set<uint64_t> prefetch_object_numbers;
+  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    uint64_t object_number = splay_offset;
+    if (splay_offset_to_objects.count(splay_offset) != 0) {
+      object_number = splay_offset_to_objects[splay_offset];
+    }
 
-  uint32_t object_count = splay_width *
-                          std::min<uint64_t>(2, active_set - object_set + 1);
-  ldout(m_cct, 10) << __func__ << ": prefetching " << object_count << " "
-                   << "objects" << dendl;
+    prefetch_object_numbers.insert(object_number);
+    if (object_number + splay_width <= max_object_number) {
+      prefetch_object_numbers.insert(object_number + splay_width);
+    }
+  }
 
-  // prefetch starting from the last known commit set
-  uint64_t start_object = object_set * splay_width;
-  for (uint64_t object_number = start_object;
-       object_number < start_object + object_count; ++object_number) {
+  ldout(m_cct, 10) << __func__ << ": prefetching "
+                   << prefetch_object_numbers.size() << " " << "objects"
+                   << dendl;
+  for (auto object_number : prefetch_object_numbers) {
     fetch(object_number);
   }
 }
@@ -248,15 +260,25 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   // prefetch in-order since a newer splay object could prefetch first
   while (!object_players.begin()->second->is_fetch_in_progress()) {
     ObjectPlayerPtr object_player = object_players.begin()->second;
+    uint64_t player_object_number = object_player->get_object_number();
 
     // skip past known committed records
-    if (!m_commit_tids.empty() && !object_player->empty()) {
-      ldout(m_cct, 15) << "seeking known commit position in "
+    if (m_commit_positions.count(splay_offset) != 0 &&
+        !object_player->empty()) {
+      ObjectPosition &position = m_commit_positions[splay_offset];
+
+      ldout(m_cct, 15) << "seeking known commit position " << position << " in "
                        << object_player->get_oid() << dendl;
+
+      bool found_commit = false;
       Entry entry;
-      while (!m_commit_tids.empty() && !object_player->empty()) {
+      while (!object_player->empty()) {
         object_player->front(&entry);
-        if (entry.get_entry_tid() > m_commit_tids[entry.get_tag_tid()]) {
+
+        if (entry.get_tag_tid() == position.tag_tid &&
+            entry.get_entry_tid() == position.entry_tid) {
+          found_commit = true;
+        } else if (found_commit) {
           ldout(m_cct, 10) << "located next uncommitted entry: " << entry
                            << dendl;
           break;
@@ -270,18 +292,23 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
 
       // if this object contains the commit position, our read should start with
       // the next consistent journal entry in the sequence
-      if (!m_commit_tids.empty() &&
-          object_player->get_object_number() == m_commit_object) {
+      if (player_object_number == m_commit_object) {
         if (object_player->empty()) {
           advance_splay_object();
         } else {
           Entry entry;
           object_player->front(&entry);
-          if (entry.get_tag_tid() == m_commit_tag_tid) {
+          if (entry.get_tag_tid() == position.tag_tid) {
             advance_splay_object();
           }
         }
       }
+
+      // do not search for commit position for this object
+      // if we've already seen it
+      if (found_commit) {
+        m_commit_positions.erase(splay_offset);
+      }
     }
 
     // if the object is empty, pre-fetch the next splay object
index 54b4a93f2a11d499c1c84e95f743f2077ed634d7..a07728ec0a2106696a2ae068ed509a71f2a082f6 100644 (file)
@@ -40,9 +40,9 @@ public:
 
 private:
   typedef std::set<uint8_t> PrefetchSplayOffsets;
-  typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
   typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
   typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
+  typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions;
 
   enum State {
     STATE_INIT,
@@ -96,8 +96,7 @@ private:
   PrefetchSplayOffsets m_prefetch_splay_offsets;
   SplayedObjectPlayers m_object_players;
   uint64_t m_commit_object;
-  uint64_t m_commit_tag_tid;
-  AllocatedEntryTids m_commit_tids;
+  SplayedObjectPositions m_commit_positions;
 
   void advance_splay_object();
 
index beafc0a692227bba0896c7d0372fa807aa9eeee6..7612b4982f5611a3ef2fc02ac8f985f88fb14553 100644 (file)
@@ -185,7 +185,8 @@ TEST_F(TestJournalPlayer, PrefetchSkip) {
 
   journal::JournalPlayer::ObjectPositions positions;
   positions = {
-    cls::journal::ObjectPosition(0, 234, 125) };
+    cls::journal::ObjectPosition(0, 234, 125),
+    cls::journal::ObjectPosition(1, 234, 124) };
   cls::journal::ObjectSetPosition commit_position(positions);
 
   ASSERT_EQ(0, create(oid));