#endif // !defined(IOS_CROSS_COMPILE)
}
+// Mock FSWritableFile for testing io priority.
+// Only override the essential functions for testing compaction io priority.
+class MockTestWritableFile : public FSWritableFileOwnerWrapper {
+ public:
+ MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
+ Env::IOPriority io_priority)
+ : FSWritableFileOwnerWrapper(std::move(file)),
+ write_io_priority_(io_priority) {}
+ IOStatus Append(const Slice& data, const IOOptions& options,
+ IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Append(data, options, dbg);
+ }
+ IOStatus Append(const Slice& data, const IOOptions& options,
+ const DataVerificationInfo& verification_info,
+ IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Append(data, options, verification_info, dbg);
+ }
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Close(options, dbg);
+ }
+ IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Flush(options, dbg);
+ }
+ IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Sync(options, dbg);
+ }
+ IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Fsync(options, dbg);
+ }
+ uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->GetFileSize(options, dbg);
+ }
+ IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
+ IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->RangeSync(offset, nbytes, options, dbg);
+ }
+
+ void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
+ IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ target()->PrepareWrite(offset, len, options, dbg);
+ }
+
+ IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
+ IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+ return target()->Allocate(offset, len, options, dbg);
+ }
+
+ private:
+ Env::IOPriority write_io_priority_;
+};
+
+// Mock FSRandomAccessFile for testing io priority.
+// Only override the essential functions for testing compaction io priority.
+class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
+ public:
+ MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
+ Env::IOPriority io_priority)
+ : FSRandomAccessFileOwnerWrapper(std::move(file)),
+ read_io_priority_(io_priority) {}
+
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+ Slice* result, char* scratch,
+ IODebugContext* dbg) const override {
+ EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
+ return target()->Read(offset, n, options, result, scratch, dbg);
+ }
+ IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
+ IODebugContext* dbg) override {
+ EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
+ return target()->Prefetch(offset, n, options, dbg);
+ }
+
+ private:
+ Env::IOPriority read_io_priority_;
+};
+
+// Mock FileSystem for testing io priority.
+class MockTestFileSystem : public FileSystemWrapper {
+ public:
+ explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
+ Env::IOPriority read_io_priority,
+ Env::IOPriority write_io_priority)
+ : FileSystemWrapper(base),
+ read_io_priority_(read_io_priority),
+ write_io_priority_(write_io_priority) {}
+
+ static const char* kClassName() { return "MockTestFileSystem"; }
+ const char* Name() const override { return kClassName(); }
+
+ IOStatus NewRandomAccessFile(const std::string& fname,
+ const FileOptions& file_opts,
+ std::unique_ptr<FSRandomAccessFile>* result,
+ IODebugContext* dbg) override {
+ IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
+ EXPECT_OK(s);
+ result->reset(
+ new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
+ return s;
+ }
+ IOStatus NewWritableFile(const std::string& fname,
+ const FileOptions& file_opts,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* dbg) override {
+ IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
+ EXPECT_OK(s);
+ result->reset(
+ new MockTestWritableFile(std::move(*result), write_io_priority_));
+ return s;
+ }
+
+ private:
+ Env::IOPriority read_io_priority_;
+ Env::IOPriority write_io_priority_;
+};
+
} // namespace
class CompactionJobTestBase : public testing::Test {
protected:
CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
- std::function<std::string(uint64_t)> encode_u64_ts)
+ std::function<std::string(uint64_t)> encode_u64_ts,
+ bool test_io_priority)
: dbname_(std::move(dbname)),
ucmp_(ucmp),
db_options_(),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_),
- encode_u64_ts_(std::move(encode_u64_ts)) {
+ encode_u64_ts_(std::move(encode_u64_ts)),
+ test_io_priority_(test_io_priority) {
Env* base_env = Env::Default();
EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
cf_options_.comparator = ucmp_;
- cf_options_.table_factory = mock_table_factory_;
+ if (test_io_priority_) {
+ BlockBasedTableOptions table_options;
+ cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ } else {
+ cf_options_.table_factory = mock_table_factory_;
+ }
}
std::string GenerateFileName(uint64_t file_number) {
return blob_index;
}
+ // Creates a table with the specificied key value pairs.
+ void CreateTable(const std::string& table_name,
+ const mock::KVVector& contents, uint64_t& file_size) {
+ std::unique_ptr<WritableFileWriter> file_writer;
+ Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
+ &file_writer, nullptr);
+ ASSERT_OK(s);
+ std::unique_ptr<TableBuilder> table_builder(
+ cf_options_.table_factory->NewTableBuilder(
+ TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_,
+ cfd_->internal_comparator(),
+ cfd_->int_tbl_prop_collector_factories(),
+ CompressionType::kNoCompression,
+ CompressionOptions(), 0 /* column_family_id */,
+ kDefaultColumnFamilyName, -1 /* level */),
+ file_writer.get()));
+ // Build table.
+ for (auto kv : contents) {
+ std::string key;
+ std::string value;
+ std::tie(key, value) = kv;
+ table_builder->Add(key, value);
+ }
+ ASSERT_OK(table_builder->Finish());
+ file_size = table_builder->FileSize();
+ }
+
void AddMockFile(const mock::KVVector& contents, int level = 0) {
assert(contents.size() > 0);
}
uint64_t file_number = versions_->NewFileNumber();
- EXPECT_OK(mock_table_factory_->CreateMockTable(
- env_, GenerateFileName(file_number), std::move(contents)));
+
+ uint64_t file_size;
+ if (test_io_priority_) {
+ CreateTable(GenerateFileName(file_number), contents, file_size);
+ } else {
+ file_size = 10;
+ EXPECT_OK(mock_table_factory_->CreateMockTable(
+ env_, GenerateFileName(file_number), std::move(contents)));
+ }
VersionEdit edit;
- edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
+ edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key,
smallest_seqno, largest_seqno, false, Temperature::kUnknown,
oldest_blob_file_number, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, kUnknownFileChecksum,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true,
uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber,
- bool check_get_priority = false) {
+ bool check_get_priority = false,
+ Env::IOPriority read_io_priority = Env::IO_TOTAL,
+ Env::IOPriority write_io_priority = Env::IO_TOTAL) {
+ // For compaction, set fs as MockTestFileSystem to check the io_priority.
+ if (test_io_priority_) {
+ db_options_.fs.reset(
+ new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
+ }
+
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
size_t num_input_files = 0;
ErrorHandler error_handler_;
std::string full_history_ts_low_;
const std::function<std::string(uint64_t)> encode_u64_ts_;
+ bool test_io_priority_;
};
// TODO(icanadi) Make it simpler once we mock out VersionSet
class CompactionJobTest : public CompactionJobTestBase {
public:
CompactionJobTest()
- : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
- BytewiseComparator(),
- [](uint64_t /*ts*/) { return ""; }) {}
+ : CompactionJobTestBase(
+ test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
+ [](uint64_t /*ts*/) { return ""; }, false) {}
};
TEST_F(CompactionJobTest, Simple) {
}
}
-TEST_F(CompactionJobTest, GetRateLimiterPriority) {
- NewDB();
-
- auto expected_results = CreateTwoFiles(false);
- auto cfd = versions_->GetColumnFamilySet()->GetDefault();
- auto files = cfd->current()->storage_info()->LevelFiles(0);
- ASSERT_EQ(2U, files.size());
- RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true,
- kInvalidBlobFileNumber, true);
-}
class CompactionJobTimestampTest : public CompactionJobTestBase {
public:
CompactionJobTimestampTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
test::BytewiseComparatorWithU64TsWrapper(),
- test::EncodeInt) {}
+ test::EncodeInt, false) {}
};
TEST_F(CompactionJobTimestampTest, GCDisabled) {
RunCompaction({files}, expected_results);
}
+// The io priority of the compaction reads and writes are different from
+// other DB reads and writes. To prepare the compaction input files, use the
+// default filesystem from Env. To test the io priority of the compaction
+// reads and writes, db_options_.fs is set as MockTestFileSystem.
+class CompactionJobIOPriorityTest : public CompactionJobTestBase {
+ public:
+ CompactionJobIOPriorityTest()
+ : CompactionJobTestBase(
+ test::PerThreadDBPath("compaction_job_io_priority_test"),
+ BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {}
+};
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
+ // When the state from WriteController is normal.
+ NewDB();
+ mock::KVVector expected_results = CreateTwoFiles(false);
+ auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+ auto files = cfd->current()->storage_info()->LevelFiles(0);
+ ASSERT_EQ(2U, files.size());
+ RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+ kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW);
+}
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
+ // When the state from WriteController is Delayed.
+ NewDB();
+ mock::KVVector expected_results = CreateTwoFiles(false);
+ auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+ auto files = cfd->current()->storage_info()->LevelFiles(0);
+ ASSERT_EQ(2U, files.size());
+ {
+ std::unique_ptr<WriteControllerToken> delay_token =
+ write_controller_.GetDelayToken(1000000);
+ RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+ kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
+ }
+}
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
+ // When the state from WriteController is Stalled.
+ NewDB();
+ mock::KVVector expected_results = CreateTwoFiles(false);
+ auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+ auto files = cfd->current()->storage_info()->LevelFiles(0);
+ ASSERT_EQ(2U, files.size());
+ {
+ std::unique_ptr<WriteControllerToken> stop_token =
+ write_controller_.GetStopToken();
+ RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+ kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
+ }
+}
+
+TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
+ NewDB();
+ mock::KVVector expected_results = CreateTwoFiles(false);
+ auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+ auto files = cfd->current()->storage_info()->LevelFiles(0);
+ ASSERT_EQ(2U, files.size());
+ RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+ kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW);
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {