]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: optionally fetch entries in small chunks during replay
authorJason Dillaman <dillaman@redhat.com>
Mon, 18 Jul 2016 15:01:26 +0000 (11:01 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 17 Aug 2016 17:22:05 +0000 (13:22 -0400)
Support fetching the full object or incremental chunks (with a
minimum of at least a single decoded entry if available).

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit f7362e9a57e484fffd840ca0eef01778dcacb65b)

src/journal/JournalMetadata.h
src/journal/JournalPlayer.cc
src/journal/ObjectPlayer.cc
src/journal/ObjectPlayer.h
src/journal/Settings.h
src/test/journal/test_ObjectPlayer.cc

index 046e77dac7b47ccf20a7549d2809ced0bafe65e2..01116d7d1e016f5a4e983dfc8f1fb3d68d46ed62 100644 (file)
@@ -74,6 +74,9 @@ public:
   void get_tags(const boost::optional<uint64_t> &tag_class, Tags *tags,
                 Context *on_finish);
 
+  inline const Settings &get_settings() const {
+    return m_settings;
+  }
   inline const std::string &get_client_id() const {
     return m_client_id;
   }
index 28905a2ed1304df65b4fad77ea9fe836bfa63f5f..a06ca620f7cd00d086ae399a5a4fc403f2c11930 100644 (file)
@@ -624,7 +624,8 @@ void JournalPlayer::fetch(uint64_t object_num) {
   C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
   ObjectPlayerPtr object_player(new ObjectPlayer(
     m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
-    m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order()));
+    m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
+    m_journal_metadata->get_settings().max_fetch_bytes));
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   m_object_players[object_num % splay_width][object_num] = object_player;
index f86e3ef93702e62402f0b09485aee8619c5e8db5..156da728e971365d17c7d56e04becaa58f3289f6 100644 (file)
@@ -15,13 +15,15 @@ namespace journal {
 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
                            const std::string &object_oid_prefix,
                            uint64_t object_num, SafeTimer &timer,
-                           Mutex &timer_lock, uint8_t order)
+                           Mutex &timer_lock, uint8_t order,
+                           uint64_t max_fetch_bytes)
   : RefCountedObject(NULL, 0), m_object_num(object_num),
     m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
     m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+    m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
     m_watch_interval(0), m_watch_task(NULL),
     m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
-    m_fetch_in_progress(false), m_read_off(0) {
+    m_fetch_in_progress(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 }
@@ -39,11 +41,12 @@ void ObjectPlayer::fetch(Context *on_finish) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
 
   Mutex::Locker locker(m_lock);
+  assert(!m_fetch_in_progress);
   m_fetch_in_progress = true;
 
   C_Fetch *context = new C_Fetch(this, on_finish);
   librados::ObjectReadOperation op;
-  op.read(m_read_off, 2 << m_order, &context->read_bl, NULL);
+  op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
   op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
 
   librados::AioCompletion *rados_completion =
@@ -95,14 +98,18 @@ void ObjectPlayer::front(Entry *entry) const {
 void ObjectPlayer::pop_front() {
   Mutex::Locker locker(m_lock);
   assert(!m_entries.empty());
+
+  auto &entry = m_entries.front();
+  m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
   m_entries.pop_front();
 }
 
-int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
+int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
+                                        bool *refetch) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
                    << bl.length() << dendl;
 
-  m_fetch_in_progress = false;
+  *refetch = false;
   if (r < 0) {
     return r;
   }
@@ -111,25 +118,37 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
   }
 
   Mutex::Locker locker(m_lock);
+  assert(m_fetch_in_progress);
+  m_read_off += bl.length();
   m_read_bl.append(bl);
 
+  bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
+  bool partial_entry = false;
   bool invalid = false;
   uint32_t invalid_start_off = 0;
 
