]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add unit test to verify that the dynamic priority can be passed from compaction to...
authorgitbw95 <95719937+gitbw95@users.noreply.github.com>
Tue, 7 Jun 2022 18:57:12 +0000 (11:57 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Tue, 7 Jun 2022 18:57:12 +0000 (11:57 -0700)
Summary:
**Summary:**
Add unit tests to verify that the dynamic priority can be passed from compaction to FS. Compaction reads&writes and other DB reads&writes share the same read&write paths to FSRandomAccessFile or FSWritableFile, so a MockTestFileSystem is added to replace the default filesystem from Env to intercept and verify the io_priority. To prepare the compaction input files, use the default filesystem from Env. To test the io priority of the compaction reads and writes, db_options_.fs is set as MockTestFileSystem.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10088

Test Plan: Add unit tests.

Reviewed By: anand1976

Differential Revision: D36882528

Pulled By: gitbw95

fbshipit-source-id: 120adc15801966f2b8c9fc45285f590a3fff96d1

db/compaction/compaction_job_test.cc

index a704226d4e4121eacb63a358b0320dc76bd203c0..daf5b67a6b94098d748240b1bddc2d34a6cac3e6 100644 (file)
@@ -69,12 +69,138 @@ void VerifyInitializationOfCompactionJobStats(
 #endif  // !defined(IOS_CROSS_COMPILE)
 }
 
+// Mock FSWritableFile for testing io priority.
+// Only override the essential functions for testing compaction io priority.
+class MockTestWritableFile : public FSWritableFileOwnerWrapper {
+ public:
+  MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
+                       Env::IOPriority io_priority)
+      : FSWritableFileOwnerWrapper(std::move(file)),
+        write_io_priority_(io_priority) {}
+  IOStatus Append(const Slice& data, const IOOptions& options,
+                  IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Append(data, options, dbg);
+  }
+  IOStatus Append(const Slice& data, const IOOptions& options,
+                  const DataVerificationInfo& verification_info,
+                  IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Append(data, options, verification_info, dbg);
+  }
+  IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Close(options, dbg);
+  }
+  IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Flush(options, dbg);
+  }
+  IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Sync(options, dbg);
+  }
+  IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Fsync(options, dbg);
+  }
+  uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->GetFileSize(options, dbg);
+  }
+  IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
+                     IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->RangeSync(offset, nbytes, options, dbg);
+  }
+
+  void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    target()->PrepareWrite(offset, len, options, dbg);
+  }
+
+  IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Allocate(offset, len, options, dbg);
+  }
+
+ private:
+  Env::IOPriority write_io_priority_;
+};
+
+// Mock FSRandomAccessFile for testing io priority.
+// Only override the essential functions for testing compaction io priority.
+class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
+ public:
+  MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
+                           Env::IOPriority io_priority)
+      : FSRandomAccessFileOwnerWrapper(std::move(file)),
+        read_io_priority_(io_priority) {}
+
+  IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+                Slice* result, char* scratch,
+                IODebugContext* dbg) const override {
+    EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
+    return target()->Read(offset, n, options, result, scratch, dbg);
+  }
+  IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
+    return target()->Prefetch(offset, n, options, dbg);
+  }
+
+ private:
+  Env::IOPriority read_io_priority_;
+};
+
+// Mock FileSystem for testing io priority.
+class MockTestFileSystem : public FileSystemWrapper {
+ public:
+  explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
+                              Env::IOPriority read_io_priority,
+                              Env::IOPriority write_io_priority)
+      : FileSystemWrapper(base),
+        read_io_priority_(read_io_priority),
+        write_io_priority_(write_io_priority) {}
+
+  static const char* kClassName() { return "MockTestFileSystem"; }
+  const char* Name() const override { return kClassName(); }
+
+  IOStatus NewRandomAccessFile(const std::string& fname,
+                               const FileOptions& file_opts,
+                               std::unique_ptr<FSRandomAccessFile>* result,
+                               IODebugContext* dbg) override {
+    IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
+    EXPECT_OK(s);
+    result->reset(
+        new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
+    return s;
+  }
+  IOStatus NewWritableFile(const std::string& fname,
+                           const FileOptions& file_opts,
+                           std::unique_ptr<FSWritableFile>* result,
+                           IODebugContext* dbg) override {
+    IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
+    EXPECT_OK(s);
+    result->reset(
+        new MockTestWritableFile(std::move(*result), write_io_priority_));
+    return s;
+  }
+
+ private:
+  Env::IOPriority read_io_priority_;
+  Env::IOPriority write_io_priority_;
+};
+
 }  // namespace
 
 class CompactionJobTestBase : public testing::Test {
  protected:
   CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
-                        std::function<std::string(uint64_t)> encode_u64_ts)
+                        std::function<std::string(uint64_t)> encode_u64_ts,
+                        bool test_io_priority)
       : dbname_(std::move(dbname)),
         ucmp_(ucmp),
         db_options_(),
