]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/journal: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 04:41:22 +0000 (12:41 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 03:27:19 +0000 (11:27 +0800)
as we cannot create a `smart_pointer<ceph::mutex>` due to limitation of
`ceph::make_mutex()`, we need to refactor `TestObjectRecorder` to remove
its `TearDown()` method, as when `TearDown()` is called, all local
variables have been destroyed, including the `ceph::mutex` instances. so
we need to introduce a helper of `ObjectRecorderFlusher`, to flush the
objects before `mutex` instances are destroyed. to simplify the
interfaces, it flushes in its dtor. so its lifecycle should be shorter
than those of mutexes. that's why, mutexes are created before `flusher`.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/test/journal/RadosTestFixture.cc
src/test/journal/RadosTestFixture.h
src/test/journal/mock/MockJournaler.h
src/test/journal/test_FutureImpl.cc
src/test/journal/test_JournalMetadata.cc
src/test/journal/test_JournalPlayer.cc
src/test/journal/test_ObjectPlayer.cc
src/test/journal/test_ObjectRecorder.cc

index 78fadb2b72ebf55d5531c66409f5ecdc6d584f46..573253801860f6b6824ad7ea2cde3c71118a3e29 100644 (file)
@@ -9,7 +9,8 @@
 #include "journal/Settings.h"
 
 RadosTestFixture::RadosTestFixture()
-  : m_timer_lock("m_timer_lock"), m_timer(NULL), m_listener(this) {
+  : m_timer_lock(ceph::make_mutex("m_timer_lock")),
+    m_listener(this) {
 }
 
 void RadosTestFixture::SetUpTestCase() {
@@ -53,7 +54,7 @@ void RadosTestFixture::TearDown() {
   }
 
   {
-    Mutex::Locker locker(m_timer_lock);
+    std::lock_guard locker{m_timer_lock};
     m_timer->shutdown();
   }
   delete m_timer;
@@ -115,10 +116,9 @@ int RadosTestFixture::init_metadata(journal::JournalMetadataPtr metadata) {
 }
 
 bool RadosTestFixture::wait_for_update(journal::JournalMetadataPtr metadata) {
-  Mutex::Locker locker(m_listener.mutex);
+  std::unique_lock locker{m_listener.mutex};
   while (m_listener.updates[metadata.get()] == 0) {
-    if (m_listener.cond.WaitInterval(
-         m_listener.mutex, utime_t(10, 0)) != 0) {
+    if (m_listener.cond.wait_for(locker, 10s) == std::cv_status::timeout) {
       return false;
     }
   }
index 68a96b7d5d7f64310008bc9f807b64ddecdf8e52..9600c839bd2b13008c11d7fadaac618170ff585e 100644 (file)
@@ -2,7 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "test/librados/test.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Timer.h"
 #include "journal/JournalMetadata.h"
 #include "cls/journal/cls_journal_types.h"
@@ -38,17 +38,17 @@ public:
 
   struct Listener : public journal::JournalMetadataListener {
     RadosTestFixture *test_fixture;
-    Mutex mutex;
-    Cond cond;
+    ceph::mutex mutex = ceph::make_mutex("mutex");
+    ceph::condition_variable cond;
     std::map<journal::JournalMetadata*, uint32_t> updates;
 
     Listener(RadosTestFixture *_test_fixture)
-      : test_fixture(_test_fixture), mutex("mutex") {}
+      : test_fixture(_test_fixture) {}
 
     void handle_update(journal::JournalMetadata *metadata) override {
-      Mutex::Locker locker(mutex);
+      std::lock_guard locker{mutex};
       ++updates[metadata];
-      cond.Signal();
+      cond.notify_all();
     }
   };
 
@@ -65,8 +65,8 @@ public:
 
   ContextWQ *m_work_queue = nullptr;
 
-  Mutex m_timer_lock;
-  SafeTimer *m_timer;
+  ceph::mutex m_timer_lock;
+  SafeTimer *m_timer = nullptr;
 
   Listener m_listener;
 
index ab424cd6b092dfd82dcfab49153ec3486b02e75f..d4e0f6c2aec17ac7a45e4dc2351d36bd851f0307 100644 (file)
@@ -13,7 +13,6 @@
 #include <string>
 
 class Context;
-class Mutex;
 
 namespace journal {
 
@@ -150,7 +149,7 @@ struct MockJournalerProxy {
   }
 
   template <typename WorkQueue, typename Timer>
-  MockJournalerProxy(WorkQueue *work_queue, Timer *timer, Mutex *timer_lock,
+  MockJournalerProxy(WorkQueue *work_queue, Timer *timer, ceph::mutex *timer_lock,
                      librados::IoCtx &header_ioctx,
                      const std::string &journal_id,
                      const std::string &client_id, const Settings&,
index 14e9d530ded97c5432b9890308ee28113970c28c..e4e127d6ee9e395c6c9a0f54fce5a8343cbc1757 100644 (file)
@@ -3,7 +3,6 @@
 
 #include "journal/FutureImpl.h"
 #include "common/Cond.h"
-#include "common/Mutex.h"
 #include "gtest/gtest.h"
 #include "test/journal/RadosTestFixture.h"
 
index ce7c1425f15f93dcb14d314faae94b01aeed1c8c..7cecb6fb1ce7b80e7db45aed5b9edbb022fd2b6e 100644 (file)
@@ -4,7 +4,6 @@
 #include "journal/JournalMetadata.h"
 #include "test/journal/RadosTestFixture.h"
 #include "common/Cond.h"
-#include "common/Mutex.h"
 #include <map>
 
 class TestJournalMetadata : public RadosTestFixture {
index 0629038ba49ad9e79d6f0293292cd8e3ebb10b5e..94490c2357a65e6a647313ec1bcf3d1bed344f76 100644 (file)
@@ -6,8 +6,7 @@
 #include "journal/JournalMetadata.h"
 #include "journal/ReplayHandler.h"
 #include "include/stringify.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "gtest/gtest.h"
 #include "test/journal/RadosTestFixture.h"
 #include <list>
@@ -23,30 +22,30 @@ public:
   static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
 
   struct ReplayHandler : public journal::ReplayHandler {
-    Mutex lock;
-    Cond cond;
+    ceph::mutex lock = ceph::make_mutex("lock");
+    ceph::condition_variable cond;
     bool entries_available;
     bool complete;
     int complete_result;
 
     ReplayHandler()
-      : lock("lock"), entries_available(false), complete(false),
+      : entries_available(false), complete(false),
         complete_result(0) {}
 
     void get() override {}
     void put() override {}
 
     void handle_entries_available() override {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       entries_available = true;
-      cond.Signal();
+      cond.notify_all();
     }
 
     void handle_complete(int r) override {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       complete = true;
       complete_result = r;
-      cond.Signal();
+      cond.notify_all();
     }
   };
 
@@ -97,11 +96,11 @@ public:
         break;
       }
 
-      Mutex::Locker locker(m_replay_hander.lock);
+      std::unique_lock locker{m_replay_hander.lock};
       if (m_replay_hander.entries_available) {
         m_replay_hander.entries_available = false;
-      } else if (m_replay_hander.cond.WaitInterval(
-          m_replay_hander.lock, utime_t(10, 0)) != 0) {
+      } else if (m_replay_hander.cond.wait_for(locker, 10s) ==
+                std::cv_status::timeout) {
         break;
       }
     }
@@ -109,14 +108,14 @@ public:
   }
 
   bool wait_for_complete(journal::JournalPlayer *player) {
-    Mutex::Locker locker(m_replay_hander.lock);
+    std::unique_lock locker{m_replay_hander.lock};
     while (!m_replay_hander.complete) {
       journal::Entry entry;
       uint64_t commit_tid;
       player->try_pop_front(&entry, &commit_tid);
 
-      if (m_replay_hander.cond.WaitInterval(
-            m_replay_hander.lock, utime_t(10, 0)) != 0) {
+      if (m_replay_hander.cond.wait_for(locker, 10s) ==
+         std::cv_status::timeout) {
         return false;
       }
     }
index c8774cab08524f971b03a73dbfdc6986c562a8e9..8e3f3323bba744df87e055c8ac5b5700cb5a2067 100644 (file)
@@ -4,7 +4,6 @@
 #include "journal/ObjectPlayer.h"
 #include "journal/Entry.h"
 #include "include/stringify.h"
-#include "common/Mutex.h"
 #include "common/Timer.h"
 #include "gtest/gtest.h"
 #include "test/librados/test.h"
index 3cc8e893cfe0c93bd7e4cb0669d9d8334a8b139e..2df5bf0faaf2e21550d758e5a7b3f65876665e8f 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "journal/ObjectRecorder.h"
 #include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Timer.h"
 #include "gtest/gtest.h"
 #include "test/librados/test.h"
@@ -14,71 +14,106 @@ using std::shared_ptr;
 
 class TestObjectRecorder : public RadosTestFixture {
 public:
-  TestObjectRecorder()
-    : m_flush_interval(std::numeric_limits<uint32_t>::max()),
-      m_flush_bytes(std::numeric_limits<uint64_t>::max()),
-      m_flush_age(600)
-  {
-  }
+  TestObjectRecorder() = default;
 
   struct Handler : public journal::ObjectRecorder::Handler {
-    Mutex lock;
-    shared_ptr<Mutex> object_lock;
-    Cond cond;
+    ceph::mutex lock = ceph::make_mutex("lock");
+    ceph::mutex* object_lock = nullptr;
+    ceph::condition_variable cond;
     bool is_closed = false;
     uint32_t overflows = 0;
 
-    Handler() : lock("lock") {
-    }
+    Handler() = default;
 
     void closed(journal::ObjectRecorder *object_recorder) override {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       is_closed = true;
-      cond.Signal();
+      cond.notify_all();
     }
     void overflow(journal::ObjectRecorder *object_recorder) override {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       journal::AppendBuffers append_buffers;
-      object_lock->Lock();
+      object_lock->lock();
       object_recorder->claim_append_buffers(&append_buffers);
-      object_lock->Unlock();
+      object_lock->unlock();
 
       ++overflows;
-      cond.Signal();
+      cond.notify_all();
     }
   };
 
-  typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
-  typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
-
-  ObjectRecorders m_object_recorders;
-  ObjectRecorderLocksMap m_object_recorder_locks;
-
-  uint32_t m_flush_interval;
-  uint64_t m_flush_bytes;
-  double m_flush_age;
-  uint64_t m_max_in_flight_appends = 0;
-  Handler m_handler;
-
-  void TearDown() override {
-    for (ObjectRecorders::iterator it = m_object_recorders.begin();
-         it != m_object_recorders.end(); ++it) {
-      C_SaferCond cond;
-      (*it)->flush(&cond);
-      cond.wait();
+  // flush the pending buffers in dtor
+  class ObjectRecorderFlusher {
+  public:
+    ObjectRecorderFlusher(librados::IoCtx& ioctx,
+                         ContextWQ* work_queue)
+      : m_ioctx{ioctx},
+       m_work_queue{work_queue}
+    {}
+    ObjectRecorderFlusher(librados::IoCtx& ioctx,
+                         ContextWQ* work_queue,
+                         uint32_t flush_interval,
+                         uint16_t flush_bytes,
+                         double flush_age,
+                         int max_in_flight)
+      : m_ioctx{ioctx},
+       m_work_queue{work_queue},
+       m_flush_interval{flush_interval},
+       m_flush_bytes{flush_bytes},
+       m_flush_age{flush_age},
+       m_max_in_flight_appends{max_in_flight < 0 ?
+                               std::numeric_limits<uint64_t>::max() :
+                               static_cast<uint64_t>(max_in_flight)}
+    {}
+    ~ObjectRecorderFlusher() {
+      for (auto& object_recorder : m_object_recorders) {
+       C_SaferCond cond;
+       object_recorder->flush(&cond);
+       cond.wait();
+      }
     }
-    m_object_recorders.clear();
-
-    RadosTestFixture::TearDown();
-  }
-
-  inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
-                                double flush_age, int max_in_flight) {
-    m_flush_interval = flush_interval;
-    m_flush_bytes = flush_bytes;
-    m_flush_age = flush_age;
-    m_max_in_flight_appends = max_in_flight;
-  }
+    journal::ObjectRecorderPtr create_object(const std::string& oid,
+                                            uint8_t order,
+                                            ceph::mutex* lock) {
+      journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
+        m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
+       order, m_max_in_flight_appends));
+      {
+       std::lock_guard locker{*lock};
+       object->set_append_batch_options(m_flush_interval,
+                                        m_flush_bytes,
+                                        m_flush_age);
+      }
+      m_object_recorders.push_back(object);
+      m_handler.object_lock = lock;
+      return object;
+    }
+    bool wait_for_closed() {
+      std::unique_lock locker{m_handler.lock};
+      return m_handler.cond.wait_for(locker, 10s,
+                                    [this] { return m_handler.is_closed; });
+    }
+    bool wait_for_overflow() {
+      std::unique_lock locker{m_handler.lock};
+      if (m_handler.cond.wait_for(locker, 10s,
+                                 [this] { return m_handler.overflows > 0; })) {
+       m_handler.overflows = 0;
+       return true;
+      } else {
+       return false;
+      }
+    }
+  private:
+    librados::IoCtx& m_ioctx;
+    ContextWQ *m_work_queue;
+    uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max();
+    uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
+    double m_flush_age = 600;
+    uint64_t m_max_in_flight_appends = 0;
+    using ObjectRecorders = std::list<journal::ObjectRecorderPtr>;
+    ObjectRecorders m_object_recorders;
+    Handler m_handler;
+  };
 
   journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
                                              const std::string &payload) {
@@ -90,22 +125,6 @@ public:
     bl.append(payload);
     return std::make_pair(future, bl);
   }
-
-  journal::ObjectRecorderPtr create_object(const std::string &oid,
-                                           uint8_t order, shared_ptr<Mutex> lock) {
-    journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
-      m_max_in_flight_appends));
-    {
-      Mutex::Locker locker(*lock);
-      object->set_append_batch_options(m_flush_interval, m_flush_bytes,
-                                       m_flush_age);
-    }
-    m_object_recorders.push_back(object);
-    m_object_recorder_locks.insert(std::make_pair(oid, lock));
-    m_handler.object_lock = lock;
-    return object;
-  }
 };
 
 TEST_F(TestObjectRecorder, Append) {
@@ -115,25 +134,25 @@ TEST_F(TestObjectRecorder, Append) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 0, 0, 0);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -149,25 +168,25 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(2, 0, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -182,25 +201,25 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 10, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -215,24 +234,24 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 0, 0.1, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -247,24 +266,25 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 12, &lock);
 
   std::string payload(2048, '1');
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               payload);
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               payload);
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_TRUE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -279,17 +299,17 @@ TEST_F(TestObjectRecorder, Flush) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 10, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond1;
@@ -309,17 +329,17 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 10, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
                                                              "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -337,8 +357,9 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
                                                              "payload");
