]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
journal: skip partially complete tag entries during playback
authorJason Dillaman <dillaman@redhat.com>
Sun, 15 May 2016 13:52:41 +0000 (09:52 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 18 May 2016 15:02:29 +0000 (11:02 -0400)
If a journal client does not fully write out its buffered entries
before quiting, replay should skip over all remaining out-of-
sequence entries for the tag.

Fixes: http://tracker.ceph.com/issues/15864
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/test/journal/test_JournalPlayer.cc

index d0d62694a9af38fed5169969eb4c84437ac4f4d2..cb728edea7b8b496b0b7c34d743660272a3769c4 100644 (file)
@@ -83,6 +83,7 @@ JournalPlayer::~JournalPlayer() {
   {
     Mutex::Locker locker(m_lock);
     assert(m_fetch_object_numbers.empty());
+    assert(!m_watch_scheduled);
   }
   m_replay_handler->put();
 }
@@ -134,6 +135,7 @@ void JournalPlayer::prefetch_and_watch(double interval) {
     Mutex::Locker locker(m_lock);
     m_watch_enabled = true;
     m_watch_interval = interval;
+    m_watch_step = WATCH_STEP_FETCH_CURRENT;
   }
   prefetch();
 }
@@ -159,17 +161,16 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     return false;
   }
 
-  if (!is_object_set_ready()) {
-    m_handler_notified = false;
-    return false;
-  }
-
   if (!verify_playback_ready()) {
-    if (!m_watch_enabled) {
-      notify_complete(0);
-    } else if (!m_watch_scheduled) {
+    if (!is_object_set_ready()) {
       m_handler_notified = false;
-      schedule_watch();
+    } else {
+      if (!m_watch_enabled) {
+        notify_complete(0);
+      } else if (!m_watch_scheduled) {
+        m_handler_notified = false;
+        schedule_watch();
+      }
     }
     return false;
   }
@@ -182,7 +183,10 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
 
   uint64_t last_entry_tid;
   if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
-    lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
+    lderr(m_cct) << "unexpected tag in journal entry: "
+                 << "entry=" << *entry << ", "
+                 << "tag_tid=" << (m_active_tag_tid ? *m_active_tag_tid : -1)
+                 << dendl;
 
     m_state = STATE_ERROR;
     notify_complete(-ENOMSG);
@@ -329,16 +333,16 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   }
 
   m_state = STATE_PLAYBACK;
-  if (!is_object_set_ready()) {
-    ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
-  } else if (verify_playback_ready()) {
+  if (verify_playback_ready()) {
     notify_entries_available();
-  } else if (m_watch_enabled) {
-    schedule_watch();
-  } else {
-    ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
-                     << dendl;
-    notify_complete(0);
+  } else if (is_object_set_ready()) {
+    if (m_watch_enabled) {
+      schedule_watch();
+    } else {
+      ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
+                       << dendl;
+      notify_complete(0);
+    }
   }
   return 0;
 }
