]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: switch external API callbacks to use dedicated asio strand
authorJason Dillaman <dillaman@redhat.com>
Tue, 16 Jun 2020 16:59:11 +0000 (12:59 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 16 Jul 2020 19:59:31 +0000 (15:59 -0400)
This ensures that the API callers will not receive concurrent
callbacks and allows internal AioCompletion users to be able to
use all available asio dispatch threads.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AsioEngine.cc
src/librbd/AsioEngine.h
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/io/AioCompletion.cc

index 55916ab654e66a8079f65799d280cc557b975211..3e62988283be78e28ccb86bc30394679b6901f95 100644 (file)
@@ -20,6 +20,7 @@ AsioEngine::AsioEngine(std::shared_ptr<librados::Rados> rados)
       neorados::RADOS::make_with_librados(*rados))),
     m_cct(m_rados_api->cct()),
     m_io_context(m_rados_api->get_io_context()),
+    m_api_strand(m_io_context),
     m_context_wq(std::make_unique<asio::ContextWQ>(m_io_context)) {
   ldout(m_cct, 20) << dendl;
 
index a06b611b49d13ac0421cecaf09aa91c90b47ceee..d338431372498fc6a97ea37610ec426ef771518f 100644 (file)
@@ -8,6 +8,7 @@
 #include "include/rados/librados_fwd.hpp"
 #include <memory>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/io_context_strand.hpp>
 
 namespace neorados { struct RADOS; }
 
@@ -39,6 +40,11 @@ public:
     return m_io_context.get_executor();
   }
 
+  inline boost::asio::io_context::strand& get_api_strand() {
+    // API client callbacks should never fire concurrently
+    return m_api_strand;
+  }
+
   inline asio::ContextWQ* get_work_queue() {
     return m_context_wq.get();
   }
@@ -48,6 +54,7 @@ private:
   CephContext* m_cct;
 
   boost::asio::io_context& m_io_context;
+  boost::asio::io_context::strand m_api_strand;
   std::unique_ptr<asio::ContextWQ> m_context_wq;
 };
 
index f8730a40cc651ff63c7114d1d2646ced556147f4..2c238abbde73a5d396d51c152ce1321ad2789fdc 100644 (file)
@@ -117,7 +117,6 @@ librados::IoCtx duplicate_io_ctx(librados::IoCtx& io_ctx) {
       exclusive_lock(nullptr), object_map(nullptr),
       op_work_queue(asio_engine->get_work_queue()),
       plugin_registry(new PluginRegistry<ImageCtx>(this)),
-      external_callback_completions(32),
       event_socket_completions(32),
       asok_hook(nullptr),
       trace_endpoint("librbd")
index af501ced216470bb7ba1b936afcabd677896bcd2..de783e0fb3c68ceae2d0d5ac121bacda2d1adc2f 100644 (file)
@@ -204,9 +204,6 @@ namespace librbd {
       io::AioCompletion*,
       boost::lockfree::allocator<ceph::allocator<void>>> Completions;
 
-    Completions external_callback_completions;
-    std::atomic<bool> external_callback_in_progress = {false};
-
     Completions event_socket_completions;
     EventSocket event_socket;
 
index dc933a12084f12c05da7eb29305df83192c18ff9..24e7afa4c75b7fb0f6e2b5c3a3d5e6525a420274 100644 (file)
@@ -9,11 +9,13 @@
 #include "common/errno.h"
 #include "common/perf_counters.h"
 
+#include "librbd/AsioEngine.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
 #include "librbd/Journal.h"
 #include "librbd/Types.h"
-#include "librbd/asio/ContextWQ.h"
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/post.hpp>
 
 #ifdef WITH_LTTNG
 #include "tracing/librbd.h"
@@ -152,8 +154,13 @@ void AioCompletion::queue_complete() {
   pending_count.compare_exchange_strong(zero, 1);
   ceph_assert(zero == 0);
 
+  add_request();
+
   // ensure completion fires in clean lock context
-  ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+  boost::asio::post(ictx->asio_engine, boost::asio::bind_executor(
+    ictx->asio_engine.get_api_strand(), [this]() {
+      complete_request(0);
+    }));
 }
 
 void AioCompletion::block(CephContext* cct) {
@@ -250,29 +257,16 @@ ssize_t AioCompletion::get_return_value() {
 }
 
 void AioCompletion::complete_external_callback() {
+  get();
+
   // ensure librbd external users never experience concurrent callbacks
   // from multiple librbd-internal threads.
-  ictx->external_callback_completions.push(this);
-
-  while (true) {
-    if (ictx->external_callback_in_progress.exchange(true)) {
-      // another thread is concurrently invoking external callbacks
-      break;
-    }
-
-    AioCompletion* aio_comp;
-    while (ictx->external_callback_completions.pop(aio_comp)) {
-      aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg);
-      aio_comp->complete_event_socket();
-    }
-
-    ictx->external_callback_in_progress.store(false);
-    if (ictx->external_callback_completions.empty()) {
-      // queue still empty implies we didn't have a race between the last failed
-      // pop and resetting the in-progress state
-      break;
-    }
-  }
+  boost::asio::dispatch(ictx->asio_engine, boost::asio::bind_executor(
+    ictx->asio_engine.get_api_strand(), [this]() {
+      complete_cb(rbd_comp, complete_arg);
+      complete_event_socket();
+      put();
+    }));
 }
 
 void AioCompletion::complete_event_socket() {