--- /dev/null
+tasks:
+- rbd_fsx:
+ clients: [client.0]
+ ops: 6000
+overrides:
+ ceph:
+ conf:
+ client:
+ rbd qos iops limit: 50
Option("rbd_journal_max_concurrent_object_sets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("maximum number of object sets a journal client can be behind before it is automatically unregistered"),
+
+ Option("rbd_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(0)
+ .set_description("the desired limit of IO operations per second"),
});
}
"rbd_journal_max_concurrent_object_sets", false)(
"rbd_mirroring_resync_after_disconnect", false)(
"rbd_mirroring_replay_delay", false)(
- "rbd_skip_partial_discard", false);
+ "rbd_skip_partial_discard", false)(
+ "rbd_qos_iops_limit", false);
md_config_t local_config_t;
std::map<std::string, bufferlist> res;
ASSIGN_OPTION(mirroring_replay_delay, int64_t);
ASSIGN_OPTION(skip_partial_discard, bool);
ASSIGN_OPTION(blkin_trace_all, bool);
+ ASSIGN_OPTION(qos_iops_limit, uint64_t);
if (thread_safe) {
ASSIGN_OPTION(journal_pool, std::string);
if (sparse_read_threshold_bytes == 0) {
sparse_read_threshold_bytes = get_object_size();
}
+
+ io_work_queue->apply_qos_iops_limit(qos_iops_limit);
}
ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
int mirroring_replay_delay;
bool skip_partial_discard;
bool blkin_trace_all;
+ uint64_t qos_iops_limit;
LibrbdAdminSocketHook *asok_hook;
return m_trace;
}
+ bool was_throttled() {
+ return m_throttled;
+ }
+
+ void set_throttled() {
+ m_throttled = true;
+ }
+
protected:
typedef std::list<ObjectRequestHandle *> ObjectRequests;
Extents m_image_extents;
ZTracer::Trace m_trace;
bool m_bypass_image_cache = false;
+ bool m_throttled = false;
ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
Extents &&image_extents, const char *trace_name,
m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+
+ SafeTimer *timer;
+ Mutex *timer_lock;
+ ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
+
+ iops_throttle = new TokenBucketThrottle(
+ cct, 0, 0, timer, timer_lock);
this->register_work_queue();
}
+template <typename I>
+ImageRequestWQ<I>::~ImageRequestWQ() {
+ delete iops_throttle;
+}
+
template <typename I>
ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
ReadResult &&read_result, int op_flags) {
}
}
+template <typename I>
+void ImageRequestWQ<I>::apply_qos_iops_limit(uint64_t limit) {
+ iops_throttle->set_max(limit);
+ iops_throttle->set_average(limit);
+}
+
+template <typename I>
+void ImageRequestWQ<I>::handle_iops_throttle_ready(int r, ImageRequest<I> *item) {
+ assert(m_io_blockers.load() > 0);
+ --m_io_blockers;
+ item->set_throttled();
+ this->requeue(item);
+ this->signal();
+}
+
template <typename I>
void *ImageRequestWQ<I>::_void_dequeue() {
CephContext *cct = m_image_ctx.cct;
ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
assert(peek_item == item);
+ if (!item->was_throttled() &&
+ iops_throttle->get<ImageRequestWQ<I>, ImageRequest<I>,
+ &ImageRequestWQ<I>::handle_iops_throttle_ready>(1, this, item)) {
+ // io was queued into blockers list and wait for tokens.
+ ++m_io_blockers;
+ return nullptr;
+ }
+
if (lock_required) {
this->get_pool_lock().Unlock();
m_image_ctx.owner_lock.get_read();
#include "include/Context.h"
#include "common/RWLock.h"
+#include "common/Throttle.h"
#include "common/WorkQueue.h"
#include "librbd/io/Types.h"
public:
ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
ThreadPool *tp);
+ ~ImageRequestWQ();
ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
int op_flags);
void set_require_lock(Direction direction, bool enabled);
+ void apply_qos_iops_limit(uint64_t limit);
+
protected:
void *_void_dequeue() override;
void process(ImageRequest<ImageCtxT> *req) override;
std::atomic<unsigned> m_in_flight_writes { 0 };
std::atomic<unsigned> m_io_blockers { 0 };
+ TokenBucketThrottle *iops_throttle;
+
bool m_shutdown = false;
Context *m_on_shutdown = nullptr;
void handle_acquire_lock(int r, ImageRequest<ImageCtxT> *req);
void handle_refreshed(int r, ImageRequest<ImageCtxT> *req);
void handle_blocked_writes(int r);
+
+ void handle_iops_throttle_ready(int r, ImageRequest<ImageCtxT> *item);
};
} // namespace io
MOCK_CONST_METHOD0(start_op, void());
MOCK_CONST_METHOD0(send, void());
MOCK_CONST_METHOD1(fail, void(int));
+ MOCK_CONST_METHOD0(was_throttled, bool());
+ MOCK_CONST_METHOD0(set_throttled, void());
ImageRequest() {
s_instance = this;