io/ObjectDispatchSpec.cc
io/ObjectDispatcher.cc
io/ObjectRequest.cc
+ io/QosImageDispatch.cc
io/ReadResult.cc
io/SimpleSchedulerObjectDispatch.cc
io/Utils.cc
#include "librbd/io/ImageDispatcher.h"
#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/QosImageDispatch.h"
#include "librbd/journal/StandardPolicy.h"
#include "osdc/Striper.h"
}
}
- io_work_queue->apply_qos_schedule_tick_min(
+ io_image_dispatcher->apply_qos_schedule_tick_min(
config.get_val<uint64_t>("rbd_qos_schedule_tick_min"));
- io_work_queue->apply_qos_limit(
- RBD_QOS_IOPS_THROTTLE,
+ io_image_dispatcher->apply_qos_limit(
+ io::IMAGE_DISPATCH_FLAG_QOS_IOPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_iops_limit"),
config.get_val<uint64_t>("rbd_qos_iops_burst"));
- io_work_queue->apply_qos_limit(
- RBD_QOS_BPS_THROTTLE,
+ io_image_dispatcher->apply_qos_limit(
+ io::IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_bps_limit"),
config.get_val<uint64_t>("rbd_qos_bps_burst"));
- io_work_queue->apply_qos_limit(
- RBD_QOS_READ_IOPS_THROTTLE,
+ io_image_dispatcher->apply_qos_limit(
+ io::IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_read_iops_limit"),
config.get_val<uint64_t>("rbd_qos_read_iops_burst"));
- io_work_queue->apply_qos_limit(
- RBD_QOS_WRITE_IOPS_THROTTLE,
+ io_image_dispatcher->apply_qos_limit(
+ io::IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_write_iops_limit"),
config.get_val<uint64_t>("rbd_qos_write_iops_burst"));
- io_work_queue->apply_qos_limit(
- RBD_QOS_READ_BPS_THROTTLE,
+ io_image_dispatcher->apply_qos_limit(
+ io::IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_read_bps_limit"),
config.get_val<uint64_t>("rbd_qos_read_bps_burst"));
- io_work_queue->apply_qos_limit(
- RBD_QOS_WRITE_BPS_THROTTLE,
+ io_image_dispatcher->apply_qos_limit(
+ io::IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_write_bps_limit"),
config.get_val<uint64_t>("rbd_qos_write_bps_burst"));
}
};
-template <typename I>
-struct ImageDispatchSpec<I>::TokenRequestedVisitor
- : public boost::static_visitor<uint64_t> {
- ImageDispatchSpec* spec;
- uint64_t flag;
- uint64_t *tokens;
-
- TokenRequestedVisitor(ImageDispatchSpec* spec, uint64_t _flag,
- uint64_t *tokens)
- : spec(spec), flag(_flag), tokens(tokens) {
- }
-
- uint64_t operator()(const Read&) const {
- if (flag & RBD_QOS_WRITE_MASK) {
- *tokens = 0;
- return false;
- }
-
- *tokens = (flag & RBD_QOS_BPS_MASK) ? spec->extents_length() : 1;
- return true;
- }
-
- uint64_t operator()(const Flush&) const {
- *tokens = 0;
- return true;
- }
-
- template <typename T>
- uint64_t operator()(const T&) const {
- if (flag & RBD_QOS_READ_MASK) {
- *tokens = 0;
- return false;
- }
-
- *tokens = (flag & RBD_QOS_BPS_MASK) ? spec->extents_length() : 1;
- return true;
- }
-};
-
template <typename I>
void ImageDispatchSpec<I>::send() {
boost::apply_visitor(SendVisitor{this}, request);
return boost::apply_visitor(IsWriteOpVisitor(), request);
}
-template <typename I>
-bool ImageDispatchSpec<I>::tokens_requested(uint64_t flag, uint64_t *tokens) {
- return boost::apply_visitor(TokenRequestedVisitor{this, flag, tokens},
- request);
-}
-
template <typename I>
void ImageDispatchSpec<I>::start_op() {
aio_comp->start_op();
int op_flags;
ZTracer::Trace parent_trace;
uint64_t tid;
- std::atomic<uint64_t> throttled_flag = 0;
static ImageDispatchSpec* create_read_request(
ImageCtxT &image_ctx, ImageDispatchLayer image_dispatch_layer,
void start_op();
- bool tokens_requested(uint64_t flag, uint64_t *tokens);
-
- bool was_throttled(uint64_t flag) {
- return throttled_flag & flag;
- }
-
- void set_throttled(uint64_t flag) {
- throttled_flag |= flag;
- }
-
- bool were_all_throttled() {
- return (throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK;
- }
-
const Extents& get_image_extents() const;
AioCompletion* get_aio_completion() const {
#include "librbd/io/ImageDispatch.h"
#include "librbd/io/ImageDispatchInterface.h"
#include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/io/QosImageDispatch.h"
#include <boost/variant.hpp>
#define dout_subsys ceph_subsys_rbd
// configure the core image dispatch handler on startup
auto image_dispatch = new ImageDispatch(image_ctx);
this->register_dispatch(image_dispatch);
+
+ m_qos_image_dispatch = new QosImageDispatch<I>(image_ctx);
+ this->register_dispatch(m_qos_image_dispatch);
+}
+
+template <typename I>
+void ImageDispatcher<I>::apply_qos_schedule_tick_min(uint64_t tick) {
+ m_qos_image_dispatch->apply_qos_schedule_tick_min(tick);
+}
+
+template <typename I>
+void ImageDispatcher<I>::apply_qos_limit(uint64_t flag, uint64_t limit,
+ uint64_t burst) {
+ m_qos_image_dispatch->apply_qos_limit(flag, limit, burst);
}
template <typename I>
namespace io {
+template <typename> struct QosImageDispatch;
+
template <typename ImageCtxT = ImageCtx>
class ImageDispatcher : public Dispatcher<ImageCtxT, ImageDispatcherInterface> {
public:
ImageDispatcher(ImageCtxT* image_ctx);
+ void apply_qos_schedule_tick_min(uint64_t tick) override;
+ void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst) override;
+
void finish(int r, ImageDispatchLayer image_dispatch_layer,
uint64_t tid) override;
private:
struct SendVisitor;
+ QosImageDispatch<ImageCtxT>* m_qos_image_dispatch = nullptr;
+
};
} // namespace io
struct ImageDispatcherInterface
: public DispatcherInterface<ImageDispatchInterface> {
public:
+ virtual void apply_qos_schedule_tick_min(uint64_t tick) = 0;
+ virtual void apply_qos_limit(uint64_t flag, uint64_t limit,
+ uint64_t burst) = 0;
virtual void finish(int r, ImageDispatchLayer image_dispatch_layer,
uint64_t tid) = 0;
-
};
} // namespace io
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageRequest.h"
#include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/io/ImageDispatcher.h"
+#include "librbd/io/QosImageDispatch.h"
#include "common/EventTrace.h"
#define dout_subsys ceph_subsys_rbd
}
};
-static std::map<uint64_t, std::string> throttle_flags = {
- { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
- { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
- { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
- { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
- { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
- { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
-};
-
template <typename I>
ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
time_t ti, ThreadPool *tp)
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << "ictx=" << image_ctx << dendl;
- SafeTimer *timer;
- ceph::mutex *timer_lock;
- ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
-
- for (auto flag : throttle_flags) {
- m_throttles.push_back(make_pair(
- flag.first,
- new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
- }
-
this->register_work_queue();
}
-template <typename I>
-ImageRequestWQ<I>::~ImageRequestWQ() {
- for (auto t : m_throttles) {
- delete t.second;
- }
-}
-
template <typename I>
ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
ReadResult &&read_result, int op_flags) {
}
}
-template <typename I>
-void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
- for (auto pair : m_throttles) {
- pair.second->set_schedule_tick_min(tick);
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
- uint64_t limit,
- uint64_t burst) {
- CephContext *cct = m_image_ctx.cct;
- TokenBucketThrottle *throttle = nullptr;
- for (auto pair : m_throttles) {
- if (flag == pair.first) {
- throttle = pair.second;
- break;
- }
- }
- ceph_assert(throttle != nullptr);
-
- int r = throttle->set_limit(limit, burst);
- if (r < 0) {
- lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
- << "burst(" << burst << ") is less than "
- << "limit(" << limit << ")" << dendl;
- // if apply failed, we should at least make sure the limit works.
- throttle->set_limit(limit, 0);
- }
-
- if (limit)
- m_qos_enabled_flag |= flag;
- else
- m_qos_enabled_flag &= ~flag;
-}
-
-template <typename I>
-void ImageRequestWQ<I>::handle_throttle_ready(
- ImageDispatchSpec<I> *item, uint64_t flag) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 15) << "req=" << item << dendl;
-
- ceph_assert(m_io_throttled.load() > 0);
- item->set_throttled(flag);
- if (item->were_all_throttled()) {
- this->requeue_back(item);
- --m_io_throttled;
- this->signal();
- }
-}
-
-template <typename I>
-bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
- uint64_t tokens = 0;
- uint64_t flag = 0;
- bool blocked = false;
- TokenBucketThrottle* throttle = nullptr;
-
- for (auto t : m_throttles) {
- flag = t.first;
- if (item->was_throttled(flag))
- continue;
-
- if (!(m_qos_enabled_flag & flag)) {
- item->set_throttled(flag);
- continue;
- }
-
- throttle = t.second;
- if (item->tokens_requested(flag, &tokens) &&
- throttle->get(tokens, this, &ImageRequestWQ<I>::handle_throttle_ready,
- item, flag)) {
- blocked = true;
- } else {
- item->set_throttled(flag);
- }
- }
- return blocked;
-}
-
template <typename I>
void *ImageRequestWQ<I>::_void_dequeue() {
CephContext *cct = m_image_ctx.cct;
return nullptr;
}
- if (needs_throttle(peek_item)) {
- ldout(cct, 15) << "throttling IO " << peek_item << dendl;
-
- ++m_io_throttled;
- // dequeue the throttled item
- ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
- return nullptr;
- }
-
bool lock_required;
bool refresh_required = m_image_ctx.state->is_refresh_required();
{
#include "include/Context.h"
#include "common/ceph_mutex.h"
-#include "common/Throttle.h"
#include "common/WorkQueue.h"
#include "librbd/io/Types.h"
#include "include/interval_set.h"
public:
ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
ThreadPool *tp);
- ~ImageRequestWQ();
ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
int op_flags);
void set_require_lock(Direction direction, bool enabled);
- void apply_qos_schedule_tick_min(uint64_t tick);
-
- void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst);
-
protected:
void *_void_dequeue() override;
void process(ImageDispatchSpec<ImageCtxT> *req) override;
- bool _empty() override {
- return (ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT>>::_empty() &&
- m_io_throttled.load() == 0);
- }
-
private:
typedef std::list<Context *> Contexts;
std::atomic<unsigned> m_in_flight_ios { 0 };
std::atomic<unsigned> m_in_flight_writes { 0 };
std::atomic<unsigned> m_io_blockers { 0 };
- std::atomic<unsigned> m_io_throttled { 0 };
typedef interval_set<uint64_t> ImageExtentIntervals;
ImageExtentIntervals m_in_flight_extents;
std::set<uint64_t> m_queued_or_blocked_io_tids;
std::map<uint64_t, ImageDispatchSpec<ImageCtxT>*> m_queued_flushes;
- std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
- uint64_t m_qos_enabled_flag = 0;
-
bool m_shutdown = false;
Context *m_on_shutdown = nullptr;
return (m_queued_writes == 0);
}
- bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item);
-
void finish_queued_io(bool write_op);
void remove_in_flight_write_ios(uint64_t offset, uint64_t length,
bool write_op, uint64_t tid);
void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req);
void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);
void handle_blocked_writes(int r);
-
- void handle_throttle_ready(ImageDispatchSpec<ImageCtxT> *item, uint64_t flag);
};
} // namespace io
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/QosImageDispatch.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include <map>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::QosImageDispatch: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+namespace {
+
+uint64_t get_extent_length(const Extents& extents) {
+ uint64_t length = 0;
+ for (auto& extent : extents) {
+ length += extent.second;
+ }
+ return length;
+}
+
+uint64_t calculate_tokens(bool read_op, uint64_t extent_length, uint64_t flag) {
+ if (read_op && ((flag & IMAGE_DISPATCH_FLAG_QOS_WRITE_MASK) != 0)) {
+ return 0;
+ } else if (!read_op && ((flag & IMAGE_DISPATCH_FLAG_QOS_READ_MASK) != 0)) {
+ return 0;
+ }
+
+ return (((flag & IMAGE_DISPATCH_FLAG_QOS_BPS_MASK) != 0) ? extent_length : 1);
+}
+
+static std::map<uint64_t, std::string> throttle_flags = {
+ {IMAGE_DISPATCH_FLAG_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
+ {IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
+};
+
+} // anonymous namespace
+
+template <typename I>
+QosImageDispatch<I>::QosImageDispatch(I* image_ctx)
+ : m_image_ctx(image_ctx) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+
+ SafeTimer *timer;
+ ceph::mutex *timer_lock;
+ ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
+ for (auto flag : throttle_flags) {
+ m_throttles.push_back(make_pair(
+ flag.first,
+ new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
+ }
+}
+
+template <typename I>
+QosImageDispatch<I>::~QosImageDispatch() {
+ for (auto t : m_throttles) {
+ delete t.second;
+ }
+}
+
+template <typename I>
+void QosImageDispatch<I>::shut_down(Context* on_finish) {
+ on_finish->complete(0);
+}
+
+template <typename I>
+void QosImageDispatch<I>::apply_qos_schedule_tick_min(uint64_t tick) {
+ for (auto pair : m_throttles) {
+ pair.second->set_schedule_tick_min(tick);
+ }
+}
+
+template <typename I>
+void QosImageDispatch<I>::apply_qos_limit(uint64_t flag, uint64_t limit,
+ uint64_t burst) {
+ auto cct = m_image_ctx->cct;
+ TokenBucketThrottle *throttle = nullptr;
+ for (auto pair : m_throttles) {
+ if (flag == pair.first) {
+ throttle = pair.second;
+ break;
+ }
+ }
+ ceph_assert(throttle != nullptr);
+
+ int r = throttle->set_limit(limit, burst);
+ if (r < 0) {
+ lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
+ << "burst(" << burst << ") is less than "
+ << "limit(" << limit << ")" << dendl;
+ // if apply failed, we should at least make sure the limit works.
+ throttle->set_limit(limit, 0);
+ }
+
+ if (limit) {
+ m_qos_enabled_flag |= flag;
+ } else {
+ m_qos_enabled_flag &= ~flag;
+ }
+}
+
+template <typename I>
+bool QosImageDispatch<I>::read(
+ AioCompletion* aio_comp, Extents &&image_extents, ReadResult &&read_result,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(true, image_extents, image_dispatch_flags,
+ dispatch_result, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, image_dispatch_flags,
+ dispatch_result, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::discard(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, image_dispatch_flags,
+ dispatch_result, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::write_same(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, image_dispatch_flags,
+ dispatch_result, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::compare_and_write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << ", image_extents=" << image_extents
+ << dendl;
+
+ if (needs_throttle(false, image_extents, image_dispatch_flags,
+ dispatch_result, on_dispatched)) {
+ return true;
+ }
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::flush(
+ AioCompletion* aio_comp, FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "tid=" << tid << dendl;
+
+ return false;
+}
+
+template <typename I>
+bool QosImageDispatch<I>::set_throttle_flag(
+ std::atomic<uint32_t>* image_dispatch_flags, uint32_t flag) {
+ uint32_t expected = image_dispatch_flags->load();
+ uint32_t desired;
+ do {
+ desired = expected | flag;
+ } while (!image_dispatch_flags->compare_exchange_weak(expected, desired));
+
+ return ((desired & IMAGE_DISPATCH_FLAG_QOS_MASK) ==
+ IMAGE_DISPATCH_FLAG_QOS_MASK);
+}
+
+template <typename I>
+bool QosImageDispatch<I>::needs_throttle(
+ bool read_op, const Extents& image_extents,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ auto extent_length = get_extent_length(image_extents);
+ bool all_qos_flags_set = false;
+
+ *dispatch_result = DISPATCH_RESULT_CONTINUE;
+
+ auto qos_enabled_flag = m_qos_enabled_flag;
+ for (auto [flag, throttle] : m_throttles) {
+ if ((qos_enabled_flag & flag) == 0) {
+ all_qos_flags_set = set_throttle_flag(image_dispatch_flags, flag);
+ continue;
+ }
+
+ auto tokens = calculate_tokens(read_op, extent_length, flag);
+ if (tokens > 0 &&
+ throttle->get(tokens, this, &QosImageDispatch<I>::handle_throttle_ready,
+ Tag{image_dispatch_flags, on_dispatched}, flag)) {
+ ldout(cct, 15) << "on_dispatched=" << on_dispatched << ", "
+ << "flag=" << flag << dendl;
+ all_qos_flags_set = false;
+ } else {
+ all_qos_flags_set = set_throttle_flag(image_dispatch_flags, flag);
+ }
+ }
+ return !all_qos_flags_set;
+}
+
+template <typename I>
+void QosImageDispatch<I>::handle_throttle_ready(Tag&& tag, uint64_t flag) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 15) << "on_dispatched=" << tag.on_dispatched << ", "
+ << "flag=" << flag << dendl;
+
+ if (set_throttle_flag(tag.image_dispatch_flags, flag)) {
+ tag.on_dispatched->complete(0);
+ }
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::QosImageDispatch<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_QOS_IMAGE_DISPATCH_H
+#define CEPH_LIBRBD_IO_QOS_IMAGE_DISPATCH_H
+
+#include "librbd/io/ImageDispatchInterface.h"
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/zipkin_trace.h"
+#include "common/Throttle.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/io/Types.h"
+#include <list>
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct AioCompletion;
+
+template <typename ImageCtxT>
+class QosImageDispatch : public ImageDispatchInterface {
+public:
+ struct Tag {
+ std::atomic<uint32_t>* image_dispatch_flags;
+ Context* on_dispatched;
+
+ Tag(std::atomic<uint32_t>* image_dispatch_flags, Context* on_dispatched)
+ : image_dispatch_flags(image_dispatch_flags),
+ on_dispatched(on_dispatched) {
+ }
+ };
+
+ QosImageDispatch(ImageCtxT* image_ctx);
+ ~QosImageDispatch() override;
+
+ ImageDispatchLayer get_dispatch_layer() const override {
+ return IMAGE_DISPATCH_LAYER_QOS;
+ }
+
+ void shut_down(Context* on_finish) override;
+
+ void apply_qos_schedule_tick_min(uint64_t tick);
+ void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst);
+
+ bool read(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ ReadResult &&read_result, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool discard(
+ AioCompletion* aio_comp, Extents &&image_extents,
+ uint32_t discard_granularity_bytes,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool write_same(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+ int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool compare_and_write(
+ AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+ bufferlist &&bl, uint64_t *mismatch_offset, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+ bool flush(
+ AioCompletion* aio_comp, FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched) override;
+
+ void handle_finished(int r, uint64_t tid) override {}
+
+private:
+ ImageCtxT* m_image_ctx;
+
+ std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
+ uint64_t m_qos_enabled_flag = 0;
+
+ bool set_throttle_flag(std::atomic<uint32_t>* image_dispatch_flags,
+ uint32_t flag);
+ bool needs_throttle(bool read_op, const Extents& image_extents,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ DispatchResult* dispatch_result, Context* on_dispatched);
+ void handle_throttle_ready(Tag&& tag, uint64_t flag);
+
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::QosImageDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_QOS_IMAGE_DISPATCH_H
namespace librbd {
namespace io {
-#define RBD_QOS_IOPS_THROTTLE 1 << 0
-#define RBD_QOS_BPS_THROTTLE 1 << 1
-#define RBD_QOS_READ_IOPS_THROTTLE 1 << 2
-#define RBD_QOS_WRITE_IOPS_THROTTLE 1 << 3
-#define RBD_QOS_READ_BPS_THROTTLE 1 << 4
-#define RBD_QOS_WRITE_BPS_THROTTLE 1 << 5
-
-#define RBD_QOS_BPS_MASK (RBD_QOS_BPS_THROTTLE | RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_WRITE_BPS_THROTTLE)
-#define RBD_QOS_IOPS_MASK (RBD_QOS_IOPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE)
-#define RBD_QOS_READ_MASK (RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE)
-#define RBD_QOS_WRITE_MASK (RBD_QOS_WRITE_BPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE)
-
-#define RBD_QOS_MASK (RBD_QOS_BPS_MASK | RBD_QOS_IOPS_MASK)
-
typedef enum {
AIO_TYPE_NONE = 0,
AIO_TYPE_GENERIC,
IMAGE_DISPATCH_LAYER_LAST
};
+enum {
+ IMAGE_DISPATCH_FLAG_QOS_IOPS_THROTTLE = 1 << 0,
+ IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE = 1 << 1,
+ IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE = 1 << 2,
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE = 1 << 3,
+ IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE = 1 << 4,
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE = 1 << 5,
+ IMAGE_DISPATCH_FLAG_QOS_BPS_MASK = (
+ IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE |
+ IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE |
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE),
+ IMAGE_DISPATCH_FLAG_QOS_IOPS_MASK = (
+ IMAGE_DISPATCH_FLAG_QOS_IOPS_THROTTLE |
+ IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE |
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE),
+ IMAGE_DISPATCH_FLAG_QOS_READ_MASK = (
+ IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE |
+ IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE),
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_MASK = (
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE |
+ IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE),
+ IMAGE_DISPATCH_FLAG_QOS_MASK = (
+ IMAGE_DISPATCH_FLAG_QOS_BPS_MASK |
+ IMAGE_DISPATCH_FLAG_QOS_IOPS_MASK),
+};
+
enum ObjectDispatchLayer {
OBJECT_DISPATCH_LAYER_NONE = 0,
OBJECT_DISPATCH_LAYER_CACHE,
#include "test/librbd/test_support.h"
#include "test/librbd/mock/MockImageCtx.h"
#include "test/librbd/mock/exclusive_lock/MockPolicy.h"
+#include "test/librbd/mock/io/MockQosImageDispatch.h"
#include "librbd/io/ImageDispatchSpec.h"
#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ImageRequest.h"
AioCompletion *aio_comp = nullptr;
bool blocked = false;
+ std::atomic<uint32_t> image_dispatch_flags = {0};
+
static ImageDispatchSpec* create_write_request(
librbd::MockTestImageCtx &image_ctx, ImageDispatchLayer dispatch_layer,
AioCompletion *aio_comp, Extents &&image_extents, bufferlist &&bl,
MOCK_CONST_METHOD0(start_op, void());
MOCK_CONST_METHOD0(send, void());
MOCK_CONST_METHOD1(fail, void(int));
- MOCK_CONST_METHOD1(was_throttled, bool(uint64_t));
- MOCK_CONST_METHOD0(were_all_throttled, bool());
- MOCK_CONST_METHOD1(set_throttled, void(uint64_t));
- MOCK_CONST_METHOD2(tokens_requested, bool(uint64_t, uint64_t *));
MOCK_CONST_METHOD0(get_image_extents, Extents());
MOCK_CONST_METHOD0(get_aio_completion, AioCompletion*());
MOCK_CONST_METHOD0(get_tid, uint64_t());
}));
}
- void expect_set_throttled(MockImageDispatchSpec &mock_image_request) {
- EXPECT_CALL(mock_image_request, set_throttled(_)).Times(6);
- }
-
- void expect_was_throttled(MockImageDispatchSpec &mock_image_request, bool value) {
- EXPECT_CALL(mock_image_request, was_throttled(_)).Times(6).WillRepeatedly(Return(value));
- }
-
- void expect_tokens_requested(MockImageDispatchSpec &mock_image_request,
- uint64_t tokens, bool r) {
- EXPECT_CALL(mock_image_request, tokens_requested(_, _))
- .WillOnce(WithArg<1>(Invoke([tokens, r](uint64_t *t) {
- *t = tokens;
- return r;
- })));
- }
-
- void expect_all_throttled(MockImageDispatchSpec &mock_image_request, bool value) {
- EXPECT_CALL(mock_image_request, were_all_throttled()).WillOnce(Return(value));
- }
-
void expect_start_op(MockImageDispatchSpec &mock_image_request) {
EXPECT_CALL(mock_image_request, start_op()).Times(1);
}
void expect_get_image_extents(MockImageDispatchSpec &mock_image_request,
const Extents &extents) {
EXPECT_CALL(mock_image_request, get_image_extents())
- .WillOnce(Return(extents));
+ .WillRepeatedly(Return(extents));
}
void expect_get_tid(MockImageDispatchSpec &mock_image_request, uint64_t tid) {
mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
auto mock_queued_image_request = new MockImageDispatchSpec();
- expect_was_throttled(*mock_queued_image_request, false);
- expect_set_throttled(*mock_queued_image_request);
expect_get_image_extents(*mock_queued_image_request, {});
expect_get_tid(*mock_queued_image_request, 0);
mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
auto mock_queued_image_request = new MockImageDispatchSpec();
- expect_was_throttled(*mock_queued_image_request, false);
- expect_set_throttled(*mock_queued_image_request);
expect_get_image_extents(*mock_queued_image_request, {});
expect_get_tid(*mock_queued_image_request, 0);
MockTestImageCtx mock_image_ctx(*ictx);
auto mock_queued_image_request = new MockImageDispatchSpec();
- expect_was_throttled(*mock_queued_image_request, false);
- expect_set_throttled(*mock_queued_image_request);
expect_get_image_extents(*mock_queued_image_request, {});
expect_get_tid(*mock_queued_image_request, 0);
aio_comp->release();
}
-TEST_F(TestMockIoImageRequestWQ, QosNoLimit) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- MockImageDispatchSpec mock_queued_image_request;
- expect_was_throttled(mock_queued_image_request, false);
- expect_set_throttled(mock_queued_image_request);
-
- InSequence seq;
- MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
-
- mock_image_request_wq.apply_qos_limit(RBD_QOS_BPS_THROTTLE, 0, 0);
-
- expect_front(mock_image_request_wq, &mock_queued_image_request);
- expect_is_refresh_request(mock_image_ctx, false);
- expect_is_write_op(mock_queued_image_request, true);
- expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
- ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == &mock_queued_image_request);
-}
-
-TEST_F(TestMockIoImageRequestWQ, BPSQosNoBurst) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- MockImageDispatchSpec mock_queued_image_request;
- expect_was_throttled(mock_queued_image_request, false);
- expect_set_throttled(mock_queued_image_request);
-
- InSequence seq;
- MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
-
- mock_image_request_wq.apply_qos_limit(RBD_QOS_BPS_THROTTLE, 1, 0);
-
- expect_front(mock_image_request_wq, &mock_queued_image_request);
- expect_tokens_requested(mock_queued_image_request, 2, true);
- expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
- expect_all_throttled(mock_queued_image_request, true);
- expect_requeue_back(mock_image_request_wq);
- expect_signal(mock_image_request_wq);
- ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
-}
-
-TEST_F(TestMockIoImageRequestWQ, BPSQosWithBurst) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- MockImageDispatchSpec mock_queued_image_request;
- expect_was_throttled(mock_queued_image_request, false);
- expect_set_throttled(mock_queued_image_request);
-
- InSequence seq;
- MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
-
- mock_image_request_wq.apply_qos_limit(RBD_QOS_BPS_THROTTLE, 1, 1);
-
- expect_front(mock_image_request_wq, &mock_queued_image_request);
- expect_tokens_requested(mock_queued_image_request, 2, true);
- expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
- expect_all_throttled(mock_queued_image_request, true);
- expect_requeue_back(mock_image_request_wq);
- expect_signal(mock_image_request_wq);
- ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
-}
-
} // namespace io
} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librbd/mock/exclusive_lock/MockPolicy.h"
+#include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/io/ImageRequestWQ.h"
+#include "librbd/io/ImageRequest.h"
+
+namespace librbd {
+namespace io {
+
+TEST_F(TestMockIoImageRequestWQ, QosNoLimit) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+
+ MockImageDispatchSpec mock_queued_image_request;
+ expect_was_throttled(mock_queued_image_request, false);
+ expect_set_throttled(mock_queued_image_request);
+
+ InSequence seq;
+ MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
+
+ mock_image_request_wq.apply_qos_limit(IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, 0,
+ 0);
+
+ expect_front(mock_image_request_wq, &mock_queued_image_request);
+ expect_is_refresh_request(mock_image_ctx, false);
+ expect_is_write_op(mock_queued_image_request, true);
+ expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
+ ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == &mock_queued_image_request);
+}
+
+TEST_F(TestMockIoImageRequestWQ, BPSQosNoBurst) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+
+ MockImageDispatchSpec mock_queued_image_request;
+ expect_was_throttled(mock_queued_image_request, false);
+ expect_set_throttled(mock_queued_image_request);
+
+ InSequence seq;
+ MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
+
+ mock_image_request_wq.apply_qos_limit(IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, 1,
+ 0);
+
+ expect_front(mock_image_request_wq, &mock_queued_image_request);
+ expect_tokens_requested(mock_queued_image_request, 2, true);
+ expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
+ expect_all_throttled(mock_queued_image_request, true);
+ expect_requeue_back(mock_image_request_wq);
+ expect_signal(mock_image_request_wq);
+ ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
+}
+
+TEST_F(TestMockIoImageRequestWQ, BPSQosWithBurst) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+
+ MockImageDispatchSpec mock_queued_image_request;
+ expect_was_throttled(mock_queued_image_request, false);
+ expect_set_throttled(mock_queued_image_request);
+
+ InSequence seq;
+ MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
+
+ mock_image_request_wq.apply_qos_limit(IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, 1,
+ 1);
+
+ expect_front(mock_image_request_wq, &mock_queued_image_request);
+ expect_tokens_requested(mock_queued_image_request, 2, true);
+ expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
+ expect_all_throttled(mock_queued_image_request, true);
+ expect_requeue_back(mock_image_request_wq);
+ expect_signal(mock_image_request_wq);
+ ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
+}
+
+} // namespace io
+} // namespace librbd
MOCK_METHOD1(send, void(ImageDispatchSpec<>*));
MOCK_METHOD3(finish, void(int r, ImageDispatchLayer, uint64_t));
+
+ MOCK_METHOD1(apply_qos_schedule_tick_min, void(uint64_t));
+ MOCK_METHOD3(apply_qos_limit, void(uint64_t, uint64_t, uint64_t));
};
} // namespace io
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_TEST_LIBRBD_MOCK_IO_QOS_IMAGE_DISPATCH_H
+#define CEPH_TEST_LIBRBD_MOCK_IO_QOS_IMAGE_DISPATCH_H
+
+#include "gmock/gmock.h"
+#include "librbd/io/Types.h"
+#include <atomic>
+
+struct Context;
+
+namespace librbd {
+namespace io {
+
+struct MockQosImageDispatch {
+ MOCK_METHOD4(needs_throttle, bool(bool, const Extents&,
+ std::atomic<uint32_t>*, Context*));
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_TEST_LIBRBD_MOCK_IO_QOS_IMAGE_DISPATCH_H