@@ -347,21 +351,19 @@ int JournalPlayer::process_playback(uint64_t object_number) {
   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
   assert(m_lock.is_locked());
 
-  if (!is_object_set_ready()) {
-    return 0;
-  }
-
   ObjectPlayerPtr object_player = get_object_player();
   if (verify_playback_ready()) {
     notify_entries_available();
-  } else if (m_watch_enabled) {
-    schedule_watch();
-  } else {
-    uint8_t splay_width = m_journal_metadata->get_splay_width();
-    uint64_t active_set = m_journal_metadata->get_active_set();
-    uint64_t object_set = object_player->get_object_number() / splay_width;
-    if (object_set == active_set) {
-      notify_complete(0);
+  } else if (is_object_set_ready()) {
+    if (m_watch_enabled) {
+      schedule_watch();
+    } else {
+      uint8_t splay_width = m_journal_metadata->get_splay_width();
+      uint64_t active_set = m_journal_metadata->get_active_set();
+      uint64_t object_set = object_player->get_object_number() / splay_width;
+      if (object_set == active_set) {
+        notify_complete(0);
+      }
     }
   }
   return 0;
@@ -377,7 +379,11 @@ bool JournalPlayer::is_object_set_ready() const {
 
 bool JournalPlayer::verify_playback_ready() {
   assert(m_lock.is_locked());
-  assert(is_object_set_ready());
+
+  if (!is_object_set_ready()) {
+    ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+    return false;
+  }
 
   ObjectPlayerPtr object_player = get_object_player();
   assert(object_player);
@@ -388,10 +394,36 @@ bool JournalPlayer::verify_playback_ready() {
   bool entry_available = false;
   if (!object_player->empty()) {
     entry_available = true;
+    m_watch_prune_active_tag = false;
     object_player->front(&entry);
     if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
       return true;
+    } else if (m_active_tag_tid && entry.get_tag_tid() < m_active_tag_tid) {
+      // new tag is registered as primary commit position -- other object
+      // commit positions might still be associated with old tags
+      ldout(m_cct, 10) << __func__ << ": detected stale entry: " << entry
+                       << dendl;
+      return prune_tag(entry.get_tag_tid());
+    } else if (m_active_tag_tid && entry.get_tag_tid() > *m_active_tag_tid) {
+      // new tag at current playback position -- implies that previous
+      // tag ended abruptly without flushing out all records
+      // search for the start record for the next tag
+      ldout(m_cct, 10) << __func__ << ": new tag detected at current playback "
+                       << "position: active_tag=" << *m_active_tag_tid << ", "
+                       << "new_tag=" << entry.get_tag_tid() << dendl;
+      return prune_active_tag();
     }
+  } else if (!m_watch_enabled && m_active_tag_tid) {
+    // current playback position is empty so this tag is done
+    ldout(m_cct, 10) << __func__ << ": no more in-sequence entries for tag "
+                     << *m_active_tag_tid << dendl;
+    return prune_active_tag();
+  } else if (m_watch_enabled && m_active_tag_tid && m_watch_prune_active_tag) {
+    // detected current tag is now longer active and we have re-read the current
+    // object but it's still empty, so this tag is done
+    ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries "
+                     << "for tag " << *m_active_tag_tid << dendl;
+    return prune_active_tag();
   }
 
   // NOTE: replay currently does not check tag class to playback multiple tags
@@ -417,6 +449,55 @@ bool JournalPlayer::verify_playback_ready() {
   return entry_available;
 }
 
+bool JournalPlayer::prune_tag(uint64_t tag_tid) {
+  assert(m_lock.is_locked());
+  ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
+                   << tag_tid << dendl;
+
+  for (auto &players : m_object_players) {
+    for (auto player_pair : players.second) {
+      ObjectPlayerPtr object_player = player_pair.second;
+      ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
+                       << dendl;
+      while (!object_player->empty()) {
+        Entry entry;
+        object_player->front(&entry);
+        if (entry.get_tag_tid() == tag_tid) {
+          ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl;
+          object_player->pop_front();
+        } else {
+          break;
+        }
+      }
+    }
+
+    // trim any empty players to prefetch the next available object
+    ObjectPlayers object_players(players.second);
+    for (auto player_pair : object_players) {
+      remove_empty_object_player(player_pair.second);
+    }
+  }
+
+  // if we removed an empty object, a fetch will be in-flight
+  if (!is_object_set_ready()) {
+    return false;
+  }
+
+  // search for the start record for the next tag
+  return verify_playback_ready();
+}
+
+bool JournalPlayer::prune_active_tag() {
+  assert(m_lock.is_locked());
+  assert(m_active_tag_tid);
+
+  uint64_t tag_tid = *m_active_tag_tid;
+  m_active_tag_tid = boost::none;
+  m_splay_offset = 0;
+
+  return prune_tag(tag_tid);
+}
+
 const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
   assert(m_lock.is_locked());
 
@@ -434,6 +515,20 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const {
   return object_players.begin()->second;
 }
 
+ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
+  assert(m_lock.is_locked());
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint8_t splay_offset = object_number % splay_width;
+  auto splay_it = m_object_players.find(splay_offset);
+  assert(splay_it != m_object_players.end());
+
+  const ObjectPlayers &object_players = splay_it->second;
+  auto player_it = object_players.find(object_number);
+  assert(player_it != object_players.end());
+  return player_it->second;
+}
+
 ObjectPlayerPtr JournalPlayer::get_next_set_object_player() const {
   assert(m_lock.is_locked());
 
@@ -445,6 +540,7 @@ void JournalPlayer::advance_splay_object() {
   assert(m_lock.is_locked());
   ++m_splay_offset;
   m_splay_offset %= m_journal_metadata->get_splay_width();
+  m_watch_step = WATCH_STEP_FETCH_CURRENT;
   ldout(m_cct, 20) << __func__ << ": new offset "
                    << static_cast<uint32_t>(m_splay_offset) << dendl;
 }
@@ -513,13 +609,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
     r = 0;
   }
   if (r == 0) {
-    uint8_t splay_width = m_journal_metadata->get_splay_width();
-    uint8_t splay_offset = object_num % splay_width;
-    assert(m_object_players.count(splay_offset) == 1);
-    ObjectPlayers &object_players = m_object_players[splay_offset];
-
-    assert(object_players.count(object_num) == 1);
-    ObjectPlayerPtr object_player = object_players[object_num];
+    ObjectPlayerPtr object_player = get_object_player(object_num);
     remove_empty_object_player(object_player);
   }
   process_state(object_num, r);
@@ -532,25 +622,39 @@ void JournalPlayer::schedule_watch() {
     return;
   }
 
-  // poll first splay offset and active splay offset since
-  // new records should only appear in those two objects
-  C_Watch *ctx = new C_Watch(this);
-
-  ObjectPlayerPtr object_player = get_object_player();
-  uint8_t splay_width = m_journal_metadata->get_splay_width();
-  if (object_player->get_object_number() % splay_width != 0) {
-    ++ctx->pending_fetches;
+  m_watch_scheduled = true;
 
-    ObjectPlayerPtr first_object_player =
-      m_object_players.begin()->second.begin()->second;
-    first_object_player->watch(ctx, m_watch_interval);
+  if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
+    // detect if a new tag has been created in case we are blocked
+    // by an incomplete tag sequence
+    ldout(m_cct, 20) << __func__ << ": asserting active tag" << dendl;
+    assert(m_active_tag_tid);
+    FunctionContext *ctx = new FunctionContext([this](int r) {
+        handle_watch_assert_active(r);
+      });
+    m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx);
+    return;
   }
 
+  ObjectPlayerPtr object_player = get_object_player();
+  switch (m_watch_step) {
+  case WATCH_STEP_FETCH_CURRENT:
+    object_player = get_object_player();
+    break;
+  case WATCH_STEP_FETCH_FIRST:
+    object_player = m_object_players.begin()->second.begin()->second;
+    break;
+  default:
+    assert(false);
+  }
+
+  ldout(m_cct, 20) << __func__ << ": scheduling watch on "
+                   << object_player->get_oid() << dendl;
+  C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
   object_player->watch(ctx, m_watch_interval);
-  m_watch_scheduled = true;
 }
 
-void JournalPlayer::handle_watch(int r) {
+void JournalPlayer::handle_watch(uint64_t object_num, int r) {
   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
   if (r == -ECANCELED) {
     // unwatch of object player(s)
@@ -558,17 +662,49 @@ void JournalPlayer::handle_watch(int r) {
   }
 
   Mutex::Locker locker(m_lock);
+  assert(m_watch_scheduled);
   m_watch_scheduled = false;
 
-  std::set<uint64_t> object_numbers;
-  for (auto &players : m_object_players) {
-    object_numbers.insert(
-      players.second.begin()->second->get_object_number());
+  ObjectPlayerPtr object_player = get_object_player(object_num);
+  if (r == 0) {
+    if (object_player->empty() && !object_player->refetch_required()) {
+      // already re-read object after trying to remove it before ... it's
+      // still empty so it's safe to remove
+      remove_empty_object_player(object_player);
+    }
+  }
+
+  // determine what object to query on next watch schedule tick
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  if (m_watch_step == WATCH_STEP_FETCH_CURRENT &&
+      object_player->get_object_number() % splay_width != 0) {
+    m_watch_step = WATCH_STEP_FETCH_FIRST;
+  } else if (m_active_tag_tid) {
+    m_watch_step = WATCH_STEP_ASSERT_ACTIVE;
+  } else {
+    m_watch_step = WATCH_STEP_FETCH_CURRENT;
   }
 
-  for (auto object_num : object_numbers) {
-    process_state(object_num, r);
+  process_state(object_num, r);
+}
+
+void JournalPlayer::handle_watch_assert_active(int r) {
+  ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_watch_scheduled);
+  m_watch_scheduled = false;
+
+  if (m_active_tag_tid && r == -ESTALE) {
+    // newer tag exists -- since we are at this step in the watch sequence,
+    // we know we can prune the active tag if watch fails again
+    ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "
+                     << "no longer active" << dendl;
+    m_watch_prune_active_tag = true;
   }
+
+  m_watch_step = WATCH_STEP_FETCH_CURRENT;
+  schedule_watch();
 }
 
 void JournalPlayer::notify_entries_available() {
index 85a8b28da25f52d04449c0154265a90dc4c05e2d..d82edb1261e11cff0ae5d514f310d12302b30636 100644 (file)
@@ -54,6 +54,12 @@ private:
     STATE_ERROR
   };
 
+  enum WatchStep {
+    WATCH_STEP_FETCH_CURRENT,
+    WATCH_STEP_FETCH_FIRST,
+    WATCH_STEP_ASSERT_ACTIVE
+  };
+
   struct C_Fetch : public Context {
     JournalPlayer *player;
     uint64_t object_num;
@@ -70,31 +76,13 @@ private:
 
   struct C_Watch : public Context {
     JournalPlayer *player;
-    Mutex lock;
-    uint8_t pending_fetches = 1;
-    int ret_val = 0;
-
-    C_Watch(JournalPlayer *player)
-      : player(player), lock("JournalPlayer::C_Watch::lock") {
-    }
-
-    virtual void complete(int r) override {
-      lock.Lock();
-      if (ret_val == 0 && r < 0) {
-        ret_val = r;
-      }
-
-      assert(pending_fetches > 0);
-      if (--pending_fetches == 0) {
-        lock.Unlock();
-        Context::complete(ret_val);
-      } else {
-        lock.Unlock();
-      }
+    uint64_t object_num;
+    C_Watch(JournalPlayer *player, uint64_t object_num)
+      : player(player), object_num(object_num) {
     }
 
     virtual void finish(int r) override {
-      player->handle_watch(r);
+      player->handle_watch(object_num, r);
     }
   };
 
@@ -114,6 +102,8 @@ private:
   bool m_watch_enabled;
   bool m_watch_scheduled;
   double m_watch_interval;
+  WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
+  bool m_watch_prune_active_tag = false;
 
   bool m_handler_notified = false;
 
@@ -129,8 +119,12 @@ private:
 
   bool is_object_set_ready() const;
   bool verify_playback_ready();
+  bool prune_tag(uint64_t tag_tid);
+  bool prune_active_tag();
+
   const ObjectPlayers &get_object_players() const;
   ObjectPlayerPtr get_object_player() const;
+  ObjectPlayerPtr get_object_player(uint64_t object_number) const;
   ObjectPlayerPtr get_next_set_object_player() const;
   bool remove_empty_object_player(const ObjectPlayerPtr &object_player);
 
@@ -142,7 +136,8 @@ private:
   void handle_fetched(uint64_t object_num, int r);
 
   void schedule_watch();
-  void handle_watch(int r);
+  void handle_watch(uint64_t object_num, int r);
+  void handle_watch_assert_active(int r);
 
   void notify_entries_available();
   void notify_complete(int r);
index 54e555fc37d6812ae16541bcd9ee065bcac01904..ed3e241630de89074c38df0de5045779d5318145 100644 (file)
@@ -298,6 +298,147 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
   ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
 }
 
+TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid, 14, 4));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, metadata->set_active_set(1));
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 860));
+  ASSERT_EQ(0, write_entry(oid, 1, 2, 853));
+  ASSERT_EQ(0, write_entry(oid, 1, 2, 857));
+  ASSERT_EQ(0, write_entry(oid, 5, 2, 861));
+  ASSERT_EQ(0, write_entry(oid, 2, 2, 854));
+  ASSERT_EQ(0, write_entry(oid, 0, 3, 0));
+  ASSERT_EQ(0, write_entry(oid, 5, 3, 1));
+  ASSERT_EQ(0, write_entry(oid, 2, 3, 2));
+  ASSERT_EQ(0, write_entry(oid, 3, 3, 3));
+
+  player->prefetch();
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 7, &entries));
+
+  Entries expected_entries = {
+    create_entry(2, 852),
+    create_entry(2, 853),
+    create_entry(2, 854),
+    create_entry(3, 0),
+    create_entry(3, 1),
+    create_entry(3, 2),
+    create_entry(3, 3)};
+  ASSERT_EQ(expected_entries, entries);
+
+  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_EQ(0, m_replay_hander.complete_result);
+}
+
+TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, metadata->set_active_set(2));
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
+  ASSERT_EQ(0, write_entry(oid, 4, 1, 0));
+
+  player->prefetch();
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  Entries expected_entries = {
+    create_entry(0, 0),
+    create_entry(0, 1),
+    create_entry(1, 0)};
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
+  ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+
+  player->prefetch();
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 4, &entries));
+
+  Entries expected_entries = {
+    create_entry(0, 0),
+    create_entry(0, 1),
+    create_entry(0, 2),
+    create_entry(1, 0)};
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, PrefetchStaleEntries) {
+  std::string oid = get_temp_oid();
+
+  journal::JournalPlayer::ObjectPositions positions = {
+    cls::journal::ObjectPosition(0, 1, 0) };
+  cls::journal::ObjectSetPosition commit_position(positions);
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+  ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, 1, 1));
+
+  player->prefetch();
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+  Entries expected_entries = {
+    create_entry(1, 1)};
+  ASSERT_EQ(expected_entries, entries);
+
+  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_EQ(0, m_replay_hander.complete_result);
+}
+
 TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
   std::string oid = get_temp_oid();
 
