]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: support streaming entry playback
authorJason Dillaman <dillaman@redhat.com>
Mon, 18 Jul 2016 19:34:53 +0000 (15:34 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 17 Aug 2016 17:22:05 +0000 (13:22 -0400)
Now that it's possible for the ObjectPlayer to only read a
partial subset of available entries, the JournalPlayer needs
to detect that more entries might be available.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 28d5ca16cbcb445f985469413b2a8a3048ab66b7)

src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/ObjectPlayer.cc
src/journal/ObjectPlayer.h
src/test/journal/RadosTestFixture.cc
src/test/journal/RadosTestFixture.h
src/test/journal/test_JournalPlayer.cc
src/test/journal/test_ObjectPlayer.cc

index 184db8eeb668a0328eef5d93f1889ad6558ecf14..30d6cdd36cfa08e160dc048471bf466e016715fd 100644 (file)
@@ -53,8 +53,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
     m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
-    m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0),
-    m_commit_object(0) {
+    m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) {
   m_replay_handler->get();
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -69,8 +68,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
     uint8_t splay_width = m_journal_metadata->get_splay_width();
     auto &active_position = commit_position.object_positions.front();
     m_active_tag_tid = active_position.tag_tid;
-    m_commit_object = active_position.object_number;
-    m_splay_offset = m_commit_object % splay_width;
+    m_commit_position_valid = true;
+    m_commit_position = active_position;
+    m_splay_offset = active_position.object_number % splay_width;
     for (auto &position : commit_position.object_positions) {
       uint8_t splay_offset = position.object_number % splay_width;
       m_commit_positions[splay_offset] = position;
@@ -94,6 +94,7 @@ void JournalPlayer::prefetch() {
   assert(m_state == STATE_INIT);
   m_state = STATE_PREFETCH;
 
+  m_active_set = m_journal_metadata->get_active_set();
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
     m_prefetch_splay_offsets.insert(splay_offset);
@@ -177,12 +178,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     if (!is_object_set_ready()) {
       m_handler_notified = false;
     } else {
-      if (!m_watch_enabled) {
-        notify_complete(0);
-      } else if (!m_watch_scheduled) {
-        m_handler_notified = false;
-        schedule_watch();
-      }
+      refetch(true);
     }
     return false;
   }
@@ -264,10 +260,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   ObjectPlayerPtr object_player = m_object_players[splay_offset];
 
   // prefetch in-order since a newer splay object could prefetch first
-  while (m_fetch_object_numbers.count(
-           object_player->get_object_number()) == 0) {
-    uint64_t player_object_number = object_player->get_object_number();
-
+  if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) {
     // skip past known committed records
     if (m_commit_positions.count(splay_offset) != 0 &&
         !object_player->empty()) {
@@ -296,20 +289,6 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
         object_player->pop_front();
       }
 
-      // if this object contains the commit position, our read should start with
-      // the next consistent journal entry in the sequence
-      if (player_object_number == m_commit_object) {
-        if (object_player->empty()) {
-          advance_splay_object();
-        } else {
-          Entry entry;
-          object_player->front(&entry);
-          if (entry.get_tag_tid() == position.tag_tid) {
-            advance_splay_object();
-          }
-        }
-      }
-
       // do not search for commit position for this object
       // if we've already seen it
       if (found_commit) {
@@ -318,9 +297,14 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
     }
 
     // if the object is empty, pre-fetch the next splay object
-    if (!remove_empty_object_player(object_player)) {
+    if (object_player->empty() && object_player->refetch_required()) {
+      ldout(m_cct, 10) << "refetching potentially partially decoded object"
+                       << dendl;
+      object_player->clear_refetch_required();
+      fetch(object_player);
+    } else if (!remove_empty_object_player(object_player)) {
+      ldout(m_cct, 10) << "prefetch of object complete" << dendl;
       prefetch_complete = true;
-      break;
     }
   }
 
@@ -333,17 +317,32 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
     return 0;
   }
 
+  ldout(m_cct, 10) << "switching to playback mode" << dendl;
   m_state = STATE_PLAYBACK;
+
+  // if we have a valid commit position, our read should start with
+  // the next consistent journal entry in the sequence
+  if (m_commit_position_valid) {
+    splay_offset = m_commit_position.object_number % splay_width;
+    object_player = m_object_players[splay_offset];
+
+    if (object_player->empty()) {
+      if (!object_player->refetch_required()) {
+        advance_splay_object();
+      }
+    } else {
+      Entry entry;
+      object_player->front(&entry);
+      if (entry.get_tag_tid() == m_commit_position.tag_tid) {
+        advance_splay_object();
+      }
+    }
+  }
+
   if (verify_playback_ready()) {
     notify_entries_available();
   } else if (is_object_set_ready()) {
-    if (m_watch_enabled) {
-      schedule_watch();
-    } else {
-      ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
-                       << dendl;
-      notify_complete(0);
-    }
+    refetch(false);
   }
   return 0;
 }
@@ -355,17 +354,7 @@ int JournalPlayer::process_playback(uint64_t object_number) {
   if (verify_playback_ready()) {
     notify_entries_available();
   } else if (is_object_set_ready()) {
-    if (m_watch_enabled) {
-      schedule_watch();
-    } else {
-      ObjectPlayerPtr object_player = get_object_player();
-      uint8_t splay_width = m_journal_metadata->get_splay_width();
-      uint64_t active_set = m_journal_metadata->get_active_set();
-      uint64_t object_set = object_player->get_object_number() / splay_width;
-      if (object_set == active_set) {
-        notify_complete(0);
-      }
-    }
+    refetch(false);
   }
   return 0;
 }
@@ -373,8 +362,10 @@ int JournalPlayer::process_playback(uint64_t object_number) {
 bool JournalPlayer::is_object_set_ready() const {
   assert(m_lock.is_locked());
   if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
+    ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
     return false;
   }
+
   return true;
 }
 
@@ -449,13 +440,6 @@ bool JournalPlayer::verify_playback_ready() {
       } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) {
         ldout(m_cct, 10) << __func__ << ": no more entries" << dendl;
         return false;
-      } else if (!m_watch_enabled) {
-        // current playback position is empty so this tag is done
-        ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
-                         << "object_num=" << object_num << ", "
-                         << "active_tag=" << *m_active_tag_tid << dendl;
-        prune_active_tag(boost::none);
-        continue;
       } else if (m_watch_enabled && m_watch_prune_active_tag) {
         // detected current tag is now longer active and we have re-read the
         // current object but it's still empty, so this tag is done
@@ -464,12 +448,24 @@ bool JournalPlayer::verify_playback_ready() {
                          << "active_tag " << *m_active_tag_tid << dendl;
         prune_active_tag(boost::none);
         continue;
-      } else if (m_watch_enabled && object_player->refetch_required()) {
+      } else if (object_player->refetch_required()) {
         // if the active object requires a refetch, don't proceed looking for a
         // new tag before this process completes
         ldout(m_cct, 10) << __func__ << ": refetch required: "
                          << "object_num=" << object_num << dendl;
         return false;
+      } else if (!m_watch_enabled) {
+        // current playback position is empty so this tag is done
+        ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
+                         << "object_num=" << object_num << ", "
+                         << "active_tag=" << *m_active_tag_tid << dendl;
+        prune_active_tag(boost::none);
+        continue;
+      } else if (!m_watch_scheduled) {
+        // no more entries and we don't have an active watch in-progress
+        ldout(m_cct, 10) << __func__ << ": no more entries -- watch required"
+                         << dendl;
+        return false;
       }
     }
   }