-  bufferlist::iterator iter(&m_read_bl, m_read_off);
+  clear_invalid_range(m_read_bl_off, m_read_bl.length());
+  bufferlist::iterator iter(&m_read_bl, 0);
   while (!iter.end()) {
     uint32_t bytes_needed;
+    uint32_t bl_off = iter.get_off();
     if (!Entry::is_readable(iter, &bytes_needed)) {
       if (bytes_needed != 0) {
-        invalid_start_off = iter.get_off();
+        invalid_start_off = m_read_bl_off + bl_off;
         invalid = true;
-        lderr(m_cct) << ": partial record at offset " << iter.get_off()
-                     << dendl;
+        partial_entry = true;
+        if (full_fetch) {
+          lderr(m_cct) << ": partial record at offset " << invalid_start_off
+                       << dendl;
+        } else {
+          ldout(m_cct, 20) << ": partial record detected, will re-fetch"
+                           << dendl;
+        }
         break;
       }
 
       if (!invalid) {
-        invalid_start_off = iter.get_off();
+        invalid_start_off = m_read_bl_off + bl_off;
         invalid = true;
         lderr(m_cct) << ": detected corrupt journal entry at offset "
                      << invalid_start_off << dendl;
@@ -138,18 +157,21 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
       continue;
     }
 
+    Entry entry;
+    ::decode(entry, iter);
+    ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
+
+    uint32_t entry_len = iter.get_off() - bl_off;
     if (invalid) {
-      uint32_t invalid_end_off = iter.get_off();
+      // new corrupt region detected
+      uint32_t invalid_end_off = m_read_bl_off + bl_off;
       lderr(m_cct) << ": corruption range [" << invalid_start_off
                    << ", " << invalid_end_off << ")" << dendl;
-      m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+      m_invalid_ranges.insert(invalid_start_off,
+                              invalid_end_off - invalid_start_off);
       invalid = false;
     }
 
-    Entry entry;
-    ::decode(entry, iter);
-    ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
-
     EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
                                       entry.get_entry_tid()));
     if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
@@ -158,20 +180,49 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
       ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
       *m_entry_keys[entry_key] = entry;
     }
+
+    // prune decoded / corrupted journal entries from front of bl
+    bufferlist sub_bl;
+    sub_bl.substr_of(m_read_bl, iter.get_off(),
+                     m_read_bl.length() - iter.get_off());
+    sub_bl.swap(m_read_bl);
+    iter = bufferlist::iterator(&m_read_bl, 0);
+
+    // advance the decoded entry offset
+    m_read_bl_off += entry_len;
   }
 
-  m_read_off = m_read_bl.length();
   if (invalid) {
-    uint32_t invalid_end_off = m_read_bl.length();
-    lderr(m_cct) << ": corruption range [" << invalid_start_off
-                 << ", " << invalid_end_off << ")" << dendl;
-    m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+    uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
+    if (!partial_entry) {
+      lderr(m_cct) << ": corruption range [" << invalid_start_off
+                   << ", " << invalid_end_off << ")" << dendl;
+    }
+    m_invalid_ranges.insert(invalid_start_off,
+                            invalid_end_off - invalid_start_off);
   }
 
-  if (!m_invalid_ranges.empty()) {
-    r = -EBADMSG;
+  if (!m_invalid_ranges.empty() && !partial_entry) {
+    return -EBADMSG;
+  } else if (partial_entry && (full_fetch || m_entries.empty())) {
+    *refetch = true;
+    return -EAGAIN;
+  }
+
+  return 0;
+}
+
+void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
+  // possibly remove previously partial record region
+  InvalidRanges decode_range;
+  decode_range.insert(off, len);
+  InvalidRanges intersect_range;
+  intersect_range.intersection_of(m_invalid_ranges, decode_range);
+  if (!intersect_range.empty()) {
+    ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
+                     << dendl;
+    m_invalid_ranges.subtract(intersect_range);
   }
