arena_.~Arena();
new (&arena_) Arena();
- SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
+ SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
return result;
}
-SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
- InstrumentedMutex* db_mutex) {
- SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
+SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
+ SuperVersion* sv = GetThreadLocalSuperVersion(db);
sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) {
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
return sv;
}
-SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
- InstrumentedMutex* db_mutex) {
+SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up
if (sv && sv->Unref()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
- db_mutex->Lock();
+ db->mutex()->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup();
- sv_to_delete = sv;
+ if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
+ db->AddSuperVersionsToFreeQueue(sv);
+ db->SchedulePurge();
+ } else {
+ sv_to_delete = sv;
+ }
} else {
- db_mutex->Lock();
+ db->mutex()->Lock();
}
sv = super_version_->Ref();
- db_mutex->Unlock();
+ db->mutex()->Unlock();
delete sv_to_delete;
}
SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe
// Return a already referenced SuperVersion to be used safely.
- SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex);
+ SuperVersion* GetReferencedSuperVersion(DBImpl* db);
// thread-safe
// Get SuperVersion stored in thread local storage. If it does not exist,
// get a reference from a current SuperVersion.
- SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex);
+ SuperVersion* GetThreadLocalSuperVersion(DBImpl* db);
// Try to return SuperVersion back to thread local storage. Retrun true on
// success and false on failure. It fails when the thread local storage
// contains anything other than SuperVersion::kSVInUse flag.
column_family);
ColumnFamilyData* cfd = cfh->cfd();
- SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
Version* version = super_version->current;
Status s =
AddToLogsToFreeQueue(l);
}
job_context->logs_to_free.clear();
- SchedulePurge();
}
}
delete log_writer;
mutex_.Lock();
}
+ while (!superversions_to_free_queue_.empty()) {
+ assert(!superversions_to_free_queue_.empty());
+ SuperVersion* sv = superversions_to_free_queue_.front();
+ superversions_to_free_queue_.pop_front();
+ mutex_.Unlock();
+ delete sv;
+ mutex_.Lock();
+ }
for (const auto& file : purge_files_) {
const PurgeFileInfo& purge_file = file.second;
const std::string& fname = purge_file.fname;
state->db->FindObsoleteFiles(&job_context, false, true);
if (state->background_purge) {
state->db->ScheduleBgLogWriterClose(&job_context);
+ state->db->AddSuperVersionsToFreeQueue(state->super_version);
+ state->db->SchedulePurge();
}
state->mu->Unlock();
- delete state->super_version;
+ if (!state->background_purge) {
+ delete state->super_version;
+ }
if (job_context.HaveSomethingToDelete()) {
if (state->background_purge) {
// PurgeObsoleteFiles here does not delete files. Instead, it adds the
result = nullptr;
#else
- SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
ReadCallback* read_callback,
bool allow_blob,
bool allow_refresh) {
- SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
#else
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
- SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
// TODO(ljin): consider using GetReferencedSuperVersion() directly
- return cfd->GetThreadLocalSuperVersion(&mutex_);
+ return cfd->GetThreadLocalSuperVersion(this);
}
// REQUIRED: this function should only be called on the write thread or if the
void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
// Release SuperVersion
if (sv->Unref()) {
+ bool defer_purge =
+ immutable_db_options().avoid_unnecessary_blocking_io;
{
InstrumentedMutexLock l(&mutex_);
sv->Cleanup();
+ if (defer_purge) {
+ AddSuperVersionsToFreeQueue(sv);
+ SchedulePurge();
+ }
+ }
+ if (!defer_purge) {
+ delete sv;
}
- delete sv;
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
}
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
start_file_number += args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
- SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
exec_results[i].second = ingestion_jobs[i].Prepare(
args[i].external_files, start_file_number, super_version);
exec_results[i].first = true;
{
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
- SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
exec_results[0].second = ingestion_jobs[0].Prepare(
args[0].external_files, next_file_number, super_version);
exec_results[0].first = true;
dummy_sv_ctx.Clean();
if (status.ok()) {
- SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv);
}
}
std::vector<SuperVersion*> sv_list;
for (auto cfd : cfd_list) {
- sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_));
+ sv_list.push_back(cfd->GetReferencedSuperVersion(this));
}
for (auto& sv : sv_list) {
VersionStorageInfo* vstorage = sv->current->storage_info();
break;
}
}
+ bool defer_purge =
+ immutable_db_options().avoid_unnecessary_blocking_io;
{
InstrumentedMutexLock l(&mutex_);
for (auto sv : sv_list) {
if (sv && sv->Unref()) {
sv->Cleanup();
- delete sv;
+ if (defer_purge) {
+ AddSuperVersionsToFreeQueue(sv);
+ } else {
+ delete sv;
+ }
}
}
+ if (defer_purge) {
+ SchedulePurge();
+ }
for (auto cfd : cfd_list) {
cfd->Unref();
}
logs_to_free_queue_.push_back(log_writer);
}
+ void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
+ superversions_to_free_queue_.push_back(sv);
+ }
+
void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
// Fill JobContext with snapshot information needed by flush and compaction.
// A queue to store log writers to close
std::deque<log::Writer*> logs_to_free_queue_;
+ std::deque<SuperVersion*> superversions_to_free_queue_;
int unscheduled_flushes_;
int unscheduled_compactions_;
// one/both sides of the interval are unbounded. But it requires more
// changes to RangesOverlapWithMemtables.
Range range(*begin, *end);
- SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
CleanupSuperVersion(super_version);
}
const ReadOptions& read_options, ColumnFamilyData* cfd,
SequenceNumber snapshot, ReadCallback* read_callback) {
assert(nullptr != cfd);
- SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
snapshot,
}
db_->ReleaseSnapshot(s1);
}
+
+TEST_F(DBTest2, BackgroundPurgeTest) {
+ Options options = CurrentOptions();
+ options.write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(1 << 20);
+ options.avoid_unnecessary_blocking_io = true;
+ DestroyAndReopen(options);
+ size_t base_value = options.write_buffer_manager->memory_usage();
+
+ ASSERT_OK(Put("a", "a"));
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ ASSERT_OK(Flush());
+ size_t value = options.write_buffer_manager->memory_usage();
+ ASSERT_GT(value, base_value);
+
+ db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
+ test::SleepingBackgroundTask sleeping_task_after;
+ db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+ &sleeping_task_after, Env::Priority::HIGH);
+ delete iter;
+
+ Env::Default()->SleepForMicroseconds(100000);
+ value = options.write_buffer_manager->memory_usage();
+ ASSERT_GT(value, base_value);
+
+ sleeping_task_after.WakeUp();
+ sleeping_task_after.WaitUntilDone();
+
+ test::SleepingBackgroundTask sleeping_task_after2;
+ db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+ &sleeping_task_after2, Env::Priority::HIGH);
+ sleeping_task_after2.WakeUp();
+ sleeping_task_after2.WaitUntilDone();
+
+ value = options.write_buffer_manager->memory_usage();
+ ASSERT_EQ(base_value, value);
+}
} // namespace rocksdb
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
db->FindObsoleteFiles(&job_context, false, true);
if (background_purge_on_iterator_cleanup) {
db->ScheduleBgLogWriterClose(&job_context);
+ db->AddSuperVersionsToFreeQueue(sv);
+ db->SchedulePurge();
}
db->mutex_.Unlock();
- delete sv;
+ if (!background_purge_on_iterator_cleanup) {
+ delete sv;
+ }
if (job_context.HaveSomethingToDelete()) {
db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
}
Cleanup(refresh_sv);
if (refresh_sv) {
// New
- sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
+ sv_ = cfd_->GetReferencedSuperVersion(db_);
}
ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
kMaxSequenceNumber /* upper_bound */);
void ForwardIterator::RenewIterators() {
SuperVersion* svnew;
assert(sv_);
- svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
+ svnew = cfd_->GetReferencedSuperVersion(db_);
if (mutable_iter_ != nullptr) {
DeleteIterator(mutable_iter_, true /* is_arena */);
// independently if the process crashes later and tries to recover.
bool atomic_flush = false;
- // If true, ColumnFamilyHandle's and Iterator's destructors won't delete
- // obsolete files directly and will instead schedule a background job
- // to do it. Use it if you're destroying iterators or ColumnFamilyHandle-s
- // from latency-sensitive threads.
+ // If true, working thread may avoid doing unnecessary and long-latency
+ // operation (such as deleting obsolete files directly or deleting memtable)
+ // and will instead schedule a background job to do it.
+ // Use it if you're latency-sensitive.
// If set to true, takes precedence over
// ReadOptions::background_purge_on_iterator_cleanup.
bool avoid_unnecessary_blocking_io = false;