@@ -560,10 +556,18 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
   uint64_t active_set = m_journal_metadata->get_active_set();
   if (!player->empty() || object_set == active_set) {
     return false;
-  } else if (m_watch_enabled && player->refetch_required()) {
+  } else if (player->refetch_required()) {
     ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires "
                      << "a refetch" << dendl;
     return false;
+  } else if (m_active_set != active_set) {
+    ldout(m_cct, 20) << __func__ << ": new active set detected, all players "
+                     << "require refetch" << dendl;
+    m_active_set = active_set;
+    for (auto &pair : m_object_players) {
+      pair.second->set_refetch_required();
+    }
+    return false;
   }
 
   ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
@@ -580,13 +584,6 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
 void JournalPlayer::fetch(uint64_t object_num) {
   assert(m_lock.is_locked());
 
-  std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
-
-  assert(m_fetch_object_numbers.count(object_num) == 0);
-  m_fetch_object_numbers.insert(object_num);
-
-  ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
-  C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
   ObjectPlayerPtr object_player(new ObjectPlayer(
     m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
     m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
@@ -594,6 +591,20 @@ void JournalPlayer::fetch(uint64_t object_num) {
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   m_object_players[object_num % splay_width] = object_player;
+  fetch(object_player);
+}
+
+void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
+  assert(m_lock.is_locked());
+
+  uint64_t object_num = object_player->get_object_number();
+  std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
+  assert(m_fetch_object_numbers.count(object_num) == 0);
+  m_fetch_object_numbers.insert(object_num);
+
+  ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
+  C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
+
   object_player->fetch(fetch_ctx);
 }
 
@@ -610,9 +621,6 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
     return;
   }
 
-  if (r == -ENOENT) {
-    r = 0;
-  }
   if (r == 0) {
     ObjectPlayerPtr object_player = get_object_player(object_num);
     remove_empty_object_player(object_player);
@@ -620,7 +628,28 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
   process_state(object_num, r);
 }
 
-void JournalPlayer::schedule_watch() {
+void JournalPlayer::refetch(bool immediate) {
+  ldout(m_cct, 10) << __func__ << dendl;
+  assert(m_lock.is_locked());
+  m_handler_notified = false;
+
+  // if watching the object, handle the periodic re-fetch
+  if (m_watch_enabled) {
+    schedule_watch(immediate);
+    return;
+  }
+
+  ObjectPlayerPtr object_player = get_object_player();
+  if (object_player->refetch_required()) {
+    object_player->clear_refetch_required();
+    fetch(object_player);
+    return;
+  }
+
+  notify_complete(0);
+}
+
+void JournalPlayer::schedule_watch(bool immediate) {
   ldout(m_cct, 10) << __func__ << dendl;
   assert(m_lock.is_locked());
   if (m_watch_scheduled) {
@@ -654,7 +683,8 @@ void JournalPlayer::schedule_watch() {
       uint8_t splay_width = m_journal_metadata->get_splay_width();
       uint64_t active_set = m_journal_metadata->get_active_set();
       uint64_t object_set = object_player->get_object_number() / splay_width;
-      if (object_set < active_set && object_player->refetch_required()) {
+      if (immediate ||
+          (object_set < active_set && object_player->refetch_required())) {
         ldout(m_cct, 20) << __func__ << ": refetching "
                          << object_player->get_oid()
                          << dendl;
@@ -728,7 +758,7 @@ void JournalPlayer::handle_watch_assert_active(int r) {
 
   m_watch_step = WATCH_STEP_FETCH_CURRENT;
   if (!m_shut_down && m_watch_enabled) {
-    schedule_watch();
+    schedule_watch(false);
   }
   m_async_op_tracker.finish_op();
 }
index 8ede43c57040ab02469b28fe6d3cb89edd5b41fd..690eccdd47191a1d6597bb51c88f46fc2b4fb786 100644 (file)
@@ -115,8 +115,11 @@ private:
 
   PrefetchSplayOffsets m_prefetch_splay_offsets;
   SplayedObjectPlayers m_object_players;
-  uint64_t m_commit_object;
+
+  bool m_commit_position_valid = false;
+  ObjectPosition m_commit_position;
   SplayedObjectPositions m_commit_positions;
+  uint64_t m_active_set;
 
   boost::optional<uint64_t> m_active_tag_tid = boost::none;
   boost::optional<uint64_t> m_prune_tag_tid = boost::none;
@@ -137,9 +140,11 @@ private:
   int process_playback(uint64_t object_number);
 
   void fetch(uint64_t object_num);
+  void fetch(const ObjectPlayerPtr &object_player);
   void handle_fetched(uint64_t object_num, int r);
+  void refetch(bool immediate);
 
-  void schedule_watch();
+  void schedule_watch(bool immediate);
   void handle_watch(uint64_t object_num, int r);
   void handle_watch_assert_active(int r);
 
index 156da728e971365d17c7d56e04becaa58f3289f6..31939e9bb167fd9da015e91e07c45bd41d20bfc7 100644 (file)
@@ -110,17 +110,19 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
                    << bl.length() << dendl;
 
   *refetch = false;
-  if (r < 0) {
+  if (r == -ENOENT) {
+    return 0;
+  } else if (r < 0) {
     return r;
-  }
-  if (bl.length() == 0) {
-    return -ENOENT;
+  } else if (bl.length() == 0) {
+    return 0;
   }
 
   Mutex::Locker locker(m_lock);
   assert(m_fetch_in_progress);
   m_read_off += bl.length();
   m_read_bl.append(bl);
+  m_refetch_required = true;
 
   bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
   bool partial_entry = false;
@@ -268,11 +270,6 @@ void ObjectPlayer::handle_watch_fetched(int r) {
   Context *watch_ctx = nullptr;
   {
     Mutex::Locker timer_locker(m_timer_lock);
-    if (r == -ENOENT) {
-      r = 0;
-    } else {
-      m_refetch_required = true;
-    }
     std::swap(watch_ctx, m_watch_ctx);
 
     if (m_unwatched) {
index 74ec94bd31b7a8c29b5d5b14e5c589c871cd7ae2..a9d2d9e985ec41b473f0b7b541ddd81c89cbc754 100644 (file)
@@ -65,6 +65,9 @@ public:
   inline bool refetch_required() const {
     return m_refetch_required;
   }
+  inline void set_refetch_required() {
+    m_refetch_required = true;
+  }
   inline void clear_refetch_required() {
     m_refetch_required = false;
   }
index 40ff485203aff4195c290afc6009e7265a96740f..dba3ec6a244a19329fca64370996749cce2505a9 100644 (file)
@@ -68,9 +68,10 @@ int RadosTestFixture::create(const std::string &oid, uint8_t order,
 
 journal::JournalMetadataPtr RadosTestFixture::create_metadata(
     const std::string &oid, const std::string &client_id,
-    double commit_interval) {
+    double commit_interval, uint64_t max_fetch_bytes) {
   journal::Settings settings;
   settings.commit_interval = commit_interval;
+  settings.max_fetch_bytes = max_fetch_bytes;
 
   journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
     m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings));
index bbd6dde608ddf287a7f96d708b97c81ecb5d62b3..4ea22e7cefcf96acd74bdd146d9e3c5e3567e8c1 100644 (file)
@@ -25,7 +25,8 @@ public:
              uint8_t splay_width = 2);
   journal::JournalMetadataPtr create_metadata(const std::string &oid,
                                               const std::string &client_id = "client",
-                                              double commit_internal = 0.1);
+                                              double commit_internal = 0.1,
+                                              uint64_t max_fetch_bytes = 0);
   int append(const std::string &oid, const bufferlist &bl);
 
   int client_register(const std::string &oid, const std::string &id = "client",
index 000f13ba4d87e15395a9d907fc82b3c44f8fb765..03255d06b7662630e84256983658079b1371202f 100644 (file)
 #include <list>
 #include <boost/scope_exit.hpp>
 
+typedef std::list<journal::Entry> Entries;
+
+template <typename T>
 class TestJournalPlayer : public RadosTestFixture {
 public:
   typedef std::list<journal::JournalPlayer *> JournalPlayers;
-  typedef std::list<journal::Entry> Entries;
+
+  static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
 
   struct ReplayHandler : public journal::ReplayHandler {
     Mutex lock;
@@ -54,19 +58,25 @@ public:
     RadosTestFixture::TearDown();
   }
 
+  journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+    return RadosTestFixture::create_metadata(oid, "client", 0.1,
+                                             max_fetch_bytes);
+  }
+
   int client_commit(const std::string &oid,
                     journal::JournalPlayer::ObjectSetPosition position) {
     return RadosTestFixture::client_commit(oid, "client", position);
   }
 
   journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
+    std::string payload(128, '0');
     bufferlist payload_bl;
-    payload_bl.append("playload");
+    payload_bl.append(payload);
     return journal::Entry(tag_tid, entry_tid, payload_bl);
   }
 
   journal::JournalPlayer *create_player(const std::string &oid,
-                                          const journal::JournalMetadataPtr &metadata) {
+                                        const journal::JournalMetadataPtr &metadata) {
     journal::JournalPlayer *player(new journal::JournalPlayer(
       m_ioctx, oid + ".", metadata, &m_replay_hander));
     m_players.push_back(player);
@@ -100,12 +110,12 @@ public:
   }
 
   bool wait_for_complete(journal::JournalPlayer *player) {
-    journal::Entry entry;
-    uint64_t commit_tid;
-    player->try_pop_front(&entry, &commit_tid);
-
     Mutex::Locker locker(m_replay_hander.lock);
     while (!m_replay_hander.complete) {
+      journal::Entry entry;
+      uint64_t commit_tid;
+      player->try_pop_front(&entry, &commit_tid);
+
       if (m_replay_hander.cond.WaitInterval(
             reinterpret_cast<CephContext*>(m_ioctx.cct()),
             m_replay_hander.lock, utime_t(10, 0)) != 0) {
@@ -127,44 +137,54 @@ public:
   ReplayHandler m_replay_hander;
 };
 
-TEST_F(TestJournalPlayer, Prefetch) {
-  std::string oid = get_temp_oid();
+template <uint64_t _max_fetch_bytes>
+class TestJournalPlayerParams {
+public:
+  static const uint64_t max_fetch_bytes = _max_fetch_bytes;
+};
+
+typedef ::testing::Types<TestJournalPlayerParams<0>,
+                         TestJournalPlayerParams<16> > TestJournalPlayerTypes;
+TYPED_TEST_CASE(TestJournalPlayer, TestJournalPlayerTypes);
+
+TYPED_TEST(TestJournalPlayer, Prefetch) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions;
   positions = {
     cls::journal::ObjectPosition(0, 234, 122) };
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 125));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
 
   player->prefetch();
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
-  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_complete(player));
 
   Entries expected_entries;
   expected_entries = {
-    create_entry(234, 123),
-    create_entry(234, 124),
-    create_entry(234, 125)};
+    this->create_entry(234, 123),
+    this->create_entry(234, 124),
+    this->create_entry(234, 125)};
   ASSERT_EQ(expected_entries, entries);
 
   uint64_t last_tid;
@@ -172,8 +192,8 @@ TEST_F(TestJournalPlayer, Prefetch) {
   ASSERT_EQ(125U, last_tid);
 }
 
-TEST_F(TestJournalPlayer, PrefetchSkip) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions;
   positions = {
@@ -181,73 +201,73 @@ TEST_F(TestJournalPlayer, PrefetchSkip) {
     cls::journal::ObjectPosition(1, 234, 124) };
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 125));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
 
   player->prefetch();
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 0, &entries));
-  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
+  ASSERT_TRUE(this->wait_for_complete(player));
 
   uint64_t last_tid;
   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
   ASSERT_EQ(125U, last_tid);
 }
 
-TEST_F(TestJournalPlayer, PrefetchWithoutCommit) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
 
   player->prefetch();
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 2, &entries));
-  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
+  ASSERT_TRUE(this->wait_for_complete(player));
 
   Entries expected_entries;
   expected_entries = {
-    create_entry(234, 122),
-    create_entry(234, 123)};
+    this->create_entry(234, 122),
+    this->create_entry(234, 123)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions;
   positions = {
@@ -256,32 +276,32 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
     cls::journal::ObjectPosition(0, 234, 120)};
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid, 14, 3));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid, 14, 3));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
-  ASSERT_EQ(0, write_entry(oid, 2, 234, 122));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 123));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 124));
-  ASSERT_EQ(0, write_entry(oid, 0, 236, 0)); // new tag allocated
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
+  ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
 
   player->prefetch();
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
-  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_complete(player));
 
   uint64_t last_tid;
   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
