]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Batch blob read IO for MultiGet (#8699)
authorYanqin Jin <yanqin@fb.com>
Sat, 18 Sep 2021 01:43:32 +0000 (18:43 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Sat, 18 Sep 2021 02:23:13 +0000 (19:23 -0700)
Summary:
In batched `MultiGet()`, RocksDB batches blob read IO and uses `RandomAccessFileReader::MultiRead()`
to read the blobs instead of issuing multiple `Read()`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8699

Test Plan:
```
make check
```

Reviewed By: ltamasi

Differential Revision: D31030861

Pulled By: riversand963

fbshipit-source-id: a0df6060cbfd54cff9515a4eee08807b1dbcb0c8

HISTORY.md
db/blob/blob_file_reader.cc
db/blob/blob_file_reader.h
db/blob/blob_file_reader_test.cc
db/blob/blob_index.h
db/blob/db_blob_basic_test.cc
db/db_impl/db_impl.cc
db/version_set.cc
db/version_set.h
table/multiget_context.h

index 6e7a45fc833832d40dca2b7cb5bbe472997a20ae..fa21fa5853145307bd99201adfdcfd8da0f9fe42 100644 (file)
@@ -23,6 +23,7 @@
 * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
 * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
 * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
+* Batch blob read requests for `DB::MultiGet` using `MultiRead`.
 
 ### Public API change
 * Remove obsolete implementation details FullKey and ParseFullKey from public API
index 6f64da48bacb0acfd82da10a3a8b466f9e8f6de8..9ce0b5b89878004c2337734bb35f261ae15017f4 100644 (file)
@@ -351,6 +351,117 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
   return Status::OK();
 }
 
+void BlobFileReader::MultiGetBlob(
+    const ReadOptions& read_options,
+    const autovector<std::reference_wrapper<const Slice>>& user_keys,
+    const autovector<uint64_t>& offsets,
+    const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
+    autovector<PinnableSlice*>& values, uint64_t* bytes_read) const {
+  const size_t num_blobs = user_keys.size();
+  assert(num_blobs == offsets.size());
+  assert(num_blobs == value_sizes.size());
+  assert(num_blobs == statuses.size());
+  assert(num_blobs == values.size());
+
+  std::vector<FSReadRequest> read_reqs(num_blobs);
+  autovector<uint64_t> adjustments;
+  uint64_t total_len = 0;
+  for (size_t i = 0; i < num_blobs; ++i) {
+    const size_t key_size = user_keys[i].get().size();
+    assert(IsValidBlobOffset(offsets[i], key_size, value_sizes[i], file_size_));
+    const uint64_t adjustment =
+        read_options.verify_checksums
+            ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
+            : 0;
+    assert(offsets[i] >= adjustment);
+    adjustments.push_back(adjustment);
+    read_reqs[i].offset = offsets[i] - adjustment;
+    read_reqs[i].len = value_sizes[i] + adjustment;
+    total_len += read_reqs[i].len;
+  }
+
+  RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
+
+  Buffer buf;
+  AlignedBuf aligned_buf;
+
+  Status s;
+  bool direct_io = file_reader_->use_direct_io();
+  if (direct_io) {
+    for (size_t i = 0; i < read_reqs.size(); ++i) {
+      read_reqs[i].scratch = nullptr;
+    }
+  } else {
+    buf.reset(new char[total_len]);
+    std::ptrdiff_t pos = 0;
+    for (size_t i = 0; i < read_reqs.size(); ++i) {
+      read_reqs[i].scratch = buf.get() + pos;
+      pos += read_reqs[i].len;
+    }
+  }
+  TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
+  s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
+                              direct_io ? &aligned_buf : nullptr);
+  if (!s.ok()) {
+    for (auto& req : read_reqs) {
+      req.status.PermitUncheckedError();
+    }
+    for (size_t i = 0; i < num_blobs; ++i) {
+      assert(statuses[i]);
+      *statuses[i] = s;
+    }
+    return;
+  }
+
+  assert(s.ok());
+  for (size_t i = 0; i < num_blobs; ++i) {
+    auto& req = read_reqs[i];
+    assert(statuses[i]);
+    if (req.status.ok() && req.result.size() != req.len) {
+      req.status = IOStatus::Corruption("Failed to read data from blob file");
+    }
+    *statuses[i] = req.status;
+  }
+
+  if (read_options.verify_checksums) {
+    for (size_t i = 0; i < num_blobs; ++i) {
+      assert(statuses[i]);
+      if (!statuses[i]->ok()) {
+        continue;
+      }
+      const Slice& record_slice = read_reqs[i].result;
+      s = VerifyBlob(record_slice, user_keys[i], value_sizes[i]);
+      if (!s.ok()) {
+        assert(statuses[i]);
+        *statuses[i] = s;
+      }
+    }
+  }
+
+  for (size_t i = 0; i < num_blobs; ++i) {
+    assert(statuses[i]);
+    if (!statuses[i]->ok()) {
+      continue;
+    }
+    const Slice& record_slice = read_reqs[i].result;
+    const Slice value_slice(record_slice.data() + adjustments[i],
+                            value_sizes[i]);
+    s = UncompressBlobIfNeeded(value_slice, compression_type_, clock_,
+                               statistics_, values[i]);
+    if (!s.ok()) {
+      *statuses[i] = s;
+    }
+  }
+
+  if (bytes_read) {
+    uint64_t total_bytes = 0;
+    for (const auto& req : read_reqs) {
+      total_bytes += req.result.size();
+    }
+    *bytes_read = total_bytes;
+  }
+}
+
 Status BlobFileReader::VerifyBlob(const Slice& record_slice,
                                   const Slice& user_key, uint64_t value_size) {
   BlobLogRecord record;
index 3ab0d52c2f256003199a4d3e3622507f2170497a..06087c3a005c6d65d4371483d2a5d2738d775020 100644 (file)
@@ -11,6 +11,7 @@
 #include "file/random_access_file_reader.h"
 #include "rocksdb/compression_type.h"
 #include "rocksdb/rocksdb_namespace.h"
+#include "util/autovector.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -43,6 +44,17 @@ class BlobFileReader {
                  CompressionType compression_type, PinnableSlice* value,
                  uint64_t* bytes_read) const;
 
