]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: make librados call async in ObjectRecorder
authorRicardo Dias <rdias@suse.com>
Wed, 7 Sep 2016 14:26:34 +0000 (15:26 +0100)
committerRicardo Dias <rdias@suse.com>
Mon, 26 Sep 2016 13:25:25 +0000 (14:25 +0100)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/journal/JournalMetadata.h
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/librbd/Journal.cc
src/librbd/Journal.h
src/test/journal/test_ObjectRecorder.cc
src/test/librbd/test_mock_Journal.cc

index 880130126dd9eb877192601cea78ca5353aa8165..d28710b3c15d3b446b2a97db0f6ac201251452ad 100644 (file)
@@ -98,6 +98,10 @@ public:
     m_work_queue->queue(on_finish, r);
   }
 
+  inline ContextWQ *get_work_queue() {
+    return m_work_queue;
+  }
+
   inline SafeTimer &get_timer() {
     return *m_timer;
   }
index 0113bd4a7b921c45ac545b8e361c20eaf9988d04..1b0e5704f71b8c192669b4857c523b2eeb496ce5 100644 (file)
@@ -61,10 +61,12 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
-    m_object_locks.push_back(shared_ptr<Mutex>(new Mutex("ObjectRecorder::m_lock::"+
-                                       std::to_string(splay_offset))));
+    m_object_locks.push_back(shared_ptr<Mutex>(
+                                          new Mutex("ObjectRecorder::m_lock::"+
+                                          std::to_string(splay_offset))));
     uint64_t object_number = splay_offset + (m_current_set * splay_width);
-    m_object_ptrs[splay_offset] = create_object_recorder(object_number,
+    m_object_ptrs[splay_offset] = create_object_recorder(
+                                                object_number,
                                                 m_object_locks[splay_offset]);
   }
 
@@ -206,14 +208,17 @@ void JournalRecorder::open_object_set() {
   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
        it != m_object_ptrs.end(); ++it) {
     ObjectRecorderPtr object_recorder = it->second;
-    if (object_recorder->get_object_number() / splay_width != m_current_set) {
+    uint64_t object_number = object_recorder->get_object_number();
+    if (object_number / splay_width != m_current_set) {
       assert(object_recorder->is_closed());
 
       // ready to close object and open object in active set
-      create_next_object_recorder(object_recorder);
+      create_next_object_recorder_unlock(object_recorder);
+    } else {
+      uint8_t splay_offset = object_number % splay_width;
+      m_object_locks[splay_offset]->Unlock();
     }
   }
-  unlock_object_recorders();
 }
 
 bool JournalRecorder::close_object_set(uint64_t active_set) {
@@ -246,14 +251,14 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
     uint64_t object_number, shared_ptr<Mutex> lock) {
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
-    object_number, lock, m_journal_metadata->get_timer(),
-    m_journal_metadata->get_timer_lock(), &m_object_handler,
-    m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
-    m_flush_age));
+    object_number, lock, m_journal_metadata->get_work_queue(),
+    m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
+    &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
+    m_flush_bytes, m_flush_age));
   return object_recorder;
 }
 
-void JournalRecorder::create_next_object_recorder(
+void JournalRecorder::create_next_object_recorder_unlock(
     ObjectRecorderPtr object_recorder) {
   assert(m_lock.is_locked());
 
@@ -279,7 +284,7 @@ void JournalRecorder::create_next_object_recorder(
       new_object_recorder->get_object_number());
   }
 
-  new_object_recorder->append(append_buffers, false);
+  new_object_recorder->append_unlock(append_buffers);
 
   m_object_ptrs[splay_offset] = new_object_recorder;
 }
index acef0e4b17959d31271408c10d457bb342d0bbc7..7a4af52ee83ab5feca0664ab1bab0303adef94e1 100644 (file)
@@ -106,7 +106,7 @@ private:
 
   ObjectRecorderPtr create_object_recorder(uint64_t object_number,
                                            std::shared_ptr<Mutex> lock);
