* Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations.
* Fix a bug on POSIX in which failure to create a lock file (e.g. out of space) can prevent future LockFile attempts in the same process on the same file from succeeding.
* Fix a bug that backup_rate_limiter and restore_rate_limiter in BackupEngine could not limit read rates.
+* Fix WAL log data corruption when using DBOptions.manual_wal_flush(true) and WriteOptions.sync(true) together. The sync WAL should work with locked log_write_mutex_.
### New Features
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
// writer thread, so no one will push to logs_,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
+ //
+ // Sync operation should work with locked log_write_mutex_, because:
+ // when DBOptions.manual_wal_flush_ is set,
+ // FlushWAL function will be invoked by another thread.
+ // if without locked log_write_mutex_, the log file may get data
+ // corruption
+
+ const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
+ if (UNLIKELY(needs_locking)) {
+ log_write_mutex_.Lock();
+ }
+
for (auto& log : logs_) {
io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!io_s.ok()) {
}
}
+ if (UNLIKELY(needs_locking)) {
+ log_write_mutex_.Unlock();
+ }
+
if (io_s.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
}
}
+// This test failure will be caught with a probability
+TEST_F(DBTest, ManualFlushWalAndWriteRace) {
+ Options options;
+ options.env = env_;
+ options.manual_wal_flush = true;
+ options.create_if_missing = true;
+
+ DestroyAndReopen(options);
+
+ WriteOptions wopts;
+ wopts.sync = true;
+
+ port::Thread writeThread([&]() {
+ for (int i = 0; i < 100; i++) {
+ auto istr = ToString(i);
+ dbfull()->Put(wopts, "key_" + istr, "value_" + istr);
+ }
+ });
+ port::Thread flushThread([&]() {
+ for (int i = 0; i < 100; i++) {
+ ASSERT_OK(dbfull()->FlushWAL(false));
+ }
+ });
+
+ writeThread.join();
+ flushThread.join();
+ ASSERT_OK(dbfull()->Put(wopts, "foo1", "value1"));
+ ASSERT_OK(dbfull()->Put(wopts, "foo2", "value2"));
+ Reopen(options);
+ ASSERT_EQ("value1", Get("foo1"));
+ ASSERT_EQ("value2", Get("foo2"));
+}
+
#ifndef ROCKSDB_LITE
TEST_F(DBTest, DynamicMemtableOptions) {
const uint64_t k64KB = 1 << 16;