}
MemTable* ColumnFamilyData::ConstructNewMemtable(
- const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq,
- uint64_t log_number) {
+ const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
- write_buffer_manager_, earliest_seq, id_, log_number);
+ write_buffer_manager_, earliest_seq, id_);
}
void ColumnFamilyData::CreateNewMemtable(
- const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq,
- uint64_t log_number) {
+ const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
- SetMemtable(
- ConstructNewMemtable(mutable_cf_options, earliest_seq, log_number));
+ SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
mem_->Ref();
}
// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
- SequenceNumber earliest_seq,
- uint64_t log_number = 0);
+ SequenceNumber earliest_seq);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
- SequenceNumber earliest_seq, uint64_t log_number = 0);
+ SequenceNumber earliest_seq);
TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }
// Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
- kMaxSequenceNumber, cfd->GetLogNumber());
+ kMaxSequenceNumber);
}
}
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
- *next_sequence, cfd->GetLogNumber());
+ *next_sequence);
}
}
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
- versions_->LastSequence(),
- cfd->GetLogNumber());
+ versions_->LastSequence());
}
data_seen = true;
}
curr_log_num != log_number)) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
- MemTable* new_mem = cfd->ConstructNewMemtable(
- mutable_cf_options, seq_of_batch, log_number);
+ MemTable* new_mem =
+ cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
cfd->mem()->SetNextLogNumber(log_number);
cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
new_mem->Ref();
}
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
- new_mem =
- cfd->ConstructNewMemtable(mutable_cf_options, seq, new_log_number);
+ new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
context->superversion_context.NewSuperVersion();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
base_->Unref();
}
-uint64_t FlushJob::ExtractEarliestLogFileNumber() {
- uint64_t earliest_logno = 0;
- for (MemTable* m : mems_) {
- uint64_t logno = m->GetEarliestLogFileNumber();
- if (logno > 0 && (earliest_logno == 0 || logno < earliest_logno)) {
- earliest_logno = logno;
- }
- }
- return earliest_logno;
-}
-
Status FlushJob::MemPurge() {
Status s;
db_mutex_->AssertHeld();
NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
static_cast<int>(memtables.size()), &arena));
- uint64_t earliest_logno = ExtractEarliestLogFileNumber();
-
auto* ioptions = cfd_->ioptions();
// Place iterator at the First (meaning most recent) key node.
new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
mutable_cf_options_, cfd_->write_buffer_mgr(),
- earliest_seqno, cfd_->GetID(), earliest_logno);
+ earliest_seqno, cfd_->GetID());
assert(new_mem != nullptr);
Env* env = db_options_.env;
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
- uint64_t ExtractEarliestLogFileNumber();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
- SequenceNumber latest_seq, uint32_t column_family_id,
- uint64_t current_logfile_number)
+ SequenceNumber latest_seq, uint32_t column_family_id)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
- mem_min_logfile_number_(current_logfile_number),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
? moptions_.inplace_update_num_locks
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
- SequenceNumber earliest_seq, uint32_t column_family_id,
- uint64_t current_logfile_number = 0);
+ SequenceNumber earliest_seq, uint32_t column_family_id);
// No copying allowed
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
// operations on the same MemTable.
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
- // Set the earliest log file number that (possibly)
- // contains entries from this memtable.
- void SetEarliestLogFileNumber(uint64_t logno) {
- mem_min_logfile_number_ = logno;
- }
-
- // Return the earliest log file number that (possibly)
- // contains entries from this memtable.
- uint64_t GetEarliestLogFileNumber() { return mem_min_logfile_number_; }
-
// if this memtable contains data from a committed
// two phase transaction we must take note of the
// log which contains that data so we can know
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
- // The earliest log containing entries inserted into
- // this memtable.
- uint64_t mem_min_logfile_number_;
-
// the earliest log containing a prepared section
// which has been inserted into this memtable.
std::atomic<uint64_t> min_prep_log_referenced_;
// and don't commit anything to the manifest file.
RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer,
to_delete, mu);
+ // Note: cfd->SetLogNumber is only called when a VersionEdit
+ // is written to MANIFEST. When mempurge is succesful, we skip
+ // this step, therefore cfd->GetLogNumber is always is
+ // earliest log with data unflushed.
// Notify new head of manifest write queue.
// wake up all the waiting writers
- // TODO(bjlemaire): explain full reason needed or investigate more.
+ // TODO(bjlemaire): explain full reason WakeUpWaitingManifestWriters
+ // needed or investigate more.
vset->WakeUpWaitingManifestWriters();
*io_s = IOStatus::OK();
}
}
}
-// Returns the earliest log that possibly contain entries
-// from one of the memtables of this memtable_list.
-uint64_t MemTableList::EarliestLogContainingData() {
- uint64_t min_log = 0;
-
- for (auto& m : current_->memlist_) {
- uint64_t log = m->GetEarliestLogFileNumber();
- if (log > 0 && (min_log == 0 || log < min_log)) {
- min_log = log;
- }
- }
-
- return min_log;
-}
-
uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const std::unordered_set<MemTable*>* memtables_to_flush) {
uint64_t min_log = 0;
size_t* current_memory_usage() { return ¤t_memory_usage_; }
- // Returns the earliest log that possibly contain entries
- // from one of the memtables of this memtable_list.
- uint64_t EarliestLogContainingData();
-
// Returns the min log containing the prep section after memtables listsed in
// `memtables_to_flush` are flushed and their status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection(
// Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) {
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
- kMaxSequenceNumber, cfd->GetLogNumber());
+ kMaxSequenceNumber);
}
auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(edit.log_number_);
- if (version_set_->db_options()->experimental_allow_mempurge &&
- edit.log_number_ > 0 &&
- (cfd->mem()->GetEarliestLogFileNumber() == 0)) {
- cfd->mem()->SetEarliestLogFileNumber(edit.log_number_);
- }
version_edit_params_.SetLogNumber(edit.log_number_);
}
}
// GetLatestMutableCFOptions() is safe here without mutex since the
// cfd is not available to client
new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
- LastSequence(), edit->log_number_);
+ LastSequence());
new_cfd->SetLogNumber(edit->log_number_);
return new_cfd;
}
if (min_log_num > num && !cfd->IsDropped()) {
min_log_num = num;
}
- // If mempurge is activated, there may be an immutable memtable
- // that has data not flushed to any SST file.
- if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) &&
- !(cfd->IsDropped())) {
- num = cfd->imm()->EarliestLogContainingData();
- if ((num > 0) && (min_log_num > num)) {
- min_log_num = num;
- }
- }
}
return min_log_num;
}
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber();
}
- // If mempurge is activated, there may be an immutable memtable
- // that has data not flushed to any SST file.
- if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) &&
- !(cfd->IsDropped())) {
- uint64_t num = cfd->imm()->EarliestLogContainingData();
- if ((num > 0) && (min_log_num > num)) {
- min_log_num = num;
- }
- }
}
return min_log_num;
}