-  void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+  void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
 
   void handle_update();
 
index c12ecff02c6ae95c1f42f055aff3a6c89cedf903..cbd3842da390a0fe196482f9515f22f878ac434d 100644 (file)
@@ -19,17 +19,18 @@ namespace journal {
 
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                                uint64_t object_number, shared_ptr<Mutex> lock,
-                               SafeTimer &timer, Mutex &timer_lock,
-                               Handler *handler, uint8_t order,
-                               uint32_t flush_interval, uint64_t flush_bytes,
-                               double flush_age)
+                               ContextWQ *work_queue, SafeTimer &timer,
+                               Mutex &timer_lock, Handler *handler,
+                               uint8_t order, uint32_t flush_interval,
+                               uint64_t flush_bytes, double flush_age)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
-    m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
-    m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
-    m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
-    m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
-    m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0),
-    m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
+    m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
+    m_timer_lock(timer_lock), m_handler(handler), m_order(order),
+    m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
+    m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+    m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
+    m_size(0), m_overflowed(false), m_object_closed(false),
+    m_in_flight_flushes(false), m_aio_scheduled(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
   assert(m_handler != NULL);
@@ -40,51 +41,39 @@ ObjectRecorder::~ObjectRecorder() {
   assert(m_append_buffers.empty());
   assert(m_in_flight_tids.empty());
   assert(m_in_flight_appends.empty());
+  assert(!m_aio_scheduled);
 }
 
 bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
-  return append(append_buffers, true);
-}
-
-bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) {
   assert(m_lock->is_locked());
 
   FutureImplPtr last_flushed_future;
   bool schedule_append = false;
-  {
-    if (m_overflowed) {
-      m_append_buffers.insert(m_append_buffers.end(),
-                              append_buffers.begin(), append_buffers.end());
-      if (unlock) {
-        m_lock->Unlock();
-      }
-      return false;
-    }
-
-    for (AppendBuffers::const_iterator iter = append_buffers.begin();
-         iter != append_buffers.end(); ++iter) {
-      if (append(*iter, &schedule_append)) {
-        last_flushed_future = iter->first;
-      }
-    }
+    
+  if (m_overflowed) {
+    m_append_buffers.insert(m_append_buffers.end(),
+                            append_buffers.begin(), append_buffers.end());
+    m_lock->Unlock();
+    return false;
+  }
 
-    if (unlock) {
-      m_lock->Unlock();
+  for (AppendBuffers::const_iterator iter = append_buffers.begin();
+       iter != append_buffers.end(); ++iter) {
+    if (append(*iter, &schedule_append)) {
+      last_flushed_future = iter->first;
     }
   }
 
   if (last_flushed_future) {
-    if (unlock) {
-      m_lock->Lock();
-    }
     flush(last_flushed_future);
-    if (unlock) {
-      m_lock->Unlock();
-    }
-  } else if (schedule_append) {
-    schedule_append_task();
+    m_lock->Unlock();
   } else {
-    cancel_append_task();
+    m_lock->Unlock();
+    if (schedule_append) {
+      schedule_append_task();
+    } else {
+      cancel_append_task();
+    }
   }
   return (!m_object_closed && !m_overflowed &&
           m_size + m_pending_bytes >= m_soft_max_size);
@@ -180,7 +169,7 @@ bool ObjectRecorder::close() {
 
   assert(!m_object_closed);
   m_object_closed = true;
-  return m_in_flight_tids.empty();
+  return m_in_flight_tids.empty() && !m_aio_scheduled;
 }
 
 void ObjectRecorder::handle_append_task() {
@@ -268,7 +257,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
       }
 
       // notify of overflow once all in-flight ops are complete
-      if (m_in_flight_tids.empty()) {
+      if (m_in_flight_tids.empty() && !m_aio_scheduled) {
         notify_handler();
       }
       return;
@@ -279,7 +268,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
     assert(!append_buffers.empty());
 
     m_in_flight_appends.erase(iter);
-    if (m_in_flight_appends.empty() && m_object_closed) {
+    if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
       // all remaining unsent appends should be redirected to new object
       notify_handler();
     }
@@ -331,6 +320,32 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
   assert(m_lock->is_locked());
   assert(!append_buffers->empty());
 
+  for (AppendBuffers::iterator it = append_buffers->begin();
+       it != append_buffers->end(); ++it) {
+    ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
+                     << dendl;
+    it->first->set_flush_in_progress();
+    m_size += it->second.length();
+  }
+
+  m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
+                           append_buffers->begin(), append_buffers->end());
+  if (!m_aio_scheduled) {
+    m_op_work_queue->queue(new FunctionContext([this] (int r) {
+        send_appends_aio();
+    }));
+    m_aio_scheduled = true;
+  }
+}
+
+void ObjectRecorder::send_appends_aio() {
+  Mutex::Locker locker(*m_lock);
+
+  m_aio_scheduled = false;
+
+  AppendBuffers append_buffers;
+  m_pending_buffers.swap(append_buffers);
+
   uint64_t append_tid = m_append_tid++;
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
                    << append_tid << dendl;
@@ -339,17 +354,15 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
   librados::ObjectWriteOperation op;
   client::guard_append(&op, m_soft_max_size);
 
-  for (AppendBuffers::iterator it = append_buffers->begin();
-       it != append_buffers->end(); ++it) {
+  for (AppendBuffers::iterator it = append_buffers.begin();
+       it != append_buffers.end(); ++it) {
     ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
                      << dendl;
-    it->first->set_flush_in_progress();
     op.append(it->second);
     op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
-    m_size += it->second.length();
   }
   m_in_flight_tids.insert(append_tid);
-  m_in_flight_appends[append_tid].swap(*append_buffers);
+  m_in_flight_appends[append_tid].swap(append_buffers);
 
   librados::AioCompletion *rados_completion =
     librados::Rados::aio_create_completion(append_flush, NULL,
index 7d93ca26c6f0efb284477c2a363f66f544173d89..f9cf10657860dee902b9c0397660cd15812cc283 100644 (file)
@@ -9,6 +9,7 @@
 #include "common/Cond.h"
 #include "common/Mutex.h"
 #include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
 #include "journal/FutureImpl.h"
 #include <list>
 #include <map>
@@ -38,9 +39,9 @@ public:
 
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                  uint64_t object_number, std::shared_ptr<Mutex> lock,
-                 SafeTimer &timer, Mutex &timer_lock, Handler *handler,
-                 uint8_t order, uint32_t flush_interval, uint64_t flush_bytes,
-                 double flush_age);
+                 ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
+                 Handler *handler, uint8_t order, uint32_t flush_interval,
+                 uint64_t flush_bytes, double flush_age);
   ~ObjectRecorder();
 
   inline uint64_t get_object_number() const {
@@ -51,7 +52,6 @@ public:
   }
 
   bool append_unlock(const AppendBuffers &append_buffers);
-  bool append(const AppendBuffers &append_buffers, bool unlock);
   void flush(Context *on_safe);
   void flush(const FutureImplPtr &future);
 
@@ -86,6 +86,7 @@ private:
       object_recorder->put();
     }
     virtual void flush(const FutureImplPtr &future) {
+      Mutex::Locker locker(*(object_recorder->m_lock));
       object_recorder->flush(future);
     }
   };
@@ -115,6 +116,8 @@ private:
   uint64_t m_object_number;
   CephContext *m_cct;
 
+  ContextWQ *m_op_work_queue;
+
   SafeTimer &m_timer;
   Mutex &m_timer_lock;
 
@@ -147,6 +150,9 @@ private:
   bool m_in_flight_flushes;
   Cond m_in_flight_flushes_cond;
 
+  AppendBuffers m_pending_buffers;
+  bool m_aio_scheduled;
+
   void handle_append_task();
   void cancel_append_task();
   void schedule_append_task();
@@ -156,6 +162,7 @@ private:
   void handle_append_flushed(uint64_t tid, int r);
   void append_overflowed(uint64_t tid);
   void send_appends(AppendBuffers *append_buffers);
+  void send_appends_aio();
 
   void notify_handler();
 };
