]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: increase concurrency of journal recorder
authorRicardo Dias <rdias@suse.com>
Mon, 25 Jul 2016 16:00:50 +0000 (17:00 +0100)
committerRicardo Dias <rdias@suse.com>
Mon, 26 Sep 2016 09:26:21 +0000 (10:26 +0100)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/librbd/Journal.cc
src/test/journal/test_ObjectRecorder.cc

index 4cbe7391ee94d93dddd605e5a7e886f9ced24694..0113bd4a7b921c45ac545b8e361c20eaf9988d04 100644 (file)
@@ -10,6 +10,8 @@
 #undef dout_prefix
 #define dout_prefix *_dout << "JournalRecorder: " << this << " "
 
+using std::shared_ptr;
+
 namespace journal {
 
 namespace {
@@ -59,8 +61,11 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    m_object_locks.push_back(shared_ptr<Mutex>(new Mutex("ObjectRecorder::m_lock::"+
+                                       std::to_string(splay_offset))));
     uint64_t object_number = splay_offset + (m_current_set * splay_width);
-    m_object_ptrs[splay_offset] = create_object_recorder(object_number);
+    m_object_ptrs[splay_offset] = create_object_recorder(object_number,
+                                                m_object_locks[splay_offset]);
   }
 
   m_journal_metadata->add_listener(&m_listener);
@@ -77,7 +82,7 @@ JournalRecorder::~JournalRecorder() {
 Future JournalRecorder::append(uint64_t tag_tid,
                                const bufferlist &payload_bl) {
 
-  Mutex::Locker locker(m_lock);
+  m_lock.Lock();
 
   uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
   uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -90,6 +95,9 @@ Future JournalRecorder::append(uint64_t tag_tid,
   future->init(m_prev_future);
   m_prev_future = future;
 
+  m_object_locks[splay_offset]->Lock();
+  m_lock.Unlock();
+
   bufferlist entry_bl;
   ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
            entry_bl);
@@ -97,11 +105,12 @@ Future JournalRecorder::append(uint64_t tag_tid,
 
   AppendBuffers append_buffers;
   append_buffers.push_back(std::make_pair(future, entry_bl));
-  bool object_full = object_ptr->append(append_buffers);
+  bool object_full = object_ptr->append_unlock(append_buffers);
 
   if (object_full) {
     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
                      << dendl;
+    Mutex::Locker l(m_lock);
     close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
   }
   return Future(future);
@@ -117,6 +126,7 @@ void JournalRecorder::flush(Context *on_safe) {
          it != m_object_ptrs.end(); ++it) {
       it->second->flush(ctx);
     }
+
   }
 
   // avoid holding the lock in case there is nothing to flush
@@ -191,6 +201,8 @@ void JournalRecorder::open_object_set() {
                    << dendl;
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
+
+  lock_object_recorders();
   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
        it != m_object_ptrs.end(); ++it) {
     ObjectRecorderPtr object_recorder = it->second;
@@ -201,6 +213,7 @@ void JournalRecorder::open_object_set() {
       create_next_object_recorder(object_recorder);
     }
   }
+  unlock_object_recorders();
 }
 
 bool JournalRecorder::close_object_set(uint64_t active_set) {
@@ -209,6 +222,7 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
   // object recorders will invoke overflow handler as they complete
   // closing the object to ensure correct order of future appends
   uint8_t splay_width = m_journal_metadata->get_splay_width();
+  lock_object_recorders();
   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
        it != m_object_ptrs.end(); ++it) {
     ObjectRecorderPtr object_recorder = it->second;
@@ -224,14 +238,15 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
       }
     }
   }
+  unlock_object_recorders();
   return (m_in_flight_object_closes == 0);
 }
 
 ObjectRecorderPtr JournalRecorder::create_object_recorder(
-    uint64_t object_number) {
+    uint64_t object_number, shared_ptr<Mutex> lock) {
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
-    object_number, m_journal_metadata->get_timer(),
+    object_number, lock, m_journal_metadata->get_timer(),
     m_journal_metadata->get_timer_lock(), &m_object_handler,
     m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
     m_flush_age));
