From: Yu Zhang Date: Thu, 21 Mar 2024 17:00:15 +0000 (-0700) Subject: Follow ups for TimedPut and write time property (#12455) X-Git-Tag: v9.1.0~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=13e1c32a187832936814bedc63b56e9944ba8a8e;p=rocksdb.git Follow ups for TimedPut and write time property (#12455) Summary: This PR contains a few follow ups from https://github.com/facebook/rocksdb/issues/12419 and https://github.com/facebook/rocksdb/issues/12428 including: 1) Handle a special case for `WriteBatch::TimedPut`. When the user specified write time is `std::numeric_limits::max()`, it's not treated as an error, but it instead creates and writes a regular `Put` entry. 2) Update the `InternalIterator::write_unix_time` APIs to handle `kTypeValuePreferredSeqno` entries. 3) FlushJob is updated to use the seqno to time mapping copy in `SuperVersion`. FlushJob currently copy the DB's seqno to time mapping while holding db mutex and only copies the part of interest, a.k.a, the part that only goes back to the earliest sequence number of the to-be-flushed memtables. While updating FlushJob to use the mapping copy in `SuperVersion`, it's given access to the full mapping to help cover the need to convert `kTypeValuePreferredSeqno`'s write time to preferred seqno as much as possible. Test plans: Added unit tests Pull Request resolved: https://github.com/facebook/rocksdb/pull/12455 Reviewed By: pdillinger Differential Revision: D55165422 Pulled By: jowlyzhang fbshipit-source-id: dc022653077f678c24661de5743146a74cce4b47 --- diff --git a/db/builder.cc b/db/builder.cc index ce7b88d5d..a3c15ad11 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -70,8 +70,8 @@ Status BuildTable( bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, const std::shared_ptr& io_tracer, BlobFileCreationReason blob_creation_reason, - const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger, - int job_id, TableProperties* table_properties, + UnownedPtr seqno_to_time_mapping, + EventLogger* event_logger, int job_id, TableProperties* table_properties, Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low, BlobFileCompletionCallback* blob_callback, Version* version, uint64_t* num_input_entries, uint64_t* memtable_payload_bytes, @@ -235,7 +235,10 @@ Status BuildTable( auto [unpacked_value, unix_write_time] = ParsePackedValueWithWriteTime(value); SequenceNumber preferred_seqno = - seqno_to_time_mapping.GetProximalSeqnoBeforeTime(unix_write_time); + seqno_to_time_mapping + ? seqno_to_time_mapping->GetProximalSeqnoBeforeTime( + unix_write_time) + : kMaxSequenceNumber; if (preferred_seqno < ikey.sequence) { value_after_flush = PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf); @@ -322,11 +325,13 @@ Status BuildTable( builder->Abandon(); } else { SeqnoToTimeMapping relevant_mapping; - relevant_mapping.CopyFromSeqnoRange(seqno_to_time_mapping, - meta->fd.smallest_seqno, - meta->fd.largest_seqno); - relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); - relevant_mapping.Enforce(tboptions.file_creation_time); + if (seqno_to_time_mapping) { + relevant_mapping.CopyFromSeqnoRange(*seqno_to_time_mapping, + meta->fd.smallest_seqno, + meta->fd.largest_seqno); + relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); + relevant_mapping.Enforce(tboptions.file_creation_time); + } builder->SetSeqnoTimeTableProperties( relevant_mapping, ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO diff --git a/db/builder.h b/db/builder.h index e78e7bbc1..6e6bc63c5 100644 --- a/db/builder.h +++ b/db/builder.h @@ -64,7 +64,7 @@ Status BuildTable( bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, const std::shared_ptr& io_tracer, BlobFileCreationReason blob_creation_reason, - const SeqnoToTimeMapping& seqno_to_time_mapping, + UnownedPtr seqno_to_time_mapping, EventLogger* event_logger = nullptr, int job_id = 0, TableProperties* table_properties = nullptr, Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET, diff --git a/db/column_family.h b/db/column_family.h index e76ceb5d4..aed8201fa 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -244,7 +244,7 @@ struct SuperVersion { // Share the ownership of the seqno to time mapping object referred to in this // SuperVersion. To be used by the new SuperVersion to be installed after this // one if seqno to time mapping does not change in between these two - // SuperVersions. + // SuperVersions. Or to share the ownership of the mapping with a FlushJob. std::shared_ptr ShareSeqnoToTimeMapping() { return seqno_to_time_mapping; } diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index 3fe800c43..8e72dce9d 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -2296,6 +2296,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) { const int kNumLevels = 7; const int kNumKeys = 100; const int kSecondsPerRecording = 101; + const int kKeyWithWriteTime = 25; + const uint64_t kUserSpecifiedWriteTime = + kMockStartTime + kSecondsPerRecording * 15; Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; @@ -2309,7 +2312,12 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) { for (int i = 0; i < kNumKeys; i++) { dbfull()->TEST_WaitForPeriodicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); - ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + if (i == kKeyWithWriteTime) { + ASSERT_OK( + TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime)); + } else { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } } ReadOptions ropts; @@ -2323,6 +2331,8 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) { for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) { if (start_time == 0) { start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i)); + } else if (i == kKeyWithWriteTime) { + VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime); } else { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time + kSecondsPerRecording * (i + 1)); @@ -2339,6 +2349,8 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) { iter->Prev(), i--) { if (i == 0) { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time); + } else if (i == kKeyWithWriteTime) { + VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime); } else { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time + kSecondsPerRecording * (i + 1)); @@ -2346,6 +2358,30 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) { } ASSERT_OK(iter->status()); } + + // Reopen the DB and disable the seqno to time recording, data with user + // specified write time can still get a write time before it's flushed. + options.preserve_internal_time_seconds = 0; + DestroyAndReopen(options); + ASSERT_OK(TimedPut(Key(kKeyWithWriteTime), rnd.RandomString(100), + kUserSpecifiedWriteTime)); + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + iter->Seek(Key(kKeyWithWriteTime)); + VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime), + kUserSpecifiedWriteTime); + ASSERT_OK(iter->status()); + } + + ASSERT_OK(Flush()); + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + iter->Seek(Key(kKeyWithWriteTime)); + VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime), + std::numeric_limits::max()); + ASSERT_OK(iter->status()); + } + Close(); } @@ -2354,6 +2390,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) { const int kNumLevels = 7; const int kNumKeys = 100; const int kSecondsPerRecording = 101; + const int kKeyWithWriteTime = 25; + const uint64_t kUserSpecifiedWriteTime = + kMockStartTime + kSecondsPerRecording * 15; Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; @@ -2367,7 +2406,12 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) { for (int i = 0; i < kNumKeys; i++) { dbfull()->TEST_WaitForPeriodicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); - ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + if (i == kKeyWithWriteTime) { + ASSERT_OK( + TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime)); + } else { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } } ASSERT_OK(Flush()); @@ -2383,6 +2427,12 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) { for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) { if (start_time == 0) { start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i)); + } else if (i == kKeyWithWriteTime) { + // It's not precisely kUserSpecifiedWriteTime, instead it has a margin + // of error that is one recording apart while we convert write time to + // sequence number, and then back to write time. + VerifyKeyAndWriteTime(iter.get(), Key(i), + kUserSpecifiedWriteTime - kSecondsPerRecording); } else { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time + kSecondsPerRecording * (i + 1)); @@ -2399,6 +2449,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) { iter->Prev(), i--) { if (i == 0) { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time); + } else if (i == kKeyWithWriteTime) { + VerifyKeyAndWriteTime(iter.get(), Key(i), + kUserSpecifiedWriteTime - kSecondsPerRecording); } else { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time + kSecondsPerRecording * (i + 1)); @@ -2428,6 +2481,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) { for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) { if (i == 0) { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time); + } else if (i == kKeyWithWriteTime) { + VerifyKeyAndWriteTime(iter.get(), Key(i), + kUserSpecifiedWriteTime - kSecondsPerRecording); } else { VerifyKeyAndWriteTime(iter.get(), Key(i), start_time + kSecondsPerRecording * (i + 1)); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f97b955db..8845a2a4d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -270,8 +270,8 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, - io_tracer_, seqno_to_time_mapping_, db_id_, db_session_id_, - cfd->GetFullHistoryTsLow(), &blob_callback_); + io_tracer_, cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_, + db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_); FileMetaData file_meta; Status s; @@ -545,8 +545,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, - thread_pri, io_tracer_, seqno_to_time_mapping_, db_id_, db_session_id_, - cfd->GetFullHistoryTsLow(), &blob_callback_)); + thread_pri, io_tracer_, + cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_, + db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_)); } std::vector file_meta(num_cfs); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 799056175..d2591b6e9 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1684,7 +1684,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, TableFileCreationReason::kRecovery, 0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_, db_session_id_, 0 /* target_file_size */, meta.fd.GetNumber()); - SeqnoToTimeMapping empty_seqno_to_time_mapping; Version* version = cfd->current(); version->Ref(); uint64_t num_input_entries = 0; @@ -1695,7 +1694,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber, snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_, BlobFileCreationReason::kRecovery, - empty_seqno_to_time_mapping, &event_logger_, job_id, + nullptr /* seqno_to_time_mapping */, &event_logger_, job_id, nullptr /* table_properties */, write_hint, nullptr /*full_history_ts_low*/, &blob_callback_, version, &num_input_entries); diff --git a/db/db_iter.h b/db/db_iter.h index d3c6db496..e27791923 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -368,10 +368,10 @@ class DBIter final : public Iterator { // overhead of calling construction of the function if creating it each time. ParsedInternalKey ikey_; - // TODO(yuzhangyu): update this documentation for kTypeValuePreferredSeqno - // types. // The approximate write time for the entry. It is deduced from the entry's - // sequence number if the seqno to time mapping is available. + // sequence number if the seqno to time mapping is available. For a + // kTypeValuePreferredSeqno entry, this is the write time specified by the + // user. uint64_t saved_write_unix_time_; std::string saved_value_; Slice pinned_value_; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 60c3cb032..fd751782d 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -759,6 +759,11 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v, } } +Status DBTestBase::TimedPut(const Slice& k, const Slice& v, + uint64_t write_unix_time, WriteOptions wo) { + return TimedPut(0, k, v, write_unix_time, wo); +} + Status DBTestBase::TimedPut(int cf, const Slice& k, const Slice& v, uint64_t write_unix_time, WriteOptions wo) { WriteBatch wb; diff --git a/db/db_test_util.h b/db/db_test_util.h index 959f75d10..2a671e1e8 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1176,6 +1176,9 @@ class DBTestBase : public testing::Test { Status Put(int cf, const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()); + Status TimedPut(const Slice& k, const Slice& v, uint64_t write_unix_time, + WriteOptions wo = WriteOptions()); + Status TimedPut(int cf, const Slice& k, const Slice& v, uint64_t write_unix_time, WriteOptions wo = WriteOptions()); diff --git a/db/dbformat.h b/db/dbformat.h index fdde56458..5b1672669 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -374,13 +374,6 @@ inline ValueType ExtractValueType(const Slice& internal_key) { return static_cast(c); } -// input [internal key]: -// output: -inline SequenceNumber ExtractSequenceNumber(const Slice& internal_key) { - uint64_t num = ExtractInternalKeyFooter(internal_key); - return num >> 8; -} - // A comparator for internal keys that uses a specified comparator for // the user key portion and breaks ties by decreasing sequence number. class InternalKeyComparator diff --git a/db/flush_job.cc b/db/flush_job.cc index 78a73c12f..7995ea786 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -100,9 +100,9 @@ FlushJob::FlushJob( Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const SeqnoToTimeMapping& seqno_to_time_mapping, const std::string& db_id, - const std::string& db_session_id, std::string full_history_ts_low, - BlobFileCompletionCallback* blob_callback) + std::shared_ptr seqno_to_time_mapping, + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), @@ -136,7 +136,7 @@ FlushJob::FlushJob( clock_(db_options_.clock), full_history_ts_low_(std::move(full_history_ts_low)), blob_callback_(blob_callback), - db_impl_seqno_to_time_mapping_(seqno_to_time_mapping) { + seqno_to_time_mapping_(std::move(seqno_to_time_mapping)) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -851,15 +851,6 @@ Status FlushJob::WriteLevel0Table() { const uint64_t start_cpu_micros = clock_->CPUMicros(); Status s; - // TODO(yuzhangyu): extend the copied seqno to time mapping range here so - // it can try to cover the earliest write unix time as much as possible. We - // need this mapping to get a more precise preferred seqno. - SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber(); - if (!db_impl_seqno_to_time_mapping_.Empty()) { - // make a local copy to use while not holding the db_mutex. - seqno_to_time_mapping_.CopyFromSeqnoRange(db_impl_seqno_to_time_mapping_, - smallest_seqno); - } meta_.temperature = mutable_cf_options_.default_write_temperature; file_options_.temperature = meta_.temperature; @@ -988,8 +979,8 @@ Status FlushJob::WriteLevel0Table() { earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), &io_s, io_tracer_, - BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_, - job_context_->job_id, &table_properties_, write_hint, + BlobFileCreationReason::kFlush, seqno_to_time_mapping_.get(), + event_logger_, job_context_->job_id, &table_properties_, write_hint, full_history_ts_low, blob_callback_, base_, &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s); diff --git a/db/flush_job.h b/db/flush_job.h index 0667a09db..9e636e6a8 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -73,7 +73,7 @@ class FlushJob { EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const SeqnoToTimeMapping& seq_time_mapping, + std::shared_ptr seqno_to_time_mapping, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", BlobFileCompletionCallback* blob_callback = nullptr); @@ -210,10 +210,14 @@ class FlushJob { const std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; - // reference to the seqno_to_time_mapping_ in db_impl.h, not safe to read - // without db mutex - const SeqnoToTimeMapping& db_impl_seqno_to_time_mapping_; - SeqnoToTimeMapping seqno_to_time_mapping_; + // Shared copy of DB's seqno to time mapping stored in SuperVersion. The + // ownership is shared with this FlushJob when it's created. + // FlushJob accesses and ref counts immutable MemTables directly via + // `MemTableListVersion` instead of ref `SuperVersion`, so we need to give + // the flush job shared ownership of the mapping. + // Note this is only installed when seqno to time recording feature is + // enables, so it could be nullptr. + std::shared_ptr seqno_to_time_mapping_; // Keeps track of the newest user-defined timestamp for this flush job if // `persist_user_defined_timestamps` flag is false. diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 82edcc5e0..3ffb77d53 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -169,7 +169,7 @@ class FlushJobTestBase : public testing::Test { bool persist_udt_ = true; bool paranoid_file_checks_ = false; - SeqnoToTimeMapping empty_seqno_to_time_mapping_; + std::shared_ptr empty_seqno_to_time_mapping_; }; class FlushJobTest : public FlushJobTestBase { @@ -627,13 +627,14 @@ TEST_F(FlushJobTest, ReplaceTimedPutWriteTimeWithPreferredSeqno) { auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); new_mem->Ref(); - SeqnoToTimeMapping seqno_to_time_mapping; + std::shared_ptr seqno_to_time_mapping = + std::make_shared(); // Seqno: 10, 11, ... 20, // Time: ... 500 ... 600 // GetProximalSeqnoBeforeTime(500) -> 10 // GetProximalSeqnoBeforeTime(600) -> 20 - seqno_to_time_mapping.Append(10, 500); - seqno_to_time_mapping.Append(20, 600); + seqno_to_time_mapping->Append(10, 500); + seqno_to_time_mapping->Append(20, 600); ASSERT_OK(new_mem->Add(SequenceNumber(15), kTypeValuePreferredSeqno, "bar", ValueWithWriteTime("bval", 500), diff --git a/db/memtable.cc b/db/memtable.cc index 4197dd9f4..ba4f0da82 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -504,13 +504,16 @@ class MemTableIterator : public InternalIterator { uint64_t write_unix_time() const override { assert(Valid()); - // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno, - // parse its unix write time out of packed value. - if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) { + ParsedInternalKey pikey; + Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false); + if (!s.ok()) { + return std::numeric_limits::max(); + } else if (kTypeValuePreferredSeqno == pikey.type) { + return ParsePackedValueForWriteTime(value()); + } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) { return std::numeric_limits::max(); } - SequenceNumber seqno = ExtractSequenceNumber(key()); - return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(seqno); + return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence); } Slice value() const override { diff --git a/db/repair.cc b/db/repair.cc index eddafe133..34d79c5df 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -484,7 +484,7 @@ class Repairer { {}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker, false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, - empty_seqno_to_time_mapping, nullptr /* event_logger */, + nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */, 0 /* job_id */, nullptr /* table_properties */, write_hint); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index 07f28906b..635557019 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -504,27 +504,36 @@ Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno, return Slice(*buf); } -std::tuple ParsePackedValueWithWriteTime(const Slice& value) { +uint64_t ParsePackedValueForWriteTime(const Slice& value) { assert(value.size() >= sizeof(uint64_t)); Slice write_time_slice(value.data() + value.size() - sizeof(uint64_t), sizeof(uint64_t)); uint64_t write_time; [[maybe_unused]] auto res = GetFixed64(&write_time_slice, &write_time); assert(res); + return write_time; +} + +std::tuple ParsePackedValueWithWriteTime(const Slice& value) { return std::make_tuple(Slice(value.data(), value.size() - sizeof(uint64_t)), - write_time); + ParsePackedValueForWriteTime(value)); } -std::tuple ParsePackedValueWithSeqno( - const Slice& value) { +SequenceNumber ParsePackedValueForSeqno(const Slice& value) { assert(value.size() >= sizeof(SequenceNumber)); Slice seqno_slice(value.data() + value.size() - sizeof(uint64_t), sizeof(uint64_t)); SequenceNumber seqno; [[maybe_unused]] auto res = GetFixed64(&seqno_slice, &seqno); assert(res); + return seqno; +} + +std::tuple ParsePackedValueWithSeqno( + const Slice& value) { return std::make_tuple( - Slice(value.data(), value.size() - sizeof(SequenceNumber)), seqno); + Slice(value.data(), value.size() - sizeof(SequenceNumber)), + ParsePackedValueForSeqno(value)); } Slice ParsePackedValueForValue(const Slice& value) { diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index 2aa7bc2aa..a9255a806 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -277,10 +277,16 @@ Slice PackValueAndWriteTime(const Slice& value, uint64_t unix_write_time, Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno, std::string* buf); +// Parse a packed value to get the write time. +uint64_t ParsePackedValueForWriteTime(const Slice& value); + // Parse a packed value to get the value and the write time. The unpacked value // Slice is backed up by the same memory backing up `value`. std::tuple ParsePackedValueWithWriteTime(const Slice& value); +// Parse a packed value to get the sequence number. +SequenceNumber ParsePackedValueForSeqno(const Slice& value); + // Parse a packed value to get the value and the sequence number. The unpacked // value Slice is backed up by the same memory backing up `value`. std::tuple ParsePackedValueWithSeqno(const Slice& value); diff --git a/db/write_batch.cc b/db/write_batch.cc index 05a592e94..4adba1de8 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -871,6 +871,9 @@ Status WriteBatchInternal::TimedPut(WriteBatch* b, uint32_t column_family_id, if (value.size() > size_t{std::numeric_limits::max()}) { return Status::InvalidArgument("value is too large"); } + if (std::numeric_limits::max() == write_unix_time) { + return WriteBatchInternal::Put(b, column_family_id, key, value); + } LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 3fe84927a..05aec899a 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -406,12 +406,19 @@ TEST_F(WriteBatchTest, PutNotImplemented) { TEST_F(WriteBatchTest, TimedPutNotImplemented) { WriteBatch batch; ASSERT_OK( - batch.TimedPut(0, Slice("k1"), Slice("v1"), /*unix_write_time=*/30)); + batch.TimedPut(0, Slice("k1"), Slice("v1"), /*write_unix_time=*/30)); ASSERT_EQ(1u, batch.Count()); ASSERT_EQ("TimedPut(k1, v1, 30)@0", PrintContents(&batch)); WriteBatch::Handler handler; ASSERT_TRUE(batch.Iterate(&handler).IsInvalidArgument()); + + batch.Clear(); + ASSERT_OK( + batch.TimedPut(0, Slice("k1"), Slice("v1"), + /*write_unix_time=*/std::numeric_limits::max())); + ASSERT_EQ(1u, batch.Count()); + ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch)); } TEST_F(WriteBatchTest, DeleteNotImplemented) { diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index 0ba0e3e28..d9e29a75f 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -96,15 +96,21 @@ class BlockBasedTableIterator : public InternalIteratorBase { uint64_t write_unix_time() const override { assert(Valid()); - // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno, - // parse its unix write time out of packed value. + ParsedInternalKey pikey; + SequenceNumber seqno; const SeqnoToTimeMapping& seqno_to_time_mapping = table_->GetSeqnoToTimeMapping(); - SequenceNumber seqno = ExtractSequenceNumber(key()); - if (kUnknownSeqnoBeforeAll == seqno) { + Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false); + if (!s.ok()) { + return std::numeric_limits::max(); + } else if (kUnknownSeqnoBeforeAll == pikey.sequence) { return kUnknownTimeBeforeAll; } else if (seqno_to_time_mapping.Empty()) { return std::numeric_limits::max(); + } else if (kTypeValuePreferredSeqno == pikey.type) { + seqno = ParsePackedValueForSeqno(value()); + } else { + seqno = pikey.sequence; } return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno); } diff --git a/unreleased_history/new_features/write_time_and_timed_put.md b/unreleased_history/new_features/write_time_and_timed_put.md new file mode 100644 index 000000000..fd6654d14 --- /dev/null +++ b/unreleased_history/new_features/write_time_and_timed_put.md @@ -0,0 +1 @@ +*Implement experimental features: API `Iterator::GetProperty("rocksdb.iterator.write-time")` to allow users to get data's approximate write unix time and write data with a specific write time via `WriteBatch::TimedPut` API. \ No newline at end of file