index 2109e1874a2e9c3ad63a0fcf1c0310b2f2815579..a23d709d62412cbd7ab59f1946c4a9918708100e 100644 (file)
@@ -20,6 +20,7 @@
 #include "librbd/journal/CreateRequest.h"
 
 #include <boost/scope_exit.hpp>
+#include <utility>
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -670,50 +671,56 @@ int Journal<I>::demote() {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_journaler != nullptr && is_tag_owner(m_lock));
+  int r;
+  C_SaferCond ctx;
+  Future future;
+  C_SaferCond flush_ctx;
 
-  cls::journal::Client client;
-  int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
-  if (r < 0) {
-    lderr(cct) << this << " " << __func__ << ": "
-               << "failed to retrieve client: " << cpp_strerror(r) << dendl;
-    return r;
-  }
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_journaler != nullptr && is_tag_owner(m_lock));
 
-  assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
-  journal::TagPredecessor predecessor;
-  predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
-  if (!client.commit_position.object_positions.empty()) {
-    auto position = client.commit_position.object_positions.front();
-    predecessor.commit_valid = true;
-    predecessor.tag_tid = position.tag_tid;
-    predecessor.entry_tid = position.entry_tid;
-  }
+    cls::journal::Client client;
+    r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
+    if (r < 0) {
+      lderr(cct) << this << " " << __func__ << ": "
+                 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+      return r;
+    }
 