@@ -290,53 +310,53 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
   ASSERT_EQ(0U, last_tid);
 }
 
-TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 2, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
 
   journal::Entry entry;
   uint64_t commit_tid;
   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
-  ASSERT_TRUE(wait_for_complete(player));
-  ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
+  ASSERT_TRUE(this->wait_for_complete(player));
+  ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
 }
 
-TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid, 14, 4));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid, 14, 4));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
@@ -344,49 +364,49 @@ TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
   };
 
   ASSERT_EQ(0, metadata->set_active_set(1));
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 860));
-  ASSERT_EQ(0, write_entry(oid, 1, 2, 853));
-  ASSERT_EQ(0, write_entry(oid, 1, 2, 857));
-  ASSERT_EQ(0, write_entry(oid, 5, 2, 861));
-  ASSERT_EQ(0, write_entry(oid, 2, 2, 854));
-  ASSERT_EQ(0, write_entry(oid, 0, 3, 0));
-  ASSERT_EQ(0, write_entry(oid, 5, 3, 1));
-  ASSERT_EQ(0, write_entry(oid, 2, 3, 2));
-  ASSERT_EQ(0, write_entry(oid, 3, 3, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
+  ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
+  ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
+  ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 7, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
 
   Entries expected_entries = {
-    create_entry(2, 852),
-    create_entry(2, 853),
-    create_entry(2, 854),
-    create_entry(3, 0),
-    create_entry(3, 1),
-    create_entry(3, 2),
-    create_entry(3, 3)};
+    this->create_entry(2, 852),
+    this->create_entry(2, 853),
+    this->create_entry(2, 854),
+    this->create_entry(3, 0),
+    this->create_entry(3, 1),
+    this->create_entry(3, 2),
+    this->create_entry(3, 3)};
   ASSERT_EQ(expected_entries, entries);
 
-  ASSERT_TRUE(wait_for_complete(player));
-  ASSERT_EQ(0, m_replay_hander.complete_result);
+  ASSERT_TRUE(this->wait_for_complete(player));
+  ASSERT_EQ(0, this->m_replay_hander.complete_result);
 }
 
-TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
@@ -394,211 +414,211 @@ TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
   };
 
   ASSERT_EQ(0, metadata->set_active_set(2));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
-  ASSERT_EQ(0, write_entry(oid, 4, 1, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
 
   Entries expected_entries = {
-    create_entry(0, 0),
-    create_entry(0, 1),
-    create_entry(1, 0)};
+    this->create_entry(0, 0),
+    this->create_entry(0, 1),
+    this->create_entry(1, 0)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
-  ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 4, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
 
   Entries expected_entries = {
-    create_entry(0, 0),
-    create_entry(0, 1),
-    create_entry(0, 2),
-    create_entry(1, 0)};
+    this->create_entry(0, 0),
+    this->create_entry(0, 1),
+    this->create_entry(0, 2),
+    this->create_entry(1, 0)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, PrefetchStaleEntries) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions = {
     cls::journal::ObjectPosition(0, 1, 0) };
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
-  ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 1, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   Entries expected_entries = {
-    create_entry(1, 1)};
+    this->create_entry(1, 1)};
   ASSERT_EQ(expected_entries, entries);
 
-  ASSERT_TRUE(wait_for_complete(player));
-  ASSERT_EQ(0, m_replay_hander.complete_result);
+  ASSERT_TRUE(this->wait_for_complete(player));
+  ASSERT_EQ(0, this->m_replay_hander.complete_result);
 }
 
-TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
-  ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
 
   player->prefetch();
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   journal::Entry entry;
   uint64_t commit_tid;
   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
-  ASSERT_TRUE(wait_for_complete(player));
-  ASSERT_EQ(0, m_replay_hander.complete_result);
+  ASSERT_TRUE(this->wait_for_complete(player));
+  ASSERT_EQ(0, this->m_replay_hander.complete_result);
 }
 
-TEST_F(TestJournalPlayer, PrefetchAndWatch) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions;
   positions = {
     cls::journal::ObjectPosition(0, 234, 122)};
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
 
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   Entries expected_entries;
-  expected_entries = {create_entry(234, 123)};
+  expected_entries = {this->create_entry(234, 123)};
   ASSERT_EQ(expected_entries, entries);
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
-  expected_entries = {create_entry(234, 124)};
+  expected_entries = {this->create_entry(234, 124)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, PrefetchSkippedObject) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid, 14, 3));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid, 14, 3));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
   ASSERT_EQ(0, metadata->set_active_set(2));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
-  ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
-  ASSERT_EQ(0, write_entry(oid, 5, 234, 124));
-  ASSERT_EQ(0, write_entry(oid, 6, 234, 125));
-  ASSERT_EQ(0, write_entry(oid, 7, 234, 126));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+  ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
+  ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
+  ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
 
   player->prefetch();
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 5, &entries));
-  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
+  ASSERT_TRUE(this->wait_for_complete(player));
 
   Entries expected_entries;
   expected_entries = {
-    create_entry(234, 122),
-    create_entry(234, 123),
-    create_entry(234, 124),
-    create_entry(234, 125),
-    create_entry(234, 126)};
+    this->create_entry(234, 122),
+    this->create_entry(234, 123),
+    this->create_entry(234, 124),
+    this->create_entry(234, 125),
+    this->create_entry(234, 126)};
   ASSERT_EQ(expected_entries, entries);
 
   uint64_t last_tid;
@@ -606,8 +626,8 @@ TEST_F(TestJournalPlayer, PrefetchSkippedObject) {
   ASSERT_EQ(126U, last_tid);
 }
 
-TEST_F(TestJournalPlayer, ImbalancedJournal) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions = {
     cls::journal::ObjectPosition(9, 300, 1),
@@ -616,43 +636,43 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) {
     cls::journal::ObjectPosition(11, 200, 4331) };
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid, 14, 4));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid, 14, 4));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
   ASSERT_EQ(0, metadata->set_active_set(2));
   metadata->set_minimum_set(2);
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 8, 300, 0));
-  ASSERT_EQ(0, write_entry(oid, 8, 301, 0));
-  ASSERT_EQ(0, write_entry(oid, 9, 300, 1));
-  ASSERT_EQ(0, write_entry(oid, 9, 301, 1));
-  ASSERT_EQ(0, write_entry(oid, 10, 200, 4334));
-  ASSERT_EQ(0, write_entry(oid, 10, 301, 2));
-  ASSERT_EQ(0, write_entry(oid, 11, 200, 4331));
-  ASSERT_EQ(0, write_entry(oid, 11, 301, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
+  ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
+  ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
+  ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
 
   player->prefetch();
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 4, &entries));
-  ASSERT_TRUE(wait_for_complete(player));
+  ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+  ASSERT_TRUE(this->wait_for_complete(player));
 
   Entries expected_entries;
   expected_entries = {
-    create_entry(301, 0),
-    create_entry(301, 1),
-    create_entry(301, 2),
-    create_entry(301, 3)};
+    this->create_entry(301, 0),
+    this->create_entry(301, 1),
+    this->create_entry(301, 2),
+    this->create_entry(301, 3)};
   ASSERT_EQ(expected_entries, entries);
 
   uint64_t last_tid;
