bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason,
- const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger,
- int job_id, TableProperties* table_properties,
+ UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
+ EventLogger* event_logger, int job_id, TableProperties* table_properties,
Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low,
BlobFileCompletionCallback* blob_callback, Version* version,
uint64_t* num_input_entries, uint64_t* memtable_payload_bytes,
auto [unpacked_value, unix_write_time] =
ParsePackedValueWithWriteTime(value);
SequenceNumber preferred_seqno =
- seqno_to_time_mapping.GetProximalSeqnoBeforeTime(unix_write_time);
+ seqno_to_time_mapping
+ ? seqno_to_time_mapping->GetProximalSeqnoBeforeTime(
+ unix_write_time)
+ : kMaxSequenceNumber;
if (preferred_seqno < ikey.sequence) {
value_after_flush =
PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf);
builder->Abandon();
} else {
SeqnoToTimeMapping relevant_mapping;
- relevant_mapping.CopyFromSeqnoRange(seqno_to_time_mapping,
- meta->fd.smallest_seqno,
- meta->fd.largest_seqno);
- relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
- relevant_mapping.Enforce(tboptions.file_creation_time);
+ if (seqno_to_time_mapping) {
+ relevant_mapping.CopyFromSeqnoRange(*seqno_to_time_mapping,
+ meta->fd.smallest_seqno,
+ meta->fd.largest_seqno);
+ relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
+ relevant_mapping.Enforce(tboptions.file_creation_time);
+ }
builder->SetSeqnoTimeTableProperties(
relevant_mapping,
ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO
bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason,
- const SeqnoToTimeMapping& seqno_to_time_mapping,
+ UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
EventLogger* event_logger = nullptr, int job_id = 0,
TableProperties* table_properties = nullptr,
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
// Share the ownership of the seqno to time mapping object referred to in this
// SuperVersion. To be used by the new SuperVersion to be installed after this
// one if seqno to time mapping does not change in between these two
- // SuperVersions.
+ // SuperVersions. Or to share the ownership of the mapping with a FlushJob.
std::shared_ptr<const SeqnoToTimeMapping> ShareSeqnoToTimeMapping() {
return seqno_to_time_mapping;
}
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kSecondsPerRecording = 101;
+ const int kKeyWithWriteTime = 25;
+ const uint64_t kUserSpecifiedWriteTime =
+ kMockStartTime + kSecondsPerRecording * 15;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
for (int i = 0; i < kNumKeys; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
- ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+ if (i == kKeyWithWriteTime) {
+ ASSERT_OK(
+ TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime));
+ } else {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+ }
}
ReadOptions ropts;
for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
if (start_time == 0) {
start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
+ } else if (i == kKeyWithWriteTime) {
+ VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
iter->Prev(), i--) {
if (i == 0) {
VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
+ } else if (i == kKeyWithWriteTime) {
+ VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
}
ASSERT_OK(iter->status());
}
+
+ // Reopen the DB and disable the seqno to time recording, data with user
+ // specified write time can still get a write time before it's flushed.
+ options.preserve_internal_time_seconds = 0;
+ DestroyAndReopen(options);
+ ASSERT_OK(TimedPut(Key(kKeyWithWriteTime), rnd.RandomString(100),
+ kUserSpecifiedWriteTime));
+ {
+ std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
+ iter->Seek(Key(kKeyWithWriteTime));
+ VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime),
+ kUserSpecifiedWriteTime);
+ ASSERT_OK(iter->status());
+ }
+
+ ASSERT_OK(Flush());
+ {
+ std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
+ iter->Seek(Key(kKeyWithWriteTime));
+ VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime),
+ std::numeric_limits<uint64_t>::max());
+ ASSERT_OK(iter->status());
+ }
+
Close();
}
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kSecondsPerRecording = 101;
+ const int kKeyWithWriteTime = 25;
+ const uint64_t kUserSpecifiedWriteTime =
+ kMockStartTime + kSecondsPerRecording * 15;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
for (int i = 0; i < kNumKeys; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
- ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+ if (i == kKeyWithWriteTime) {
+ ASSERT_OK(
+ TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime));
+ } else {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
+ }
}
ASSERT_OK(Flush());
for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
if (start_time == 0) {
start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
+ } else if (i == kKeyWithWriteTime) {
+ // It's not precisely kUserSpecifiedWriteTime, instead it has a margin
+ // of error that is one recording apart while we convert write time to
+ // sequence number, and then back to write time.
+ VerifyKeyAndWriteTime(iter.get(), Key(i),
+ kUserSpecifiedWriteTime - kSecondsPerRecording);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
iter->Prev(), i--) {
if (i == 0) {
VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
+ } else if (i == kKeyWithWriteTime) {
+ VerifyKeyAndWriteTime(iter.get(), Key(i),
+ kUserSpecifiedWriteTime - kSecondsPerRecording);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) {
if (i == 0) {
VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
+ } else if (i == kKeyWithWriteTime) {
+ VerifyKeyAndWriteTime(iter.get(), Key(i),
+ kUserSpecifiedWriteTime - kSecondsPerRecording);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
- io_tracer_, seqno_to_time_mapping_, db_id_, db_session_id_,
- cfd->GetFullHistoryTsLow(), &blob_callback_);
+ io_tracer_, cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,
+ db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_);
FileMetaData file_meta;
Status s;
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
- thread_pri, io_tracer_, seqno_to_time_mapping_, db_id_, db_session_id_,
- cfd->GetFullHistoryTsLow(), &blob_callback_));
+ thread_pri, io_tracer_,
+ cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,
+ db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_));
}
std::vector<FileMetaData> file_meta(num_cfs);
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, db_id_, db_session_id_,
0 /* target_file_size */, meta.fd.GetNumber());
- SeqnoToTimeMapping empty_seqno_to_time_mapping;
Version* version = cfd->current();
version->Ref();
uint64_t num_input_entries = 0;
snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
io_tracer_, BlobFileCreationReason::kRecovery,
- empty_seqno_to_time_mapping, &event_logger_, job_id,
+ nullptr /* seqno_to_time_mapping */, &event_logger_, job_id,
nullptr /* table_properties */, write_hint,
nullptr /*full_history_ts_low*/, &blob_callback_, version,
&num_input_entries);
// overhead of calling construction of the function if creating it each time.
ParsedInternalKey ikey_;
- // TODO(yuzhangyu): update this documentation for kTypeValuePreferredSeqno
- // types.
// The approximate write time for the entry. It is deduced from the entry's
- // sequence number if the seqno to time mapping is available.
+ // sequence number if the seqno to time mapping is available. For a
+ // kTypeValuePreferredSeqno entry, this is the write time specified by the
+ // user.
uint64_t saved_write_unix_time_;
std::string saved_value_;
Slice pinned_value_;
}
}
+Status DBTestBase::TimedPut(const Slice& k, const Slice& v,
+ uint64_t write_unix_time, WriteOptions wo) {
+ return TimedPut(0, k, v, write_unix_time, wo);
+}
+
Status DBTestBase::TimedPut(int cf, const Slice& k, const Slice& v,
uint64_t write_unix_time, WriteOptions wo) {
WriteBatch wb;
Status Put(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());
+ Status TimedPut(const Slice& k, const Slice& v, uint64_t write_unix_time,
+ WriteOptions wo = WriteOptions());
+
Status TimedPut(int cf, const Slice& k, const Slice& v,
uint64_t write_unix_time, WriteOptions wo = WriteOptions());
return static_cast<ValueType>(c);
}
-// input [internal key]: <user_provided_key | ts | seqno + type>
-// output: <seqno>
-inline SequenceNumber ExtractSequenceNumber(const Slice& internal_key) {
- uint64_t num = ExtractInternalKeyFooter(internal_key);
- return num >> 8;
-}
-
// A comparator for internal keys that uses a specified comparator for
// the user key portion and breaks ties by decreasing sequence number.
class InternalKeyComparator
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
- const SeqnoToTimeMapping& seqno_to_time_mapping, const std::string& db_id,
- const std::string& db_session_id, std::string full_history_ts_low,
- BlobFileCompletionCallback* blob_callback)
+ std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
+ const std::string& db_id, const std::string& db_session_id,
+ std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
: dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
clock_(db_options_.clock),
full_history_ts_low_(std::move(full_history_ts_low)),
blob_callback_(blob_callback),
- db_impl_seqno_to_time_mapping_(seqno_to_time_mapping) {
+ seqno_to_time_mapping_(std::move(seqno_to_time_mapping)) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
const uint64_t start_cpu_micros = clock_->CPUMicros();
Status s;
- // TODO(yuzhangyu): extend the copied seqno to time mapping range here so
- // it can try to cover the earliest write unix time as much as possible. We
- // need this mapping to get a more precise preferred seqno.
- SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber();
- if (!db_impl_seqno_to_time_mapping_.Empty()) {
- // make a local copy to use while not holding the db_mutex.
- seqno_to_time_mapping_.CopyFromSeqnoRange(db_impl_seqno_to_time_mapping_,
- smallest_seqno);
- }
meta_.temperature = mutable_cf_options_.default_write_temperature;
file_options_.temperature = meta_.temperature;
earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_,
- BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_,
- job_context_->job_id, &table_properties_, write_hint,
+ BlobFileCreationReason::kFlush, seqno_to_time_mapping_.get(),
+ event_logger_, job_context_->job_id, &table_properties_, write_hint,
full_history_ts_low, blob_callback_, base_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s);
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
- const SeqnoToTimeMapping& seq_time_mapping,
+ std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "",
BlobFileCompletionCallback* blob_callback = nullptr);
const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
- // reference to the seqno_to_time_mapping_ in db_impl.h, not safe to read
- // without db mutex
- const SeqnoToTimeMapping& db_impl_seqno_to_time_mapping_;
- SeqnoToTimeMapping seqno_to_time_mapping_;
+ // Shared copy of DB's seqno to time mapping stored in SuperVersion. The
+ // ownership is shared with this FlushJob when it's created.
+ // FlushJob accesses and ref counts immutable MemTables directly via
+ // `MemTableListVersion` instead of ref `SuperVersion`, so we need to give
+ // the flush job shared ownership of the mapping.
+ // Note this is only installed when seqno to time recording feature is
+ // enables, so it could be nullptr.
+ std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
// Keeps track of the newest user-defined timestamp for this flush job if
// `persist_user_defined_timestamps` flag is false.
bool persist_udt_ = true;
bool paranoid_file_checks_ = false;
- SeqnoToTimeMapping empty_seqno_to_time_mapping_;
+ std::shared_ptr<SeqnoToTimeMapping> empty_seqno_to_time_mapping_;
};
class FlushJobTest : public FlushJobTestBase {
auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber);
new_mem->Ref();
- SeqnoToTimeMapping seqno_to_time_mapping;
+ std::shared_ptr<SeqnoToTimeMapping> seqno_to_time_mapping =
+ std::make_shared<SeqnoToTimeMapping>();
// Seqno: 10, 11, ... 20,
// Time: ... 500 ... 600
// GetProximalSeqnoBeforeTime(500) -> 10
// GetProximalSeqnoBeforeTime(600) -> 20
- seqno_to_time_mapping.Append(10, 500);
- seqno_to_time_mapping.Append(20, 600);
+ seqno_to_time_mapping->Append(10, 500);
+ seqno_to_time_mapping->Append(20, 600);
ASSERT_OK(new_mem->Add(SequenceNumber(15), kTypeValuePreferredSeqno, "bar",
ValueWithWriteTime("bval", 500),
uint64_t write_unix_time() const override {
assert(Valid());
- // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno,
- // parse its unix write time out of packed value.
- if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
+ ParsedInternalKey pikey;
+ Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
+ if (!s.ok()) {
+ return std::numeric_limits<uint64_t>::max();
+ } else if (kTypeValuePreferredSeqno == pikey.type) {
+ return ParsePackedValueForWriteTime(value());
+ } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
return std::numeric_limits<uint64_t>::max();
}
- SequenceNumber seqno = ExtractSequenceNumber(key());
- return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(seqno);
+ return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence);
}
Slice value() const override {
{}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker,
false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
- empty_seqno_to_time_mapping, nullptr /* event_logger */,
+ nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */,
0 /* job_id */, nullptr /* table_properties */, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
return Slice(*buf);
}
-std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value) {
+uint64_t ParsePackedValueForWriteTime(const Slice& value) {
assert(value.size() >= sizeof(uint64_t));
Slice write_time_slice(value.data() + value.size() - sizeof(uint64_t),
sizeof(uint64_t));
uint64_t write_time;
[[maybe_unused]] auto res = GetFixed64(&write_time_slice, &write_time);
assert(res);
+ return write_time;
+}
+
+std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value) {
return std::make_tuple(Slice(value.data(), value.size() - sizeof(uint64_t)),
- write_time);
+ ParsePackedValueForWriteTime(value));
}
-std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(
- const Slice& value) {
+SequenceNumber ParsePackedValueForSeqno(const Slice& value) {
assert(value.size() >= sizeof(SequenceNumber));
Slice seqno_slice(value.data() + value.size() - sizeof(uint64_t),
sizeof(uint64_t));
SequenceNumber seqno;
[[maybe_unused]] auto res = GetFixed64(&seqno_slice, &seqno);
assert(res);
+ return seqno;
+}
+
+std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(
+ const Slice& value) {
return std::make_tuple(
- Slice(value.data(), value.size() - sizeof(SequenceNumber)), seqno);
+ Slice(value.data(), value.size() - sizeof(SequenceNumber)),
+ ParsePackedValueForSeqno(value));
}
Slice ParsePackedValueForValue(const Slice& value) {
Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno,
std::string* buf);
+// Parse a packed value to get the write time.
+uint64_t ParsePackedValueForWriteTime(const Slice& value);
+
// Parse a packed value to get the value and the write time. The unpacked value
// Slice is backed up by the same memory backing up `value`.
std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value);
+// Parse a packed value to get the sequence number.
+SequenceNumber ParsePackedValueForSeqno(const Slice& value);
+
// Parse a packed value to get the value and the sequence number. The unpacked
// value Slice is backed up by the same memory backing up `value`.
std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(const Slice& value);
if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
return Status::InvalidArgument("value is too large");
}
+ if (std::numeric_limits<uint64_t>::max() == write_unix_time) {
+ return WriteBatchInternal::Put(b, column_family_id, key, value);
+ }
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
TEST_F(WriteBatchTest, TimedPutNotImplemented) {
WriteBatch batch;
ASSERT_OK(
- batch.TimedPut(0, Slice("k1"), Slice("v1"), /*unix_write_time=*/30));
+ batch.TimedPut(0, Slice("k1"), Slice("v1"), /*write_unix_time=*/30));
ASSERT_EQ(1u, batch.Count());
ASSERT_EQ("TimedPut(k1, v1, 30)@0", PrintContents(&batch));
WriteBatch::Handler handler;
ASSERT_TRUE(batch.Iterate(&handler).IsInvalidArgument());
+
+ batch.Clear();
+ ASSERT_OK(
+ batch.TimedPut(0, Slice("k1"), Slice("v1"),
+ /*write_unix_time=*/std::numeric_limits<uint64_t>::max()));
+ ASSERT_EQ(1u, batch.Count());
+ ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
}
TEST_F(WriteBatchTest, DeleteNotImplemented) {
uint64_t write_unix_time() const override {
assert(Valid());
- // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno,
- // parse its unix write time out of packed value.
+ ParsedInternalKey pikey;
+ SequenceNumber seqno;
const SeqnoToTimeMapping& seqno_to_time_mapping =
table_->GetSeqnoToTimeMapping();
- SequenceNumber seqno = ExtractSequenceNumber(key());
- if (kUnknownSeqnoBeforeAll == seqno) {
+ Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
+ if (!s.ok()) {
+ return std::numeric_limits<uint64_t>::max();
+ } else if (kUnknownSeqnoBeforeAll == pikey.sequence) {
return kUnknownTimeBeforeAll;
} else if (seqno_to_time_mapping.Empty()) {
return std::numeric_limits<uint64_t>::max();
+ } else if (kTypeValuePreferredSeqno == pikey.type) {
+ seqno = ParsePackedValueForSeqno(value());
+ } else {
+ seqno = pikey.sequence;
}
return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno);
}
--- /dev/null
+*Implement experimental features: API `Iterator::GetProperty("rocksdb.iterator.write-time")` to allow users to get data's approximate write unix time and write data with a specific write time via `WriteBatch::TimedPut` API.
\ No newline at end of file