-  cls::journal::Tag new_tag;
-  r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class,
-                             predecessor, ORPHAN_MIRROR_UUID, &new_tag);
-  if (r < 0) {
-    return r;
-  }
+    assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+    journal::TagPredecessor predecessor;
+    predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
+    if (!client.commit_position.object_positions.empty()) {
+      auto position = client.commit_position.object_positions.front();
+      predecessor.commit_valid = true;
+      predecessor.tag_tid = position.tag_tid;
+      predecessor.entry_tid = position.entry_tid;
+    }
 
-  bufferlist::iterator tag_data_bl_it = new_tag.data.begin();
-  r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data);
-  if (r < 0) {
-    lderr(cct) << this << " " << __func__ << ": "
-               << "failed to decode newly allocated tag" << dendl;
-    return r;
-  }
+    cls::journal::Tag new_tag;
+    r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class,
+                               predecessor, ORPHAN_MIRROR_UUID, &new_tag);
+    if (r < 0) {
+      return r;
+    }
+
+    bufferlist::iterator tag_data_bl_it = new_tag.data.begin();
+    r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data);
+    if (r < 0) {
+      lderr(cct) << this << " " << __func__ << ": "
+                 << "failed to decode newly allocated tag" << dendl;
+      return r;
+    }
 
-  journal::EventEntry event_entry{journal::DemoteEvent{}};
-  bufferlist event_entry_bl;
-  ::encode(event_entry, event_entry_bl);
+    journal::EventEntry event_entry{journal::DemoteEvent{}};
+    bufferlist event_entry_bl;
+    ::encode(event_entry, event_entry_bl);
 
