]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix bug in WAL streaming uncompression (#11198)
authoranand76 <anand76@devvm4702.ftw0.facebook.com>
Wed, 8 Feb 2023 20:05:49 +0000 (12:05 -0800)
committeranand76 <anand76@devvm6332.prn0.facebook.com>
Fri, 10 Feb 2023 22:58:13 +0000 (14:58 -0800)
Summary:
Fix a bug in the calculation of the input buffer address/offset in log_reader.cc. The bug is when consecutive fragments of a compressed record are located at the same offset in the log reader buffer, the second fragment input buffer is treated as a leftover from the previous input buffer. As a result, the offset in the `ZSTD_inBuffer` is not reset.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11198

Test Plan: Add a unit test in log_test.cc that fails without the fix and passes with it.

Reviewed By: ajkr, cbi42

Differential Revision: D43102692

Pulled By: anand1976

fbshipit-source-id: aa2648f4802c33991b76a3233c5a58d4cc9e77fd

HISTORY.md
db/log_reader.cc
db/log_test.cc
util/compression.cc
util/compression.h

index 904f93c79a128a1e47c416e354ca37215e3ce432..ad91ac2c32386d09737d339f3565b9d6a0239b47 100644 (file)
@@ -5,6 +5,7 @@
 * Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish
 * Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()`
 * Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support.
+* Fixed a bug in DB open/recovery from a compressed WAL that was caused due to incorrect handling of certain record fragments with the same offset within a WAL block.
 
 ## 7.10.0 (01/23/2023)
 ### Behavior changes
index a21868776141f6ec780c62bb56e55f8515839fff..575a7d758910cd6aec9a55e52dd635796d721a60 100644 (file)
@@ -515,10 +515,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
 
       size_t uncompressed_size = 0;
       int remaining = 0;
+      const char* input = header + header_size;
       do {
-        remaining = uncompress_->Uncompress(header + header_size, length,
-                                            uncompressed_buffer_.get(),
-                                            &uncompressed_size);
+        remaining = uncompress_->Uncompress(
+            input, length, uncompressed_buffer_.get(), &uncompressed_size);
+        input = nullptr;
         if (remaining < 0) {
           buffer_.clear();
           return kBadRecord;
@@ -830,10 +831,11 @@ bool FragmentBufferedReader::TryReadFragment(
     uncompressed_record_.clear();
     size_t uncompressed_size = 0;
     int remaining = 0;
+    const char* input = header + header_size;
     do {
-      remaining = uncompress_->Uncompress(header + header_size, length,
-                                          uncompressed_buffer_.get(),
-                                          &uncompressed_size);
+      remaining = uncompress_->Uncompress(
+          input, length, uncompressed_buffer_.get(), &uncompressed_size);
+      input = nullptr;
       if (remaining < 0) {
         buffer_.clear();
         *fragment_type_or_err = kBadRecord;
index 2a43dc152d96fd57695099de30c51e5497aa1e7c..f4d388f41b0577df081a39e9feb4b64dc8e7bb53 100644 (file)
@@ -979,6 +979,38 @@ TEST_P(CompressionLogTest, Fragmentation) {
   ASSERT_EQ("EOF", Read());
 }
 
+TEST_P(CompressionLogTest, AlignedFragmentation) {
+  CompressionType compression_type = std::get<2>(GetParam());
+  if (!StreamingCompressionTypeSupported(compression_type)) {
+    ROCKSDB_GTEST_SKIP("Test requires support for compression type");
+    return;
+  }
+  ASSERT_OK(SetupTestEnv());
+  Random rnd(301);
+  int num_filler_records = 0;
+  // Keep writing small records until the next record will be aligned at the
+  // beginning of the block.
+  while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) {
+    char entry = 'a';
+    ASSERT_OK(writer_->AddRecord(Slice(&entry, 1)));
+    num_filler_records++;
+  }
+  const std::vector<std::string> wal_entries = {
+      rnd.RandomBinaryString(3 * kBlockSize),
+  };
+  for (const std::string& wal_entry : wal_entries) {
+    Write(wal_entry);
+  }
+
+  for (int i = 0; i < num_filler_records; ++i) {
+    ASSERT_EQ("a", Read());
+  }
+  for (const std::string& wal_entry : wal_entries) {
+    ASSERT_EQ(wal_entry, Read());
+  }
+  ASSERT_EQ("EOF", Read());
+}
+
 INSTANTIATE_TEST_CASE_P(
     Compression, CompressionLogTest,
     ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
@@ -1026,10 +1058,11 @@ TEST_P(StreamingCompressionTest, Basic) {
   for (int i = 0; i < (int)compressed_buffers.size(); i++) {
     // Call uncompress till either the entire input is consumed or the output
     // buffer size is equal to the allocated output buffer size.
+    const char* input = compressed_buffers[i].c_str();
     do {
-      ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(),
-                                       compressed_buffers[i].size(),
+      ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(),
                                        uncompressed_output_buffer, &output_pos);
+      input = nullptr;
       if (output_pos > 0) {
         std::string uncompressed_fragment;
         uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);
index 8e2f01b12507423ca098add67594f997a6b78fc6..712d333ee633279109d2db95b28b11f4fa78daaa 100644 (file)
@@ -85,14 +85,14 @@ void ZSTDStreamingCompress::Reset() {
 
 int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
                                         char* output, size_t* output_pos) {
-  assert(input != nullptr && output != nullptr && output_pos != nullptr);
+  assert(output != nullptr && output_pos != nullptr);
   *output_pos = 0;
   // Don't need to uncompress an empty input
   if (input_size == 0) {
     return 0;
   }
 #ifdef ZSTD_STREAMING
-  if (input_buffer_.src != input) {
+  if (input) {
     // New input
     input_buffer_ = {input, input_size, /*pos=*/0};
   }
index 2185d52134ee3ef64aaca70756901c08f86e430e..31ff5a7554a304cadd506f9ae22cae7d8a24d8dc 100644 (file)
@@ -1711,8 +1711,11 @@ class StreamingUncompress {
         compress_format_version_(compress_format_version),
         max_output_len_(max_output_len) {}
   virtual ~StreamingUncompress() = default;
-  // uncompress should be called again with the same input if output_size is
-  // equal to max_output_len or with the next input fragment.
+  // Uncompress can be called repeatedly to progressively process the same
+  // input buffer, or can be called with a new input buffer. When the input
+  // buffer is not fully consumed, the return value is > 0 or output_size
+  // == max_output_len. When calling uncompress to continue processing the
+  // same input buffer, the input argument should be nullptr.
   // Parameters:
   // input - buffer to uncompress
   // input_size - size of input buffer