]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: avoid using lock within AIO completion where possible
authorJason Dillaman <dillaman@redhat.com>
Fri, 26 Apr 2019 18:24:15 +0000 (14:24 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 2 May 2019 13:30:45 +0000 (09:30 -0400)
'perf' shows several percent of CPU is being utilized handling the
heavyweight locking semantics of AIO completion. With these changes,
the lock contention disappears.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/api/Group.cc
src/librbd/io/AioCompletion.cc
src/librbd/io/AioCompletion.h
src/librbd/io/ReadResult.cc
src/librbd/librbd.cc

index 9dfbc19018d1bc04ef1b422c9519863b495ba622..04f155b5189b4bb3e411098f74ba0231dc5b13d5 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "common/Cond.h"
 #include "common/errno.h"
 
 #include "librbd/ExclusiveLock.h"
index 73c58167446a7040caeef91bba053e40d8506510..fce9529d5f614bd4ec6b2c610e703d63ea95fd08 100644 (file)
@@ -31,32 +31,40 @@ namespace io {
 
 int AioCompletion::wait_for_complete() {
   tracepoint(librbd, aio_wait_for_complete_enter, this);
-  lock.Lock();
-  while (state != AIO_STATE_COMPLETE)
-    cond.Wait(lock);
-  lock.Unlock();
+  {
+    std::unique_lock<std::mutex> locker(lock);
+    while (state != AIO_STATE_COMPLETE) {
+      cond.wait(locker);
+    }
+  }
   tracepoint(librbd, aio_wait_for_complete_exit, 0);
   return 0;
 }
 
-void AioCompletion::finalize(ssize_t rval)
-{
-  ceph_assert(lock.is_locked());
+void AioCompletion::finalize() {
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
 
-  ldout(cct, 20) << "r=" << rval << dendl;
-  if (rval >= 0 && aio_type == AIO_TYPE_READ) {
+  // finalize any pending error results since we won't be
+  // atomically incrementing rval anymore
+  int err_r = error_rval;
+  if (err_r < 0) {
+    rval = err_r;
+  }
+
+  ssize_t r = rval;
+  ldout(cct, 20) << "r=" << r << dendl;
+  if (r >= 0 && aio_type == AIO_TYPE_READ) {
     read_result.assemble_result(cct);
   }
 }
 
 void AioCompletion::complete() {
-  ceph_assert(lock.is_locked());
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
 
-  tracepoint(librbd, aio_complete_enter, this, rval);
+  ssize_t r = rval;
+  tracepoint(librbd, aio_complete_enter, this, r);
   if (ictx->perfcounter != nullptr) {
     ceph::timespan elapsed = coarse_mono_clock::now() - start_time;
     switch (aio_type) {
@@ -83,7 +91,7 @@ void AioCompletion::complete() {
   }
 
   if ((aio_type == AIO_TYPE_CLOSE) ||
-      (aio_type == AIO_TYPE_OPEN && rval < 0)) {
+      (aio_type == AIO_TYPE_OPEN && r < 0)) {
     // must destroy ImageCtx prior to invoking callback
     delete ictx;
     ictx = nullptr;
@@ -91,9 +99,7 @@ void AioCompletion::complete() {
 
   state = AIO_STATE_CALLBACK;
   if (complete_cb) {
-    lock.Unlock();
     complete_cb(rbd_comp, complete_arg);
-    lock.Lock();
   }
 
   if (ictx != nullptr && event_notify && ictx->event_socket.is_valid()) {
@@ -102,9 +108,12 @@ void AioCompletion::complete() {
     ictx->completed_reqs_lock.Unlock();
     ictx->event_socket.notify();
   }
-
   state = AIO_STATE_COMPLETE;
-  cond.Signal();
+
+  {
+    std::unique_lock<std::mutex> locker(lock);
+    cond.notify_all();
+  }
 
   // note: possible for image to be closed after op marked finished
   if (async_op.started()) {
@@ -114,7 +123,6 @@ void AioCompletion::complete() {
 }
 
 void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
-  Mutex::Locker locker(lock);
   if (ictx == nullptr) {
     ictx = i;
     aio_type = t;
@@ -123,7 +131,6 @@ void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
 }
 
 void AioCompletion::start_op() {
-  Mutex::Locker locker(lock);
   ceph_assert(ictx != nullptr);
 
   if (aio_type == AIO_TYPE_OPEN || aio_type == AIO_TYPE_CLOSE) {
@@ -137,7 +144,6 @@ void AioCompletion::start_op() {
 
 void AioCompletion::fail(int r)
 {
-  lock.Lock();
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
 
@@ -145,69 +151,62 @@ void AioCompletion::fail(int r)
   ceph_assert(pending_count == 0);
   rval = r;
   complete();
-  put_unlock();
+  put();
 }
 
 void AioCompletion::set_request_count(uint32_t count) {
-  lock.Lock();
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
 
-  ldout(cct, 20) << "pending=" << count << dendl;
-  ceph_assert(pending_count == 0);
-
-  if (count > 0) {
-    pending_count = count;
-    lock.Unlock();
-  } else {
-    pending_count = 1;
-    lock.Unlock();
+  uint32_t previous_pending_count = pending_count.exchange(
+    count == 0 ? 1 : count);
+  ceph_assert(previous_pending_count == 0);
 
+  ldout(cct, 20) << "pending=" << count << dendl;
+  if (count == 0) {
     // ensure completion fires in clean lock context
     ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+    return;
   }
 }
 
 void AioCompletion::complete_request(ssize_t r)
 {
-  lock.Lock();
+  uint32_t previous_pending_count = pending_count--;
+  ceph_assert(previous_pending_count > 0);
+  auto pending_count = previous_pending_count - 1;
+
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
 
-  if (rval >= 0) {
-    if (r < 0 && r != -EEXIST)
-      rval = r;
-    else if (r > 0)
-      rval += r;
+  if (r > 0) {
+    rval += r;
+  } else if (r != -EEXIST) {
+    // might race w/ another thread setting an error code but
+    // first one wins
+    int zero = 0;
+    error_rval.compare_exchange_strong(zero, r);
   }
-  ceph_assert(pending_count);
-  int count = --pending_count;
 
   ldout(cct, 20) << "cb=" << complete_cb << ", "
                  << "pending=" << pending_count << dendl;
-  if (!count) {
-    finalize(rval);
+  if (pending_count == 0) {
+    finalize();
     complete();
   }
-  put_unlock();
+  put();
 }
 
 bool AioCompletion::is_complete() {
   tracepoint(librbd, aio_is_complete_enter, this);
-  bool done;
-  {
-    Mutex::Locker l(lock);
-    done = this->state == AIO_STATE_COMPLETE;
-  }
+  bool done = (this->state != AIO_STATE_PENDING);
   tracepoint(librbd, aio_is_complete_exit, done);
   return done;
 }
 
 ssize_t AioCompletion::get_return_value() {
   tracepoint(librbd, aio_get_return_value_enter, this);
-  lock.Lock();
   ssize_t r = rval;
-  lock.Unlock();
   tracepoint(librbd, aio_get_return_value_exit, r);
   return r;
 }
index f3551a0272cc4ce52a9d9f0164658eb9a36548a7..0e6874126e28f01e612e81a90595e22d00592aed 100644 (file)
@@ -4,8 +4,6 @@
 #ifndef CEPH_LIBRBD_IO_AIO_COMPLETION_H
 #define CEPH_LIBRBD_IO_AIO_COMPLETION_H
 
-#include "common/Cond.h"
-#include "common/Mutex.h"
 #include "common/ceph_time.h"
 #include "include/Context.h"
 #include "include/utime.h"
 #include "librbd/io/ReadResult.h"
 #include "librbd/io/Types.h"
 
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
 class CephContext;
 
 namespace librbd {
@@ -42,26 +44,32 @@ struct AioCompletion {
     AIO_STATE_COMPLETE,
   } aio_state_t;
 
-  mutable Mutex lock;
-  Cond cond;
-  aio_state_t state;
-  ssize_t rval;
-  callback_t complete_cb;
-  void *complete_arg;
-  rbd_completion_t rbd_comp;
-  uint32_t pending_count;   ///< number of requests
-  int ref;
-  bool released;
-  ImageCtx *ictx;
+  mutable std::mutex lock;
+  std::condition_variable cond;
+
+  callback_t complete_cb = nullptr;
+  void *complete_arg = nullptr;
+  rbd_completion_t rbd_comp = nullptr;
+
+  /// note: only using atomic for built-in memory barrier
+  std::atomic<aio_state_t> state{AIO_STATE_PENDING};
+
+  std::atomic<ssize_t> rval{0};
+  std::atomic<int> error_rval{0};
+  std::atomic<uint32_t> ref{1};
+  std::atomic<uint32_t> pending_count{0};   ///< number of requests
+  std::atomic<bool> released{false};
+
+  ImageCtx *ictx = nullptr;
   coarse_mono_time start_time;
-  aio_type_t aio_type;
+  aio_type_t aio_type = AIO_TYPE_NONE;
 
   ReadResult read_result;
 
   AsyncOperation async_op;
 
   xlist<AioCompletion*>::item m_xlist_item;
-  bool event_notify;
+  bool event_notify = false;
 
   template <typename T, void (T::*MF)(int)>
   static void callback_adapter(completion_t cb, void *arg) {
@@ -96,12 +104,7 @@ struct AioCompletion {
     return comp;
   }
 
-  AioCompletion() : lock("AioCompletion::lock", true, false),
-                    state(AIO_STATE_PENDING), rval(0), complete_cb(NULL),
-                    complete_arg(NULL), rbd_comp(NULL),
-                    pending_count(0), ref(1), released(false), ictx(NULL),
-                    aio_type(AIO_TYPE_NONE), m_xlist_item(this),
-                    event_notify(false) {
+  AioCompletion() : m_xlist_item(this) {
   }
 
   ~AioCompletion() {
@@ -109,14 +112,14 @@ struct AioCompletion {
 
   int wait_for_complete();
 
-  void finalize(ssize_t rval);
+  void finalize();
 
   inline bool is_initialized(aio_type_t type) const {
-    Mutex::Locker locker(lock);
+    std::unique_lock<std::mutex> locker(lock);
     return ((ictx != nullptr) && (aio_type == type));
   }
   inline bool is_started() const {
-    Mutex::Locker locker(lock);
+    std::unique_lock<std::mutex> locker(lock);
     return async_op.started();
   }
 
@@ -133,9 +136,7 @@ struct AioCompletion {
 
   void set_request_count(uint32_t num);
   void add_request() {
-    lock.Lock();
     ceph_assert(pending_count > 0);
-    lock.Unlock();
     get();
   }
   void complete_request(ssize_t r);
@@ -145,26 +146,19 @@ struct AioCompletion {
   ssize_t get_return_value();
 
   void get() {
-    lock.Lock();
     ceph_assert(ref > 0);
-    ref++;
-    lock.Unlock();
+    ++ref;
   }
   void release() {
-    lock.Lock();
-    ceph_assert(!released);
-    released = true;
-    put_unlock();
+    bool previous_released = released.exchange(true);
+    ceph_assert(!previous_released);
+    put();
   }
   void put() {
-    lock.Lock();
-    put_unlock();
-  }
-  void put_unlock() {
-    ceph_assert(ref > 0);
-    int n = --ref;
-    lock.Unlock();
-    if (!n) {
+    uint32_t previous_ref = ref--;
+    ceph_assert(previous_ref > 0);
+
+    if (previous_ref == 1) {
       if (ictx != nullptr && event_notify) {
         ictx->completed_reqs_lock.Lock();
         m_xlist_item.remove_myself();
@@ -175,7 +169,6 @@ struct AioCompletion {
   }
 
   void set_event_notify(bool s) {
-    Mutex::Locker l(lock);
     event_notify = s;
   }
 
index c24d8b4a7fc515b7fad60ea7c0cf9bd39744487a..8ce351c618a8dd9bad602aef56d08a52cb8ef6f2 100644 (file)
@@ -96,10 +96,10 @@ void ReadResult::C_ImageReadRequest::finish(int r) {
     }
     ceph_assert(length == bl.length());
 
-    aio_completion->lock.Lock();
+    aio_completion->lock.lock();
     aio_completion->read_result.m_destriper.add_partial_result(
       cct, bl, image_extents);
-    aio_completion->lock.Unlock();
+    aio_completion->lock.unlock();
     r = length;
   }
 
@@ -131,10 +131,10 @@ void ReadResult::C_ObjectReadRequest::finish(int r) {
       extent_map[object_off] = bl.length();
     }
 
-    aio_completion->lock.Lock();
+    aio_completion->lock.lock();
     aio_completion->read_result.m_destriper.add_partial_sparse_result(
       cct, bl, extent_map, object_off, buffer_extents);
-    aio_completion->lock.Unlock();
+    aio_completion->lock.unlock();
 
     r = object_len;
   }
index 48c438f7eeebe47915a7be879268acf1be3271d6..f6d8ca2c22d9850b70c331e4c31ac12974e60271 100644 (file)
@@ -101,9 +101,8 @@ struct C_AioCompletion : public Context {
     if (r < 0) {
       aio_comp->fail(r);
     } else {
-      aio_comp->lock.Lock();
       aio_comp->complete();
-      aio_comp->put_unlock();
+      aio_comp->put();
     }
   }
 };