@@ -246,8 +261,10 @@ void JournalRecorder::create_next_object_recorder(
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   uint8_t splay_offset = object_number % splay_width;
 
+  assert(m_object_locks[splay_offset]->is_locked());
+
   ObjectRecorderPtr new_object_recorder = create_object_recorder(
-     (m_current_set * splay_width) + splay_offset);
+     (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
 
   ldout(m_cct, 10) << __func__ << ": "
                    << "old oid=" << object_recorder->get_oid() << ", "
@@ -262,7 +279,7 @@ void JournalRecorder::create_next_object_recorder(
       new_object_recorder->get_object_number());
   }
 
-  new_object_recorder->append(append_buffers);
+  new_object_recorder->append(append_buffers, false);
 
   m_object_ptrs[splay_offset] = new_object_recorder;
 }
index 6ed2e6316236d3a439def0735ed6c13bb9c6db6e..acef0e4b17959d31271408c10d457bb342d0bbc7 100644 (file)
@@ -92,6 +92,7 @@ private:
   uint32_t m_in_flight_object_closes = 0;
   uint64_t m_current_set;
   ObjectRecorderPtrs m_object_ptrs;
+  std::vector<std::shared_ptr<Mutex>> m_object_locks;
 
   FutureImplPtr m_prev_future;
 
@@ -103,13 +104,26 @@ private:
 
   void close_and_advance_object_set(uint64_t object_set);
 
-  ObjectRecorderPtr create_object_recorder(uint64_t object_number);
+  ObjectRecorderPtr create_object_recorder(uint64_t object_number,
+                                           std::shared_ptr<Mutex> lock);
   void create_next_object_recorder(ObjectRecorderPtr object_recorder);
 
   void handle_update();
 
   void handle_closed(ObjectRecorder *object_recorder);
   void handle_overflow(ObjectRecorder *object_recorder);
+
+  void lock_object_recorders() {
+    for (auto& lock : m_object_locks) {
+      lock->Lock();
+    }
+  }
+
+  void unlock_object_recorders() {
+    for (auto& lock : m_object_locks) {
+      lock->Unlock();
+    }
+  }
 };
 
 } // namespace journal
index 0cf2fd1e8ea05b48c516732b659e08453afaffce..c12ecff02c6ae95c1f42f055aff3a6c89cedf903 100644 (file)
 #define dout_prefix *_dout << "ObjectRecorder: " << this << " "
 
 using namespace cls::journal;
+using std::shared_ptr;
 
 namespace journal {
 
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
-                               uint64_t object_number,
+                               uint64_t object_number, shared_ptr<Mutex> lock,
                                SafeTimer &timer, Mutex &timer_lock,
                                Handler *handler, uint8_t order,
                                uint32_t flush_interval, uint64_t flush_bytes,
@@ -27,9 +28,8 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
     m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
     m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
     m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
-    m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
-    m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
-    m_object_closed(false), m_in_flight_flushes(false) {
+    m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0),
+    m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
   assert(m_handler != NULL);
@@ -42,14 +42,22 @@ ObjectRecorder::~ObjectRecorder() {
   assert(m_in_flight_appends.empty());
 }
 
-bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
+bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
+  return append(append_buffers, true);
+}
+
+bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) {
+  assert(m_lock->is_locked());
+
   FutureImplPtr last_flushed_future;
   bool schedule_append = false;
   {
-    Mutex::Locker locker(m_lock);
     if (m_overflowed) {
       m_append_buffers.insert(m_append_buffers.end(),
                               append_buffers.begin(), append_buffers.end());
+      if (unlock) {
+        m_lock->Unlock();
+      }
       return false;
     }
 
@@ -59,10 +67,20 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
         last_flushed_future = iter->first;
       }
     }
