From: Jason Dillaman Date: Tue, 16 Jun 2020 16:59:11 +0000 (-0400) Subject: librbd: switch external API callbacks to use dedicated asio strand X-Git-Tag: v17.0.0~1717^2~10 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=11ae84df8badc955e45f2f092d77926a744cfb48;p=ceph.git librbd: switch external API callbacks to use dedicated asio strand This ensures that the API callers will not receive concurrent callbacks and allows internal AioCompletion users to be able to use all available asio dispatch threads. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/AsioEngine.cc b/src/librbd/AsioEngine.cc index 55916ab654e66..3e62988283be7 100644 --- a/src/librbd/AsioEngine.cc +++ b/src/librbd/AsioEngine.cc @@ -20,6 +20,7 @@ AsioEngine::AsioEngine(std::shared_ptr rados) neorados::RADOS::make_with_librados(*rados))), m_cct(m_rados_api->cct()), m_io_context(m_rados_api->get_io_context()), + m_api_strand(m_io_context), m_context_wq(std::make_unique(m_io_context)) { ldout(m_cct, 20) << dendl; diff --git a/src/librbd/AsioEngine.h b/src/librbd/AsioEngine.h index a06b611b49d13..d338431372498 100644 --- a/src/librbd/AsioEngine.h +++ b/src/librbd/AsioEngine.h @@ -8,6 +8,7 @@ #include "include/rados/librados_fwd.hpp" #include #include +#include namespace neorados { struct RADOS; } @@ -39,6 +40,11 @@ public: return m_io_context.get_executor(); } + inline boost::asio::io_context::strand& get_api_strand() { + // API client callbacks should never fire concurrently + return m_api_strand; + } + inline asio::ContextWQ* get_work_queue() { return m_context_wq.get(); } @@ -48,6 +54,7 @@ private: CephContext* m_cct; boost::asio::io_context& m_io_context; + boost::asio::io_context::strand m_api_strand; std::unique_ptr m_context_wq; }; diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index f8730a40cc651..2c238abbde73a 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -117,7 +117,6 @@ librados::IoCtx duplicate_io_ctx(librados::IoCtx& io_ctx) { exclusive_lock(nullptr), object_map(nullptr), op_work_queue(asio_engine->get_work_queue()), plugin_registry(new PluginRegistry(this)), - external_callback_completions(32), event_socket_completions(32), asok_hook(nullptr), trace_endpoint("librbd") diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index af501ced21647..de783e0fb3c68 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -204,9 +204,6 @@ namespace librbd { io::AioCompletion*, boost::lockfree::allocator>> Completions; - Completions external_callback_completions; - std::atomic external_callback_in_progress = {false}; - Completions event_socket_completions; EventSocket event_socket; diff --git a/src/librbd/io/AioCompletion.cc b/src/librbd/io/AioCompletion.cc index dc933a12084f1..24e7afa4c75b7 100644 --- a/src/librbd/io/AioCompletion.cc +++ b/src/librbd/io/AioCompletion.cc @@ -9,11 +9,13 @@ #include "common/errno.h" #include "common/perf_counters.h" +#include "librbd/AsioEngine.h" #include "librbd/ImageCtx.h" #include "librbd/internal.h" #include "librbd/Journal.h" #include "librbd/Types.h" -#include "librbd/asio/ContextWQ.h" +#include +#include #ifdef WITH_LTTNG #include "tracing/librbd.h" @@ -152,8 +154,13 @@ void AioCompletion::queue_complete() { pending_count.compare_exchange_strong(zero, 1); ceph_assert(zero == 0); + add_request(); + // ensure completion fires in clean lock context - ictx->op_work_queue->queue(new C_AioRequest(this), 0); + boost::asio::post(ictx->asio_engine, boost::asio::bind_executor( + ictx->asio_engine.get_api_strand(), [this]() { + complete_request(0); + })); } void AioCompletion::block(CephContext* cct) { @@ -250,29 +257,16 @@ ssize_t AioCompletion::get_return_value() { } void AioCompletion::complete_external_callback() { + get(); + // ensure librbd external users never experience concurrent callbacks // from multiple librbd-internal threads. - ictx->external_callback_completions.push(this); - - while (true) { - if (ictx->external_callback_in_progress.exchange(true)) { - // another thread is concurrently invoking external callbacks - break; - } - - AioCompletion* aio_comp; - while (ictx->external_callback_completions.pop(aio_comp)) { - aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg); - aio_comp->complete_event_socket(); - } - - ictx->external_callback_in_progress.store(false); - if (ictx->external_callback_completions.empty()) { - // queue still empty implies we didn't have a race between the last failed - // pop and resetting the in-progress state - break; - } - } + boost::asio::dispatch(ictx->asio_engine, boost::asio::bind_executor( + ictx->asio_engine.get_api_strand(), [this]() { + complete_cb(rbd_comp, complete_arg); + complete_event_socket(); + put(); + })); } void AioCompletion::complete_event_socket() {