return new ImageDispatchSpec(image_ctx, aio_comp,
std::move(image_extents),
Read{std::move(read_result)},
- op_flags, parent_trace);
+ op_flags, parent_trace, 0);
}
static ImageDispatchSpec* create_discard_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
- uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace) {
+ uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
Discard{discard_granularity_bytes},
- 0, parent_trace);
+ 0, parent_trace, tid);
}
static ImageDispatchSpec* create_write_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
- bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
+ bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents),
- Write{std::move(bl)}, op_flags, parent_trace);
+ Write{std::move(bl)}, op_flags, parent_trace, tid);
}
static ImageDispatchSpec* create_write_same_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
+ bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
WriteSame{std::move(bl)}, op_flags,
- parent_trace);
+ parent_trace, tid);
}
static ImageDispatchSpec* create_compare_and_write_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset,
- int op_flags, const ZTracer::Trace &parent_trace) {
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp,
std::move(image_extents),
CompareAndWrite{std::move(cmp_bl),
std::move(bl),
mismatch_offset},
- op_flags, parent_trace);
+ op_flags, parent_trace, tid);
}
static ImageDispatchSpec* create_flush_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp,
FlushSource flush_source, const ZTracer::Trace &parent_trace) {
return new ImageDispatchSpec(image_ctx, aio_comp, {}, Flush{flush_source},
- 0, parent_trace);
+ 0, parent_trace, 0);
}
~ImageDispatchSpec() {
return (m_throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK;
}
+ const Extents& get_image_extents() const;
+
+ AioCompletion* get_aio_completion() const {
+ return m_aio_comp;
+ }
+
+ uint64_t get_tid();
+
private:
typedef boost::variant<Read,
Discard,
ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp,
Extents&& image_extents, Request&& request,
- int op_flags, const ZTracer::Trace& parent_trace)
+ int op_flags, const ZTracer::Trace& parent_trace, uint64_t tid)
: m_image_ctx(image_ctx), m_aio_comp(aio_comp),
m_image_extents(std::move(image_extents)), m_request(std::move(request)),
- m_op_flags(op_flags), m_parent_trace(parent_trace) {
+ m_op_flags(op_flags), m_parent_trace(parent_trace), m_tid(tid) {
m_aio_comp->get();
}
Request m_request;
int m_op_flags;
ZTracer::Trace m_parent_trace;
+ uint64_t m_tid;
std::atomic<uint64_t> m_throttled_flag = 0;
uint64_t extents_length();
return;
}
+ auto tid = ++m_last_tid;
+
+ {
+ std::lock_guard locker{m_lock};
+ m_queued_or_blocked_io_tids.insert(tid);
+ }
+
+ ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
+ m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
+
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(ImageDispatchSpec<I>::create_write_request(
- m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
+ queue(req);
} else {
c->start_op();
- ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
- std::move(bl), op_flags, trace);
+ process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
return;
}
+ auto tid = ++m_last_tid;
+
+ {
+ std::lock_guard locker{m_lock};
+ m_queued_or_blocked_io_tids.insert(tid);
+ }
+
+ ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
+ m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
+
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(ImageDispatchSpec<I>::create_discard_request(
- m_image_ctx, c, off, len, discard_granularity_bytes, trace));
+ queue(req);
} else {
c->start_op();
- ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}},
- discard_granularity_bytes, trace);
+ process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
return;
}
+ auto tid = ++m_last_tid;
+
+ ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
+ m_image_ctx, c, FLUSH_SOURCE_USER, trace);
+
+ {
+ std::lock_guard locker{m_lock};
+ if(!m_queued_or_blocked_io_tids.empty()) {
+ ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
+ m_queued_flushes.emplace(tid, req);
+ --m_in_flight_ios;
+ return;
+ }
+ }
+
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
- queue(ImageDispatchSpec<I>::create_flush_request(
- m_image_ctx, c, FLUSH_SOURCE_USER, trace));
+ queue(req);
} else {
c->start_op();
- ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace);
+ process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
return;
}
+ auto tid = ++m_last_tid;
+
+ {
+ std::lock_guard locker{m_lock};
+ m_queued_or_blocked_io_tids.insert(tid);
+ }
+
+ ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
+ m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
+
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(ImageDispatchSpec<I>::create_write_same_request(
- m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
+ queue(req);
} else {
c->start_op();
- ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl),
- op_flags, trace);
+ process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
return;
}
+ auto tid = ++m_last_tid;
+
+ {
+ std::lock_guard locker{m_lock};
+ m_queued_or_blocked_io_tids.insert(tid);
+ }
+
+ ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
+ m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
+ mismatch_off, op_flags, trace, tid);
+
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(ImageDispatchSpec<I>::create_compare_and_write_request(
- m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
- mismatch_off, op_flags, trace));
+ queue(req);
} else {
c->start_op();
- ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
- std::move(cmp_bl), std::move(bl),
- mismatch_off, op_flags, trace);
+ process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
}
+template <typename I>
+bool ImageRequestWQ<I>::block_overlapping_io(
+ ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx
+ << "off: " << off << " len: " << len <<dendl;
+
+ if(len == 0) {
+ return false;
+ }
+
+ if (in_flight_image_extents->empty() ||
+ !in_flight_image_extents->intersects(off, len)) {
+ in_flight_image_extents->insert(off, len);
+ return false;
+ }
+
+ return true;
+}
+
+template <typename I>
+void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
+ uint64_t tid) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+
+ remove_in_flight_write_ios(offset, length, true, tid);
+
+ std::unique_lock locker{m_lock};
+ if (!m_blocked_ios.empty()) {
+ auto it = m_blocked_ios.begin();
+ while (it != m_blocked_ios.end()) {
+ auto next_blocked_object_ios_it = it;
+ ++next_blocked_object_ios_it;
+ auto blocked_io = *it;
+
+ if (block_overlapping_io(&m_in_flight_extents, offset, length)) {
+ break;
+ }
+ ldout(cct, 20) << "unblocking off: " << offset << ", "
+ << "len: " << length << dendl;
+ AioCompletion *aio_comp = blocked_io->get_aio_completion();
+
+ m_blocked_ios.erase(it);
+ locker.unlock();
+ queue_unblocked_io(aio_comp, blocked_io);
+ locker.lock();
+ }
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::unblock_flushes(uint64_t tid) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+ std::unique_lock locker{m_lock};
+ auto io_tid_it = m_queued_or_blocked_io_tids.begin();
+ while (true) {
+ auto it = m_queued_flushes.begin();
+ if (it == m_queued_flushes.end() ||
+ (io_tid_it != m_queued_or_blocked_io_tids.end() &&
+ *io_tid_it < it->first)) {
+ break;
+ }
+
+ auto blocked_flush = *it;
+ ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
+
+ AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
+
+ m_queued_flushes.erase(it);
+ locker.unlock();
+ queue_unblocked_io(aio_comp, blocked_flush.second);
+ locker.lock();
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
+ ImageDispatchSpec<I> *req) {
+ if (!start_in_flight_io(comp)) {
+ return;
+ }
+
+ std::shared_lock owner_locker{m_image_ctx.owner_lock};
+ queue(req);
+}
+
template <typename I>
void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
}
template <typename I>
-void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
+void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
+ bool non_blocking_io) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
+ //extents are invalidated after the request is sent
+ //so gather them ahead of that
+ const auto& extents = req->get_image_extents();
+ bool write_op = req->is_write_op();
+ uint64_t tid = req->get_tid();
+ uint64_t offset;
+ uint64_t length;
+
+ if (write_op) {
+ std::lock_guard locker{m_lock};
+ offset = extents.size() ? extents.front().first : 0;
+ length = extents.size() ? extents.front().second : 0;
+ bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
+ if (blocked) {
+ ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
+ << &m_image_ctx << ", "
+ << "off=" << offset << ", len=" << length << dendl;
+ m_blocked_ios.push_back(req);
+ --m_in_flight_ios;
+ return;
+ }
+ }
+
req->send();
- finish_queued_io(req);
- if (req->is_write_op()) {
- finish_in_flight_write();
+ if (write_op) {
+ if (non_blocking_io) {
+ finish_in_flight_write();
+ }
+ unblock_overlapping_io(offset, length, tid);
+ unblock_flushes(tid);
}
delete req;
+}
+template <typename I>
+void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+ << "req=" << req << dendl;
+
+ bool write_op = req->is_write_op();
+
+ process_io(req, true);
+
+ finish_queued_io(write_op);
finish_in_flight_io();
}
template <typename I>
-void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) {
+void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
+ bool write_op, uint64_t tid) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+ {
+ std::lock_guard locker{m_lock};
+ if (write_op) {
+ if (length > 0) {
+ if(!m_in_flight_extents.empty()) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "erasing in flight extents with tid:"
+ << tid << dendl;
+ ImageExtentIntervals extents;
+ extents.insert(offset, length);
+ ImageExtentIntervals intersect;
+ intersect.intersection_of(extents, m_in_flight_extents);
+ m_in_flight_extents.subtract(intersect);
+ }
+ }
+ m_queued_or_blocked_io_tids.erase(tid);
+ }
+ }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
std::shared_lock locker{m_lock};
- if (req->is_write_op()) {
+ if (write_op) {
ceph_assert(m_queued_writes > 0);
m_queued_writes--;
} else {
writes_blocked = true;
}
}
-
if (writes_blocked) {
flush_image(m_image_ctx, new C_BlockedWrites(this));
}
int r, ImageDispatchSpec<I> *req) {
this->process_finish();
req->fail(r);
- finish_queued_io(req);
+
+ bool write_op = req->is_write_op();
+ uint64_t tid = req->get_tid();
+ const auto& extents = req->get_image_extents();
+ uint64_t offset = extents.size() ? extents.front().first : 0;
+ uint64_t length = extents.size() ? extents.front().second : 0;
+
+ finish_queued_io(write_op);
+ remove_in_flight_write_ios(offset, length, write_op, tid);
delete req;
finish_in_flight_io();
}
#include "common/Throttle.h"
#include "common/WorkQueue.h"
#include "librbd/io/Types.h"
-
+#include "include/interval_set.h"
#include <list>
#include <atomic>
+#include <vector>
namespace librbd {
void apply_qos_schedule_tick_min(uint64_t tick);
void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst);
+
protected:
void *_void_dequeue() override;
void process(ImageDispatchSpec<ImageCtxT> *req) override;
std::atomic<unsigned> m_io_blockers { 0 };
std::atomic<unsigned> m_io_throttled { 0 };
+ typedef interval_set<uint64_t> ImageExtentIntervals;
+ ImageExtentIntervals m_in_flight_extents;
+
+ std::vector<ImageDispatchSpec<ImageCtxT>*> m_blocked_ios;
+ std::atomic<unsigned> m_last_tid { 0 };
+ std::set<uint64_t> m_queued_or_blocked_io_tids;
+ std::map<uint64_t, ImageDispatchSpec<ImageCtxT>*> m_queued_flushes;
+
std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
uint64_t m_qos_enabled_flag = 0;
bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item);
- void finish_queued_io(ImageDispatchSpec<ImageCtxT> *req);
+ void finish_queued_io(bool write_op);
+ void remove_in_flight_write_ios(uint64_t offset, uint64_t length,
+ bool write_op, uint64_t tid);
void finish_in_flight_write();
+ void unblock_flushes(uint64_t tid);
+ bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents,
+ uint64_t object_off, uint64_t object_len);
+ void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid);
int start_in_flight_io(AioCompletion *c);
void finish_in_flight_io();
void fail_in_flight_io(int r, ImageDispatchSpec<ImageCtxT> *req);
+ void process_io(ImageDispatchSpec<ImageCtxT> *req, bool non_blocking_io);
void queue(ImageDispatchSpec<ImageCtxT> *req);
+ void queue_unblocked_io(AioCompletion *comp,
+ ImageDispatchSpec<ImageCtxT> *req);
void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req);
void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);