]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Follow ups for TimedPut and write time property (#12455)
authorYu Zhang <yuzhangyu@fb.com>
Thu, 21 Mar 2024 17:00:15 +0000 (10:00 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 21 Mar 2024 17:00:15 +0000 (10:00 -0700)
Summary:
This PR contains a few follow ups from https://github.com/facebook/rocksdb/issues/12419 and https://github.com/facebook/rocksdb/issues/12428 including:

1) Handle a special case for `WriteBatch::TimedPut`. When the user specified write time is `std::numeric_limits<uint64_t>::max()`, it's not treated as an error, but it instead creates and writes a regular `Put` entry.

2) Update the `InternalIterator::write_unix_time` APIs to handle `kTypeValuePreferredSeqno` entries.

3) FlushJob is updated to use the seqno to time mapping copy in `SuperVersion`. FlushJob currently copy the DB's seqno to time mapping while holding db mutex and only copies the part of interest, a.k.a, the part that only goes back to the earliest sequence number of the to-be-flushed memtables. While updating FlushJob to use the mapping copy in `SuperVersion`, it's given access to the full mapping to help cover the need to convert `kTypeValuePreferredSeqno`'s write time to preferred seqno as much as possible.

Test plans:
Added unit tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12455

Reviewed By: pdillinger

Differential Revision: D55165422

Pulled By: jowlyzhang

fbshipit-source-id: dc022653077f678c24661de5743146a74cce4b47

21 files changed:
db/builder.cc
db/builder.h
db/column_family.h
db/compaction/tiered_compaction_test.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_open.cc
db/db_iter.h
db/db_test_util.cc
db/db_test_util.h
db/dbformat.h
db/flush_job.cc
db/flush_job.h
db/flush_job_test.cc
db/memtable.cc
db/repair.cc
db/seqno_to_time_mapping.cc
db/seqno_to_time_mapping.h
db/write_batch.cc
db/write_batch_test.cc
table/block_based/block_based_table_iterator.h
unreleased_history/new_features/write_time_and_timed_put.md [new file with mode: 0644]