@@ -348,9 +369,9 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
 
   object->flush(append_buffer.first);
   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   // should automatically flush once its attached to the object
   C_SaferCond cond;
@@ -365,35 +386,26 @@ TEST_F(TestObjectRecorder, Close) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(2, 0, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+  journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->close());
-  ASSERT_TRUE(lock->is_locked());
-  lock->Unlock();
-
-  {
-    Mutex::Locker locker(m_handler.lock);
-    while (!m_handler.is_closed) {
-      if (m_handler.cond.WaitInterval(
-            m_handler.lock, utime_t(10, 0)) != 0) {
-        break;
-      }
-    }
-  }
+  ASSERT_TRUE(ceph_mutex_is_locked(lock));
+  lock.unlock();
+
+  ASSERT_TRUE(flusher.wait_for_closed());
 
-  ASSERT_TRUE(m_handler.is_closed);
   ASSERT_EQ(0U, object->get_pending_appends());
 }
 
@@ -404,8 +416,11 @@ TEST_F(TestObjectRecorder, Overflow) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
-  journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
+  ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
+  ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
+
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+  journal::ObjectRecorderPtr object1 = flusher.create_object(oid, 12, &lock1);
 
   std::string payload(1 << 11, '1');
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
@@ -414,56 +429,26 @@ TEST_F(TestObjectRecorder, Overflow) {
                                                               payload);
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1, append_buffer2};
-  lock1->Lock();
+  lock1.lock();
   ASSERT_TRUE(object1->append(std::move(append_buffers)));