-  m_tag_tid = new_tag.tid;
-  Future future = m_journaler->append(m_tag_tid, event_entry_bl);
-  C_SaferCond ctx;
-  future.flush(&ctx);
+    m_tag_tid = new_tag.tid;
+    future = m_journaler->append(m_tag_tid, event_entry_bl);
+    future.flush(&ctx);
+  }
 
   r = ctx.wait();
   if (r < 0) {
@@ -723,9 +730,11 @@ int Journal<I>::demote() {
     return r;
   }
 
-  m_journaler->committed(future);
-  C_SaferCond flush_ctx;
-  m_journaler->flush_commit_position(&flush_ctx);
+  {
+    Mutex::Locker l(m_lock);
+    m_journaler->committed(future);
+    m_journaler->flush_commit_position(&flush_ctx);
+  }
 
   r = flush_ctx.wait();
   if (r < 0) {
@@ -858,21 +867,22 @@ uint64_t Journal<I>::append_io_events(journal::EventType event_type,
                                       bool flush_entry) {
   assert(!bufferlists.empty());
 
-  Futures futures;
   uint64_t tid;
   {
-    {
-      Mutex::Locker locker(m_lock);
-      assert(m_state == STATE_READY);
+    Mutex::Locker locker(m_lock);
+    assert(m_state == STATE_READY);
 
-      tid = ++m_event_tid;
-      assert(tid != 0);
-    }
+    tid = ++m_event_tid;
+    assert(tid != 0);
+  }
 
-    for (auto &bl : bufferlists) {
-      assert(bl.length() <= m_max_append_size);
-      futures.push_back(m_journaler->append(m_tag_tid, bl));
-    }
+  Futures futures;
+  for (auto &bl : bufferlists) {
+    assert(bl.length() <= m_max_append_size);
+    futures.push_back(m_journaler->append(m_tag_tid, bl));
+  }
+
+  {
     Mutex::Locker event_locker(m_event_lock);
     m_events[tid] = Event(futures, requests, offset, length);
   }
@@ -892,6 +902,7 @@ uint64_t Journal<I>::append_io_events(journal::EventType event_type,
   } else {
     futures.back().wait(on_safe);
   }
+
   return tid;
 }
 
index 374c0bedce1ac203776c0333eccc0e13558c163d..49a902db91791fca1f7d12e7c92d2ad9d759b3e2 100644 (file)
@@ -10,6 +10,7 @@
 #include "include/interval_set.h"
 #include "common/Cond.h"
 #include "common/Mutex.h"
+#include "common/Cond.h"
 #include "journal/Future.h"
 #include "journal/JournalMetadataListener.h"
 #include "journal/ReplayEntry.h"
@@ -166,6 +167,10 @@ public:
 
   int is_resync_requested(bool *do_resync);
 
+  inline ContextWQ *get_work_queue() {
+    return m_work_queue;
+  }
+
 private:
   ImageCtxT &m_image_ctx;
 
index b6117918be3b38c50868da159675c44144c42abb..a7cadcfbc7c2b4c6928bd736f004ef8f210d5fd6 100644 (file)
@@ -95,8 +95,8 @@ public:
   journal::ObjectRecorderPtr create_object(const std::string &oid,
                                            uint8_t order, shared_ptr<Mutex> lock) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, lock, *m_timer, m_timer_lock, &m_handler, order,
-      m_flush_interval, m_flush_bytes, m_flush_age));
+      m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
+      order, m_flush_interval, m_flush_bytes, m_flush_age));
     m_object_recorders.push_back(object);
     m_object_recorder_locks.insert(std::make_pair(oid, lock));
     m_handler.object_lock = lock;
@@ -120,7 +120,6 @@ TEST_F(TestObjectRecorder, Append) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
@@ -128,14 +127,10 @@ TEST_F(TestObjectRecorder, Append) {
   append_buffers = {append_buffer2};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(2U, object->get_pending_appends());
 
   C_SaferCond cond;
-  lock->Lock();
   append_buffer2.first->flush(&cond);
-  ASSERT_TRUE(lock->is_locked());
-  lock->Unlock();
   ASSERT_EQ(0, cond.wait());
   ASSERT_EQ(0U, object->get_pending_appends());
 }