@@ -660,124 +680,124 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) {
   ASSERT_EQ(3U, last_tid);
 }
 
-TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 2));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 4));
-  ASSERT_EQ(0, write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
+  ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
 
   Entries expected_entries = {
-    create_entry(0, 0),
-    create_entry(0, 1),
-    create_entry(0, 2)};
+    this->create_entry(0, 0),
+    this->create_entry(0, 1),
+    this->create_entry(0, 2)};
   ASSERT_EQ(expected_entries, entries);
 
   journal::Entry entry;
   uint64_t commit_tid;
   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
 
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
   ASSERT_EQ(0, metadata->set_active_set(1));
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
 
   expected_entries = {
-    create_entry(0, 3),
-    create_entry(0, 4),
-    create_entry(0, 5)};
+    this->create_entry(0, 3),
+    this->create_entry(0, 4),
+    this->create_entry(0, 5)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, LiveReplayMissingSequence) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid, 14, 4));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid, 14, 4));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 860));
-  ASSERT_EQ(0, write_entry(oid, 1, 2, 853));
-  ASSERT_EQ(0, write_entry(oid, 1, 2, 857));
-  ASSERT_EQ(0, write_entry(oid, 2, 2, 854));
-  ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
+  ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
 
   Entries expected_entries = {
-    create_entry(2, 852),
-    create_entry(2, 853),
-    create_entry(2, 854)};
+    this->create_entry(2, 852),
+    this->create_entry(2, 853),
+    this->create_entry(2, 854)};
   ASSERT_EQ(expected_entries, entries);
 
   journal::Entry entry;
   uint64_t commit_tid;
   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
 
-  ASSERT_EQ(0, write_entry(oid, 3, 3, 3));
-  ASSERT_EQ(0, write_entry(oid, 2, 3, 2));
-  ASSERT_EQ(0, write_entry(oid, 1, 3, 1));
-  ASSERT_EQ(0, write_entry(oid, 0, 3, 0));
-  ASSERT_TRUE(wait_for_entries(player, 4, &entries));
+  ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
+  ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
 
   expected_entries = {
-    create_entry(3, 0),
-    create_entry(3, 1),
-    create_entry(3, 2),
-    create_entry(3, 3)};
+    this->create_entry(3, 0),
+    this->create_entry(3, 1),
+    this->create_entry(3, 2),
+    this->create_entry(3, 3)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
@@ -785,35 +805,35 @@ TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
   };
 
   ASSERT_EQ(0, metadata->set_active_set(2));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
