// vim: ts=8 sw=2 smarttab
#include "librbd/exclusive_lock/ImageDispatch.h"
+#include "include/Context.h"
#include "common/dout.h"
#include "common/errno.h"
#include "common/WorkQueue.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/exclusive_lock/Policy.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/FlushTracker.h"
+#include "librbd/io/ImageDispatchSpec.h"
#include "librbd/io/ImageDispatcherInterface.h"
#include "librbd/io/ImageRequestWQ.h"
: m_image_ctx(image_ctx),
m_lock(ceph::make_shared_mutex(
util::unique_lock_name("librbd::exclusve_lock::ImageDispatch::m_lock",
- this))) {
+ this))),
+ m_flush_tracker(new io::FlushTracker<I>(image_ctx)) {
+}
+
+template <typename I>
+ImageDispatch<I>::~ImageDispatch() {
+ delete m_flush_tracker;
}
template <typename I>
void ImageDispatch<I>::shut_down(Context* on_finish) {
- // TODO
+ // TODO remove when ImageRequestWQ is removed
unset_require_lock(io::DIRECTION_BOTH);
+ // release any IO waiting on exclusive lock
+ Contexts on_dispatches;
+ {
+ std::unique_lock locker{m_lock};
+ std::swap(on_dispatches, m_on_dispatches);
+ }
+
+ for (auto ctx : on_dispatches) {
+ ctx->complete(0);
+ }
+
+ // ensure we don't have any pending flushes before deleting layer
+ m_flush_tracker->shut_down();
on_finish->complete(0);
}
template <typename I>
void ImageDispatch<I>::set_require_lock(io::Direction direction,
Context* on_finish) {
- bool blocked = set_require_lock(direction, true);
+ auto cct = m_image_ctx->cct;
- // TODO
- if (blocked) {
- std::shared_lock owner_locker{m_image_ctx->owner_lock};
- m_image_ctx->io_image_dispatcher->block_writes(on_finish);
- } else {
- on_finish->complete(0);
+ // pause any matching IO from proceeding past this layer
+ set_require_lock(direction, true);
+
+ auto ctx = new C_Gather(cct, on_finish);
+
+ // passive wait for in-flight IO to complete
+ m_flush_tracker->flush(ctx->new_sub());
+
+ if (direction != io::DIRECTION_READ) {
+ // push through an flush for any in-flight writes at lower levels
+ auto aio_comp = io::AioCompletion::create_and_start(
+ ctx->new_sub(), util::get_image_ctx(m_image_ctx), io::AIO_TYPE_FLUSH);
+ auto req = io::ImageDispatchSpec<I>::create_flush_request(
+ *m_image_ctx, io::IMAGE_DISPATCH_LAYER_EXCLUSIVE_LOCK, aio_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ req->send();
}
+
+ ctx->activate();
}
template <typename I>
void ImageDispatch<I>::unset_require_lock(io::Direction direction) {
- bool unblocked = set_require_lock(direction, false);
-
- // TODO
- if (unblocked) {
- m_image_ctx->io_image_dispatcher->unblock_writes();
- }
+ set_require_lock(direction, false);
}
template <typename I>
auto cct = m_image_ctx->cct;
ldout(cct, 20) << "image_extents=" << image_extents << dendl;
- if (needs_exclusive_lock(true, dispatch_result, on_dispatched)) {
+ if (needs_exclusive_lock(true, tid, dispatch_result, on_dispatched)) {
return true;
}
+ m_flush_tracker->start_io(tid);
return false;
}
ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
<< dendl;
- if (needs_exclusive_lock(false, dispatch_result, on_dispatched)) {
+ if (needs_exclusive_lock(false, tid, dispatch_result, on_dispatched)) {
return true;
}
+ m_flush_tracker->start_io(tid);
return false;
}
ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
<< dendl;
- if (needs_exclusive_lock(false, dispatch_result, on_dispatched)) {
+ if (needs_exclusive_lock(false, tid, dispatch_result, on_dispatched)) {
return true;
}
+ m_flush_tracker->start_io(tid);
return false;
}
ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
<< dendl;
- if (needs_exclusive_lock(false, dispatch_result, on_dispatched)) {
+ if (needs_exclusive_lock(false, tid, dispatch_result, on_dispatched)) {
return true;
}
+ m_flush_tracker->start_io(tid);
return false;
}
ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
<< dendl;
- if (needs_exclusive_lock(false, dispatch_result, on_dispatched)) {
+ if (needs_exclusive_lock(false, tid, dispatch_result, on_dispatched)) {
return true;
}
+ m_flush_tracker->start_io(tid);
return false;
}
return false;
}
- if (needs_exclusive_lock(false, dispatch_result, on_dispatched)) {
+ if (needs_exclusive_lock(false, tid, dispatch_result, on_dispatched)) {
return true;
}
- return false;
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ m_flush_tracker->flush(on_dispatched);
+ return true;
+}
+
+template <typename I>
+void ImageDispatch<I>::handle_finished(int r, uint64_t tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ m_flush_tracker->finish_io(tid);
}
template <typename I>
}
template <typename I>
-bool ImageDispatch<I>::needs_exclusive_lock(bool read_op,
+bool ImageDispatch<I>::needs_exclusive_lock(bool read_op, uint64_t tid,
io::DispatchResult* dispatch_result,
Context* on_dispatched) {
auto cct = m_image_ctx->cct;
#include "common/zipkin_trace.h"
#include "librbd/io/ReadResult.h"
#include "librbd/io/Types.h"
+#include <atomic>
#include <list>
+#include <unordered_set>
struct Context;
struct ImageCtx;
-namespace io { struct AioCompletion; }
+namespace io {
+struct AioCompletion;
+template <typename> struct FlushTracker;
+}
namespace exclusive_lock {
}
ImageDispatch(ImageCtxT* image_ctx);
+ ~ImageDispatch() override;
io::ImageDispatchLayer get_dispatch_layer() const override {
return io::IMAGE_DISPATCH_LAYER_EXCLUSIVE_LOCK;
std::atomic<uint32_t>* image_dispatch_flags,
io::DispatchResult* dispatch_result, Context* on_dispatched) override;
- void handle_finished(int r, uint64_t tid) override {}
+ void handle_finished(int r, uint64_t tid) override;
private:
typedef std::list<Context*> Contexts;
+ typedef std::unordered_set<uint64_t> Tids;
ImageCtxT* m_image_ctx;
mutable ceph::shared_mutex m_lock;
bool m_require_lock_on_read = false;
bool m_require_lock_on_write = false;
+ io::FlushTracker<ImageCtxT>* m_flush_tracker = nullptr;
Contexts m_on_dispatches;
bool set_require_lock(io::Direction direction, bool enabled);
bool is_lock_required(bool read_op) const;
- bool needs_exclusive_lock(bool read_op, io::DispatchResult* dispatch_result,
+ bool needs_exclusive_lock(bool read_op, uint64_t tid,
+ io::DispatchResult* dispatch_result,
Context* on_dispatched);
void handle_acquire_lock(int r);
-
};
} // namespace exclusiv_lock