:Default: ``0``
+``rbd qos iops burst seconds``
+
+:Description: The desired burst duration in seconds of IO operations.
+:Type: Unsigned Integer
+:Required: No
+:Default: ``1``
+
+
+``rbd qos bps burst seconds``
+
+:Description: The desired burst duration in seconds of IO bytes.
+:Type: Unsigned Integer
+:Required: No
+:Default: ``1``
+
+
+``rbd qos read iops burst seconds``
+
+:Description: The desired burst duration in seconds of read operations.
+:Type: Unsigned Integer
+:Required: No
+:Default: ``1``
+
+
+``rbd qos write iops burst seconds``
+
+:Description: The desired burst duration in seconds of write operations.
+:Type: Unsigned Integer
+:Required: No
+:Default: ``1``
+
+
+``rbd qos read bps burst seconds``
+
+:Description: The desired burst duration in seconds of read bytes.
+:Type: Unsigned Integer
+:Required: No
+:Default: ``1``
+
+
+``rbd qos write bps burst seconds``
+
+:Description: The desired burst duration in seconds of write bytes.
+:Type: Unsigned Integer
+:Required: No
+:Default: ``1``
+
+
``rbd qos schedule tick min``
:Description: The minimum schedule tick (in milliseconds) for QoS.
}
uint64_t got = 0;
- if (remain >= c) {
+ if (available >= c) {
// There is enough token in bucket, take c.
got = c;
+ available -= c;
remain -= c;
} else {
- // There is not enough, take all remain.
- got = remain;
- remain = 0;
+ // There is not enough, take all available.
+ got = available;
+ remain -= available;
+ available = 0;
}
return got;
}
-uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) {
+uint64_t TokenBucketThrottle::Bucket::put(uint64_t tokens, double burst_ratio) {
if (0 == max) {
return 0;
}
- if (c) {
- // put c tokens into bucket
+ if (tokens) {
+ // put tokens into bucket
uint64_t current = remain;
- if ((current + c) <= max) {
- remain += c;
+ if ((current + tokens) <= capacity) {
+ remain += tokens;
} else {
- remain = max;
+ remain = capacity;
+ }
+
+ // available tokens increase at burst speed
+ uint64_t available_inc = tokens;
+ if (burst_ratio > 1) {
+ available_inc = (uint64_t)(tokens * burst_ratio);
+ }
+ uint64_t inc_upper_limit = remain > max ? max : remain;
+ if ((available + available_inc) <= inc_upper_limit ){
+ available += available_inc;
+ }else{
+ available = inc_upper_limit;
}
+
}
return remain;
}
-void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
- if (remain > m || 0 == m) {
- remain = m;
+void TokenBucketThrottle::Bucket::set_max(uint64_t max, uint64_t burst_seconds) {
+ // the capacity of bucket should not be less than max
+ if (burst_seconds < 1){
+ burst_seconds = 1;
}
- max = m;
+ uint64_t new_capacity = max*burst_seconds;
+ if (capacity != new_capacity){
+ capacity = new_capacity;
+ remain = capacity;
+ }
+ if (available > max || 0 == max) {
+ available = max;
+ }
+ this->max = max;
}
TokenBucketThrottle::TokenBucketThrottle(
CephContext *cct,
const std::string &name,
- uint64_t capacity,
+ uint64_t burst,
uint64_t avg,
SafeTimer *timer,
ceph::mutex *timer_lock)
: m_cct(cct), m_name(name),
- m_throttle(m_cct, name + "_bucket", capacity),
- m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
+ m_throttle(m_cct, name + "_bucket", burst),
+ m_burst(burst), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
m_lock(ceph::make_mutex(name + "_lock"))
{}
}
}
-int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst) {
+int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds) {
{
std::lock_guard lock{m_lock};
m_current_tick = 0;
// for the default configuration of burst.
- m_throttle.set_max(0 == burst ? average : burst);
+ m_throttle.set_max(0 == burst ? average : burst, burst_seconds);
}
// turn millisecond to second
m_schedule_tick = m_tick / 1000.0;
{
std::lock_guard lock(m_lock);
// put tokens into bucket.
- m_throttle.put(tokens_this_tick());
+ double burst_ratio = 1.0;
+ if (m_throttle.max > m_avg && m_avg > 0){
+ burst_ratio = (double)m_throttle.max/m_avg;
+ }
+ m_throttle.put(tokens_this_tick(), burst_ratio);
if (0 == m_avg || 0 == m_throttle.max)
tmp_blockers.swap(m_blockers);
// check the m_blockers from head to tail, if blocker can get
uint64_t remain;
uint64_t max;
+ uint64_t capacity;
+ uint64_t available;
Bucket(CephContext *cct, const std::string &name, uint64_t m)
- : cct(cct), name(name), remain(m), max(m) {}
+ : cct(cct), name(name), remain(m), max(m), capacity(m), available(m) {}
uint64_t get(uint64_t c);
- uint64_t put(uint64_t c);
- void set_max(uint64_t m);
+ uint64_t put(uint64_t tokens, double burst_ratio);
+ void set_max(uint64_t max, uint64_t burst_seconds);
};
struct Blocker {
CephContext *m_cct;
const std::string m_name;
Bucket m_throttle;
- uint64_t m_avg = 0;
uint64_t m_burst = 0;
+ uint64_t m_avg = 0;
SafeTimer *m_timer;
ceph::mutex *m_timer_lock;
Context *m_token_ctx = nullptr;
public:
TokenBucketThrottle(CephContext *cct, const std::string &name,
- uint64_t capacity, uint64_t avg,
+ uint64_t burst, uint64_t avg,
SafeTimer *timer, ceph::mutex *timer_lock);
~TokenBucketThrottle();
return wait;
}
- int set_limit(uint64_t average, uint64_t burst);
+ int set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds);
void set_schedule_tick_min(uint64_t tick);
private:
.set_default(0)
.set_description("the desired burst limit of write bytes"),
+ Option("rbd_qos_iops_burst_seconds", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_min(1)
+ .set_description("the desired burst duration in seconds of IO operations"),
+
+ Option("rbd_qos_bps_burst_seconds", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_min(1)
+ .set_description("the desired burst duration in seconds of IO bytes"),
+
+ Option("rbd_qos_read_iops_burst_seconds", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_min(1)
+ .set_description("the desired burst duration in seconds of read operations"),
+
+ Option("rbd_qos_write_iops_burst_seconds", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_min(1)
+ .set_description("the desired burst duration in seconds of write operations"),
+
+ Option("rbd_qos_read_bps_burst_seconds", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_min(1)
+ .set_description("the desired burst duration in seconds of read bytes"),
+
+ Option("rbd_qos_write_bps_burst_seconds", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1)
+ .set_min(1)
+ .set_description("the desired burst duration in seconds of write bytes"),
+
Option("rbd_qos_schedule_tick_min", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(50)
.set_min(1)
io_image_dispatcher->apply_qos_limit(
io::IMAGE_DISPATCH_FLAG_QOS_IOPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_iops_limit"),
- config.get_val<uint64_t>("rbd_qos_iops_burst"));
+ config.get_val<uint64_t>("rbd_qos_iops_burst"),
+ config.get_val<uint64_t>("rbd_qos_iops_burst_seconds"));
io_image_dispatcher->apply_qos_limit(
io::IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_bps_limit"),
- config.get_val<uint64_t>("rbd_qos_bps_burst"));
+ config.get_val<uint64_t>("rbd_qos_bps_burst"),
+ config.get_val<uint64_t>("rbd_qos_bps_burst_seconds"));
io_image_dispatcher->apply_qos_limit(
io::IMAGE_DISPATCH_FLAG_QOS_READ_IOPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_read_iops_limit"),
- config.get_val<uint64_t>("rbd_qos_read_iops_burst"));
+ config.get_val<uint64_t>("rbd_qos_read_iops_burst"),
+ config.get_val<uint64_t>("rbd_qos_read_iops_burst_seconds"));
io_image_dispatcher->apply_qos_limit(
io::IMAGE_DISPATCH_FLAG_QOS_WRITE_IOPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_write_iops_limit"),
- config.get_val<uint64_t>("rbd_qos_write_iops_burst"));
+ config.get_val<uint64_t>("rbd_qos_write_iops_burst"),
+ config.get_val<uint64_t>("rbd_qos_write_iops_burst_seconds"));
io_image_dispatcher->apply_qos_limit(
io::IMAGE_DISPATCH_FLAG_QOS_READ_BPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_read_bps_limit"),
- config.get_val<uint64_t>("rbd_qos_read_bps_burst"));
+ config.get_val<uint64_t>("rbd_qos_read_bps_burst"),
+ config.get_val<uint64_t>("rbd_qos_read_bps_burst_seconds"));
io_image_dispatcher->apply_qos_limit(
io::IMAGE_DISPATCH_FLAG_QOS_WRITE_BPS_THROTTLE,
config.get_val<uint64_t>("rbd_qos_write_bps_limit"),
- config.get_val<uint64_t>("rbd_qos_write_bps_burst"));
+ config.get_val<uint64_t>("rbd_qos_write_bps_burst"),
+ config.get_val<uint64_t>("rbd_qos_write_bps_burst_seconds"));
if (!disable_zero_copy &&
config.get_val<bool>("rbd_disable_zero_copy_writes")) {
template <typename I>
void ImageDispatcher<I>::apply_qos_limit(uint64_t flag, uint64_t limit,
- uint64_t burst) {
- m_qos_image_dispatch->apply_qos_limit(flag, limit, burst);
+ uint64_t burst, uint64_t burst_seconds) {
+ m_qos_image_dispatch->apply_qos_limit(flag, limit, burst, burst_seconds);
}
template <typename I>
void shut_down(Context* on_finish) override;
void apply_qos_schedule_tick_min(uint64_t tick) override;
- void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst) override;
+ void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst,
+ uint64_t burst_seconds) override;
bool writes_blocked() const override;
int block_writes() override;
public:
virtual void apply_qos_schedule_tick_min(uint64_t tick) = 0;
virtual void apply_qos_limit(uint64_t flag, uint64_t limit,
- uint64_t burst) = 0;
+ uint64_t burst, uint64_t burst_seconds) = 0;
virtual bool writes_blocked() const = 0;
virtual int block_writes() = 0;
template <typename I>
void QosImageDispatch<I>::apply_qos_limit(uint64_t flag, uint64_t limit,
- uint64_t burst) {
+ uint64_t burst, uint64_t burst_seconds) {
auto cct = m_image_ctx->cct;
TokenBucketThrottle *throttle = nullptr;
for (auto pair : m_throttles) {
}
ceph_assert(throttle != nullptr);
- int r = throttle->set_limit(limit, burst);
+ int r = throttle->set_limit(limit, burst, burst_seconds);
if (r < 0) {
lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
<< "burst(" << burst << ") is less than "
<< "limit(" << limit << ")" << dendl;
// if apply failed, we should at least make sure the limit works.
- throttle->set_limit(limit, 0);
+ throttle->set_limit(limit, 0, 1);
}
if (limit) {
void shut_down(Context* on_finish) override;
void apply_qos_schedule_tick_min(uint64_t tick);
- void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst);
+ void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst,
+ uint64_t burst_seconds);
bool read(
AioCompletion* aio_comp, Extents &&image_extents,
MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
mock_image_request_wq.apply_qos_limit(IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, 0,
- 0);
+ 0, 1);
expect_front(mock_image_request_wq, &mock_queued_image_request);
expect_is_refresh_request(mock_image_ctx, false);
MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
mock_image_request_wq.apply_qos_limit(IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, 1,
- 0);
+ 0, 1);
expect_front(mock_image_request_wq, &mock_queued_image_request);
expect_tokens_requested(mock_queued_image_request, 2, true);
MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
mock_image_request_wq.apply_qos_limit(IMAGE_DISPATCH_FLAG_QOS_BPS_THROTTLE, 1,
- 1);
+ 1, 1);
expect_front(mock_image_request_wq, &mock_queued_image_request);
expect_tokens_requested(mock_queued_image_request, 2, true);
MOCK_METHOD3(finish, void(int r, ImageDispatchLayer, uint64_t));
MOCK_METHOD1(apply_qos_schedule_tick_min, void(uint64_t));
- MOCK_METHOD3(apply_qos_limit, void(uint64_t, uint64_t, uint64_t));
+ MOCK_METHOD4(apply_qos_limit, void(uint64_t, uint64_t, uint64_t, uint64_t));
MOCK_CONST_METHOD0(writes_blocked, bool());
MOCK_METHOD0(block_writes, int());