--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/QueueImageDispatch.h"
+#include "common/dout.h"
+#include "common/Cond.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/io/ImageRequestWQ.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::QueueImageDispatch: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+template <typename I>
+QueueImageDispatch<I>::QueueImageDispatch(I* image_ctx)
+ : m_image_ctx(image_ctx),
+ m_lock(ceph::make_shared_mutex(
+ util::unique_lock_name("librbd::io::QueueImageDispatch::m_lock",
+ this))) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+}
+
+template <typename I>
+void QueueImageDispatch<I>::shut_down(Context* on_finish) {
+ on_finish->complete(0);
+}
+
+template <typename I>
+int QueueImageDispatch<I>::block_writes() {
+ C_SaferCond cond_ctx;
+ block_writes(&cond_ctx);
+ return cond_ctx.wait();
+}
+
+template <typename I>
+void QueueImageDispatch<I>::block_writes(Context *on_blocked) {
+ ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock));
+ auto cct = m_image_ctx->cct;
+
+ // TODO temp
+ auto ctx = new C_Gather(cct, on_blocked);
+ m_image_ctx->io_work_queue->block_writes(ctx->new_sub());
+
+ {
+ std::unique_lock locker{m_lock};
+ ++m_write_blockers;
+ ldout(cct, 5) << m_image_ctx << ", "
+ << "num=" << m_write_blockers << dendl;
+ if (!m_write_blocker_contexts.empty() || !m_in_flight_write_tids.empty()) {
+ ldout(cct, 5) << "waiting for in-flight writes to complete: "
+ << "write_tids=" << m_in_flight_write_tids << dendl;
+ m_write_blocker_contexts.push_back(ctx->new_sub());
+ ctx->activate();
+ return;
+ }
+ }
+
+ // ensure that all in-flight IO is flushed
+ flush_image(ctx->new_sub());
+ ctx->activate();
+};
+
+template <typename I>
+void QueueImageDispatch<I>::unblock_writes() {
+ auto cct = m_image_ctx->cct;
+
+ Contexts waiter_contexts;
+ Contexts dispatch_contexts;
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_write_blockers > 0);
+ --m_write_blockers;
+
+ ldout(cct, 5) << m_image_ctx << ", "
+ << "num=" << m_write_blockers << dendl;
+ if (m_write_blockers == 0) {
+ std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
+ std::swap(dispatch_contexts, m_on_dispatches);
+ }
+ }
+
+ for (auto ctx : waiter_contexts) {
+ ctx->complete(0);
+ }
+
+ for (auto ctx : dispatch_contexts) {
+ ctx->complete(0);
+ }
+
+ // TODO temp
+ m_image_ctx->io_work_queue->unblock_writes();
+}
+
+template <typename I>
+void QueueImageDispatch<I>::wait_on_writes_unblocked(Context *on_unblocked) {
+ ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock));
+ auto cct = m_image_ctx->cct;
+
+ // TODO temp
+ auto ctx = new C_Gather(cct, on_unblocked);
+ m_image_ctx->io_work_queue->wait_on_writes_unblocked(ctx->new_sub());
+
+ {
+ std::unique_lock locker{m_lock};
+ ldout(cct, 20) << m_image_ctx << ", "
+ << "write_blockers=" << m_write_blockers << dendl;
+ if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
+ m_unblocked_write_waiter_contexts.push_back(ctx->new_sub());
+ ctx->activate();
+ return;
+ }
+ }
+
+ ctx->activate();
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::read(
+ AioCompletion* aio_comp, Extents &&image_extents, ReadResult &&read_result,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ return enqueue(true, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::discard(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::write_same(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::compare_and_write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::flush(
+ AioCompletion* aio_comp, FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ if (flush_source != FLUSH_SOURCE_USER) {
+ return false;
+ }
+
+ return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+void QueueImageDispatch<I>::handle_finished(int r, uint64_t tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl;
+
+ std::unique_lock locker{m_lock};
+ auto it = m_in_flight_write_tids.find(tid);
+ if (it == m_in_flight_write_tids.end()) {
+ // assumed to be a read op
+ return;
+ }
+ m_in_flight_write_tids.erase(it);
+
+ Contexts write_blocker_contexts;
+ if (m_in_flight_write_tids.empty()) {
+ std::swap(write_blocker_contexts, m_write_blocker_contexts);
+ }
+ locker.unlock();
+
+ for (auto ctx : write_blocker_contexts) {
+ ctx->complete(0);
+ }
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::enqueue(
+ bool read_op, uint64_t tid, DispatchResult* dispatch_result,
+ Context* on_dispatched) {
+ std::unique_lock locker{m_lock};
+ if (!read_op) {
+ if (m_write_blockers > 0 || !m_on_dispatches.empty()) {
+ *dispatch_result = DISPATCH_RESULT_RESTART;
+ m_on_dispatches.push_back(on_dispatched);
+ return true;
+ }
+
+ m_in_flight_write_tids.insert(tid);
+ }
+ locker.unlock();
+
+ if (!m_image_ctx->non_blocking_aio) {
+ return false;
+ }
+
+ *dispatch_result = DISPATCH_RESULT_CONTINUE;
+ m_image_ctx->op_work_queue->queue(on_dispatched, 0);
+ return true;
+}
+
+template <typename I>
+void QueueImageDispatch<I>::flush_image(Context* on_finish) {
+ auto aio_comp = AioCompletion::create_and_start(
+ on_finish, util::get_image_ctx(m_image_ctx), librbd::io::AIO_TYPE_FLUSH);
+ auto req = ImageDispatchSpec<I>::create_flush_request(
+ *m_image_ctx, IMAGE_DISPATCH_LAYER_QUEUE, aio_comp, FLUSH_SOURCE_INTERNAL,
+ {});
+ req->send();
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::QueueImageDispatch<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H
+#define CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H
+
+#include "librbd/io/ImageDispatchInterface.h"
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/ceph_mutex.h"
+#include "common/zipkin_trace.h"
+#include "common/Throttle.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/io/Types.h"
+#include <list>
+#include <set>
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct AioCompletion;
+
+template <typename ImageCtxT>
+class QueueImageDispatch : public ImageDispatchInterface {
+public:
+ QueueImageDispatch(ImageCtxT* image_ctx);
+
+ ImageDispatchLayer get_dispatch_layer() const override {
+ return IMAGE_DISPATCH_LAYER_QUEUE;
+ }
+
+ void shut_down(Context* on_finish) override;
+
+ int block_writes();
+ void block_writes(Context *on_blocked);
+ void unblock_writes();
+
+ inline bool writes_blocked() const {
+ std::shared_lock locker{m_lock};
+ return (m_write_blockers > 0);
+ }
+
+ void wait_on_writes_unblocked(Context *on_unblocked);
+
+ bool read(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ ReadResult &&read_result, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool discard(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ uint32_t discard_granularity_bytes,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool write_same(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool compare_and_write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool flush(
+ AioCompletion* aio_comp, FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+
+ void handle_finished(int r, uint64_t tid) override;
+
+private:
+ typedef std::list<Context*> Contexts;
+ typedef std::set<uint64_t> Tids;
+
+ ImageCtxT* m_image_ctx;
+
+ mutable ceph::shared_mutex m_lock;
+ Contexts m_on_dispatches;
+ Tids m_in_flight_write_tids;
+
+ uint32_t m_write_blockers = 0;
+ Contexts m_write_blocker_contexts;
+ Contexts m_unblocked_write_waiter_contexts;
+
+ bool enqueue(bool read_op, uint64_t tid, DispatchResult* dispatch_result,
+ Context* on_dispatched);
+ void flush_image(Context* on_blocked);
+
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::QueueImageDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H