* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.
* Users can pass a cache object to write buffer manager, so that they can cap memory usage for memtable and block cache using one single limit.
* Flush will be triggered when 7/8 of the limit introduced by write_buffer_manager or db_write_buffer_size is triggered, so that the hard threshold is hard to hit.
+* Introduce WriteOptions.low_pri. If it is true, low priority writes will be throttled if the compaction is behind.
## 5.5.0 (05/17/2017)
### New Features
uint64_t write_rate = write_controller->delayed_write_rate();
write_controller->set_delayed_write_rate(static_cast<uint64_t>(
static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
+ // Set the low pri limit to be 1/4 the delayed write rate.
+ // Note we don't reset this value even after delay condition is relased.
+ // Low-pri rate will continue to apply if there is a compaction
+ // pressure.
+ write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
+ 4);
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %d",
crc32c::IsFastCrc32Supported());
}
+
+int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
} // namespace
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_),
write_controller_(mutable_db_options_.delayed_write_rate),
+ // Use delayed_write_rate as a base line to determine the initial
+ // low pri write rate limit. It may be adjusted later.
+ low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min(
+ static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8),
+ kDefaultLowPriThrottledRate))),
last_batch_group_size_(0),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
+ Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+ WriteBatch* my_batch);
+
Status ScheduleFlushes(WriteContext* context);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
WriteController write_controller_;
+ unique_ptr<RateLimiter> low_pri_write_rate_limiter_;
+
// Size of the last batch group. In slowdown mode, next write needs to
// sleep if it uses up the quota.
uint64_t last_batch_group_size_;
return Status::Corruption("Batch is nullptr!");
}
+ Status status;
+ if (write_options.low_pri) {
+ status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable);
}
- Status status;
-
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
return bg_error_;
}
+Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+ WriteBatch* my_batch) {
+ assert(write_options.low_pri);
+ // This is called outside the DB mutex. Although it is safe to make the call,
+ // the consistency condition is not guaranteed to hold. It's OK to live with
+ // it in this case.
+ // If we need to speed compaction, it means the compaction is left behind
+ // and we start to limit low pri writes to a limit.
+ if (write_controller_.NeedSpeedupCompaction()) {
+ if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
+ // For 2PC, we only rate limit prepare, not commit.
+ return Status::OK();
+ }
+ if (write_options.no_slowdown) {
+ return Status::Incomplete();
+ } else {
+ assert(my_batch != nullptr);
+ // Rate limit those writes. The reason that we don't completely wait
+ // is that in case the write is heavy, low pri writes may never have
+ // a chance to run. Now we guarantee we are still slowly making
+ // progress.
+ write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(),
+ Env::IO_HIGH, nullptr);
+ }
+ }
+ return Status::OK();
+}
+
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
// now it's done
ASSERT_TRUE(done.load());
}
-
} // namespace rocksdb
int main(int argc, char** argv) {
ASSERT_EQ(1, count);
delete it;
}
+
+TEST_F(DBTest2, LowPriWrite) {
+ Options options = CurrentOptions();
+ // Compaction pressure should trigger since 6 files
+ options.level0_file_num_compaction_trigger = 4;
+ options.level0_slowdown_writes_trigger = 12;
+ options.level0_stop_writes_trigger = 30;
+ options.delayed_write_rate = 8 * 1024 * 1024;
+ Reopen(options);
+
+ std::atomic<int> rate_limit_count(0);
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:1", [&](void* arg) {
+ rate_limit_count.fetch_add(1);
+ int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
+ ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
+ });
+ // Block compaction
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ WriteOptions wo;
+ for (int i = 0; i < 6; i++) {
+ wo.low_pri = false;
+ Put("", "", wo);
+ wo.low_pri = true;
+ Put("", "", wo);
+ Flush();
+ }
+ ASSERT_EQ(0, rate_limit_count.load());
+ wo.low_pri = true;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+ wo.low_pri = false;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+
+ TEST_SYNC_POINT("DBTest.LowPriWrite:0");
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+ dbfull()->TEST_WaitForCompact();
+ wo.low_pri = true;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+ wo.low_pri = false;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+}
} // namespace rocksdb
int main(int argc, char** argv) {
new CompactionPressureToken(this));
}
-bool WriteController::IsStopped() const { return total_stopped_ > 0; }
+bool WriteController::IsStopped() const {
+ return total_stopped_.load(std::memory_order_relaxed) > 0;
+}
// This is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time.
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
- if (total_stopped_ > 0) {
+ if (total_stopped_.load(std::memory_order_relaxed) > 0) {
return 0;
}
- if (total_delayed_.load() == 0) {
+ if (total_delayed_.load(std::memory_order_relaxed) == 0) {
return 0;
}
#include <atomic>
#include <memory>
+#include "rocksdb/rate_limiter.h"
namespace rocksdb {
// to be called while holding DB mutex
class WriteController {
public:
- explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u)
+ explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
+ int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
: total_stopped_(0),
total_delayed_(0),
total_compaction_pressure_(0),
bytes_left_(0),
- last_refill_time_(0) {
+ last_refill_time_(0),
+ low_pri_rate_limiter_(
+ NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
set_max_delayed_write_rate(_delayed_write_rate);
}
~WriteController() = default;
uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
+ RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
+
private:
uint64_t NowMicrosMonotonic(Env* env);
friend class DelayWriteToken;
friend class CompactionPressureToken;
- int total_stopped_;
+ std::atomic<int> total_stopped_;
std::atomic<int> total_delayed_;
- int total_compaction_pressure_;
+ std::atomic<int> total_compaction_pressure_;
uint64_t bytes_left_;
uint64_t last_refill_time_;
// write rate set when initialization or by `DBImpl::SetDBOptions`
uint64_t max_delayed_write_rate_;
// current write rate
uint64_t delayed_write_rate_;
+
+ std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
};
class WriteControllerToken {
// immediately with Status::Incomplete().
bool no_slowdown;
+ // If true, this write request is of lower priority if compaction is
+ // behind. In this case, no_slowdown = true, the request will be cancelled
+ // immediately with Status::Incomplete() returned. Otherwise, it will be
+ // slowed down. The slowdown value is determined by RocksDB to guarantee
+ // it introduces minimum impacts to high priority writes.
+ //
+ // Default: false
+ bool low_pri;
+
WriteOptions()
: sync(false),
disableWAL(false),
ignore_missing_column_families(false),
- no_slowdown(false) {}
+ no_slowdown(false),
+ low_pri(false) {}
};
// Options that control flush operations
Statistics* stats) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
TEST_SYNC_POINT("GenericRateLimiter::Request");
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
+ &rate_bytes_per_sec_);
MutexLock g(&request_mutex_);
if (stop_) {
return;