index ce7b88d5dc1331c5a7452173ecfe401247169bfb..a3c15ad11e5a70eb6f60670d176d4b906e3f2144 100644 (file)
@@ -70,8 +70,8 @@ Status BuildTable(
     bool paranoid_file_checks, InternalStats* internal_stats,
     IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
     BlobFileCreationReason blob_creation_reason,
-    const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger,
-    int job_id, TableProperties* table_properties,
+    UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
+    EventLogger* event_logger, int job_id, TableProperties* table_properties,
     Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low,
     BlobFileCompletionCallback* blob_callback, Version* version,
     uint64_t* num_input_entries, uint64_t* memtable_payload_bytes,
@@ -235,7 +235,10 @@ Status BuildTable(
         auto [unpacked_value, unix_write_time] =
             ParsePackedValueWithWriteTime(value);
         SequenceNumber preferred_seqno =
-            seqno_to_time_mapping.GetProximalSeqnoBeforeTime(unix_write_time);
+            seqno_to_time_mapping
+                ? seqno_to_time_mapping->GetProximalSeqnoBeforeTime(
+                      unix_write_time)
+                : kMaxSequenceNumber;
         if (preferred_seqno < ikey.sequence) {
           value_after_flush =
               PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf);
@@ -322,11 +325,13 @@ Status BuildTable(
       builder->Abandon();
     } else {
       SeqnoToTimeMapping relevant_mapping;
-      relevant_mapping.CopyFromSeqnoRange(seqno_to_time_mapping,
-                                          meta->fd.smallest_seqno,
-                                          meta->fd.largest_seqno);
-      relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
-      relevant_mapping.Enforce(tboptions.file_creation_time);
+      if (seqno_to_time_mapping) {
+        relevant_mapping.CopyFromSeqnoRange(*seqno_to_time_mapping,
+                                            meta->fd.smallest_seqno,
+                                            meta->fd.largest_seqno);
+        relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
+        relevant_mapping.Enforce(tboptions.file_creation_time);
+      }
       builder->SetSeqnoTimeTableProperties(
           relevant_mapping,
           ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO
index e78e7bbc1c905ce4a17cd75c32dc6a2922328af4..6e6bc63c51093d855f8f0b47492a87465c6e6059 100644 (file)
@@ -64,7 +64,7 @@ Status BuildTable(
     bool paranoid_file_checks, InternalStats* internal_stats,
     IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
     BlobFileCreationReason blob_creation_reason,
-    const SeqnoToTimeMapping& seqno_to_time_mapping,
+    UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
     EventLogger* event_logger = nullptr, int job_id = 0,
     TableProperties* table_properties = nullptr,
     Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
index e76ceb5d46d677047378a0b9defa719a6e75e4b2..aed8201fa741e00b342dfd11ed37483524dd632e 100644 (file)
@@ -244,7 +244,7 @@ struct SuperVersion {
   // Share the ownership of the seqno to time mapping object referred to in this
   // SuperVersion. To be used by the new SuperVersion to be installed after this
   // one if seqno to time mapping does not change in between these two
-  // SuperVersions.
+  // SuperVersions. Or to share the ownership of the mapping with a FlushJob.
   std::shared_ptr<const SeqnoToTimeMapping> ShareSeqnoToTimeMapping() {
     return seqno_to_time_mapping;
   }
index 3fe800c43ccade9e3b91c07b3bfef106db742504..8e72dce9d033427366b4315eb90f2be876d13799 100644 (file)
@@ -2296,6 +2296,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
   const int kNumLevels = 7;
   const int kNumKeys = 100;
   const int kSecondsPerRecording = 101;
+  const int kKeyWithWriteTime = 25;
+  const uint64_t kUserSpecifiedWriteTime =
+      kMockStartTime + kSecondsPerRecording * 15;
 
   Options options = CurrentOptions();
   options.compaction_style = kCompactionStyleUniversal;
@@ -2309,7 +2312,12 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
   for (int i = 0; i < kNumKeys; i++) {
     dbfull()->TEST_WaitForPeriodicTaskRun(
         [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
-    ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+    if (i == kKeyWithWriteTime) {
+      ASSERT_OK(
+          TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime));
+    } else {
+      ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+    }
   }
 
   ReadOptions ropts;
@@ -2323,6 +2331,8 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
     for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
       if (start_time == 0) {
         start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
+      } else if (i == kKeyWithWriteTime) {
+        VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime);
       } else {
         VerifyKeyAndWriteTime(iter.get(), Key(i),
                               start_time + kSecondsPerRecording * (i + 1));
@@ -2339,6 +2349,8 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
          iter->Prev(), i--) {
       if (i == 0) {
         VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
+      } else if (i == kKeyWithWriteTime) {
+        VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime);
       } else {
         VerifyKeyAndWriteTime(iter.get(), Key(i),
                               start_time + kSecondsPerRecording * (i + 1));
@@ -2346,6 +2358,30 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
     }
     ASSERT_OK(iter->status());
   }
+
+  // Reopen the DB and disable the seqno to time recording, data with user
+  // specified write time can still get a write time before it's flushed.
+  options.preserve_internal_time_seconds = 0;
+  DestroyAndReopen(options);
+  ASSERT_OK(TimedPut(Key(kKeyWithWriteTime), rnd.RandomString(100),
+                     kUserSpecifiedWriteTime));
+  {
+    std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
+    iter->Seek(Key(kKeyWithWriteTime));
+    VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime),
+                          kUserSpecifiedWriteTime);
+    ASSERT_OK(iter->status());
+  }
+
+  ASSERT_OK(Flush());
+  {
+    std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
+    iter->Seek(Key(kKeyWithWriteTime));
+    VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime),
+                          std::numeric_limits<uint64_t>::max());
+    ASSERT_OK(iter->status());
+  }
+
   Close();
 }
 
@@ -2354,6 +2390,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
   const int kNumLevels = 7;
   const int kNumKeys = 100;
   const int kSecondsPerRecording = 101;
+  const int kKeyWithWriteTime = 25;
+  const uint64_t kUserSpecifiedWriteTime =
+      kMockStartTime + kSecondsPerRecording * 15;
 
   Options options = CurrentOptions();
   options.compaction_style = kCompactionStyleUniversal;
@@ -2367,7 +2406,12 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
   for (int i = 0; i < kNumKeys; i++) {
     dbfull()->TEST_WaitForPeriodicTaskRun(
         [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
-    ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+    if (i == kKeyWithWriteTime) {
+      ASSERT_OK(
+          TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime));
+    } else {
+      ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+    }
   }
 
   ASSERT_OK(Flush());
@@ -2383,6 +2427,12 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
     for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
       if (start_time == 0) {
         start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
+      } else if (i == kKeyWithWriteTime) {
+        // It's not precisely kUserSpecifiedWriteTime, instead it has a margin
+        // of error that is one recording apart while we convert write time to
+        // sequence number, and then back to write time.
+        VerifyKeyAndWriteTime(iter.get(), Key(i),
+                              kUserSpecifiedWriteTime - kSecondsPerRecording);
       } else {
         VerifyKeyAndWriteTime(iter.get(), Key(i),
                               start_time + kSecondsPerRecording * (i + 1));
@@ -2399,6 +2449,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
          iter->Prev(), i--) {
       if (i == 0) {
         VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
+      } else if (i == kKeyWithWriteTime) {
+        VerifyKeyAndWriteTime(iter.get(), Key(i),
+                              kUserSpecifiedWriteTime - kSecondsPerRecording);
       } else {
         VerifyKeyAndWriteTime(iter.get(), Key(i),
                               start_time + kSecondsPerRecording * (i + 1));
@@ -2428,6 +2481,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
     for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) {
       if (i == 0) {
         VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
+      } else if (i == kKeyWithWriteTime) {
+        VerifyKeyAndWriteTime(iter.get(), Key(i),
+                              kUserSpecifiedWriteTime - kSecondsPerRecording);
       } else {
         VerifyKeyAndWriteTime(iter.get(), Key(i),
                               start_time + kSecondsPerRecording * (i + 1));
index f97b955dbc515bef19a858698614f67f5aa43520..8845a2a4d7e60dbe22abecefce311c157869f528 100644 (file)
@@ -270,8 +270,8 @@ Status DBImpl::FlushMemTableToOutputFile(
       GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
       &event_logger_, mutable_cf_options.report_bg_io_stats,
       true /* sync_output_directory */, true /* write_manifest */, thread_pri,
-      io_tracer_, seqno_to_time_mapping_, db_id_, db_session_id_,
-      cfd->GetFullHistoryTsLow(), &blob_callback_);
+      io_tracer_, cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,
+      db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_);
   FileMetaData file_meta;
 
   Status s;
@@ -545,8 +545,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
         GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
         &event_logger_, mutable_cf_options.report_bg_io_stats,
         false /* sync_output_directory */, false /* write_manifest */,
-        thread_pri, io_tracer_, seqno_to_time_mapping_, db_id_, db_session_id_,
-        cfd->GetFullHistoryTsLow(), &blob_callback_));
+        thread_pri, io_tracer_,
+        cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,
+        db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_));
   }
 
   std::vector<FileMetaData> file_meta(num_cfs);
index 7990561757fc87216dbf0a8ad74800b3d464cccb..d2591b6e92cdb6397fdafa2c5c6df96b05bac327 100644 (file)
@@ -1684,7 +1684,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
           TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
           0 /* file_creation_time */, db_id_, db_session_id_,
           0 /* target_file_size */, meta.fd.GetNumber());
-      SeqnoToTimeMapping empty_seqno_to_time_mapping;
       Version* version = cfd->current();
       version->Ref();
       uint64_t num_input_entries = 0;
@@ -1695,7 +1694,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
           snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
           snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
           io_tracer_, BlobFileCreationReason::kRecovery,
-          empty_seqno_to_time_mapping, &event_logger_, job_id,
+          nullptr /* seqno_to_time_mapping */, &event_logger_, job_id,
           nullptr /* table_properties */, write_hint,
           nullptr /*full_history_ts_low*/, &blob_callback_, version,
           &num_input_entries);