+
+    if (unlock) {
+      m_lock->Unlock();
+    }
   }
 
   if (last_flushed_future) {
+    if (unlock) {
+      m_lock->Lock();
+    }
     flush(last_flushed_future);
+    if (unlock) {
+      m_lock->Unlock();
+    }
   } else if (schedule_append) {
     schedule_append_task();
   } else {
@@ -78,13 +96,13 @@ void ObjectRecorder::flush(Context *on_safe) {
   cancel_append_task();
   Future future;
   {
-    Mutex::Locker locker(m_lock);
+    Mutex::Locker locker(*m_lock);
 
     // if currently handling flush notifications, wait so that
     // we notify in the correct order (since lock is dropped on
     // callback)
     if (m_in_flight_flushes) {
-      m_in_flight_flushes_cond.Wait(m_lock);
+      m_in_flight_flushes_cond.Wait(*(m_lock.get()));
     }
 
     // attach the flush to the most recent append
@@ -110,7 +128,8 @@ void ObjectRecorder::flush(const FutureImplPtr &future) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
                    << dendl;
 
-  Mutex::Locker locker(m_lock);
+  assert(m_lock->is_locked());
+
   if (future->get_flush_handler().get() != &m_flush_handler) {
     // if we don't own this future, re-issue the flush so that it hits the
     // correct journal object owner
@@ -142,7 +161,7 @@ void ObjectRecorder::flush(const FutureImplPtr &future) {
 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
-  Mutex::Locker locker(m_lock);
+  assert(m_lock->is_locked());
   assert(m_in_flight_tids.empty());
   assert(m_in_flight_appends.empty());
   assert(m_object_closed || m_overflowed);
@@ -151,11 +170,12 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
 }
 
 bool ObjectRecorder::close() {
+  assert (m_lock->is_locked());
+
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
   cancel_append_task();
 
-  Mutex::Locker locker(m_lock);
   flush_appends(true);
 
   assert(!m_object_closed);
@@ -167,7 +187,7 @@ void ObjectRecorder::handle_append_task() {
   assert(m_timer_lock.is_locked());
   m_append_task = NULL;
 
-  Mutex::Locker locker(m_lock);
+  Mutex::Locker locker(*m_lock);
   flush_appends(true);
 }
 
@@ -189,7 +209,7 @@ void ObjectRecorder::schedule_append_task() {
 
 bool ObjectRecorder::append(const AppendBuffer &append_buffer,
                             bool *schedule_append) {
-  assert(m_lock.is_locked());
+  assert(m_lock->is_locked());
 
   bool flush_requested = false;
   if (!m_object_closed && !m_overflowed) {
@@ -206,7 +226,7 @@ bool ObjectRecorder::append(const AppendBuffer &append_buffer,
 }
 
 bool ObjectRecorder::flush_appends(bool force) {
-  assert(m_lock.is_locked());
+  assert(m_lock->is_locked());
   if (m_object_closed || m_overflowed) {
     return true;
   }
@@ -232,7 +252,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
 
   AppendBuffers append_buffers;
   {
-    Mutex::Locker locker(m_lock);
+    Mutex::Locker locker(*m_lock);
     auto tid_iter = m_in_flight_tids.find(tid);
     assert(tid_iter != m_in_flight_tids.end());
     m_in_flight_tids.erase(tid_iter);
@@ -275,7 +295,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
   }
 
   // wake up any flush requests that raced with a RADOS callback
-  Mutex::Locker locker(m_lock);
+  Mutex::Locker locker(*m_lock);
   m_in_flight_flushes = false;
   m_in_flight_flushes_cond.Signal();
 }
@@ -284,7 +304,7 @@ void ObjectRecorder::append_overflowed(uint64_t tid) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
                    << dendl;
 
-  assert(m_lock.is_locked());
+  assert(m_lock->is_locked());
   assert(!m_in_flight_appends.empty());
   assert(m_in_flight_appends.begin()->first == tid);
 
@@ -308,7 +328,7 @@ void ObjectRecorder::append_overflowed(uint64_t tid) {
 }
 
 void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
-  assert(m_lock.is_locked());
+  assert(m_lock->is_locked());
   assert(!append_buffers->empty());
 
   uint64_t append_tid = m_append_tid++;
@@ -340,7 +360,7 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
 }
 
 void ObjectRecorder::notify_handler() {
-  assert(m_lock.is_locked());
+  assert(m_lock->is_locked());
 
   for (AppendBuffers::const_iterator it = m_append_buffers.begin();
        it != m_append_buffers.end(); ++it) {
@@ -350,14 +370,14 @@ void ObjectRecorder::notify_handler() {
   }
 
   if (m_object_closed) {
-    m_lock.Unlock();
+    m_lock->Unlock();
     m_handler->closed(this);
-    m_lock.Lock();
+    m_lock->Lock();
   } else {
     // TODO need to delay completion until after aio_notify completes
-    m_lock.Unlock();
+    m_lock->Unlock();
     m_handler->overflow(this);
-    m_lock.Lock();
+    m_lock->Lock();
   }
 }
 
index 53f8cc9ad0dbbb852535cc86be013b1fbaa55b2d..7d93ca26c6f0efb284477c2a363f66f544173d89 100644 (file)
@@ -37,9 +37,10 @@ public:
   };
 
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
-                 uint64_t object_number, SafeTimer &timer, Mutex &timer_lock,
-                 Handler *handler, uint8_t order, uint32_t flush_interval,
-                 uint64_t flush_bytes, double flush_age);
+                 uint64_t object_number, std::shared_ptr<Mutex> lock,
+                 SafeTimer &timer, Mutex &timer_lock, Handler *handler,
+                 uint8_t order, uint32_t flush_interval, uint64_t flush_bytes,
+                 double flush_age);
   ~ObjectRecorder();
 
   inline uint64_t get_object_number() const {
@@ -49,14 +50,15 @@ public:
     return m_oid;
   }
 
-  bool append(const AppendBuffers &append_buffers);
+  bool append_unlock(const AppendBuffers &append_buffers);
+  bool append(const AppendBuffers &append_buffers, bool unlock);
   void flush(Context *on_safe);
   void flush(const FutureImplPtr &future);
 
   void claim_append_buffers(AppendBuffers *append_buffers);
 
   bool is_closed() const {
-    Mutex::Locker locker(m_lock);
+    assert(m_lock->is_locked());
     return (m_object_closed && m_in_flight_appends.empty());
   }
   bool close();
@@ -66,7 +68,7 @@ public:
   }
 
   inline size_t get_pending_appends() const {
-    Mutex::Locker locker(m_lock);
+    Mutex::Locker locker(*m_lock);
     return m_append_buffers.size();
   }
 
@@ -129,7 +131,7 @@ private:
 
   C_AppendTask *m_append_task;
 
-  mutable Mutex m_lock;
+  mutable std::shared_ptr<Mutex> m_lock;
   AppendBuffers m_append_buffers;
   uint64_t m_append_tid;
   uint32_t m_pending_bytes;
index 180ac46be2caf7592d9dc22cb51f84bb84890c61..2109e1874a2e9c3ad63a0fcf1c0310b2f2815579 100644 (file)
@@ -861,17 +861,19 @@ uint64_t Journal<I>::append_io_events(journal::EventType event_type,
   Futures futures;
   uint64_t tid;
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_READY);
+    {
+      Mutex::Locker locker(m_lock);
+      assert(m_state == STATE_READY);
 
-    Mutex::Locker event_locker(m_event_lock);
-    tid = ++m_event_tid;
-    assert(tid != 0);
+      tid = ++m_event_tid;
+      assert(tid != 0);
+    }
 
     for (auto &bl : bufferlists) {
       assert(bl.length() <= m_max_append_size);
       futures.push_back(m_journaler->append(m_tag_tid, bl));
     }
+    Mutex::Locker event_locker(m_event_lock);
     m_events[tid] = Event(futures, requests, offset, length);
   }
 
index de82d06209517641588e2b8607f7c7786a0c7502..b6117918be3b38c50868da159675c44144c42abb 100644 (file)
@@ -10,6 +10,8 @@
 #include "test/journal/RadosTestFixture.h"
 #include <limits>
 
+using std::shared_ptr;
+
 class TestObjectRecorder : public RadosTestFixture {
 public:
   TestObjectRecorder()
@@ -21,6 +23,7 @@ public:
 
   struct Handler : public journal::ObjectRecorder::Handler {
     Mutex lock;
+    shared_ptr<Mutex> object_lock;
     Cond cond;
     bool is_closed = false;
     uint32_t overflows = 0;
@@ -36,7 +39,9 @@ public:
     virtual void overflow(journal::ObjectRecorder *object_recorder) {
       Mutex::Locker locker(lock);
       journal::AppendBuffers append_buffers;
+      object_lock->Lock();
       object_recorder->claim_append_buffers(&append_buffers);
+      object_lock->Unlock();
 
       ++overflows;
       cond.Signal();
@@ -44,8 +49,10 @@ public:
   };
 
   typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
+  typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
 
   ObjectRecorders m_object_recorders;
+  ObjectRecorderLocksMap m_object_recorder_locks;
 
   uint32_t m_flush_interval;
   uint64_t m_flush_bytes;
@@ -86,11 +93,13 @@ public:
   }
 
   journal::ObjectRecorderPtr create_object(const std::string &oid,
-                                           uint8_t order) {
+                                           uint8_t order, shared_ptr<Mutex> lock) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_handler, order,
+      m_ioctx, oid, 0, lock, *m_timer, m_timer_lock, &m_handler, order,
       m_flush_interval, m_flush_bytes, m_flush_age));
     m_object_recorders.push_back(object);
+    m_object_recorder_locks.insert(std::make_pair(oid, lock));
+    m_handler.object_lock = lock;
     return object;
   }
 };