-  return r;
 }
 
 void ObjectPlayer::schedule_watch() {
@@ -236,9 +287,20 @@ void ObjectPlayer::handle_watch_fetched(int r) {
 }
 
 void ObjectPlayer::C_Fetch::finish(int r) {
-  r = object_player->handle_fetch_complete(r, read_bl);
-  object_player.reset();
+  bool refetch = false;
+  r = object_player->handle_fetch_complete(r, read_bl, &refetch);
 
+  {
+    Mutex::Locker locker(object_player->m_lock);
+    object_player->m_fetch_in_progress = false;
+  }
+
+  if (refetch) {
+    object_player->fetch(on_finish);
+    return;
+  }
+
+  object_player.reset();
   on_finish->complete(r);
 }
 
index d0809cec8fe26d350ce2cb53aab6bd25c37d01ab..74ec94bd31b7a8c29b5d5b14e5c589c871cd7ae2 100644 (file)
@@ -32,7 +32,7 @@ public:
 
   ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
                uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
-               uint8_t order);
+               uint8_t order, uint64_t max_fetch_bytes);
   ~ObjectPlayer();
 
   inline const std::string &get_oid() const {
@@ -77,8 +77,7 @@ private:
     ObjectPlayerPtr object_player;
     Context *on_finish;
     bufferlist read_bl;
-    C_Fetch(ObjectPlayer *o, Context *ctx)
-      : object_player(o), on_finish(ctx) {
+    C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) {
     }
     virtual void finish(int r);
   };
@@ -104,6 +103,7 @@ private:
   Mutex &m_timer_lock;
 
   uint8_t m_order;
+  uint64_t m_max_fetch_bytes;
 
   double m_watch_interval;
   Context *m_watch_task;
@@ -111,7 +111,8 @@ private:
   mutable Mutex m_lock;
   bool m_fetch_in_progress;
   bufferlist m_read_bl;
-  uint32_t m_read_off;
+  uint32_t m_read_off = 0;
+  uint32_t m_read_bl_off = 0;
 
   Entries m_entries;
   EntryKeys m_entry_keys;
@@ -122,7 +123,9 @@ private:
   bool m_unwatched = false;
   bool m_refetch_required = true;
 
-  int handle_fetch_complete(int r, const bufferlist &bl);
+  int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch);
+
+  void clear_invalid_range(uint32_t off, uint32_t len);
 
   void schedule_watch();
   bool cancel_watch();
index 958073414588a892ad969f51ac9becff82a7744e..d8f5e7469301a974d9589fa1f722da08212b0889 100644 (file)
@@ -10,6 +10,7 @@ namespace journal {
 
 struct Settings {
   double commit_interval = 5;         ///< commit position throttle (in secs)
+  uint64_t max_fetch_bytes = 0;       ///< 0 implies no limit
 };
 
 } // namespace journal
index 67c35a1d12c641c05d4f0026022d4566f7dd322c..ed4c0b667e68d91dfc17b43bd521316a3c162a61 100644 (file)
 #include "test/librados/test.h"
 #include "test/journal/RadosTestFixture.h"
 
+template <typename T>
 class TestObjectPlayer : public RadosTestFixture {
 public:
+  static const uint32_t max_fetch_bytes = T::max_fetch_bytes;
+
   journal::ObjectPlayerPtr create_object(const std::string &oid,
                                          uint8_t order) {
     journal::ObjectPlayerPtr object(new journal::ObjectPlayer(
-      m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order));
+      m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order,
+      max_fetch_bytes));
     return object;
   }
 
+  int fetch(journal::ObjectPlayerPtr object_player) {
+    while (true) {
+      C_SaferCond ctx;
+      object_player->clear_refetch_required();
+      object_player->fetch(&ctx);
+      int r = ctx.wait();
+      if (r < 0 || !object_player->refetch_required()) {
+        return r;
+      }
+    }
+    return 0;
+  }
+
+  int watch_and_wait_for_entries(journal::ObjectPlayerPtr object_player,
+                                 journal::ObjectPlayer::Entries *entries,
+                                 size_t count) {
+    for (size_t i = 0; i < 50; ++i) {
+      object_player->get_entries(entries);
+      if (entries->size() == count) {
+        break;
+      }
+
+      C_SaferCond ctx;
+      object_player->watch(&ctx, 0.1);
+
+      int r = ctx.wait();
+      if (r < 0) {
+        return r;
+      }
+    }
+    return 0;
+  }
+
   std::string get_object_name(const std::string &oid) {
     return oid + ".0";
   }
 };
 