index d3c6db4966421bf106b935c69c21b4609a51ef3b..e277919239b85c40df604ff4f907c56bf0a3f266 100644 (file)
@@ -368,10 +368,10 @@ class DBIter final : public Iterator {
   // overhead of calling construction of the function if creating it each time.
   ParsedInternalKey ikey_;
 
-  // TODO(yuzhangyu): update this documentation for kTypeValuePreferredSeqno
-  // types.
   // The approximate write time for the entry. It is deduced from the entry's
-  // sequence number if the seqno to time mapping is available.
+  // sequence number if the seqno to time mapping is available. For a
+  // kTypeValuePreferredSeqno entry, this is the write time specified by the
+  // user.
   uint64_t saved_write_unix_time_;
   std::string saved_value_;
   Slice pinned_value_;
index 60c3cb0322918140597122694326074c625ec017..fd751782d1404708b78e6947ccbde95327edf99e 100644 (file)
@@ -759,6 +759,11 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
   }
 }
 
+Status DBTestBase::TimedPut(const Slice& k, const Slice& v,
+                            uint64_t write_unix_time, WriteOptions wo) {
+  return TimedPut(0, k, v, write_unix_time, wo);
+}
+
 Status DBTestBase::TimedPut(int cf, const Slice& k, const Slice& v,
                             uint64_t write_unix_time, WriteOptions wo) {
   WriteBatch wb;
index 959f75d101245263fa1a46d38b5af7e793b39c91..2a671e1e8cff5a552928d05ea201ab4f7a726dbe 100644 (file)
@@ -1176,6 +1176,9 @@ class DBTestBase : public testing::Test {
   Status Put(int cf, const Slice& k, const Slice& v,
              WriteOptions wo = WriteOptions());
 
+  Status TimedPut(const Slice& k, const Slice& v, uint64_t write_unix_time,
+                  WriteOptions wo = WriteOptions());
+
   Status TimedPut(int cf, const Slice& k, const Slice& v,
                   uint64_t write_unix_time, WriteOptions wo = WriteOptions());
 
index fdde564582f734c411a47fb0559f9038d32a2e7d..5b16726693e1de46c1549a12fae1ba346cfa3b16 100644 (file)
@@ -374,13 +374,6 @@ inline ValueType ExtractValueType(const Slice& internal_key) {
   return static_cast<ValueType>(c);
 }
 
-// input [internal key]: <user_provided_key | ts | seqno + type>
-// output:                                        <seqno>
-inline SequenceNumber ExtractSequenceNumber(const Slice& internal_key) {
-  uint64_t num = ExtractInternalKeyFooter(internal_key);
-  return num >> 8;
-}
-
 // A comparator for internal keys that uses a specified comparator for
 // the user key portion and breaks ties by decreasing sequence number.
 class InternalKeyComparator
index 78a73c12f7b3ea16475593be6530c1f4a8e25287..7995ea786e58b7921f3476b57cfcf214491c365c 100644 (file)
@@ -100,9 +100,9 @@ FlushJob::FlushJob(
     Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
     const bool sync_output_directory, const bool write_manifest,
     Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
-    const SeqnoToTimeMapping& seqno_to_time_mapping, const std::string& db_id,
-    const std::string& db_session_id, std::string full_history_ts_low,
-    BlobFileCompletionCallback* blob_callback)
+    std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
+    const std::string& db_id, const std::string& db_session_id,
+    std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
     : dbname_(dbname),
       db_id_(db_id),
       db_session_id_(db_session_id),
@@ -136,7 +136,7 @@ FlushJob::FlushJob(
       clock_(db_options_.clock),
       full_history_ts_low_(std::move(full_history_ts_low)),
       blob_callback_(blob_callback),
-      db_impl_seqno_to_time_mapping_(seqno_to_time_mapping) {
+      seqno_to_time_mapping_(std::move(seqno_to_time_mapping)) {
   // Update the thread status to indicate flush.
   ReportStartedFlush();
   TEST_SYNC_POINT("FlushJob::FlushJob()");
@@ -851,15 +851,6 @@ Status FlushJob::WriteLevel0Table() {
   const uint64_t start_cpu_micros = clock_->CPUMicros();
   Status s;
 
-  // TODO(yuzhangyu): extend the copied seqno to time mapping range here so
-  // it can try to cover the earliest write unix time as much as possible. We
-  // need this mapping to get a more precise preferred seqno.
-  SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber();
-  if (!db_impl_seqno_to_time_mapping_.Empty()) {
-    // make a local copy to use while not holding the db_mutex.
-    seqno_to_time_mapping_.CopyFromSeqnoRange(db_impl_seqno_to_time_mapping_,
-                                              smallest_seqno);
-  }
   meta_.temperature = mutable_cf_options_.default_write_temperature;
   file_options_.temperature = meta_.temperature;
 
@@ -988,8 +979,8 @@ Status FlushJob::WriteLevel0Table() {
           earliest_write_conflict_snapshot_, job_snapshot_seq,
           snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
           cfd_->internal_stats(), &io_s, io_tracer_,
-          BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_,
-          job_context_->job_id, &table_properties_, write_hint,
+          BlobFileCreationReason::kFlush, seqno_to_time_mapping_.get(),
+          event_logger_, job_context_->job_id, &table_properties_, write_hint,
           full_history_ts_low, blob_callback_, base_, &num_input_entries,
           &memtable_payload_bytes, &memtable_garbage_bytes);
       TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s);
index 0667a09db37ac16db29a07ef1c43f05b2a89c7de..9e636e6a85766db537bf8208829356cd62b29639 100644 (file)
@@ -73,7 +73,7 @@ class FlushJob {
            EventLogger* event_logger, bool measure_io_stats,
            const bool sync_output_directory, const bool write_manifest,
            Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
-           const SeqnoToTimeMapping& seq_time_mapping,
+           std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
            const std::string& db_id = "", const std::string& db_session_id = "",
            std::string full_history_ts_low = "",
            BlobFileCompletionCallback* blob_callback = nullptr);
@@ -210,10 +210,14 @@ class FlushJob {
   const std::string full_history_ts_low_;
   BlobFileCompletionCallback* blob_callback_;
 
-  // reference to the seqno_to_time_mapping_ in db_impl.h, not safe to read
-  // without db mutex
-  const SeqnoToTimeMapping& db_impl_seqno_to_time_mapping_;
-  SeqnoToTimeMapping seqno_to_time_mapping_;
+  // Shared copy of DB's seqno to time mapping stored in SuperVersion. The
+  // ownership is shared with this FlushJob when it's created.
+  // FlushJob accesses and ref counts immutable MemTables directly via
+  // `MemTableListVersion` instead of ref `SuperVersion`, so we need to give
+  // the flush job shared ownership of the mapping.
+  // Note this is only installed when seqno to time recording feature is
+  // enables, so it could be nullptr.
+  std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
 
   // Keeps track of the newest user-defined timestamp for this flush job if
   // `persist_user_defined_timestamps` flag is false.
index 82edcc5e0c3148eafbdc5a6fa4e3e49595083a10..3ffb77d537829aa24fbbcd80a34a2fab063cba64 100644 (file)
@@ -169,7 +169,7 @@ class FlushJobTestBase : public testing::Test {
   bool persist_udt_ = true;
   bool paranoid_file_checks_ = false;
 
-  SeqnoToTimeMapping empty_seqno_to_time_mapping_;
+  std::shared_ptr<SeqnoToTimeMapping> empty_seqno_to_time_mapping_;
 };
 
 class FlushJobTest : public FlushJobTestBase {
@@ -627,13 +627,14 @@ TEST_F(FlushJobTest, ReplaceTimedPutWriteTimeWithPreferredSeqno) {
   auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
                                            kMaxSequenceNumber);
   new_mem->Ref();
-  SeqnoToTimeMapping seqno_to_time_mapping;
+  std::shared_ptr<SeqnoToTimeMapping> seqno_to_time_mapping =
+      std::make_shared<SeqnoToTimeMapping>();
   // Seqno: 10,       11, ... 20,
   // Time:  ...  500      ...      600
   // GetProximalSeqnoBeforeTime(500) -> 10
   // GetProximalSeqnoBeforeTime(600) -> 20
-  seqno_to_time_mapping.Append(10, 500);
-  seqno_to_time_mapping.Append(20, 600);
+  seqno_to_time_mapping->Append(10, 500);
+  seqno_to_time_mapping->Append(20, 600);
 
   ASSERT_OK(new_mem->Add(SequenceNumber(15), kTypeValuePreferredSeqno, "bar",
                          ValueWithWriteTime("bval", 500),
index 4197dd9f476559ec2390f28682aa585150eeff95..ba4f0da824a46b4715e2572662b85924785f3811 100644 (file)
@@ -504,13 +504,16 @@ class MemTableIterator : public InternalIterator {
 
   uint64_t write_unix_time() const override {
     assert(Valid());
-    // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno,
-    // parse its unix write time out of packed value.
-    if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
+    ParsedInternalKey pikey;
+    Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
+    if (!s.ok()) {
+      return std::numeric_limits<uint64_t>::max();
+    } else if (kTypeValuePreferredSeqno == pikey.type) {
+      return ParsePackedValueForWriteTime(value());
+    } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
       return std::numeric_limits<uint64_t>::max();
     }
-    SequenceNumber seqno = ExtractSequenceNumber(key());
-    return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(seqno);
+    return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence);
   }
 
   Slice value() const override {
index eddafe1332493e9dd91199d44a307d443a89e895..34d79c5df5514ff031363235034345f211fa05f8 100644 (file)
@@ -484,7 +484,7 @@ class Repairer {
           {}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker,
           false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
           nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
-          empty_seqno_to_time_mapping, nullptr /* event_logger */,
+          nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */,
           0 /* job_id */, nullptr /* table_properties */, write_hint);
       ROCKS_LOG_INFO(db_options_.info_log,
                      "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
index 07f28906b068c94eab53a699ecba7b0261816cd6..6355570196827b2fd9947de5ce52250b0aeb73df 100644 (file)
@@ -504,27 +504,36 @@ Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno,
   return Slice(*buf);
 }
 
-std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value) {
+uint64_t ParsePackedValueForWriteTime(const Slice& value) {
   assert(value.size() >= sizeof(uint64_t));
   Slice write_time_slice(value.data() + value.size() - sizeof(uint64_t),
                          sizeof(uint64_t));
   uint64_t write_time;
   [[maybe_unused]] auto res = GetFixed64(&write_time_slice, &write_time);
   assert(res);
+  return write_time;
+}
+
+std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value) {
   return std::make_tuple(Slice(value.data(), value.size() - sizeof(uint64_t)),
-                         write_time);
+                         ParsePackedValueForWriteTime(value));
 }
 
-std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(
-    const Slice& value) {
+SequenceNumber ParsePackedValueForSeqno(const Slice& value) {
   assert(value.size() >= sizeof(SequenceNumber));
   Slice seqno_slice(value.data() + value.size() - sizeof(uint64_t),
                     sizeof(uint64_t));
   SequenceNumber seqno;
   [[maybe_unused]] auto res = GetFixed64(&seqno_slice, &seqno);
   assert(res);
+  return seqno;
+}
+
+std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(
+    const Slice& value) {
   return std::make_tuple(
-      Slice(value.data(), value.size() - sizeof(SequenceNumber)), seqno);
+      Slice(value.data(), value.size() - sizeof(SequenceNumber)),
+      ParsePackedValueForSeqno(value));
 }
 
 Slice ParsePackedValueForValue(const Slice& value) {
index 2aa7bc2aa85d3269febc47857c7bafcd092f0df9..a9255a806fe443b6e17cad9ade18d39843a5fddc 100644 (file)
@@ -277,10 +277,16 @@ Slice PackValueAndWriteTime(const Slice& value, uint64_t unix_write_time,
 Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno,
                         std::string* buf);
 
+// Parse a packed value to get the write time.
+uint64_t ParsePackedValueForWriteTime(const Slice& value);
+
 // Parse a packed value to get the value and the write time. The unpacked value
 // Slice is backed up by the same memory backing up `value`.
 std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value);
 
+// Parse a packed value to get the sequence number.
+SequenceNumber ParsePackedValueForSeqno(const Slice& value);
+
 // Parse a packed value to get the value and the sequence number. The unpacked
 // value Slice is backed up by the same memory backing up `value`.
 std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(const Slice& value);
index 05a592e94a9ee8a70cbfd42e1958f690bbd322d4..4adba1de84b1e18f34f42e1d452547174f07a1fc 100644 (file)
@@ -871,6 +871,9 @@ Status WriteBatchInternal::TimedPut(WriteBatch* b, uint32_t column_family_id,
   if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
     return Status::InvalidArgument("value is too large");
   }
+  if (std::numeric_limits<uint64_t>::max() == write_unix_time) {
+    return WriteBatchInternal::Put(b, column_family_id, key, value);
+  }
   LocalSavePoint save(b);
 
   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
index 3fe84927a20de49c7d2fa4d6f74071cb8cf6b445..05aec899a561c905d3755aa99f883d031c338947 100644 (file)
@@ -406,12 +406,19 @@ TEST_F(WriteBatchTest, PutNotImplemented) {
 TEST_F(WriteBatchTest, TimedPutNotImplemented) {
   WriteBatch batch;
   ASSERT_OK(
-      batch.TimedPut(0, Slice("k1"), Slice("v1"), /*unix_write_time=*/30));
+      batch.TimedPut(0, Slice("k1"), Slice("v1"), /*write_unix_time=*/30));
   ASSERT_EQ(1u, batch.Count());
   ASSERT_EQ("TimedPut(k1, v1, 30)@0", PrintContents(&batch));
 
   WriteBatch::Handler handler;
   ASSERT_TRUE(batch.Iterate(&handler).IsInvalidArgument());
+
+  batch.Clear();
+  ASSERT_OK(
+      batch.TimedPut(0, Slice("k1"), Slice("v1"),
+                     /*write_unix_time=*/std::numeric_limits<uint64_t>::max()));
+  ASSERT_EQ(1u, batch.Count());
+  ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
 }
 
 TEST_F(WriteBatchTest, DeleteNotImplemented) {
index 0ba0e3e2894acb30727343c45f7aa44e65b315d5..d9e29a75f3b378558b0f8e7e5c1c46463f6471c4 100644 (file)
@@ -96,15 +96,21 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
 
   uint64_t write_unix_time() const override {
     assert(Valid());
-    // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno,
-    // parse its unix write time out of packed value.
+    ParsedInternalKey pikey;
+    SequenceNumber seqno;
     const SeqnoToTimeMapping& seqno_to_time_mapping =
         table_->GetSeqnoToTimeMapping();
-    SequenceNumber seqno = ExtractSequenceNumber(key());
-    if (kUnknownSeqnoBeforeAll == seqno) {
+    Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
+    if (!s.ok()) {
+      return std::numeric_limits<uint64_t>::max();
+    } else if (kUnknownSeqnoBeforeAll == pikey.sequence) {
       return kUnknownTimeBeforeAll;
     } else if (seqno_to_time_mapping.Empty()) {
       return std::numeric_limits<uint64_t>::max();
+    } else if (kTypeValuePreferredSeqno == pikey.type) {
+      seqno = ParsePackedValueForSeqno(value());
+    } else {
+      seqno = pikey.sequence;
     }
     return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno);
   }
diff --git a/unreleased_history/new_features/write_time_and_timed_put.md b/unreleased_history/new_features/write_time_and_timed_put.md
new file mode 100644 (file)
index 0000000..fd6654d
--- /dev/null
@@ -0,0 +1 @@
+*Implement experimental features: API `Iterator::GetProperty("rocksdb.iterator.write-time")` to allow users to get data's approximate write unix time and write data with a specific write time via `WriteBatch::TimedPut` API.
\ No newline at end of file