@@ -102,23 +111,31 @@ TEST_F(TestObjectRecorder, Append) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(2U, object->get_pending_appends());
 
   C_SaferCond cond;
+  lock->Lock();
   append_buffer2.first->flush(&cond);
+  ASSERT_TRUE(lock->is_locked());
+  lock->Unlock();
   ASSERT_EQ(0, cond.wait());
   ASSERT_EQ(0U, object->get_pending_appends());
 }
@@ -131,19 +148,24 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   set_flush_interval(2);
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -159,19 +181,24 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   set_flush_bytes(10);
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -187,18 +214,23 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   set_flush_age(0.1);
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -213,19 +245,24 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  journal::ObjectRecorderPtr object = create_object(oid, 12);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
 
   std::string payload(2048, '1');
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               payload);
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               payload);
   append_buffers = {append_buffer2};
-  ASSERT_TRUE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_TRUE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -240,13 +277,16 @@ TEST_F(TestObjectRecorder, Flush) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond1;
@@ -266,18 +306,24 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
                                                              "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond;
   append_buffer.first->wait(&cond);
+  lock->Lock();
   object->flush(append_buffer.first);
+  ASSERT_TRUE(lock->is_locked());
+  lock->Unlock();
   ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
               append_buffer.first->is_complete());
   ASSERT_EQ(0, cond.wait());