+  void MultiGetBlob(
+      const ReadOptions& read_options,
+      const autovector<std::reference_wrapper<const Slice>>& user_keys,
+      const autovector<uint64_t>& offsets,
+      const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
+      autovector<PinnableSlice*>& values, uint64_t* bytes_read) const;
+
+  CompressionType GetCompressionType() const { return compression_type_; }
+
+  uint64_t GetFileSize() const { return file_size_; }
+
  private:
   BlobFileReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,
                  uint64_t file_size, CompressionType compression_type,
index e08a4bab83665237cddd237fc7ae7e31e7fa3398..8544b53d4cce7f05c305451e07f02633a4d699e9 100644 (file)
@@ -27,23 +27,23 @@ namespace ROCKSDB_NAMESPACE {
 
 namespace {
 
-// Creates a test blob file with a single blob in it. Note: this method
-// makes it possible to test various corner cases by allowing the caller
-// to specify the contents of various blob file header/footer fields.
+// Creates a test blob file with `num` blobs in it.
 void WriteBlobFile(const ImmutableOptions& immutable_options,
                    uint32_t column_family_id, bool has_ttl,
                    const ExpirationRange& expiration_range_header,
                    const ExpirationRange& expiration_range_footer,
-                   uint64_t blob_file_number, const Slice& key,
-                   const Slice& blob, CompressionType compression_type,
-                   uint64_t* blob_offset, uint64_t* blob_size) {
+                   uint64_t blob_file_number, const std::vector<Slice>& keys,
+                   const std::vector<Slice>& blobs, CompressionType compression,
+                   std::vector<uint64_t>& blob_offsets,
+                   std::vector<uint64_t>& blob_sizes) {
   assert(!immutable_options.cf_paths.empty());
-  assert(blob_offset);
-  assert(blob_size);
+  size_t num = keys.size();
+  assert(num == blobs.size());
+  assert(num == blob_offsets.size());
+  assert(num == blob_sizes.size());
 
   const std::string blob_file_path =
       BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
-
   std::unique_ptr<FSWritableFile> file;
   ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
                             FileOptions()));
@@ -59,50 +59,77 @@ void WriteBlobFile(const ImmutableOptions& immutable_options,
                                 statistics, blob_file_number, use_fsync,
                                 do_flush);
 
-  BlobLogHeader header(column_family_id, compression_type, has_ttl,
+  BlobLogHeader header(column_family_id, compression, has_ttl,
                        expiration_range_header);
 
   ASSERT_OK(blob_log_writer.WriteHeader(header));
 
-  std::string compressed_blob;
-  Slice blob_to_write;
-
-  if (compression_type == kNoCompression) {
-    blob_to_write = blob;
-    *blob_size = blob.size();
+  std::vector<std::string> compressed_blobs(num);
+  std::vector<Slice> blobs_to_write(num);
+  if (kNoCompression == compression) {
+    for (size_t i = 0; i < num; ++i) {
+      blobs_to_write[i] = blobs[i];
+      blob_sizes[i] = blobs[i].size();
+    }
   } else {
     CompressionOptions opts;
-    CompressionContext context(compression_type);
+    CompressionContext context(compression);
     constexpr uint64_t sample_for_compression = 0;
-
     CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
-                         compression_type, sample_for_compression);
+                         compression, sample_for_compression);
 
     constexpr uint32_t compression_format_version = 2;
 
-    ASSERT_TRUE(
-        CompressData(blob, info, compression_format_version, &compressed_blob));
-
-    blob_to_write = compressed_blob;
-    *blob_size = compressed_blob.size();
+    for (size_t i = 0; i < num; ++i) {
+      ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version,
+                               &compressed_blobs[i]));
+      blobs_to_write[i] = compressed_blobs[i];
+      blob_sizes[i] = compressed_blobs[i].size();
+    }
   }
 
-  uint64_t key_offset = 0;
-
-  ASSERT_OK(
-      blob_log_writer.AddRecord(key, blob_to_write, &key_offset, blob_offset));
+  for (size_t i = 0; i < num; ++i) {
+    uint64_t key_offset = 0;
+    ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset,
+                                        &blob_offsets[i]));
+  }
 
   BlobLogFooter footer;
-  footer.blob_count = 1;
+  footer.blob_count = num;
   footer.expiration_range = expiration_range_footer;
 
   std::string checksum_method;
   std::string checksum_value;
-
   ASSERT_OK(
       blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
 }
 
+// Creates a test blob file with a single blob in it. Note: this method
+// makes it possible to test various corner cases by allowing the caller
+// to specify the contents of various blob file header/footer fields.
+void WriteBlobFile(const ImmutableOptions& immutable_options,
+                   uint32_t column_family_id, bool has_ttl,
+                   const ExpirationRange& expiration_range_header,
+                   const ExpirationRange& expiration_range_footer,
+                   uint64_t blob_file_number, const Slice& key,
+                   const Slice& blob, CompressionType compression,
+                   uint64_t* blob_offset, uint64_t* blob_size) {
+  std::vector<Slice> keys{key};
+  std::vector<Slice> blobs{blob};
+  std::vector<uint64_t> blob_offsets{0};
+  std::vector<uint64_t> blob_sizes{0};
+  WriteBlobFile(immutable_options, column_family_id, has_ttl,
+                expiration_range_header, expiration_range_footer,
+                blob_file_number, keys, blobs, compression, blob_offsets,
+                blob_sizes);
+  if (blob_offset) {
+    *blob_offset = blob_offsets[0];
+  }
+  if (blob_size) {
+    *blob_size = blob_sizes[0];
+  }
+}
+
 }  // anonymous namespace
 
 class BlobFileReaderTest : public testing::Test {
@@ -127,15 +154,19 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
   constexpr bool has_ttl = false;
   constexpr ExpirationRange expiration_range;
   constexpr uint64_t blob_file_number = 1;
-  constexpr char key[] = "key";
-  constexpr char blob[] = "blob";
+  constexpr size_t num_blobs = 3;
+  const std::vector<std::string> key_strs = {"key1", "key2", "key3"};
+  const std::vector<std::string> blob_strs = {"blob1", "blob2", "blob3"};
 
-  uint64_t blob_offset = 0;
-  uint64_t blob_size = 0;
+  const std::vector<Slice> keys = {key_strs[0], key_strs[1], key_strs[2]};
+  const std::vector<Slice> blobs = {blob_strs[0], blob_strs[1], blob_strs[2]};
+
+  std::vector<uint64_t> blob_offsets(keys.size());
+  std::vector<uint64_t> blob_sizes(keys.size());
 
   WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
-                expiration_range, blob_file_number, key, blob, kNoCompression,
-                &blob_offset, &blob_size);
+                expiration_range, blob_file_number, keys, blobs, kNoCompression,
+                blob_offsets, blob_sizes);
 
   constexpr HistogramImpl* blob_file_read_hist = nullptr;
 
@@ -153,10 +184,36 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
     PinnableSlice value;
     uint64_t bytes_read = 0;
 
-    ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
-                              kNoCompression, &value, &bytes_read));
-    ASSERT_EQ(value, blob);
-    ASSERT_EQ(bytes_read, blob_size);
+    ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
+                              blob_sizes[0], kNoCompression, &value,
+                              &bytes_read));
+    ASSERT_EQ(value, blobs[0]);
+    ASSERT_EQ(bytes_read, blob_sizes[0]);
+
+    // MultiGetBlob
+    bytes_read = 0;
+    size_t total_size = 0;
+    autovector<std::reference_wrapper<const Slice>> key_refs;
+    for (const auto& key_ref : keys) {
+      key_refs.emplace_back(std::cref(key_ref));
+    }
+    autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
+                                 blob_offsets[2]};
+    autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
+    std::array<Status, num_blobs> statuses_buf;
+    autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
+                                 &statuses_buf[2]};
+    std::array<PinnableSlice, num_blobs> value_buf;
+    autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
+                                      &value_buf[2]};
+    reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
+                         values, &bytes_read);
+    for (size_t i = 0; i < num_blobs; ++i) {
+      ASSERT_OK(statuses_buf[i]);
+      ASSERT_EQ(value_buf[i], blobs[i]);
+      total_size += blob_sizes[i];
+    }
+    ASSERT_EQ(bytes_read, total_size);
   }
 
   read_options.verify_checksums = true;
@@ -165,14 +222,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
     PinnableSlice value;
     uint64_t bytes_read = 0;
 
-    ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
-                              kNoCompression, &value, &bytes_read));
-    ASSERT_EQ(value, blob);
+    ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
+                              blob_sizes[1], kNoCompression, &value,
+                              &bytes_read));
+    ASSERT_EQ(value, blobs[1]);
 
-    constexpr uint64_t key_size = sizeof(key) - 1;
+    const uint64_t key_size = keys[1].size();
     ASSERT_EQ(bytes_read,
               BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
-                  blob_size);
+                  blob_sizes[1]);
   }
 
   // Invalid offset (too close to start of file)
@@ -181,8 +239,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
     uint64_t bytes_read = 0;
 
     ASSERT_TRUE(reader
-                    ->GetBlob(read_options, key, blob_offset - 1, blob_size,
-                              kNoCompression, &value, &bytes_read)
+                    ->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
+                              blob_sizes[0], kNoCompression, &value,
+                              &bytes_read)
                     .IsCorruption());
     ASSERT_EQ(bytes_read, 0);
   }
@@ -193,8 +252,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
     uint64_t bytes_read = 0;
 
     ASSERT_TRUE(reader
-                    ->GetBlob(read_options, key, blob_offset + 1, blob_size,
-                              kNoCompression, &value, &bytes_read)
+                    ->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
+                              blob_sizes[2], kNoCompression, &value,
+                              &bytes_read)
                     .IsCorruption());
     ASSERT_EQ(bytes_read, 0);
   }
@@ -205,8 +265,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
     uint64_t bytes_read = 0;
 
     ASSERT_TRUE(reader
-                    ->GetBlob(read_options, key, blob_offset, blob_size, kZSTD,
-                              &value, &bytes_read)
+                    ->GetBlob(read_options, keys[0], blob_offsets[0],
+                              blob_sizes[0], kZSTD, &value, &bytes_read)
                     .IsCorruption());
     ASSERT_EQ(bytes_read, 0);
   }
@@ -219,23 +279,82 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
 
     ASSERT_TRUE(reader
                     ->GetBlob(read_options, shorter_key,
-                              blob_offset - (sizeof(key) - sizeof(shorter_key)),
-                              blob_size, kNoCompression, &value, &bytes_read)
+                              blob_offsets[0] -
+                                  (keys[0].size() - sizeof(shorter_key) + 1),
+                              blob_sizes[0], kNoCompression, &value,
+                              &bytes_read)
                     .IsCorruption());
     ASSERT_EQ(bytes_read, 0);
+
+    // MultiGetBlob
+    autovector<std::reference_wrapper<const Slice>> key_refs;
+    for (const auto& key_ref : keys) {
+      key_refs.emplace_back(std::cref(key_ref));
+    }
+    Slice shorter_key_slice(shorter_key, sizeof(shorter_key) - 1);
+    key_refs[1] = std::cref(shorter_key_slice);
+
+    autovector<uint64_t> offsets{
+        blob_offsets[0],
+        blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
+        blob_offsets[2]};
+    autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
+    std::array<Status, num_blobs> statuses_buf;
+    autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
+                                 &statuses_buf[2]};
+    std::array<PinnableSlice, num_blobs> value_buf;
+    autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
+                                      &value_buf[2]};
+    reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
+                         values, &bytes_read);
+    for (size_t i = 0; i < num_blobs; ++i) {
+      if (i == 1) {
+        ASSERT_TRUE(statuses_buf[i].IsCorruption());
+      } else {
+        ASSERT_OK(statuses_buf[i]);
+      }
+    }
   }
 
   // Incorrect key
   {
-    constexpr char incorrect_key[] = "foo";
+    constexpr char incorrect_key[] = "foo1";
     PinnableSlice value;
     uint64_t bytes_read = 0;
 
     ASSERT_TRUE(reader
-                    ->GetBlob(read_options, incorrect_key, blob_offset,
-                              blob_size, kNoCompression, &value, &bytes_read)
+                    ->GetBlob(read_options, incorrect_key, blob_offsets[0],
+                              blob_sizes[0], kNoCompression, &value,
+                              &bytes_read)
                     .IsCorruption());
     ASSERT_EQ(bytes_read, 0);
+
+    // MultiGetBlob
+    autovector<std::reference_wrapper<const Slice>> key_refs;
+    for (const auto& key_ref : keys) {
+      key_refs.emplace_back(std::cref(key_ref));
+    }
+    Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
+    key_refs[2] = std::cref(wrong_key_slice);
+
+    autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
+                                 blob_offsets[2]};
+    autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
+    std::array<Status, num_blobs> statuses_buf;
+    autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
+                                 &statuses_buf[2]};
+    std::array<PinnableSlice, num_blobs> value_buf;
+    autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
+                                      &value_buf[2]};
+    reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
+                         values, &bytes_read);
+    for (size_t i = 0; i < num_blobs; ++i) {
+      if (i == num_blobs - 1) {
+        ASSERT_TRUE(statuses_buf[i].IsCorruption());
+      } else {
+        ASSERT_OK(statuses_buf[i]);
+      }
+    }
   }
 
   // Incorrect value size
@@ -244,10 +363,35 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
     uint64_t bytes_read = 0;
 
     ASSERT_TRUE(reader
-                    ->GetBlob(read_options, key, blob_offset, blob_size + 1,
-                              kNoCompression, &value, &bytes_read)
+                    ->GetBlob(read_options, keys[1], blob_offsets[1],
+                              blob_sizes[1] + 1, kNoCompression, &value,
+                              &bytes_read)
                     .IsCorruption());
     ASSERT_EQ(bytes_read, 0);
+
+    // MultiGetBlob
+    autovector<std::reference_wrapper<const Slice>> key_refs;
+    for (const auto& key_ref : keys) {
+      key_refs.emplace_back(std::cref(key_ref));
+    }
+    autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
+                                 blob_offsets[2]};
+    autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1] + 1, blob_sizes[2]};
+    std::array<Status, num_blobs> statuses_buf;
+    autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
+                                 &statuses_buf[2]};
+    std::array<PinnableSlice, num_blobs> value_buf;
+    autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
+                                      &value_buf[2]};
+    reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
+                         values, &bytes_read);
+    for (size_t i = 0; i < num_blobs; ++i) {
+      if (i != 1) {
+        ASSERT_OK(statuses_buf[i]);
+      } else {
+        ASSERT_TRUE(statuses_buf[i].IsCorruption());
+      }
+    }
   }
 }
 
index 27927cb385be055e76d9df8c40248e5942830965..5bac36627783a4ce0fc2d61f161effde53d04b6c 100644 (file)
@@ -52,6 +52,9 @@ class BlobIndex {
 
   BlobIndex() : type_(Type::kUnknown) {}
 
+  BlobIndex(const BlobIndex&) = default;
+  BlobIndex& operator=(const BlobIndex&) = default;
+
   bool IsInlined() const { return type_ == Type::kInlinedTTL; }
 
   bool HasTTL() const {
index 23a54cad3227f7d8e2c82021567eff0a4a4e1a62..9a909f67dae7667b76f80d7139c064289ea49456 100644 (file)
@@ -127,6 +127,46 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) {
   }
 }
 
+TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
+  Options options = GetDefaultOptions();
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+
+  Reopen(options);
+
+  constexpr size_t kNumBlobFiles = 3;
+  constexpr size_t kNumBlobsPerFile = 3;
+  constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles;
+
+  std::vector<std::string> key_strs;
+  std::vector<std::string> value_strs;
+  for (size_t i = 0; i < kNumBlobFiles; ++i) {
+    for (size_t j = 0; j < kNumBlobsPerFile; ++j) {
+      std::string key = "key" + std::to_string(i) + "_" + std::to_string(j);
+      std::string value =
+          "value_as_blob" + std::to_string(i) + "_" + std::to_string(j);
+      ASSERT_OK(Put(key, value));
+      key_strs.push_back(key);
+      value_strs.push_back(value);
+    }
+    ASSERT_OK(Flush());
+  }
+  assert(key_strs.size() == kNumKeys);
+  std::array<Slice, kNumKeys> keys;
+  for (size_t i = 0; i < keys.size(); ++i) {
+    keys[i] = key_strs[i];
+  }
+  std::array<PinnableSlice, kNumKeys> values;
+  std::array<Status, kNumKeys> statuses;
+  db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0],
+                &values[0], &statuses[0]);
+
+  for (size_t i = 0; i < kNumKeys; ++i) {
+    ASSERT_OK(statuses[i]);
+    ASSERT_EQ(value_strs[i], values[i]);
+  }
+}
+
 TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
   Options options = GetDefaultOptions();
   options.enable_blob_files = true;
@@ -150,6 +190,83 @@ TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
                   .IsCorruption());
 }
 
+TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
+  Options options = GetDefaultOptions();
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+  options.create_if_missing = true;
+
+  DestroyAndReopen(options);
+
+  constexpr size_t kNumOfKeys = 3;
+  std::array<std::string, kNumOfKeys> key_strs;
+  std::array<std::string, kNumOfKeys> value_strs;
+  std::array<Slice, kNumOfKeys + 1> keys;
+  for (size_t i = 0; i < kNumOfKeys; ++i) {
+    key_strs[i] = "foo" + std::to_string(i);
+    value_strs[i] = "blob_value" + std::to_string(i);
+    ASSERT_OK(Put(key_strs[i], value_strs[i]));
+    keys[i] = key_strs[i];
+  }
+
+  constexpr char key[] = "key";
+  {
+    // Fake a corrupt blob index.
+    const std::string blob_index("foobar");
+    WriteBatch batch;
+    ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
+    ASSERT_OK(db_->Write(WriteOptions(), &batch));
+    keys[kNumOfKeys] = Slice(static_cast<const char*>(key), sizeof(key) - 1);
+  }
+
+  ASSERT_OK(Flush());
+
+  std::array<PinnableSlice, kNumOfKeys + 1> values;
+  std::array<Status, kNumOfKeys + 1> statuses;
+  db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1,
+                keys.data(), values.data(), statuses.data(),
+                /*sorted_input=*/false);
+  for (size_t i = 0; i < kNumOfKeys + 1; ++i) {
+    if (i != kNumOfKeys) {
+      ASSERT_OK(statuses[i]);
+      ASSERT_EQ("blob_value" + std::to_string(i), values[i]);
+    } else {
+      ASSERT_TRUE(statuses[i].IsCorruption());
+    }
+  }
+}
+
+TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) {
+  Options options = GetDefaultOptions();
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+
+  Reopen(options);
+
+  constexpr size_t kNumOfKeys = 3;
+  std::array<std::string, kNumOfKeys> key_bufs;
+  std::array<std::string, kNumOfKeys> value_bufs;
+  std::array<Slice, kNumOfKeys> keys;
+  for (size_t i = 0; i < kNumOfKeys; ++i) {
+    key_bufs[i] = "foo" + std::to_string(i);
+    value_bufs[i] = "blob_value" + std::to_string(i);
+    ASSERT_OK(Put(key_bufs[i], value_bufs[i]));
+    keys[i] = key_bufs[i];
+  }
+  ASSERT_OK(Flush());
+
+  std::array<PinnableSlice, kNumOfKeys> values;
+  std::array<Status, kNumOfKeys> statuses;
+  ReadOptions read_opts;
+  read_opts.value_size_soft_limit = 1;
+  db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys,
+                keys.data(), values.data(), statuses.data(),
+                /*sorted_input=*/true);
+  for (const auto& s : statuses) {
+    ASSERT_TRUE(s.IsAborted());
+  }
+}
+
 TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) {
   constexpr uint64_t min_blob_size = 10;
 
@@ -522,11 +639,21 @@ class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
   std::string sync_point_;
 };
 
+class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest {
+ public:
+  DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {}
+};
+
 INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest,
                         ::testing::ValuesIn(std::vector<std::string>{
                             "BlobFileReader::OpenFile:NewRandomAccessFile",
                             "BlobFileReader::GetBlob:ReadFromFile"}));
 
+INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest,
+                        ::testing::ValuesIn(std::vector<std::string>{
+                            "BlobFileReader::OpenFile:NewRandomAccessFile",
+                            "BlobFileReader::MultiGetBlob:ReadFromFile"}));
+
 TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
   Options options;
   options.env = fault_injection_env_.get();
@@ -556,7 +683,7 @@ TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
   SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
-TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
+TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) {
   Options options = GetDefaultOptions();
   options.env = fault_injection_env_.get();
   options.enable_blob_files = true;
@@ -598,6 +725,53 @@ TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
   ASSERT_TRUE(statuses[1].IsIOError());
 }
 
+TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) {
+  Options options = GetDefaultOptions();
+  options.env = fault_injection_env_.get();
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+
+  Reopen(options);
+
+  constexpr size_t num_keys = 2;
+
+  constexpr char key1[] = "key1";
+  constexpr char value1[] = "blob1";
+
+  ASSERT_OK(Put(key1, value1));
+  ASSERT_OK(Flush());
+
+  constexpr char key2[] = "key2";
+  constexpr char value2[] = "blob2";
+
+  ASSERT_OK(Put(key2, value2));
+  ASSERT_OK(Flush());
+
+  std::array<Slice, num_keys> keys{{key1, key2}};
+  std::array<PinnableSlice, num_keys> values;
+  std::array<Status, num_keys> statuses;
+
+  bool first_blob_file = true;
+  SyncPoint::GetInstance()->SetCallBack(
+      sync_point_, [&first_blob_file, this](void* /* arg */) {
+        if (first_blob_file) {
+          first_blob_file = false;
+          return;
+        }
+        fault_injection_env_->SetFilesystemActive(false,
+                                                  Status::IOError(sync_point_));
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
+                keys.data(), values.data(), statuses.data());
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  ASSERT_OK(statuses[0]);
+  ASSERT_EQ(value1, values[0]);
+  ASSERT_TRUE(statuses[1].IsIOError());
+}
+
 namespace {
 
 class ReadBlobCompactionFilter : public CompactionFilter {
index a3d5bb19bab9d5af9323ab8f7ccd9e6a937abfdc..bac5e6c7fc871ed8046627009969e4bcd2291abc 100644 (file)
@@ -2162,7 +2162,7 @@ bool DBImpl::MultiCFSnapshot(
     // consecutive retries, it means the write rate is very high. In that case
     // its probably ok to take the mutex on the 3rd try so we can succeed for
     // sure
-    static const int num_retries = 3;
+    constexpr int num_retries = 3;
     for (int i = 0; i < num_retries; ++i) {
       last_try = (i == num_retries - 1);
       bool retry = false;
@@ -2192,8 +2192,9 @@ bool DBImpl::MultiCFSnapshot(
           *snapshot = versions_->LastPublishedSequence();
         }
       } else {
-        *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
-                        ->number_;
+        *snapshot =
+            static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
+                ->number_;
       }
       for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
            ++cf_iter) {
@@ -2394,17 +2395,9 @@ void DBImpl::PrepareMultiGetKeys(
     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
   if (sorted_input) {
 #ifndef NDEBUG
-    CompareKeyContext key_context_less;
-
-    for (size_t index = 1; index < sorted_keys->size(); ++index) {
-      const KeyContext* const lhs = (*sorted_keys)[index - 1];
-      const KeyContext* const rhs = (*sorted_keys)[index];
-
-      // lhs should be <= rhs, or in other words, rhs should NOT be < lhs
-      assert(!key_context_less(rhs, lhs));
-    }
+    assert(std::is_sorted(sorted_keys->begin(), sorted_keys->end(),
+                          CompareKeyContext()));
 #endif
-
     return;
   }
 
index 63f3082aa69cb2d90bb2554d9b7448c9bf084d98..d8d34fd48f3c0cdd90ef7b91ef38a7367730145c 100644 (file)
@@ -25,6 +25,7 @@
 #include "db/blob/blob_file_cache.h"
 #include "db/blob/blob_file_reader.h"
 #include "db/blob/blob_index.h"
+#include "db/blob/blob_log_format.h"
 #include "db/internal_stats.h"
 #include "db/log_reader.h"
 #include "db/log_writer.h"
@@ -1863,6 +1864,112 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
   return s;
 }
 
+void Version::MultiGetBlob(
+    const ReadOptions& read_options, MultiGetRange& range,
+    const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
+  if (read_options.read_tier == kBlockCacheTier) {
+    Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
+    for (const auto& elem : blob_rqs) {
+      for (const auto& blob_rq : elem.second) {
+        const KeyContext& key_context = blob_rq.second;
+        assert(key_context.s);
+        assert(key_context.s->ok());
+        *(key_context.s) = s;
+        assert(key_context.get_context);
+        auto& get_context = *(key_context.get_context);
+        get_context.MarkKeyMayExist();
+      }
+    }
+    return;
+  }
+
+  assert(!blob_rqs.empty());
+  Status status;
+  const auto& blob_files = storage_info_.GetBlobFiles();
+  for (auto& elem : blob_rqs) {
+    uint64_t blob_file_number = elem.first;
+    if (blob_files.find(blob_file_number) == blob_files.end()) {
+      auto& blobs_in_file = elem.second;
+      for (const auto& blob : blobs_in_file) {
+        const KeyContext& key_context = blob.second;
+        *(key_context.s) = Status::Corruption("Invalid blob file number");
+      }
+      continue;
+    }
+    CacheHandleGuard<BlobFileReader> blob_file_reader;
+    assert(blob_file_cache_);
+    status = blob_file_cache_->GetBlobFileReader(blob_file_number,
+                                                 &blob_file_reader);
+    assert(!status.ok() || blob_file_reader.GetValue());
+
+    auto& blobs_in_file = elem.second;
+    if (!status.ok()) {
+      for (const auto& blob : blobs_in_file) {
+        const KeyContext& key_context = blob.second;
+        *(key_context.s) = status;
+      }
+      continue;
+    }
+
+    assert(blob_file_reader.GetValue());
+    const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
+    const CompressionType compression =
+        blob_file_reader.GetValue()->GetCompressionType();
+
+    // TODO: sort blobs_in_file by file offset.
+    autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
+    autovector<std::reference_wrapper<const Slice>> user_keys;
+    autovector<uint64_t> offsets;
+    autovector<uint64_t> value_sizes;
+    autovector<Status*> statuses;
+    autovector<PinnableSlice*> values;
+    for (const auto& blob : blobs_in_file) {
+      const auto& blob_index = blob.first;
+      const KeyContext& key_context = blob.second;
+      if (blob_index.HasTTL() || blob_index.IsInlined()) {
+        *(key_context.s) =
+            Status::Corruption("Unexpected TTL/inlined blob index");
+        continue;
+      }
+      const uint64_t key_size = key_context.ukey_with_ts.size();
+      const uint64_t offset = blob_index.offset();
+      const uint64_t value_size = blob_index.size();
+      if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) {
+        *(key_context.s) = Status::Corruption("Invalid blob offset");
+        continue;
+      }
+      if (blob_index.compression() != compression) {
+        *(key_context.s) =
+            Status::Corruption("Compression type mismatch when reading a blob");
+        continue;
+      }
+      blob_read_key_contexts.emplace_back(std::cref(key_context));
+      user_keys.emplace_back(std::cref(key_context.ukey_with_ts));
+      offsets.push_back(blob_index.offset());
+      value_sizes.push_back(blob_index.size());
+      statuses.push_back(key_context.s);
+      values.push_back(key_context.value);
+    }
+    blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets,
+                                              value_sizes, statuses, values,
+                                              /*bytes_read=*/nullptr);
+    size_t num = blob_read_key_contexts.size();
+    assert(num == user_keys.size());
+    assert(num == offsets.size());
+    assert(num == value_sizes.size());
+    assert(num == statuses.size());
+    assert(num == values.size());
+    for (size_t i = 0; i < num; ++i) {
+      if (statuses[i]->ok()) {
+        range.AddValueSize(blob_read_key_contexts[i].get().value->size());
+        if (range.GetValueSize() > read_options.value_size_soft_limit) {
+          *(blob_read_key_contexts[i].get().s) = Status::Aborted();
+        }
+      }
+    }
+  }
+}
+
 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
                   PinnableSlice* value, std::string* timestamp, Status* status,
                   MergeContext* merge_context,
@@ -2085,6 +2192,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
   uint64_t num_data_read = 0;
   uint64_t num_sst_read = 0;
 
+  MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
+  // blob_file => [[blob_idx, it], ...]
+  std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
+
   while (f != nullptr) {
     MultiGetRange file_range = fp.CurrentFileRange();
     bool timer_enabled =
@@ -2170,24 +2281,24 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
 
           if (iter->is_blob_index) {
             if (iter->value) {
-              constexpr uint64_t* bytes_read = nullptr;
-
-              *status = GetBlob(read_options, iter->ukey_with_ts, *iter->value,
-                                iter->value, bytes_read);
-              if (!status->ok()) {
-                if (status->IsIncomplete()) {
-                  get_context.MarkKeyMayExist();
-                }
-
-                continue;
+              const Slice& blob_index_slice = *(iter->value);
+              BlobIndex blob_index;
+              Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
+              if (tmp_s.ok()) {
+                const uint64_t blob_file_num = blob_index.file_number();
+                blob_rqs[blob_file_num].emplace_back(
+                    std::make_pair(blob_index, std::cref(*iter)));
+              } else {
+                *(iter->s) = tmp_s;
               }
             }
-          }
-
-          file_range.AddValueSize(iter->value->size());
-          if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
-            s = Status::Aborted();
-            break;
+          } else {
+            file_range.AddValueSize(iter->value->size());
+            if (file_range.GetValueSize() >
+                read_options.value_size_soft_limit) {
+              s = Status::Aborted();
+              break;
+            }
           }
           continue;
         case GetContext::kDeleted:
@@ -2233,6 +2344,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
     f = fp.GetNextFile();
   }
 
+  if (s.ok() && !blob_rqs.empty()) {
+    MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
+  }
+
   // Process any left over keys
   for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
     GetContext& get_context = *iter->get_context;
index c15bbd8f35f9ea822b2c874dc72a5033dcc31cd0..86948a042d6f72c3128d90d4a102dee664a46d01 100644 (file)
@@ -713,6 +713,12 @@ class Version {
                  const BlobIndex& blob_index, PinnableSlice* value,
                  uint64_t* bytes_read) const;
 
+  using BlobReadRequests = std::vector<
+      std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>>;
+  void MultiGetBlob(
+      const ReadOptions& read_options, MultiGetRange& range,
+      const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs);
+
   // Loads some stats information from files. Call without mutex held. It needs
   // to be called before applying the version to the version set.
   void PrepareApply(const MutableCFOptions& mutable_cf_options,
index 3872353d2ff2e28d6f878ec6e4a5ec6b0b50422d..3d1ce72bc23dc26dea2d594848cba287c9d7ee0d 100644 (file)
@@ -97,6 +97,8 @@ class MultiGetContext {
   // that need to be performed
   static const int MAX_BATCH_SIZE = 32;
 
+  static_assert(MAX_BATCH_SIZE < 64, "MAX_BATCH_SIZE cannot exceed 63");
+
   MultiGetContext(autovector<KeyContext*, MAX_BATCH_SIZE>* sorted_keys,
                   size_t begin, size_t num_keys, SequenceNumber snapshot,
                   const ReadOptions& read_opts)
@@ -104,6 +106,7 @@ class MultiGetContext {
         value_mask_(0),
         value_size_(0),
         lookup_key_ptr_(reinterpret_cast<LookupKey*>(lookup_key_stack_buf)) {
+    assert(num_keys <= MAX_BATCH_SIZE);
     if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) {
       lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]);
       lookup_key_ptr_ = reinterpret_cast<LookupKey*>(
@@ -236,6 +239,10 @@ class MultiGetContext {
       skip_mask_ |= uint64_t{1} << iter.index_;
     }
 
+    bool IsKeySkipped(const Iterator& iter) const {
+      return skip_mask_ & (uint64_t{1} << iter.index_);
+    }
+
     // Update the value_mask_ in MultiGetContext so its
     // immediately reflected in all the Range Iterators
     void MarkKeyDone(Iterator& iter) {