neorados::RADOS::make_with_librados(*rados))),
m_cct(m_rados_api->cct()),
m_io_context(m_rados_api->get_io_context()),
+ m_api_strand(m_io_context),
m_context_wq(std::make_unique<asio::ContextWQ>(m_io_context)) {
ldout(m_cct, 20) << dendl;
#include "include/rados/librados_fwd.hpp"
#include <memory>
#include <boost/asio/io_context.hpp>
+#include <boost/asio/io_context_strand.hpp>
namespace neorados { struct RADOS; }
return m_io_context.get_executor();
}
+ inline boost::asio::io_context::strand& get_api_strand() {
+ // API client callbacks should never fire concurrently
+ return m_api_strand;
+ }
+
inline asio::ContextWQ* get_work_queue() {
return m_context_wq.get();
}
CephContext* m_cct;
boost::asio::io_context& m_io_context;
+ boost::asio::io_context::strand m_api_strand;
std::unique_ptr<asio::ContextWQ> m_context_wq;
};
exclusive_lock(nullptr), object_map(nullptr),
op_work_queue(asio_engine->get_work_queue()),
plugin_registry(new PluginRegistry<ImageCtx>(this)),
- external_callback_completions(32),
event_socket_completions(32),
asok_hook(nullptr),
trace_endpoint("librbd")
#include "common/errno.h"
#include "common/perf_counters.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/Journal.h"
#include "librbd/Types.h"
-#include "librbd/asio/ContextWQ.h"
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/post.hpp>
#ifdef WITH_LTTNG
#include "tracing/librbd.h"
pending_count.compare_exchange_strong(zero, 1);
ceph_assert(zero == 0);
+ add_request();
+
// ensure completion fires in clean lock context
- ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+ boost::asio::post(ictx->asio_engine, boost::asio::bind_executor(
+ ictx->asio_engine.get_api_strand(), [this]() {
+ complete_request(0);
+ }));
}
void AioCompletion::block(CephContext* cct) {
}
void AioCompletion::complete_external_callback() {
+ get();
+
// ensure librbd external users never experience concurrent callbacks
// from multiple librbd-internal threads.
- ictx->external_callback_completions.push(this);
-
- while (true) {
- if (ictx->external_callback_in_progress.exchange(true)) {
- // another thread is concurrently invoking external callbacks
- break;
- }
-
- AioCompletion* aio_comp;
- while (ictx->external_callback_completions.pop(aio_comp)) {
- aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg);
- aio_comp->complete_event_socket();
- }
-
- ictx->external_callback_in_progress.store(false);
- if (ictx->external_callback_completions.empty()) {
- // queue still empty implies we didn't have a race between the last failed
- // pop and resetting the in-progress state
- break;
- }
- }
+ boost::asio::dispatch(ictx->asio_engine, boost::asio::bind_executor(
+ ictx->asio_engine.get_api_strand(), [this]() {
+ complete_cb(rbd_comp, complete_arg);
+ complete_event_socket();
+ put();
+ }));
}
void AioCompletion::complete_event_socket() {