@@ -290,7 +336,8 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
                                                              "payload");
@@ -298,9 +345,14 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
 
+  lock->Lock();
   object->flush(append_buffer.first);
+  ASSERT_TRUE(lock->is_locked());
+  lock->Unlock();
   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
 
   // should automatically flush once its attached to the object
   C_SaferCond cond;
@@ -316,16 +368,22 @@ TEST_F(TestObjectRecorder, Close) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   set_flush_interval(2);
-  journal::ObjectRecorderPtr object = create_object(oid, 24);
+  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  ASSERT_FALSE(object->append(append_buffers));
+  lock->Lock();
+  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
+  lock->Lock();
   ASSERT_FALSE(object->close());
+  ASSERT_TRUE(lock->is_locked());
+  lock->Unlock();
 
   {
     Mutex::Locker locker(m_handler.lock);
@@ -349,8 +407,10 @@ TEST_F(TestObjectRecorder, Overflow) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  journal::ObjectRecorderPtr object1 = create_object(oid, 12);
-  journal::ObjectRecorderPtr object2 = create_object(oid, 12);
+  shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
+  journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
+  shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
+  journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
 
   std::string payload(2048, '1');
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
@@ -359,7 +419,9 @@ TEST_F(TestObjectRecorder, Overflow) {
                                                               payload);
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1, append_buffer2};
-  ASSERT_TRUE(object1->append(append_buffers));
+  lock1->Lock();
+  ASSERT_TRUE(object1->append_unlock(append_buffers));
+  ASSERT_FALSE(lock1->is_locked());
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -370,8 +432,13 @@ TEST_F(TestObjectRecorder, Overflow) {
                                                               payload);
   append_buffers = {append_buffer3};
 
-  ASSERT_FALSE(object2->append(append_buffers));
+  lock2->Lock();
+  ASSERT_FALSE(object2->append_unlock(append_buffers));
+  ASSERT_FALSE(lock2->is_locked());
+  lock2->Lock();
   append_buffer3.first->flush(NULL);
+  ASSERT_TRUE(lock2->is_locked());
+  lock2->Unlock();
 
   bool overflowed = false;
   {