-  lock1->Unlock();
+  lock1.unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
   ASSERT_EQ(0, cond.wait());
   ASSERT_EQ(0U, object1->get_pending_appends());
 
-  bool overflowed = false;
-  {
-    Mutex::Locker locker(m_handler.lock);
-    while (m_handler.overflows == 0) {
-      if (m_handler.cond.WaitInterval(
-            m_handler.lock, utime_t(10, 0)) != 0) {
-        break;
-      }
-    }
-    if (m_handler.overflows != 0) {
-      overflowed = true;
-      m_handler.overflows = 0;
-    }
-  }
+  ASSERT_TRUE(flusher.wait_for_overflow());
 
-  ASSERT_TRUE(overflowed);
-
-  shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
-  journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
+  journal::ObjectRecorderPtr object2 = flusher.create_object(oid, 12, &lock2);
 
   journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
                                                               payload);
   append_buffers = {append_buffer3};
-  lock2->Lock();
+  lock2.lock();
   ASSERT_FALSE(object2->append(std::move(append_buffers)));
-  lock2->Unlock();
+  lock2.unlock();
   append_buffer3.first->flush(NULL);
 
-  overflowed = false;
-  {
-    Mutex::Locker locker(m_handler.lock);
-    while (m_handler.overflows == 0) {
-      if (m_handler.cond.WaitInterval(
-            m_handler.lock, utime_t(10, 0)) != 0) {
-        break;
-      }
-    }
-    if (m_handler.overflows != 0) {
-      overflowed = true;
-    }
-  }
-
-  ASSERT_TRUE(overflowed);
+  ASSERT_TRUE(flusher.wait_for_overflow());
 }