-TEST_F(TestObjectPlayer, Fetch) {
-  std::string oid = get_temp_oid();
+template <uint32_t _max_fetch_bytes>
+struct TestObjectPlayerParams {
+  static const uint32_t max_fetch_bytes = _max_fetch_bytes;
+};
+
+typedef ::testing::Types<TestObjectPlayerParams<0>,
+                         TestObjectPlayerParams<10> > TestObjectPlayerTypes;
+TYPED_TEST_CASE(TestObjectPlayer, TestObjectPlayerTypes);
+
+TYPED_TEST(TestObjectPlayer, Fetch) {
+  std::string oid = this->get_temp_oid();
 
-  journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
-  journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
+  journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+  journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
 
   bufferlist bl;
   ::encode(entry1, bl);
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
-
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_LE(0, cond.wait());
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+  ASSERT_LE(0, this->fetch(object));
 
   journal::ObjectPlayer::Entries entries;
   object->get_entries(&entries);
@@ -49,48 +92,42 @@ TEST_F(TestObjectPlayer, Fetch) {
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestObjectPlayer, FetchLarge) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchLarge) {
+  std::string oid = this->get_temp_oid();
 
   journal::Entry entry1(234, 123,
-                        create_payload(std::string(8192 - 33, '1')));
-  journal::Entry entry2(234, 124, create_payload(""));
+                        this->create_payload(std::string(8192 - 32, '1')));
+  journal::Entry entry2(234, 124, this->create_payload(""));
 
   bufferlist bl;
   ::encode(entry1, bl);
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
-
-  journal::ObjectPlayerPtr object = create_object(oid, 12);
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_LE(0, cond.wait());
+  journal::ObjectPlayerPtr object = this->create_object(oid, 12);
+  ASSERT_LE(0, this->fetch(object));
 
   journal::ObjectPlayer::Entries entries;
   object->get_entries(&entries);
-  ASSERT_EQ(1U, entries.size());
+  ASSERT_EQ(2U, entries.size());
 
-  journal::ObjectPlayer::Entries expected_entries = {entry1};
+  journal::ObjectPlayer::Entries expected_entries = {entry1, entry2};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestObjectPlayer, FetchDeDup) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchDeDup) {
+  std::string oid = this->get_temp_oid();
 
-  journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
-  journal::Entry entry2(234, 123, create_payload(std::string(24, '2')));
+  journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+  journal::Entry entry2(234, 123, this->create_payload(std::string(24, '2')));
 
   bufferlist bl;
   ::encode(entry1, bl);
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
-
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_LE(0, cond.wait());
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+  ASSERT_LE(0, this->fetch(object));
 
   journal::ObjectPlayer::Entries entries;
   object->get_entries(&entries);
@@ -100,48 +137,32 @@ TEST_F(TestObjectPlayer, FetchDeDup) {
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestObjectPlayer, FetchEmpty) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchEmpty) {
+  std::string oid = this->get_temp_oid();
 
   bufferlist bl;
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
 
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_EQ(-ENOENT, cond.wait());
+  ASSERT_EQ(-ENOENT, this->fetch(object));
   ASSERT_TRUE(object->empty());
 }
 