@@ -324,7 +465,7 @@ TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
   uint64_t commit_tid;
   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
   ASSERT_TRUE(wait_for_complete(player));
-  ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
+  ASSERT_EQ(0, m_replay_hander.complete_result);
 }
 
 TEST_F(TestJournalPlayer, PrefetchAndWatch) {
@@ -497,3 +638,167 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
     create_entry(0, 5)};
   ASSERT_EQ(expected_entries, entries);
 }
+
+TEST_F(TestJournalPlayer, LiveReplayMissingSequence) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid, 14, 4));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 860));
+  ASSERT_EQ(0, write_entry(oid, 1, 2, 853));
+  ASSERT_EQ(0, write_entry(oid, 1, 2, 857));
+  ASSERT_EQ(0, write_entry(oid, 2, 2, 854));
+  ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+  player->prefetch_and_watch(0.25);
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  Entries expected_entries = {
+    create_entry(2, 852),
+    create_entry(2, 853),
+    create_entry(2, 854)};
+  ASSERT_EQ(expected_entries, entries);
+
+  journal::Entry entry;
+  uint64_t commit_tid;
+  ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+  ASSERT_EQ(0, write_entry(oid, 3, 3, 3));
+  ASSERT_EQ(0, write_entry(oid, 2, 3, 2));
+  ASSERT_EQ(0, write_entry(oid, 1, 3, 1));
+  ASSERT_EQ(0, write_entry(oid, 0, 3, 0));
+  ASSERT_TRUE(wait_for_entries(player, 4, &entries));
+
+  expected_entries = {
+    create_entry(3, 0),
+    create_entry(3, 1),
+    create_entry(3, 2),
+    create_entry(3, 3)};
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, metadata->set_active_set(2));
+  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
+  ASSERT_EQ(0, write_entry(oid, 4, 1, 0));
+  player->prefetch_and_watch(0.25);
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  Entries expected_entries = {
+    create_entry(0, 0),
+    create_entry(0, 1),
+    create_entry(1, 0)};
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  C_SaferCond ctx1;
+  cls::journal::Tag tag1;
+  metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  ASSERT_EQ(0, metadata->set_active_set(0));
+  ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, tag1.tid, 1));
+  ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 2));
+  ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 4));
+  player->prefetch_and_watch(0.25);
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  Entries expected_entries = {
+    create_entry(tag1.tid, 0),
+    create_entry(tag1.tid, 1),
+    create_entry(tag1.tid, 2)};
+  ASSERT_EQ(expected_entries, entries);
+
+  journal::Entry entry;
+  uint64_t commit_tid;
+  ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+  C_SaferCond ctx2;
+  cls::journal::Tag tag2;
+  metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+
+  ASSERT_EQ(0, write_entry(oid, 0, tag2.tid, 0));
+  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+  expected_entries = {
+    create_entry(tag2.tid, 0)};
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, LiveReplayStaleEntries) {
+  std::string oid = get_temp_oid();
+
+  journal::JournalPlayer::ObjectPositions positions = {
+    cls::journal::ObjectPosition(0, 1, 0) };
+  cls::journal::ObjectSetPosition commit_position(positions);
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+  ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+  ASSERT_EQ(0, write_entry(oid, 1, 1, 1));
+  player->prefetch_and_watch(0.25);
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+  Entries expected_entries = {
+    create_entry(1, 1)};
+  ASSERT_EQ(expected_entries, entries);
+}
+