// vim: ts=8 sw=2 smarttab
#include "librbd/AsioEngine.h"
+#include "include/Context.h"
#include "include/stringify.h"
#include "include/neorados/RADOS.hpp"
#include "include/rados/librados.hpp"
ldout(m_cct, 20) << dendl;
}
+void AsioEngine::dispatch(Context* ctx, int r) {
+ dispatch([ctx, r]() { ctx->complete(r); });
+}
+
+void AsioEngine::post(Context* ctx, int r) {
+ post([ctx, r]() { ctx->complete(r); });
+}
+
} // namespace librbd
#include "include/common_fwd.h"
#include "include/rados/librados_fwd.hpp"
#include <memory>
+#include <boost/asio/dispatch.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
+#include <boost/asio/post.hpp>
+struct Context;
namespace neorados { struct RADOS; }
namespace librbd {
return m_context_wq.get();
}
+ template <typename T>
+ void dispatch(T&& t) {
+ boost::asio::dispatch(m_io_context, std::forward<T>(t));
+ }
+ void dispatch(Context* ctx, int r);
+
+ template <typename T>
+ void post(T&& t) {
+ boost::asio::post(m_io_context, std::forward<T>(t));
+ }
+ void post(Context* ctx, int r);
+
private:
std::shared_ptr<neorados::RADOS> m_rados_api;
CephContext* m_cct;
add_request();
// ensure completion fires in clean lock context
- boost::asio::post(ictx->asio_engine, boost::asio::bind_executor(
- ictx->asio_engine.get_api_strand(), [this]() {
+ boost::asio::post(*ictx->asio_engine, boost::asio::bind_executor(
+ ictx->asio_engine->get_api_strand(), [this]() {
complete_request(0);
}));
}
// ensure librbd external users never experience concurrent callbacks
// from multiple librbd-internal threads.
- boost::asio::dispatch(ictx->asio_engine, boost::asio::bind_executor(
- ictx->asio_engine.get_api_strand(), [this]() {
+ boost::asio::dispatch(*ictx->asio_engine, boost::asio::bind_executor(
+ ictx->asio_engine->get_api_strand(), [this]() {
complete_cb(rbd_comp, complete_arg);
complete_event_socket();
put();
// vim: ts=8 sw=2 smarttab
#include "librbd/io/AsyncOperation.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/asio/ContextWQ.h"
-#include "common/dout.h"
#include "include/ceph_assert.h"
+#include "common/dout.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
if (!m_flush_contexts.empty()) {
C_CompleteFlushes *ctx = new C_CompleteFlushes(m_image_ctx,
std::move(m_flush_contexts));
- m_image_ctx->op_work_queue->queue(ctx);
+ m_image_ctx->asio_engine->post(ctx, 0);
}
}
}
}
- m_image_ctx->op_work_queue->queue(on_finish);
+ m_image_ctx->asio_engine->post(on_finish, 0);
}
} // namespace io
#include "common/ceph_mutex.h"
#include "common/dout.h"
#include "common/errno.h"
+#include "librbd/AsioEngine.h"
#include "librbd/AsyncObjectThrottle.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
if (m_image_ctx->parent == nullptr) {
ldout(cct, 5) << "parent detached" << dendl;
- m_image_ctx->op_work_queue->queue(
- util::create_context_callback<
- CopyupRequest<I>, &CopyupRequest<I>::handle_read_from_parent>(this),
- -ENOENT);
+ m_image_ctx->asio_engine->post(
+ [this]() { handle_read_from_parent(-ENOENT); });
return;
} else if (is_deep_copy()) {
deep_copy();
void CopyupRequest<I>::handle_copyup(int r) {
auto cct = m_image_ctx->cct;
unsigned pending_copyups;
+ int copyup_ret_val = r;
{
std::lock_guard locker{m_lock};
ceph_assert(m_pending_copyups > 0);
pending_copyups = --m_pending_copyups;
+ if (m_copyup_ret_val < 0) {
+ copyup_ret_val = m_copyup_ret_val;
+ } else if (r < 0) {
+ m_copyup_ret_val = r;
+ }
}
ldout(cct, 20) << "r=" << r << ", "
<< "pending=" << pending_copyups << dendl;
- if (r < 0 && r != -ENOENT) {
- lderr(cct) << "failed to copyup object: " << cpp_strerror(r) << dendl;
- complete_requests(false, r);
- }
-
if (pending_copyups == 0) {
+ if (copyup_ret_val < 0 && copyup_ret_val != -ENOENT) {
+ lderr(cct) << "failed to copyup object: " << cpp_strerror(copyup_ret_val)
+ << dendl;
+ complete_requests(false, copyup_ret_val);
+ }
+
finish(0);
}
}
deep_copied.insert(it.second.front());
}
}
+ ldout(m_image_ctx->cct, 15) << "deep_copied=" << deep_copied << dendl;
std::copy_if(m_image_ctx->snaps.rbegin(), m_image_ctx->snaps.rend(),
std::back_inserter(m_snap_ids),
ceph::mutex m_lock = ceph::make_mutex("CopyupRequest", false);
WriteRequests m_pending_requests;
unsigned m_pending_copyups = 0;
+ int m_copyup_ret_val = 0;
WriteRequests m_restart_requests;
bool m_append_request_permitted = true;
#include "librbd/io/ObjectDispatch.h"
#include "common/dout.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
-#include "librbd/asio/ContextWQ.h"
#include "librbd/io/ObjectRequest.h"
#define dout_subsys ceph_subsys_rbd
auto cct = m_image_ctx->cct;
ldout(cct, 5) << dendl;
- m_image_ctx->op_work_queue->queue(on_finish, 0);
+ m_image_ctx->asio_engine->post(on_finish, 0);
}
template <typename I>
#include "include/err.h"
#include "osd/osd_types.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/ObjectMap.h"
#include "librbd/Utils.h"
-#include "librbd/asio/ContextWQ.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/CopyupRequest.h"
#include "librbd/io/ImageRequest.h"
template <typename I>
void ObjectRequest<I>::async_finish(int r) {
ldout(m_ictx->cct, 20) << "r=" << r << dendl;
- m_ictx->op_work_queue->queue(util::create_context_callback<
- ObjectRequest<I>, &ObjectRequest<I>::finish>(this), r);
+ m_ictx->asio_engine->post([this, r]() { finish(r); });
}
template <typename I>
std::shared_lock image_locker{image_ctx->image_lock};
if (image_ctx->object_map != nullptr &&
!image_ctx->object_map->object_may_exist(this->m_object_no)) {
- image_ctx->op_work_queue->queue(new LambdaContext([this](int r) {
- read_parent();
- }), 0);
+ image_ctx->asio_engine->post([this]() { read_parent(); });
return;
}
}
#include "librbd/io/QosImageDispatch.h"
#include "common/dout.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
-#include "librbd/asio/ContextWQ.h"
#include "librbd/io/FlushTracker.h"
#include <map>
if (set_throttle_flag(tag.image_dispatch_flags, flag)) {
// timer_lock is held -- so dispatch from outside the timer thread
- m_image_ctx->op_work_queue->queue(tag.on_dispatched, 0);
+ m_image_ctx->asio_engine->post(tag.on_dispatched, 0);
}
}
#include "librbd/io/QueueImageDispatch.h"
#include "common/dout.h"
#include "common/Cond.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
-#include "librbd/asio/ContextWQ.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageDispatchSpec.h"
}
*dispatch_result = DISPATCH_RESULT_CONTINUE;
- m_image_ctx->op_work_queue->queue(on_dispatched, 0);
+ m_image_ctx->asio_engine->post(on_dispatched, 0);
return true;
}
#include "librbd/io/SimpleSchedulerObjectDispatch.h"
#include "common/Timer.h"
#include "common/errno.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
-#include "librbd/asio/ContextWQ.h"
#include "librbd/io/ObjectDispatchSpec.h"
#include "librbd/io/ObjectDispatcher.h"
#include "librbd/io/Utils.h"
ldout(cct, 20) << "running timer task " << m_timer_task << dendl;
m_timer_task = nullptr;
- m_image_ctx->op_work_queue->queue(
- new LambdaContext(
- [this, object_no](int r) {
- std::lock_guard locker{m_lock};
- dispatch_delayed_requests(object_no);
- }), 0);
+ m_image_ctx->asio_engine->post(
+ [this, object_no]() {
+ std::lock_guard locker{m_lock};
+ dispatch_delayed_requests(object_no);
+ });
});
ldout(cct, 20) << "scheduling task " << m_timer_task << " at "