-TEST_F(TestObjectPlayer, FetchError) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchCorrupt) {
+  std::string oid = this->get_temp_oid();
 
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
-
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_EQ(-ENOENT, cond.wait());
-  ASSERT_TRUE(object->empty());
-}
-
-TEST_F(TestObjectPlayer, FetchCorrupt) {
-  std::string oid = get_temp_oid();
-
-  journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
-  journal::Entry entry2(234, 124, create_payload(std::string(24, '2')));
+  journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+  journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2')));
 
   bufferlist bl;
   ::encode(entry1, bl);
-  ::encode(create_payload("corruption"), bl);
+  ::encode(this->create_payload("corruption"), bl);
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
-
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_EQ(-EBADMSG, cond.wait());
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+  ASSERT_EQ(-EBADMSG, this->fetch(object));
 
   journal::ObjectPlayer::Entries entries;
   object->get_entries(&entries);
@@ -151,21 +172,18 @@ TEST_F(TestObjectPlayer, FetchCorrupt) {
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestObjectPlayer, FetchAppend) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchAppend) {
+  std::string oid = this->get_temp_oid();
 
-  journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
-  journal::Entry entry2(234, 124, create_payload(std::string(24, '2')));
+  journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+  journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2')));
 
   bufferlist bl;
   ::encode(entry1, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
-
-  C_SaferCond cond1;
-  object->fetch(&cond1);
-  ASSERT_LE(0, cond1.wait());
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+  ASSERT_LE(0, this->fetch(object));
 
   journal::ObjectPlayer::Entries entries;
   object->get_entries(&entries);
@@ -176,11 +194,8 @@ TEST_F(TestObjectPlayer, FetchAppend) {
 
   bl.clear();
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
-
-  C_SaferCond cond2;
-  object->fetch(&cond2);
-  ASSERT_LE(0, cond2.wait());
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+  ASSERT_LE(0, this->fetch(object));
 
   object->get_entries(&entries);
   ASSERT_EQ(2U, entries.size());
@@ -189,22 +204,19 @@ TEST_F(TestObjectPlayer, FetchAppend) {
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestObjectPlayer, PopEntry) {
-  std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, PopEntry) {
+  std::string oid = this->get_temp_oid();
 
-  journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
-  journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
+  journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+  journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
 
   bufferlist bl;
   ::encode(entry1, bl);
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
 
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
-
-  C_SaferCond cond;
-  object->fetch(&cond);
-  ASSERT_LE(0, cond.wait());
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+  ASSERT_LE(0, this->fetch(object));
 
   journal::ObjectPlayer::Entries entries;
   object->get_entries(&entries);
@@ -220,23 +232,23 @@ TEST_F(TestObjectPlayer, PopEntry) {
   ASSERT_TRUE(object->empty());
 }
 
-TEST_F(TestObjectPlayer, Watch) {
-  std::string oid = get_temp_oid();
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
+TYPED_TEST(TestObjectPlayer, Watch) {
+  std::string oid = this->get_temp_oid();
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
 
   C_SaferCond cond1;
   object->watch(&cond1, 0.1);
 
-  journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
-  journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
+  journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+  journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
 
   bufferlist bl;
   ::encode(entry1, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
   ASSERT_LE(0, cond1.wait());
 
   journal::ObjectPlayer::Entries entries;
-  object->get_entries(&entries);
+  ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 1U));
   ASSERT_EQ(1U, entries.size());
 
   journal::ObjectPlayer::Entries expected_entries;
@@ -248,19 +260,19 @@ TEST_F(TestObjectPlayer, Watch) {
 
   bl.clear();
   ::encode(entry2, bl);
-  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
   ASSERT_LE(0, cond2.wait());
 
-  object->get_entries(&entries);
+  ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 2U));
   ASSERT_EQ(2U, entries.size());
 
   expected_entries = {entry1, entry2};
   ASSERT_EQ(expected_entries, entries);
 }
 
-TEST_F(TestObjectPlayer, Unwatch) {
-  std::string oid = get_temp_oid();
-  journal::ObjectPlayerPtr object = create_object(oid, 14);
+TYPED_TEST(TestObjectPlayer, Unwatch) {
+  std::string oid = this->get_temp_oid();
+  journal::ObjectPlayerPtr object = this->create_object(oid, 14);
 
   C_SaferCond watch_ctx;
   object->watch(&watch_ctx, 600);