@@ -90,7 +216,8 @@ class CompactionJobTestBase : public testing::Test {
         shutting_down_(false),
         mock_table_factory_(new mock::MockTableFactory()),
         error_handler_(nullptr, db_options_, &mutex_),
-        encode_u64_ts_(std::move(encode_u64_ts)) {
+        encode_u64_ts_(std::move(encode_u64_ts)),
+        test_io_priority_(test_io_priority) {
     Env* base_env = Env::Default();
     EXPECT_OK(
         test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
@@ -105,7 +232,12 @@ class CompactionJobTestBase : public testing::Test {
     db_options_.db_paths.emplace_back(dbname_,
                                       std::numeric_limits<uint64_t>::max());
     cf_options_.comparator = ucmp_;
-    cf_options_.table_factory = mock_table_factory_;
+    if (test_io_priority_) {
+      BlockBasedTableOptions table_options;
+      cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
+    } else {
+      cf_options_.table_factory = mock_table_factory_;
+    }
   }
 
   std::string GenerateFileName(uint64_t file_number) {
@@ -145,6 +277,33 @@ class CompactionJobTestBase : public testing::Test {
     return blob_index;
   }
 
+  // Creates a table with the specificied key value pairs.
+  void CreateTable(const std::string& table_name,
+                   const mock::KVVector& contents, uint64_t& file_size) {
+    std::unique_ptr<WritableFileWriter> file_writer;
+    Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
+                                          &file_writer, nullptr);
+    ASSERT_OK(s);
+    std::unique_ptr<TableBuilder> table_builder(
+        cf_options_.table_factory->NewTableBuilder(
+            TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_,
+                                cfd_->internal_comparator(),
+                                cfd_->int_tbl_prop_collector_factories(),
+                                CompressionType::kNoCompression,
+                                CompressionOptions(), 0 /* column_family_id */,
+                                kDefaultColumnFamilyName, -1 /* level */),
+            file_writer.get()));
+    // Build table.
+    for (auto kv : contents) {
+      std::string key;
+      std::string value;
+      std::tie(key, value) = kv;
+      table_builder->Add(key, value);
+    }
+    ASSERT_OK(table_builder->Finish());
+    file_size = table_builder->FileSize();
+  }
+
   void AddMockFile(const mock::KVVector& contents, int level = 0) {
     assert(contents.size() > 0);
 
@@ -198,11 +357,18 @@ class CompactionJobTestBase : public testing::Test {
     }
 
     uint64_t file_number = versions_->NewFileNumber();
-    EXPECT_OK(mock_table_factory_->CreateMockTable(
-        env_, GenerateFileName(file_number), std::move(contents)));
+
+    uint64_t file_size;
+    if (test_io_priority_) {
+      CreateTable(GenerateFileName(file_number), contents, file_size);
+    } else {
+      file_size = 10;
+      EXPECT_OK(mock_table_factory_->CreateMockTable(
+          env_, GenerateFileName(file_number), std::move(contents)));
+    }
 
     VersionEdit edit;
-    edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
+    edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key,
                  smallest_seqno, largest_seqno, false, Temperature::kUnknown,
                  oldest_blob_file_number, kUnknownOldestAncesterTime,
                  kUnknownFileCreationTime, kUnknownFileChecksum,
@@ -323,7 +489,15 @@ class CompactionJobTestBase : public testing::Test {
       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
       int output_level = 1, bool verify = true,
       uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber,
-      bool check_get_priority = false) {
+      bool check_get_priority = false,
+      Env::IOPriority read_io_priority = Env::IO_TOTAL,
+      Env::IOPriority write_io_priority = Env::IO_TOTAL) {
+    // For compaction, set fs as MockTestFileSystem to check the io_priority.
+    if (test_io_priority_) {
+      db_options_.fs.reset(
+          new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
+    }
+
     auto cfd = versions_->GetColumnFamilySet()->GetDefault();
 
     size_t num_input_files = 0;
@@ -445,15 +619,16 @@ class CompactionJobTestBase : public testing::Test {
   ErrorHandler error_handler_;
   std::string full_history_ts_low_;
   const std::function<std::string(uint64_t)> encode_u64_ts_;
+  bool test_io_priority_;
 };
 
 // TODO(icanadi) Make it simpler once we mock out VersionSet
 class CompactionJobTest : public CompactionJobTestBase {
  public:
   CompactionJobTest()
-      : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
-                              BytewiseComparator(),
-                              [](uint64_t /*ts*/) { return ""; }) {}
+      : CompactionJobTestBase(
+            test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
+            [](uint64_t /*ts*/) { return ""; }, false) {}
 };
 
 TEST_F(CompactionJobTest, Simple) {
@@ -1343,23 +1518,13 @@ TEST_F(CompactionJobTest, ResultSerialization) {
   }
 }
 
-TEST_F(CompactionJobTest, GetRateLimiterPriority) {
-  NewDB();
-
-  auto expected_results = CreateTwoFiles(false);
-  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
-  auto files = cfd->current()->storage_info()->LevelFiles(0);
-  ASSERT_EQ(2U, files.size());
-  RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true,
-                kInvalidBlobFileNumber, true);
-}
 
 class CompactionJobTimestampTest : public CompactionJobTestBase {
  public:
   CompactionJobTimestampTest()
       : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
                               test::BytewiseComparatorWithU64TsWrapper(),
-                              test::EncodeInt) {}
+                              test::EncodeInt, false) {}
 };
 
 TEST_F(CompactionJobTimestampTest, GCDisabled) {
@@ -1475,6 +1640,69 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
   RunCompaction({files}, expected_results);
 }
 
+// The io priority of the compaction reads and writes are different from
+// other DB reads and writes. To prepare the compaction input files, use the
+// default filesystem from Env. To test the io priority of the compaction
+// reads and writes, db_options_.fs is set as MockTestFileSystem.
+class CompactionJobIOPriorityTest : public CompactionJobTestBase {
+ public:
+  CompactionJobIOPriorityTest()
+      : CompactionJobTestBase(
+            test::PerThreadDBPath("compaction_job_io_priority_test"),
+            BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {}
+};
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
+  // When the state from WriteController is normal.
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  auto files = cfd->current()->storage_info()->LevelFiles(0);
+  ASSERT_EQ(2U, files.size());
+  RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+                kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW);
+}
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
+  // When the state from WriteController is Delayed.
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  auto files = cfd->current()->storage_info()->LevelFiles(0);
+  ASSERT_EQ(2U, files.size());
+  {
+    std::unique_ptr<WriteControllerToken> delay_token =
+        write_controller_.GetDelayToken(1000000);
+    RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+                  kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
+  }
+}
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
+  // When the state from WriteController is Stalled.
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  auto files = cfd->current()->storage_info()->LevelFiles(0);
+  ASSERT_EQ(2U, files.size());
+  {
+    std::unique_ptr<WriteControllerToken> stop_token =
+        write_controller_.GetStopToken();
+    RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+                  kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
+  }
+}
+
+TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  auto files = cfd->current()->storage_info()->LevelFiles(0);
+  ASSERT_EQ(2U, files.size());
+  RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
+                kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW);
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {