]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Support timestamps in SstFileWriter (#8899)
authorLevi Tamasi <ltamasi@fb.com>
Fri, 10 Sep 2021 01:57:01 +0000 (18:57 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 10 Sep 2021 01:58:01 +0000 (18:58 -0700)
Summary:
As a first step of supporting user-defined timestamps with ingestion, the
patch adds timestamp support to `SstFileWriter`; namely, it adds new
versions of the `Put` and `Delete` APIs that take timestamps. (`Merge`
and `DeleteRange` are currently not supported with user-defined timestamps
in general but once those features are implemented, we can handle them
in `SstFileWriter` in a similar fashion.) The new APIs validate the size of
the timestamp provided by the client. Similarly, calls to the pre-existing
timestamp-less APIs are now disallowed when user-defined timestamps are
in use according to the comparator.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8899

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D30850699

Pulled By: ltamasi

fbshipit-source-id: 779154373618f19b8f0797976bb7286783c57b67

include/rocksdb/sst_file_writer.h
table/block_based/block.cc
table/sst_file_reader_test.cc
table/sst_file_writer.cc

index 3bcc8a693160d4260621a5f003ccbc7252e1401c..a6430eaa937148d2e89d0f22a68fec340dd76522 100644 (file)
@@ -112,21 +112,40 @@ class SstFileWriter {
 
   // Add a Put key with value to currently opened file (deprecated)
   // REQUIRES: key is after any previously added key according to comparator.
+  // REQUIRES: comparator is *not* timestamp-aware.
   ROCKSDB_DEPRECATED_FUNC Status Add(const Slice& user_key, const Slice& value);
 
   // Add a Put key with value to currently opened file
   // REQUIRES: key is after any previously added key according to comparator.
+  // REQUIRES: comparator is *not* timestamp-aware.
   Status Put(const Slice& user_key, const Slice& value);
 
+  // Add a Put (key with timestamp, value) to the currently opened file
+  // REQUIRES: key is after any previously added key according to the
+  // comparator.
+  // REQUIRES: the timestamp's size is equal to what is expected by
+  // the comparator.
+  Status Put(const Slice& user_key, const Slice& timestamp, const Slice& value);
+
   // Add a Merge key with value to currently opened file
   // REQUIRES: key is after any previously added key according to comparator.
+  // REQUIRES: comparator is *not* timestamp-aware.
   Status Merge(const Slice& user_key, const Slice& value);
 
   // Add a deletion key to currently opened file
   // REQUIRES: key is after any previously added key according to comparator.
+  // REQUIRES: comparator is *not* timestamp-aware.
   Status Delete(const Slice& user_key);
 
+  // Add a deletion key with timestamp to the currently opened file
+  // REQUIRES: key is after any previously added key according to the
+  // comparator.
+  // REQUIRES: the timestamp's size is equal to what is expected by
+  // the comparator.
+  Status Delete(const Slice& user_key, const Slice& timestamp);
+
   // Add a range deletion tombstone to currently opened file
+  // REQUIRES: comparator is *not* timestamp-aware.
   Status DeleteRange(const Slice& begin_key, const Slice& end_key);
 
   // Finalize writing to sst file and close file.
index 2d32ebcb492a21bc5e306575aa21c8c89cc7b79a..9c2a60844d8f2ad569b018c0909edc91974bbb5b 100644 (file)
@@ -522,7 +522,8 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
     if (global_seqno_ != kDisableGlobalSequenceNumber) {
       // If we are reading a file with a global sequence number we should
       // expect that all encoded sequence numbers are zeros and any value
-      // type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion.
+      // type is kTypeValue, kTypeMerge, kTypeDeletion,
+      // kTypeDeletionWithTimestamp, or kTypeRangeDeletion.
       uint64_t packed = ExtractInternalKeyFooter(raw_key_.GetKey());
       SequenceNumber seqno;
       ValueType value_type;
@@ -530,6 +531,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
       assert(value_type == ValueType::kTypeValue ||
              value_type == ValueType::kTypeMerge ||
              value_type == ValueType::kTypeDeletion ||
+             value_type == ValueType::kTypeDeletionWithTimestamp ||
              value_type == ValueType::kTypeRangeDeletion);
       assert(seqno == 0);
     }
index 52cab2ab36e17723f1e489869233d9353dc7386e..d1394b9382e0e83f8d2e13b5e26fbbf64390764c 100644 (file)
@@ -195,6 +195,224 @@ TEST_F(SstFileReaderTest, ReadFileWithGlobalSeqno) {
   ASSERT_OK(DestroyDB(db_name, options));
 }
 
+TEST_F(SstFileReaderTest, TimestampSizeMismatch) {
+  SstFileWriter writer(soptions_, options_);
+
+  ASSERT_OK(writer.Open(sst_name_));
+
+  // Comparator is not timestamp-aware; calls to APIs taking timestamps should
+  // fail.
+  ASSERT_NOK(writer.Put("key", EncodeAsUint64(100), "value"));
+  ASSERT_NOK(writer.Delete("another_key", EncodeAsUint64(200)));
+}
+
+class SstFileReaderTimestampTest : public testing::Test {
+ public:
+  SstFileReaderTimestampTest() {
+    Env* env = Env::Default();
+    EXPECT_OK(test::CreateEnvFromSystem(ConfigOptions(), &env, &env_guard_));
+    EXPECT_NE(nullptr, env);
+
+    options_.env = env;
+
+    options_.comparator = test::ComparatorWithU64Ts();
+
+    sst_name_ = test::PerThreadDBPath("sst_file_ts");
+  }
+
+  ~SstFileReaderTimestampTest() {
+    EXPECT_OK(options_.env->DeleteFile(sst_name_));
+  }
+
+  struct KeyValueDesc {
+    KeyValueDesc(std::string k, std::string ts, std::string v)
+        : key(std::move(k)), timestamp(std::move(ts)), value(std::move(v)) {}
+
+    std::string key;
+    std::string timestamp;
+    std::string value;
+  };
+
+  struct InputKeyValueDesc : public KeyValueDesc {
+    InputKeyValueDesc(std::string k, std::string ts, std::string v, bool is_del,
+                      bool use_contig_buf)
+        : KeyValueDesc(std::move(k), std::move(ts), std::move(v)),
+          is_delete(is_del),
+          use_contiguous_buffer(use_contig_buf) {}
+
+    bool is_delete = false;
+    bool use_contiguous_buffer = false;
+  };
+
+  struct OutputKeyValueDesc : public KeyValueDesc {
+    OutputKeyValueDesc(std::string k, std::string ts, std::string v)
+        : KeyValueDesc(std::move(k), std::string(ts), std::string(v)) {}
+  };
+
+  void CreateFile(const std::vector<InputKeyValueDesc>& descs) {
+    SstFileWriter writer(soptions_, options_);
+
+    ASSERT_OK(writer.Open(sst_name_));
+
+    for (const auto& desc : descs) {
+      if (desc.is_delete) {
+        if (desc.use_contiguous_buffer) {
+          std::string key_with_ts(desc.key + desc.timestamp);
+          ASSERT_OK(writer.Delete(Slice(key_with_ts.data(), desc.key.size()),
+                                  Slice(key_with_ts.data() + desc.key.size(),
+                                        desc.timestamp.size())));
+        } else {
+          ASSERT_OK(writer.Delete(desc.key, desc.timestamp));
+        }
+      } else {
+        if (desc.use_contiguous_buffer) {
+          std::string key_with_ts(desc.key + desc.timestamp);
+          ASSERT_OK(writer.Put(Slice(key_with_ts.data(), desc.key.size()),
+                               Slice(key_with_ts.data() + desc.key.size(),
+                                     desc.timestamp.size()),
+                               desc.value));
+        } else {
+          ASSERT_OK(writer.Put(desc.key, desc.timestamp, desc.value));
+        }
+      }
+    }
+
+    ASSERT_OK(writer.Finish());
+  }
+
+  void CheckFile(const std::string& timestamp,
+                 const std::vector<OutputKeyValueDesc>& descs) {
+    SstFileReader reader(options_);
+
+    ASSERT_OK(reader.Open(sst_name_));
+    ASSERT_OK(reader.VerifyChecksum());
+
+    Slice ts_slice(timestamp);
+
+    ReadOptions read_options;
+    read_options.timestamp = &ts_slice;
+
+    std::unique_ptr<Iterator> iter(reader.NewIterator(read_options));
+    iter->SeekToFirst();
+
+    for (const auto& desc : descs) {
+      ASSERT_TRUE(iter->Valid());
+      ASSERT_EQ(iter->key(), desc.key);
+      ASSERT_EQ(iter->timestamp(), desc.timestamp);
+      ASSERT_EQ(iter->value(), desc.value);
+
+      iter->Next();
+    }
+
+    ASSERT_FALSE(iter->Valid());
+  }
+
+ protected:
+  std::shared_ptr<Env> env_guard_;
+  Options options_;
+  EnvOptions soptions_;
+  std::string sst_name_;
+};
+
+TEST_F(SstFileReaderTimestampTest, Basic) {
+  std::vector<InputKeyValueDesc> input_descs;
+
+  for (uint64_t k = 0; k < kNumKeys; k += 4) {
+    // A Put with key k, timestamp k that gets overwritten by a subsequent Put
+    // with timestamp (k + 1). Note that the comparator uses descending order
+    // for the timestamp part, so we add the later Put first.
+    input_descs.emplace_back(
+        /* key */ EncodeAsString(k), /* timestamp */ EncodeAsUint64(k + 1),
+        /* value */ EncodeAsString(k * 2), /* is_delete */ false,
+        /* use_contiguous_buffer */ false);
+    input_descs.emplace_back(
+        /* key */ EncodeAsString(k), /* timestamp */ EncodeAsUint64(k),
+        /* value */ EncodeAsString(k * 3), /* is_delete */ false,
+        /* use_contiguous_buffer */ true);
+
+    // A Put with key (k + 2), timestamp (k + 2) that gets cancelled out by a
+    // Delete with timestamp (k + 3).  Note that the comparator uses descending
+    // order for the timestamp part, so we add the Delete first.
+    input_descs.emplace_back(/* key */ EncodeAsString(k + 2),
+                             /* timestamp */ EncodeAsUint64(k + 3),
+                             /* value */ std::string(), /* is_delete */ true,
+                             /* use_contiguous_buffer */ (k % 8) == 0);
+    input_descs.emplace_back(
+        /* key */ EncodeAsString(k + 2), /* timestamp */ EncodeAsUint64(k + 2),
+        /* value */ EncodeAsString(k * 5), /* is_delete */ false,
+        /* use_contiguous_buffer */ (k % 8) != 0);
+  }
+
+  CreateFile(input_descs);
+
+  // Note: below, we check the results as of each timestamp in the range,
+  // updating the expected result as needed.
+  std::vector<OutputKeyValueDesc> output_descs;
+
+  for (uint64_t ts = 0; ts < kNumKeys; ++ts) {
+    const uint64_t k = ts - (ts % 4);
+
+    switch (ts % 4) {
+      case 0:  // Initial Put for key k
+        output_descs.emplace_back(/* key */ EncodeAsString(k),
+                                  /* timestamp */ EncodeAsUint64(ts),
+                                  /* value */ EncodeAsString(k * 3));
+        break;
+
+      case 1:  // Second Put for key k
+        assert(output_descs.back().key == EncodeAsString(k));
+        assert(output_descs.back().timestamp == EncodeAsUint64(ts - 1));
+        assert(output_descs.back().value == EncodeAsString(k * 3));
+        output_descs.back().timestamp = EncodeAsUint64(ts);
+        output_descs.back().value = EncodeAsString(k * 2);
+        break;
+
+      case 2:  // Put for key (k + 2)
+        output_descs.emplace_back(/* key */ EncodeAsString(k + 2),
+                                  /* timestamp */ EncodeAsUint64(ts),
+                                  /* value */ EncodeAsString(k * 5));
+        break;
+
+      case 3:  // Delete for key (k + 2)
+        assert(output_descs.back().key == EncodeAsString(k + 2));
+        assert(output_descs.back().timestamp == EncodeAsUint64(ts - 1));
+        assert(output_descs.back().value == EncodeAsString(k * 5));
+        output_descs.pop_back();
+        break;
+    }
+
+    CheckFile(EncodeAsUint64(ts), output_descs);
+  }
+}
+
+TEST_F(SstFileReaderTimestampTest, TimestampsOutOfOrder) {
+  SstFileWriter writer(soptions_, options_);
+
+  ASSERT_OK(writer.Open(sst_name_));
+
+  // Note: KVs that have the same user key disregarding timestamps should be in
+  // descending order of timestamps.
+  ASSERT_OK(writer.Put("key", EncodeAsUint64(1), "value1"));
+  ASSERT_NOK(writer.Put("key", EncodeAsUint64(2), "value2"));
+}
+
+TEST_F(SstFileReaderTimestampTest, TimestampSizeMismatch) {
+  SstFileWriter writer(soptions_, options_);
+
+  ASSERT_OK(writer.Open(sst_name_));
+
+  // Comparator expects 64-bit timestamps; timestamps with other sizes as well
+  // as calls to the timestamp-less APIs should be rejected.
+  ASSERT_NOK(writer.Put("key", "not_an_actual_64_bit_timestamp", "value"));
+  ASSERT_NOK(writer.Delete("another_key", "timestamp_of_unexpected_size"));
+
+  ASSERT_NOK(writer.Put("key_without_timestamp", "value"));
+  ASSERT_NOK(writer.Merge("another_key_missing_a_timestamp", "merge_operand"));
+  ASSERT_NOK(writer.Delete("yet_another_key_still_no_timestamp"));
+  ASSERT_NOK(writer.DeleteRange("begin_key_timestamp_absent",
+                                "end_key_with_a_complete_lack_of_timestamps"));
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
index fa367e69b4a4daa7bed2057937c57475b87b7ae8..5a057477624cea20d3b2acca74e66d2f6c3e30a3 100644 (file)
@@ -63,8 +63,8 @@ struct SstFileWriter::Rep {
   std::string db_session_id;
   uint64_t next_file_number = 1;
 
-  Status Add(const Slice& user_key, const Slice& value,
-             const ValueType value_type) {
+  Status AddImpl(const Slice& user_key, const Slice& value,
+                 ValueType value_type) {
     if (!builder) {
       return Status::InvalidArgument("File is not opened");
     }
@@ -80,23 +80,14 @@ struct SstFileWriter::Rep {
       }
     }
 
-    // TODO(tec) : For external SST files we could omit the seqno and type.
-    switch (value_type) {
-      case ValueType::kTypeValue:
-        ikey.Set(user_key, 0 /* Sequence Number */,
-                 ValueType::kTypeValue /* Put */);
-        break;
-      case ValueType::kTypeMerge:
-        ikey.Set(user_key, 0 /* Sequence Number */,
-                 ValueType::kTypeMerge /* Merge */);
-        break;
-      case ValueType::kTypeDeletion:
-        ikey.Set(user_key, 0 /* Sequence Number */,
-                 ValueType::kTypeDeletion /* Delete */);
-        break;
-      default:
-        return Status::InvalidArgument("Value type is not supported");
-    }
+    assert(value_type == kTypeValue || value_type == kTypeMerge ||
+           value_type == kTypeDeletion ||
+           value_type == kTypeDeletionWithTimestamp);
+
+    constexpr SequenceNumber sequence_number = 0;
+
+    ikey.Set(user_key, sequence_number, value_type);
+
     builder->Add(ikey.Encode(), value);
 
     // update file info
@@ -108,7 +99,42 @@ struct SstFileWriter::Rep {
     return Status::OK();
   }
 
+  Status Add(const Slice& user_key, const Slice& value, ValueType value_type) {
+    if (internal_comparator.timestamp_size() != 0) {
+      return Status::InvalidArgument("Timestamp size mismatch");
+    }
+
+    return AddImpl(user_key, value, value_type);
+  }
+
+  Status Add(const Slice& user_key, const Slice& timestamp, const Slice& value,
+             ValueType value_type) {
+    const size_t timestamp_size = timestamp.size();
+
+    if (internal_comparator.timestamp_size() != timestamp_size) {
+      return Status::InvalidArgument("Timestamp size mismatch");
+    }
+
+    const size_t user_key_size = user_key.size();
+
+    if (user_key.data() + user_key_size == timestamp.data()) {
+      Slice user_key_with_ts(user_key.data(), user_key_size + timestamp_size);
+      return AddImpl(user_key_with_ts, value, value_type);
+    }
+
+    std::string user_key_with_ts;
+    user_key_with_ts.reserve(user_key_size + timestamp_size);
+    user_key_with_ts.append(user_key.data(), user_key_size);
+    user_key_with_ts.append(timestamp.data(), timestamp_size);
+
+    return AddImpl(user_key_with_ts, value, value_type);
+  }
+
   Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
+    if (internal_comparator.timestamp_size() != 0) {
+      return Status::InvalidArgument("Timestamp size mismatch");
+    }
+
     if (!builder) {
       return Status::InvalidArgument("File is not opened");
     }
@@ -294,6 +320,11 @@ Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
   return rep_->Add(user_key, value, ValueType::kTypeValue);
 }
 
+Status SstFileWriter::Put(const Slice& user_key, const Slice& timestamp,
+                          const Slice& value) {
+  return rep_->Add(user_key, timestamp, value, ValueType::kTypeValue);
+}
+
 Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
   return rep_->Add(user_key, value, ValueType::kTypeMerge);
 }
@@ -302,6 +333,11 @@ Status SstFileWriter::Delete(const Slice& user_key) {
   return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
 }
 
+Status SstFileWriter::Delete(const Slice& user_key, const Slice& timestamp) {
+  return rep_->Add(user_key, timestamp, Slice(),
+                   ValueType::kTypeDeletionWithTimestamp);
+}
+
 Status SstFileWriter::DeleteRange(const Slice& begin_key,
                                   const Slice& end_key) {
   return rep_->DeleteRange(begin_key, end_key);