]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: avoid holding lock while sending journal append
authorJason Dillaman <dillaman@redhat.com>
Wed, 28 Sep 2016 12:35:36 +0000 (08:35 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 11 Oct 2016 17:57:12 +0000 (13:57 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit dc77a629ed353d586b63f0bd8e20f54a7595afba)

src/journal/JournalRecorder.cc
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/test/journal/test_ObjectRecorder.cc

index 1b0e5704f71b8c192669b4857c523b2eeb496ce5..1917165008fecc69854d62f09983ee9f029c2fda 100644 (file)
@@ -105,10 +105,7 @@ Future JournalRecorder::append(uint64_t tag_tid,
            entry_bl);
   assert(entry_bl.length() <= m_journal_metadata->get_object_size());
 
-  AppendBuffers append_buffers;
-  append_buffers.push_back(std::make_pair(future, entry_bl));
-  bool object_full = object_ptr->append_unlock(append_buffers);
-
+  bool object_full = object_ptr->append_unlock({{future, entry_bl}});
   if (object_full) {
     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
                      << dendl;
@@ -284,8 +281,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
       new_object_recorder->get_object_number());
   }
 
-  new_object_recorder->append_unlock(append_buffers);
-
+  new_object_recorder->append_unlock(std::move(append_buffers));
   m_object_ptrs[splay_offset] = new_object_recorder;
 }
 
index cbd3842da390a0fe196482f9515f22f878ac434d..1079b0a8db7874ead22a21465724a02722cf072e 100644 (file)
@@ -44,12 +44,12 @@ ObjectRecorder::~ObjectRecorder() {
   assert(!m_aio_scheduled);
 }
 
-bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
+bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
   assert(m_lock->is_locked());
 
   FutureImplPtr last_flushed_future;
   bool schedule_append = false;
-    
+
   if (m_overflowed) {
     m_append_buffers.insert(m_append_buffers.end(),
                             append_buffers.begin(), append_buffers.end());
@@ -339,37 +339,58 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
 }
 
 void ObjectRecorder::send_appends_aio() {
-  Mutex::Locker locker(*m_lock);
-
-  m_aio_scheduled = false;
+  AppendBuffers *append_buffers;
+  uint64_t append_tid;
+  {
+    Mutex::Locker locker(*m_lock);
+    append_tid = m_append_tid++;
+    m_in_flight_tids.insert(append_tid);
 
-  AppendBuffers append_buffers;
-  m_pending_buffers.swap(append_buffers);
+    // safe to hold pointer outside lock until op is submitted
+    append_buffers = &m_in_flight_appends[append_tid];
+    append_buffers->swap(m_pending_buffers);
+  }
 
-  uint64_t append_tid = m_append_tid++;
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
                    << append_tid << dendl;
   C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
+  C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
 
   librados::ObjectWriteOperation op;
   client::guard_append(&op, m_soft_max_size);
-
-  for (AppendBuffers::iterator it = append_buffers.begin();
-       it != append_buffers.end(); ++it) {
+  for (AppendBuffers::iterator it = append_buffers->begin();
+       it != append_buffers->end(); ++it) {
     ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
                      << dendl;
     op.append(it->second);
     op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
   }
-  m_in_flight_tids.insert(append_tid);
-  m_in_flight_appends[append_tid].swap(append_buffers);
 
   librados::AioCompletion *rados_completion =
-    librados::Rados::aio_create_completion(append_flush, NULL,
+    librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
                                            utils::rados_ctx_callback);
   int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
   assert(r == 0);
   rados_completion->release();
+
+  {
+    Mutex::Locker locker(*m_lock);
+    if (m_pending_buffers.empty()) {
+      m_aio_scheduled = false;
+      if (m_in_flight_appends.empty() && m_object_closed) {
+        // all remaining unsent appends should be redirected to new object
+        notify_handler();
+      }
+    } else {
+      // additional pending items -- reschedule
+      m_op_work_queue->queue(new FunctionContext([this] (int r) {
+          send_appends_aio();
+        }));
+    }
+  }
+
+  // allow append op to complete
+  gather_ctx->activate();
 }
 
 void ObjectRecorder::notify_handler() {
index f9cf10657860dee902b9c0397660cd15812cc283..9b285f8190880fc16889a163b9a9b50c366236cf 100644 (file)
@@ -51,7 +51,7 @@ public:
     return m_oid;
   }
 
-  bool append_unlock(const AppendBuffers &append_buffers);
+  bool append_unlock(AppendBuffers &&append_buffers);
   void flush(Context *on_safe);
   void flush(const FutureImplPtr &future);
 
index a7cadcfbc7c2b4c6928bd736f004ef8f210d5fd6..e2113365af9c7701f2fe85422db2c528732a11d8 100644 (file)
@@ -119,14 +119,14 @@ TEST_F(TestObjectRecorder, Append) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(2U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -151,14 +151,14 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -182,14 +182,14 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -213,13 +213,13 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -243,13 +243,13 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               payload);
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_TRUE(object->append_unlock(append_buffers));
+  ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -272,7 +272,7 @@ TEST_F(TestObjectRecorder, Flush) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond1;
@@ -300,7 +300,7 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -336,7 +336,7 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   lock->Unlock();
   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
 
   // should automatically flush once its attached to the object
   C_SaferCond cond;
@@ -360,7 +360,7 @@ TEST_F(TestObjectRecorder, Close) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(append_buffers));
+  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
   ASSERT_EQ(1U, object->get_pending_appends());
 
   lock->Lock();
@@ -403,7 +403,7 @@ TEST_F(TestObjectRecorder, Overflow) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1, append_buffer2};
   lock1->Lock();
-  ASSERT_TRUE(object1->append_unlock(append_buffers));
+  ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -415,7 +415,7 @@ TEST_F(TestObjectRecorder, Overflow) {
   append_buffers = {append_buffer3};
 
   lock2->Lock();
-  ASSERT_FALSE(object2->append_unlock(append_buffers));
+  ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
   append_buffer3.first->flush(NULL);
 
   bool overflowed = false;