opt->rep.WAL_ttl_seconds = ttl;
}
+void leveldb_options_set_WAL_size_limit_MB(
+ leveldb_options_t* opt, uint64_t limit) {
+ opt->rep.WAL_size_limit_MB = limit;
+}
+
leveldb_comparator_t* leveldb_comparator_create(
void* state,
void (*destructor)(void*),
" a compaction run that compacts Level-K with Level-(K+1) (for"
" K >= 1)");
-DEFINE_uint64(wal_ttl, 0, "Set the TTL for the WAL Files in seconds.");
+DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
+DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
+ " in MB.");
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
"Allow buffered io using OS buffers");
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type_e;
- options.WAL_ttl_seconds = FLAGS_wal_ttl;
+ options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
+ options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level.resize(FLAGS_num_levels);
delete_obsolete_files_last_run_(0),
purge_wal_files_last_run_(0),
last_stats_dump_time_microsec_(0),
+ default_interval_to_delete_obsolete_WAL_(600),
stall_level0_slowdown_(0),
stall_memtable_compaction_(0),
stall_level0_num_files_(0),
}
const Status DBImpl::CreateArchivalDirectory() {
- if (options_.WAL_ttl_seconds > 0) {
+ if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
std::string archivalPath = ArchivalDirectory(options_.wal_dir);
return env_->CreateDirIfMissing(archivalPath);
}
Status DBImpl::DeleteLogFile(uint64_t number) {
Status s;
auto filename = LogFileName(options_.wal_dir, number);
- if (options_.WAL_ttl_seconds > 0) {
+ if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
s = env_->RenameFile(filename,
ArchivedLogFileName(options_.wal_dir, number));
EvictObsoleteFiles(deletion_state);
}
+// 1. Go through all archived files and
+// a. if ttl is enabled, delete outdated files
+// b. if archive size limit is enabled, delete empty files,
+// compute file number and size.
+// 2. If size limit is enabled:
+// a. compute how many files should be deleted
+// b. get sorted non-empty archived logs
+// c. delete what should be deleted
void DBImpl::PurgeObsoleteWALFiles() {
+ bool const ttl_enabled = options_.WAL_ttl_seconds > 0;
+ bool const size_limit_enabled = options_.WAL_size_limit_MB > 0;
+ if (!ttl_enabled && !size_limit_enabled) {
+ return;
+ }
+
int64_t current_time;
Status s = env_->GetCurrentTime(¤t_time);
- uint64_t now_seconds = static_cast<uint64_t>(current_time);
- assert(s.ok());
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't get current time: %s", s.ToString().c_str());
+ assert(false);
+ return;
+ }
+ uint64_t const now_seconds = static_cast<uint64_t>(current_time);
+ uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ?
+ options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_;
- if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) {
- if (purge_wal_files_last_run_ + options_.WAL_ttl_seconds > now_seconds) {
- return;
- }
- std::vector<std::string> wal_files;
- std::string archival_dir = ArchivalDirectory(options_.wal_dir);
- env_->GetChildren(archival_dir, &wal_files);
- for (const auto& f : wal_files) {
- uint64_t file_m_time;
- const std::string file_path = archival_dir + "/" + f;
- const Status s = env_->GetFileModificationTime(file_path, &file_m_time);
- if (s.ok() && (now_seconds - file_m_time > options_.WAL_ttl_seconds)) {
- Status status = env_->DeleteFile(file_path);
- if (!status.ok()) {
- Log(options_.info_log,
- "Failed Deleting a WAL file Error : i%s",
- status.ToString().c_str());
+ if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
+ return;
+ }
+
+ purge_wal_files_last_run_ = now_seconds;
+
+ std::string archival_dir = ArchivalDirectory(options_.wal_dir);
+ std::vector<std::string> files;
+ s = env_->GetChildren(archival_dir, &files);
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't get archive files: %s", s.ToString().c_str());
+ assert(false);
+ return;
+ }
+
+ size_t log_files_num = 0;
+ uint64_t log_file_size = 0;
+
+ for (auto& f : files) {
+ uint64_t number;
+ FileType type;
+ if (ParseFileName(f, &number, &type) && type == kLogFile) {
+ std::string const file_path = archival_dir + "/" + f;
+ if (ttl_enabled) {
+ uint64_t file_m_time;
+ Status const s = env_->GetFileModificationTime(file_path,
+ &file_m_time);
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't get file mod time: %s: %s",
+ file_path.c_str(), s.ToString().c_str());
+ continue;
}
- } // Ignore errors.
+ if (now_seconds - file_m_time > options_.WAL_ttl_seconds) {
+ Status const s = env_->DeleteFile(file_path);
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't delete file: %s: %s",
+ file_path.c_str(), s.ToString().c_str());
+ continue;
+ }
+ continue;
+ }
+ }
+
+ if (size_limit_enabled) {
+ uint64_t file_size;
+ Status const s = env_->GetFileSize(file_path, &file_size);
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't get file size: %s: %s",
+ file_path.c_str(), s.ToString().c_str());
+ return;
+ } else {
+ if (file_size > 0) {
+ log_file_size = std::max(log_file_size, file_size);
+ ++log_files_num;
+ } else {
+ Status s = env_->DeleteFile(file_path);
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't delete file: %s: %s",
+ file_path.c_str(), s.ToString().c_str());
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (0 == log_files_num || !size_limit_enabled) {
+ return;
+ }
+
+ size_t const files_keep_num = options_.WAL_size_limit_MB *
+ 1024 * 1024 / log_file_size;
+ if (log_files_num <= files_keep_num) {
+ return;
+ }
+
+ size_t files_del_num = log_files_num - files_keep_num;
+ VectorLogPtr archived_logs;
+ AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
+
+ if (files_del_num > archived_logs.size()) {
+ Log(options_.info_log, "Trying to delete more archived log files than "
+ "exist. Deleting all");
+ files_del_num = archived_logs.size();
+ }
+
+ for (size_t i = 0; i < files_del_num; ++i) {
+ std::string const file_path = archived_logs[i]->PathName();
+ Status const s = DeleteFile(file_path);
+ if (!s.ok()) {
+ Log(options_.info_log, "Can't delete file: %s: %s",
+ file_path.c_str(), s.ToString().c_str());
+ continue;
}
}
- purge_wal_files_last_run_ = now_seconds;
}
// If externalTable is set, then apply recovered transactions
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);}
+ void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL)
+ {
+ default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
+ }
+
protected:
Env* const env_;
const std::string dbname_;
// last time stats were dumped to LOG
std::atomic<uint64_t> last_stats_dump_time_microsec_;
+ // obsolete files will be deleted every this seconds if ttl deletion is
+ // enabled and archive size_limit is disabled.
+ uint64_t default_interval_to_delete_obsolete_WAL_;
+
// These count the number of microseconds for which MakeRoomForWrite stalls.
uint64_t stall_level0_slowdown_;
uint64_t stall_memtable_compaction_;
return std::move(log_files);
}
-TEST(DBTest, WALArchival) {
+TEST(DBTest, WALArchivalTtl) {
do {
- std::string value(1024, '1');
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
DestroyAndReopen(&options);
-
- // TEST : Create DB with a ttl.
+ // TEST : Create DB with a ttl and no size limit.
// Put some keys. Count the log files present in the DB just after insert.
// Re-open db. Causes deletion/archival to take place.
// Assert that the files moved under "/archive".
+ // Reopen db with small ttl.
+ // Assert that archive was removed.
std::string archiveDir = ArchivalDirectory(dbname_);
for (int i = 0; i < 10; ++i) {
for (int j = 0; j < 10; ++j) {
- ASSERT_OK(Put(Key(10*i+j), value));
+ ASSERT_OK(Put(Key(10 * i + j), DummyString(1024)));
}
- std::vector<uint64_t> logFiles = ListLogFiles(env_, dbname_);
+ std::vector<uint64_t> log_files = ListLogFiles(env_, dbname_);
options.create_if_missing = false;
Reopen(&options);
std::vector<uint64_t> logs = ListLogFiles(env_, archiveDir);
std::set<uint64_t> archivedFiles(logs.begin(), logs.end());
- for (auto& log : logFiles) {
+ for (auto& log : log_files) {
ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end());
}
}
- std::vector<uint64_t> logFiles = ListLogFiles(env_, archiveDir);
- ASSERT_TRUE(logFiles.size() > 0);
+ std::vector<uint64_t> log_files = ListLogFiles(env_, archiveDir);
+ ASSERT_TRUE(log_files.size() > 0);
+
options.WAL_ttl_seconds = 1;
- env_->SleepForMicroseconds(2*1000*1000);
+ env_->SleepForMicroseconds(2 * 1000 * 1000);
Reopen(&options);
- logFiles = ListLogFiles(env_, archiveDir);
- ASSERT_TRUE(logFiles.size() == 0);
+ log_files = ListLogFiles(env_, archiveDir);
+ ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions());
}
-TEST(DBTest, WALClear) {
+uint64_t GetLogDirSize(std::string dir_path, SpecialEnv* env) {
+ uint64_t dir_size = 0;
+ std::vector<std::string> files;
+ env->GetChildren(dir_path, &files);
+ for (auto& f : files) {
+ uint64_t number;
+ FileType type;
+ if (ParseFileName(f, &number, &type) && type == kLogFile) {
+ std::string const file_path = dir_path + "/" + f;
+ uint64_t file_size;
+ env->GetFileSize(file_path, &file_size);
+ dir_size += file_size;
+ }
+ }
+ return dir_size;
+}
+
+TEST(DBTest, WALArchivalSizeLimit) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
- options.WAL_ttl_seconds = 1;
+ options.WAL_ttl_seconds = 0;
+ options.WAL_size_limit_MB = 1000;
+
+ // TEST : Create DB with huge size limit and no ttl.
+ // Put some keys. Count the archived log files present in the DB
+ // just after insert. Assert that there are many enough.
+ // Change size limit. Re-open db.
+ // Assert that archive is not greater than WAL_size_limit_MB.
+ // Set ttl and time_to_check_ to small values. Re-open db.
+ // Assert that there are no archived logs left.
- for (int j = 0; j < 10; ++j)
- for (int i = 0; i < 10; ++i)
- ASSERT_OK(Put(Key(10*i+j), DummyString(1024)));
+ DestroyAndReopen(&options);
+ for (int i = 0; i < 128 * 128; ++i) {
+ ASSERT_OK(Put(Key(i), DummyString(1024)));
+ }
Reopen(&options);
+
std::string archive_dir = ArchivalDirectory(dbname_);
std::vector<std::uint64_t> log_files = ListLogFiles(env_, archive_dir);
- ASSERT_TRUE(!log_files.empty());
+ ASSERT_TRUE(log_files.size() > 2);
+
+ options.WAL_size_limit_MB = 8;
+ Reopen(&options);
+ dbfull()->TEST_PurgeObsoleteteWAL();
+
+ uint64_t archive_size = GetLogDirSize(archive_dir, env_);
+ ASSERT_TRUE(archive_size <= options.WAL_size_limit_MB * 1024 * 1024);
+
+ options.WAL_ttl_seconds = 1;
+ dbfull()->TEST_SetDefaultTimeToCheck(1);
env_->SleepForMicroseconds(2 * 1000 * 1000);
+ Reopen(&options);
dbfull()->TEST_PurgeObsoleteteWAL();
+
log_files = ListLogFiles(env_, archive_dir);
ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions());
options_.target_file_size_base = 1024*1024*1000;
options_.max_bytes_for_level_base = 1024*1024*1000;
options_.WAL_ttl_seconds = 300; // Used to test log files
+ options_.WAL_size_limit_MB = 1024; // Used to test log files
dbname_ = test::TmpDir() + "/deletefile_test";
DestroyDB(dbname_, options_);
numlevels_ = 7;
// seq_number. If the sequence number is non existent, it returns an iterator
// at the first available seq_no after the requested seq_no
// Returns Status::Ok if iterator is valid
- // Must set WAL_ttl_seconds to a large value to use this api, else the WAL
- // files will get cleared aggressively and the iterator might keep getting
- // invalid before an update is read.
+ // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
+ // use this api, else the WAL files will get
+ // cleared aggressively and the iterator might keep getting invalid before
+ // an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) = 0;
// be issued on this database.
bool disable_auto_compactions;
- // The number of seconds a WAL(write ahead log) should be kept after it has
- // been marked as Not Live. If the value is set. The WAL files are moved to
- // the archive directory and deleted after the given TTL.
- // If set to 0, WAL files are deleted as soon as they are not required by
- // the database.
- // If set to std::numeric_limits<uint64_t>::max() the WAL files will never be
- // deleted.
- // Default : 0
+ // The following two fields affect how archived logs will be deleted.
+ // 1. If both set to 0, logs will be deleted asap and will not get into
+ // the archive.
+ // 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
+ // WAL files will be checked every 10 min and if total size is greater
+ // then WAL_size_limit_MB, they will be deleted starting with the
+ // earliest until size_limit is met. All empty files will be deleted.
+ // 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then
+ // WAL files will be checked every WAL_ttl_secondsi / 2 and those which
+ // are older than WAL_ttl_seconds will be deleted.
+ // 4. If both are not 0, WAL files will be checked every 10 min and both
+ // checks will be performed with ttl being first.
uint64_t WAL_ttl_seconds;
+ uint64_t WAL_size_limit_MB;
// Number of bytes to preallocate (via fallocate) the manifest
// files. Default is 4mb, which is reasonable to reduce random IO
enum WalFileType {
/* Indicates that WAL file is in archive directory. WAL files are moved from
* the main db directory to archive directory once they are not live and stay
- * there for a duration of WAL_ttl_seconds which can be set in Options
+ * there until cleaned up. Files are cleaned depending on archive size
+ * (Options::WAL_size_limit_MB) and time since last cleaning
+ * (Options::WAL_ttl_seconds).
*/
kArchivedLogFile = 0,
DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should"
" perform.");
-DEFINE_uint64(wal_ttl, 1000, "the wal ttl for the run(in seconds)");
+DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
+DEFINE_uint64(wal_size_limit_MB, 10, "the wal size limit for the run"
+ "(in MB)");
int main(int argc, const char** argv) {
google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
- " --num_inserts=<num_inserts> --wal_ttl=<WAL_ttl_seconds>");
+ " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
+ " --wal_size_limit_MB=<WAL_size_limit_MB>");
google::ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
Env* env = Env::Default();
default_db_path += "db_repl_stress";
Options options;
options.create_if_missing = true;
- options.WAL_ttl_seconds = FLAGS_wal_ttl;
+ options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
+ options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
DB* db;
DestroyDB(default_db_path, options);
arena_block_size(0),
disable_auto_compactions(false),
WAL_ttl_seconds(0),
+ WAL_size_limit_MB(0),
manifest_preallocation_size(4 * 1024 * 1024),
purge_redundant_kvs_while_flush(true),
allow_os_buffer(true),
disable_auto_compactions);
Log(log," Options.WAL_ttl_seconds: %ld",
WAL_ttl_seconds);
+ Log(log," Options.WAL_size_limit_MB: %ld",
+ WAL_size_limit_MB);
Log(log," Options.manifest_preallocation_size: %ld",
manifest_preallocation_size);
Log(log," Options.purge_redundant_kvs_while_flush: %d",