@@ -157,7 +152,6 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
@@ -165,7 +159,6 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   append_buffers = {append_buffer2};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -190,7 +183,6 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
@@ -198,7 +190,6 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   append_buffers = {append_buffer2};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -223,14 +214,12 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -255,14 +244,12 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               payload);
   append_buffers = {append_buffer2};
   lock->Lock();
   ASSERT_TRUE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -286,7 +273,6 @@ TEST_F(TestObjectRecorder, Flush) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond1;
@@ -315,7 +301,6 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   append_buffers = {append_buffer};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -352,7 +337,6 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
 
   // should automatically flush once its attached to the object
   C_SaferCond cond;
@@ -377,7 +361,6 @@ TEST_F(TestObjectRecorder, Close) {
   append_buffers = {append_buffer1};
   lock->Lock();
   ASSERT_FALSE(object->append_unlock(append_buffers));
-  ASSERT_FALSE(lock->is_locked());
   ASSERT_EQ(1U, object->get_pending_appends());
 
   lock->Lock();
@@ -421,7 +404,6 @@ TEST_F(TestObjectRecorder, Overflow) {
   append_buffers = {append_buffer1, append_buffer2};
   lock1->Lock();
   ASSERT_TRUE(object1->append_unlock(append_buffers));
-  ASSERT_FALSE(lock1->is_locked());
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -434,11 +416,7 @@ TEST_F(TestObjectRecorder, Overflow) {
 
   lock2->Lock();
   ASSERT_FALSE(object2->append_unlock(append_buffers));
-  ASSERT_FALSE(lock2->is_locked());
-  lock2->Lock();
   append_buffer3.first->flush(NULL);
-  ASSERT_TRUE(lock2->is_locked());
-  lock2->Unlock();
 
   bool overflowed = false;
   {
index 7cc56ad17b43d71317765e460b3d16394e13e2ed..6622cf349fafd735fc5c9d660c059dfe27c79e33 100644 (file)
@@ -950,11 +950,13 @@ TEST_F(TestMockJournal, EventAndIOCommitOrder) {
   expect_append_journaler(mock_journaler);
   expect_wait_future(mock_future, &on_journal_safe1);
   ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal));
+  mock_journal.get_work_queue()->drain();
 
   Context *on_journal_safe2;
   expect_append_journaler(mock_journaler);
   expect_wait_future(mock_future, &on_journal_safe2);
   ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal));
+  mock_journal.get_work_queue()->drain();
 
   // commit journal event followed by IO event (standard)
   on_journal_safe1->complete(0);
@@ -998,6 +1000,7 @@ TEST_F(TestMockJournal, AppendWriteEvent) {
   expect_append_journaler(mock_journaler);
   expect_wait_future(mock_future, &on_journal_safe);
   ASSERT_EQ(1U, when_append_write_event(mock_image_ctx, mock_journal, 1 << 17));
+  mock_journal.get_work_queue()->drain();
 
   on_journal_safe->complete(0);
   C_SaferCond event_ctx;
@@ -1037,6 +1040,7 @@ TEST_F(TestMockJournal, EventCommitError) {
   expect_wait_future(mock_future, &on_journal_safe);
   ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
                                      object_request));
+  mock_journal.get_work_queue()->drain();
 
   // commit the event in the journal w/o waiting writeback
   expect_future_committed(mock_journaler);
@@ -1076,6 +1080,7 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
   expect_wait_future(mock_future, &on_journal_safe);
   ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
                                      object_request));
+  mock_journal.get_work_queue()->drain();
 
   expect_future_is_valid(mock_future);
   C_SaferCond flush_ctx;
@@ -1111,6 +1116,7 @@ TEST_F(TestMockJournal, IOCommitError) {
   expect_append_journaler(mock_journaler);
   expect_wait_future(mock_future, &on_journal_safe);
   ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal));
+  mock_journal.get_work_queue()->drain();
 
   // failed IO remains uncommitted in journal
   on_journal_safe->complete(0);