]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: support replay passed skipped splay objects 6687/head
authorJason Dillaman <dillaman@redhat.com>
Mon, 23 Nov 2015 22:46:55 +0000 (17:46 -0500)
committerJason Dillaman <dillaman@redhat.com>
Mon, 23 Nov 2015 22:46:55 +0000 (17:46 -0500)
It's possible for a splay object within a set to be skipped
if the set is closed due to a full object within the set.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h

index 2f971588abc7e66602321da341f99fddec1c7287..1fd674ad93ee14243cb2b8644c5a9e317d232aed 100644 (file)
@@ -53,9 +53,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
                              ReplayHandler *replay_handler)
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
-    m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
-    m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
-    m_watch_interval(0), m_commit_object(0) {
+    m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
+    m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0),
+    m_commit_object(0) {
   m_replay_handler->get();
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -83,25 +83,29 @@ JournalPlayer::~JournalPlayer() {
 }
 
 void JournalPlayer::prefetch() {
-  m_lock.Lock();
+  Mutex::Locker locker(m_lock);
   assert(m_state == STATE_INIT);
   m_state = STATE_PREFETCH;
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
-  ldout(m_cct, 10) << __func__ << ": prefetching " << (2 * splay_width) << " "
+  for (uint8_t splay_index = 0; splay_index < splay_width; ++splay_index) {
+    m_prefetch_splay_offsets.insert(splay_index);
+  }
+
+  uint64_t object_set = m_commit_object / splay_width;
+  uint64_t active_set = m_journal_metadata->get_active_set();
+
+  uint32_t object_count = splay_width *
+                          std::min<uint64_t>(2, active_set - object_set + 1);
+  ldout(m_cct, 10) << __func__ << ": prefetching " << object_count << " "
                    << "objects" << dendl;
 
   // prefetch starting from the last known commit set
-  C_PrefetchBatch *ctx = new C_PrefetchBatch(this);
-  uint64_t start_object = (m_commit_object / splay_width) * splay_width;
+  uint64_t start_object = object_set * splay_width;
   for (uint64_t object_number = start_object;
-       object_number < start_object + (2 * splay_width); ++object_number) {
-    ctx->add_fetch();
-    fetch(object_number, ctx);
+       object_number < start_object + object_count; ++object_number) {
+    fetch(object_number);
   }
-  m_lock.Unlock();
-
-  ctx->complete(0);
 }
 
 void JournalPlayer::prefetch_and_watch(double interval) {
@@ -126,6 +130,7 @@ void JournalPlayer::unwatch() {
 }
 
 bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
+  ldout(m_cct, 20) << __func__ << dendl;
   Mutex::Locker locker(m_lock);
   if (m_state != STATE_PLAYBACK) {
     return false;
@@ -136,7 +141,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
 
   if (object_player->empty()) {
     if (m_watch_enabled && !m_watch_scheduled) {
-      object_player->watch(&m_process_state, m_watch_interval);
+      object_player->watch(
+        new C_Watch(this, object_player->get_object_number()),
+        m_watch_interval);
       m_watch_scheduled = true;
     } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
       m_journal_metadata->get_finisher().queue(new C_HandleComplete(
@@ -171,11 +178,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     }
   } else {
     advance_splay_object();
-
-    ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
-    if (!next_set_object_player->empty()) {
-      remove_object_player(object_player, &m_process_state);
-    }
+    remove_empty_object_player(object_player);
   }
 
   m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
@@ -184,14 +187,15 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
   return true;
 }
 
-void JournalPlayer::process_state(int r) {
-  ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+void JournalPlayer::process_state(uint64_t object_number, int r) {
+  ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
+                   << "r=" << r << dendl;
   if (r >= 0) {
     Mutex::Locker locker(m_lock);
     switch (m_state) {
     case STATE_PREFETCH:
       ldout(m_cct, 10) << "PREFETCH" << dendl;
-      r = process_prefetch();
+      r = process_prefetch(object_number);
       break;
     case STATE_PLAYBACK:
       ldout(m_cct, 10) << "PLAYBACK" << dendl;
@@ -216,58 +220,77 @@ void JournalPlayer::process_state(int r) {
   }
 }
 
-int JournalPlayer::process_prefetch() {
+int JournalPlayer::process_prefetch(uint64_t object_number) {
   ldout(m_cct, 10) << __func__ << dendl;
   assert(m_lock.is_locked());
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
-  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
-    assert(m_object_players.count(splay_offset) == 1);
+  uint8_t splay_offset = object_number % splay_width;
 
-    ObjectPlayers &object_players = m_object_players[splay_offset];
-    assert(object_players.size() == 2);
+  PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find(
+    splay_offset);
+  if (it == m_prefetch_splay_offsets.end()) {
+    return 0;
+  }
 
-    ObjectPlayerPtr object_player = object_players.begin()->second;
-    assert(!object_player->is_fetch_in_progress());
-
-    ldout(m_cct, 15) << "seeking known commit position in "
-                     << object_player->get_oid() << dendl;
-    Entry entry;
-    while (!m_commit_tids.empty() && !object_player->empty()) {
-      object_player->front(&entry);
-      if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
-        ldout(m_cct, 10) << "located next uncommitted entry: " << entry
-                         << dendl;
-        break;
-      }
+  bool prefetch_complete = false;
+  assert(m_object_players.count(splay_offset) == 1);
+  ObjectPlayers &object_players = m_object_players[splay_offset];
 
-      ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
-      m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid());
-      object_player->pop_front();
-    }
+  // prefetch in-order since a newer splay object could prefetch first
+  while (!object_players.begin()->second->is_fetch_in_progress()) {
+    ObjectPlayerPtr object_player = object_players.begin()->second;
 
-    // if this object contains the commit position, our read should start with
-    // the next consistent journal entry in the sequence
-    if (!m_commit_tids.empty() &&
-        object_player->get_object_number() == m_commit_object) {
-      if (object_player->empty()) {
-        advance_splay_object();
-      } else {
-        Entry entry;
+    // skip past known committed records
+    if (!m_commit_tids.empty() && !object_player->empty()) {
+      ldout(m_cct, 15) << "seeking known commit position in "
+                       << object_player->get_oid() << dendl;
+      Entry entry;
+      while (!m_commit_tids.empty() && !object_player->empty()) {
         object_player->front(&entry);
-        if (entry.get_tag() == m_commit_tag) {
+        if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
+          ldout(m_cct, 10) << "located next uncommitted entry: " << entry
+                           << dendl;
+          break;
+        }
+
+        ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
+        m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid());
+        object_player->pop_front();
+      }
+
+      // if this object contains the commit position, our read should start with
+      // the next consistent journal entry in the sequence
+      if (!m_commit_tids.empty() &&
+          object_player->get_object_number() == m_commit_object) {
+        if (object_player->empty()) {
           advance_splay_object();
+        } else {
+          Entry entry;
+          object_player->front(&entry);
+          if (entry.get_tag() == m_commit_tag) {
+            advance_splay_object();
+          }
         }
       }
     }
 
-    ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
-    if (object_player->empty() && !next_set_object_player->empty()) {
-      ldout(m_cct, 15) << object_player->get_oid() << " empty" << dendl;
-      remove_object_player(object_player, &m_process_state);
+    // if the object is empty, pre-fetch the next splay object
+    if (!remove_empty_object_player(object_player)) {
+      prefetch_complete = true;
+      break;
     }
   }
 
+  if (!prefetch_complete) {
+    return 0;
+  }
+
+  m_prefetch_splay_offsets.erase(it);
+  if (!m_prefetch_splay_offsets.empty()) {
+    return 0;
+  }
+
   m_state = STATE_PLAYBACK;
   ObjectPlayerPtr object_player = get_object_player();
   if (!object_player->empty()) {
@@ -275,7 +298,9 @@ int JournalPlayer::process_prefetch() {
     m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
       m_replay_handler), 0);
   } else if (m_watch_enabled) {
-    object_player->watch(&m_process_state, m_watch_interval);
+    object_player->watch(
+      new C_Watch(this, object_player->get_object_number()),
+      m_watch_interval);
     m_watch_scheduled = true;
   } else {
     ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
@@ -304,14 +329,11 @@ int JournalPlayer::process_playback() {
 const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
   assert(m_lock.is_locked());
 
-  assert(m_object_players.count(m_splay_offset) == 1);
   SplayedObjectPlayers::const_iterator it = m_object_players.find(
     m_splay_offset);
   assert(it != m_object_players.end());
 
-  const ObjectPlayers &object_players = it->second;
-  assert(object_players.size() == 2);
-  return object_players;
+  return it->second;
 }
 
 ObjectPlayerPtr JournalPlayer::get_object_player() const {
@@ -336,27 +358,37 @@ void JournalPlayer::advance_splay_object() {
                    << static_cast<uint32_t>(m_splay_offset) << dendl;
 }
 
-void JournalPlayer::remove_object_player(const ObjectPlayerPtr &object_player,
-                                         Context *on_fetch) {
+bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
   assert(m_lock.is_locked());
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint64_t object_set = player->get_object_number() / splay_width;
+  uint64_t active_set = m_journal_metadata->get_active_set();
+  if (!player->empty() || object_set == active_set) {
+    return false;
+  }
+
+  ldout(m_cct, 15) << player->get_oid() << " empty" << dendl;
   ObjectPlayers &object_players = m_object_players[
-    object_player->get_object_number() % splay_width];
+    player->get_object_number() % splay_width];
   assert(!object_players.empty());
-  assert(object_players.begin()->second == object_player);
-  object_players.erase(object_players.begin());
 
-  fetch(object_player->get_object_number() + (2 * splay_width), on_fetch);
+  uint64_t next_object_num = object_players.rbegin()->first + splay_width;
+  uint64_t next_object_set = next_object_num / splay_width;
+  if (next_object_set <= active_set) {
+    fetch(next_object_num);
+  }
+  object_players.erase(player->get_object_number());
+  return true;
 }
 
-void JournalPlayer::fetch(uint64_t object_num, Context *ctx) {
+void JournalPlayer::fetch(uint64_t object_num) {
   assert(m_lock.is_locked());
 
   std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
 
   ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
-  C_Fetch *fetch_ctx = new C_Fetch(this, object_num, ctx);
+  C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
   ObjectPlayerPtr object_player(new ObjectPlayer(
     m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
     m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order()));
@@ -366,42 +398,33 @@ void JournalPlayer::fetch(uint64_t object_num, Context *ctx) {
   object_player->fetch(fetch_ctx);
 }
 
-int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
-  std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
-
-  ldout(m_cct, 10) << __func__ << ": fetched "
+void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
+  ldout(m_cct, 10) << __func__ << ": "
                    << utils::get_object_name(m_object_oid_prefix, object_num)
                    << ": r=" << r << dendl;
-  if (r < 0 && r != -ENOENT) {
-    return r;
+  if (r == -ENOENT) {
+    r = 0;
   }
-  return 0;
-}
+  if (r == 0) {
+    Mutex::Locker locker(m_lock);
+    uint8_t splay_width = m_journal_metadata->get_splay_width();
+    uint8_t splay_offset = object_num % splay_width;
+    assert(m_object_players.count(splay_offset) == 1);
+    ObjectPlayers &object_players = m_object_players[splay_offset];
 
-JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
-  : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
-    return_value(0) {
-  player->m_async_op_tracker.start_op();
-}
+    assert(object_players.count(object_num) == 1);
+    ObjectPlayerPtr object_player = object_players[object_num];
+    remove_empty_object_player(object_player);
+  }
 
-void JournalPlayer::C_PrefetchBatch::add_fetch() {
-  Mutex::Locker locker(lock);
-  ++refs;
+  process_state(object_num, r);
 }
 
-void JournalPlayer::C_PrefetchBatch::complete(int r) {
-  {
-    Mutex::Locker locker(lock);
-    if (r < 0 && return_value == 0) {
-      return_value = r;
-    }
-    --refs;
-  }
-
-  if (refs == 0) {
-    player->process_state(return_value);
-    delete this;
-  }
+void JournalPlayer::handle_watch(uint64_t object_num, int r) {
+  ldout(m_cct, 10) << __func__ << ": "
+                   << utils::get_object_name(m_object_oid_prefix, object_num)
+                   << ": r=" << r << dendl;
+  process_state(object_num, r);
 }
 
 } // namespace journal
index 7d4855960d71859a5ff5931e825aefee719dd008..93347ea3f22edcdea14fd040ae4f1341ec67a9b7 100644 (file)
@@ -39,6 +39,7 @@ public:
   bool try_pop_front(Entry *entry, uint64_t *commit_tid);
 
 private:
+  typedef std::set<uint8_t> PrefetchSplayOffsets;
   typedef std::map<std::string, uint64_t> AllocatedTids;
   typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
   typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
@@ -50,44 +51,28 @@ private:
     STATE_ERROR
   };
 
-  struct C_ProcessState : public Context {
+  struct C_Watch : public Context {
     JournalPlayer *player;
-    C_ProcessState(JournalPlayer *p) : player(p) {}
-    virtual void complete(int r) {
-      player->process_state(r);
-    }
-    virtual void finish(int r) {}
-  };
-
-  struct C_PrefetchBatch : public Context {
-    JournalPlayer *player;
-    Mutex lock;
-    uint32_t refs;
-    int return_value;
+    uint64_t object_num;
 
-    C_PrefetchBatch(JournalPlayer *p);
-    virtual ~C_PrefetchBatch() {
-      player->m_async_op_tracker.finish_op();
+    C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
+    }
+    virtual void finish(int r) {
+      player->handle_watch(object_num, r);
     }
-    void add_fetch();
-    virtual void complete(int r);
-    virtual void finish(int r) {}
   };
 
   struct C_Fetch : public Context {
     JournalPlayer *player;
     uint64_t object_num;
-    Context *on_fetch;
-    C_Fetch(JournalPlayer *p, uint64_t o, Context *c)
-      : player(p), object_num(o), on_fetch(c) {
+    C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
       player->m_async_op_tracker.start_op();
     }
     virtual ~C_Fetch() {
       player->m_async_op_tracker.finish_op();
     }
     virtual void finish(int r) {
-      r = player->handle_fetched(r, object_num);
-      on_fetch->complete(r);
+      player->handle_fetched(object_num, r);
     }
   };
 
@@ -98,8 +83,6 @@ private:
 
   ReplayHandler *m_replay_handler;
 
-  C_ProcessState m_process_state;
-
   AsyncOpTracker m_async_op_tracker;
 
   mutable Mutex m_lock;
@@ -110,6 +93,7 @@ private:
   bool m_watch_scheduled;
   double m_watch_interval;
 
+  PrefetchSplayOffsets m_prefetch_splay_offsets;
   SplayedObjectPlayers m_object_players;
   uint64_t m_commit_object;
   std::string m_commit_tag;
@@ -120,15 +104,15 @@ private:
   const ObjectPlayers &get_object_players() const;
   ObjectPlayerPtr get_object_player() const;
   ObjectPlayerPtr get_next_set_object_player() const;
-  void remove_object_player(const ObjectPlayerPtr &object_player,
-                            Context *on_fetch);
+  bool remove_empty_object_player(const ObjectPlayerPtr &object_player);
 
-  void process_state(int r);
-  int process_prefetch();
+  void process_state(uint64_t object_number, int r);
+  int process_prefetch(uint64_t object_number);
   int process_playback();
 
-  void fetch(uint64_t object_num, Context *ctx);
-  int handle_fetched(int r, uint64_t object_num);
+  void fetch(uint64_t object_num);
+  void handle_fetched(uint64_t object_num, int r);
+  void handle_watch(uint64_t object_num, int r);
 };
 
 } // namespace journal