Summary:
This fixes the following scenario we've hit:
- we reached max_total_wal_size, created a new wal and scheduled flushing all memtables corresponding to the old one,
- before the last of these flushes started its column family was dropped; the last background flush call was a no-op; no one removed the old wal from alive_logs_,
- hours have passed and no flushes happened even though lots of data was written; data is written to different column families, compactions are disabled; old column families are dropped before memtable grows big enough to trigger a flush; the old wal still sits in alive_logs_ preventing max_total_wal_size limit from kicking in,
- a few more hours pass and we run out disk space because of one huge .log file.
Test Plan: `make check`; backported the new test, checked that it fails without this diff
Reviewers: igor
Reviewed By: igor
Subscribers: dhruba
Differential Revision: https://reviews.facebook.net/D40893
(cherry picked from commit
218487d8dc45c4cb03dbb80bd4d7031b131b9f25)
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
job_context->min_pending_output);
+ uint64_t min_log_number = versions_->MinLogNumber();
+ if (!alive_log_files_.empty()) {
+ // find newly obsoleted log files
+ while (alive_log_files_.begin()->number < min_log_number) {
+ auto& earliest = *alive_log_files_.begin();
+ job_context->log_delete_files.push_back(earliest.number);
+ total_log_size_ -= earliest.size;
+ alive_log_files_.pop_front();
+ // Current log should always stay alive since it can't have
+ // number < MinLogNumber().
+ assert(alive_log_files_.size());
+ }
+ }
+
+ // We're just cleaning up for DB::Write().
+ job_context->logs_to_free = logs_to_free_;
+ logs_to_free_.clear();
+
// store the current filenum, lognum, etc
job_context->manifest_file_number = versions_->manifest_file_number();
job_context->pending_manifest_file_number =
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
-
- if (disable_delete_obsolete_files_ == 0) {
- // add to deletion state
- while (alive_log_files_.size() &&
- alive_log_files_.begin()->number < versions_->MinLogNumber()) {
- const auto& earliest = *alive_log_files_.begin();
- job_context->log_delete_files.push_back(earliest.number);
- total_log_size_ -= earliest.size;
- alive_log_files_.pop_front();
- }
- }
}
if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks &&
void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
+ TEST_SYNC_POINT("DBImpl::BGWorkFlush");
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
+ TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
}
void DBImpl::BGWorkCompaction(void* db) {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
- // We're just cleaning up for DB::Write()
- job_context.logs_to_free = logs_to_free_;
- logs_to_free_.clear();
-
// If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
- // We're just cleaning up for DB::Write()
- job_context.logs_to_free = logs_to_free_;
- logs_to_free_.clear();
-
// If compaction failed, we want to delete all temporary files that we might
// have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles()
size_t TEST_LogsToFreeSize();
+ uint64_t TEST_LogfileNumber();
+
#endif // ROCKSDB_LITE
// Returns the list of live files in 'live' and the list
return logs_to_free_.size();
}
+uint64_t DBImpl::TEST_LogfileNumber() {
+ InstrumentedMutexLock l(&mutex_);
+ return logfile_number_;
+}
+
} // namespace rocksdb
#endif // ROCKSDB_LITE
} while (ChangeCompactOptions());
}
-#ifndef NDEBUG // sync point is not included with DNDEBUG build
TEST_F(DBTest, TransactionLogIteratorRace) {
static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
} while (ChangeCompactOptions());
}
}
-#endif
TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) {
do {
ASSERT_EQ("1", it->key().ToString());
}
+TEST_F(DBTest, DeletingOldWalAfterDrop) {
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ { { "Test:AllowFlushes", "DBImpl::BGWorkFlush" },
+ { "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} });
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ Options options = CurrentOptions();
+ options.max_total_wal_size = 8192;
+ options.compression = kNoCompression;
+ options.write_buffer_size = 1 << 20;
+ options.level0_file_num_compaction_trigger = (1<<30);
+ options.level0_slowdown_writes_trigger = (1<<30);
+ options.level0_stop_writes_trigger = (1<<30);
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ CreateColumnFamilies({"cf1", "cf2"}, options);
+ ASSERT_OK(Put(0, "key1", DummyString(8192)));
+ ASSERT_OK(Put(0, "key2", DummyString(8192)));
+ // the oldest wal should now be getting_flushed
+ ASSERT_OK(db_->DropColumnFamily(handles_[0]));
+ // all flushes should now do nothing because their CF is dropped
+ TEST_SYNC_POINT("Test:AllowFlushes");
+ TEST_SYNC_POINT("Test:WaitForFlush");
+ uint64_t lognum1 = dbfull()->TEST_LogfileNumber();
+ ASSERT_OK(Put(1, "key3", DummyString(8192)));
+ ASSERT_OK(Put(1, "key4", DummyString(8192)));
+ // new wal should have been created
+ uint64_t lognum2 = dbfull()->TEST_LogfileNumber();
+ EXPECT_GT(lognum2, lognum1);
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
uint64_t MinLogNumber() const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) {
- if (min_log_num > cfd->GetLogNumber()) {
+ // It's safe to ignore dropped column families here:
+ // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
+ if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber();
}
}