-  ASSERT_EQ(0, write_entry(oid, 4, 1, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
 
   Entries expected_entries = {
-    create_entry(0, 0),
-    create_entry(0, 1),
-    create_entry(1, 0)};
+    this->create_entry(0, 0),
+    this->create_entry(0, 1),
+    this->create_entry(1, 0)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
+  std::string oid = this->get_temp_oid();
 
   cls::journal::ObjectSetPosition commit_position;
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
@@ -826,19 +846,19 @@ TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
   ASSERT_EQ(0, ctx1.wait());
 
   ASSERT_EQ(0, metadata->set_active_set(0));
-  ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, tag1.tid, 1));
-  ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 2));
-  ASSERT_EQ(0, write_entry(oid, 0, tag1.tid, 4));
+  ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
+  ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
 
   Entries expected_entries = {
-    create_entry(tag1.tid, 0),
-    create_entry(tag1.tid, 1),
-    create_entry(tag1.tid, 2)};
+    this->create_entry(tag1.tid, 0),
+    this->create_entry(tag1.tid, 1),
+    this->create_entry(tag1.tid, 2)};
   ASSERT_EQ(expected_entries, entries);
 
   journal::Entry entry;
@@ -850,65 +870,65 @@ TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
   metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
   ASSERT_EQ(0, ctx2.wait());
 
-  ASSERT_EQ(0, write_entry(oid, 0, tag2.tid, 0));
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   expected_entries = {
-    create_entry(tag2.tid, 0)};
+    this->create_entry(tag2.tid, 0)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, LiveReplayStaleEntries) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions = {
     cls::journal::ObjectPosition(0, 1, 0) };
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
     ASSERT_EQ(0, unwatch_ctx.wait());
   };
 
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
-  ASSERT_EQ(0, write_entry(oid, 0, 1, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 1, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   Entries expected_entries = {
-    create_entry(1, 1)};
+    this->create_entry(1, 1)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
+  std::string oid = this->get_temp_oid();
 
   journal::JournalPlayer::ObjectPositions positions = {
     cls::journal::ObjectPosition(1, 0, 1),
     cls::journal::ObjectPosition(0, 0, 0)};
   cls::journal::ObjectSetPosition commit_position(positions);
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, commit_position));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, commit_position));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
@@ -916,41 +936,41 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
   };
 
   ASSERT_EQ(0, metadata->set_active_set(1));
-  ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
-  ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
-  ASSERT_EQ(0, write_entry(oid, 3, 0, 3));
-  ASSERT_EQ(0, write_entry(oid, 2, 1, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+  ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+  ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
   player->prefetch_and_watch(0.25);
 
   Entries entries;
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   Entries expected_entries = {
-    create_entry(1, 0)};
+    this->create_entry(1, 0)};
   ASSERT_EQ(expected_entries, entries);
 
   // should remove player for offset 3 after refetching
   ASSERT_EQ(0, metadata->set_active_set(3));
-  ASSERT_EQ(0, write_entry(oid, 7, 1, 1));
+  ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
 
-  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+  ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
 
   expected_entries = {
-    create_entry(1, 1)};
+    this->create_entry(1, 1)};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestJournalPlayer, PrefechShutDown) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
+  std::string oid = this->get_temp_oid();
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, {}));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, {}));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
@@ -959,17 +979,17 @@ TEST_F(TestJournalPlayer, PrefechShutDown) {
   player->prefetch();
 }
 
-TEST_F(TestJournalPlayer, LiveReplayShutDown) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
+  std::string oid = this->get_temp_oid();
 
-  ASSERT_EQ(0, create(oid));
-  ASSERT_EQ(0, client_register(oid));
-  ASSERT_EQ(0, client_commit(oid, {}));
+  ASSERT_EQ(0, this->create(oid));
+  ASSERT_EQ(0, this->client_register(oid));
+  ASSERT_EQ(0, this->client_commit(oid, {}));
 
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
-  ASSERT_EQ(0, init_metadata(metadata));
+  journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+  ASSERT_EQ(0, this->init_metadata(metadata));
 
-  journal::JournalPlayer *player = create_player(oid, metadata);
+  journal::JournalPlayer *player = this->create_player(oid, metadata);
   BOOST_SCOPE_EXIT_ALL( (player) ) {
     C_SaferCond unwatch_ctx;
     player->shut_down(&unwatch_ctx);
index ed4c0b667e68d91dfc17b43bd521316a3c162a61..880128ee8e19abae18cafe53a1e6228539647bf9 100644 (file)
@@ -145,7 +145,7 @@ TYPED_TEST(TestObjectPlayer, FetchEmpty) {
 
   journal::ObjectPlayerPtr object = this->create_object(oid, 14);
 
-  ASSERT_EQ(-ENOENT, this->fetch(object));
+  ASSERT_EQ(0, this->fetch(object));
   ASSERT_TRUE(object->empty());
 }