* Fixed a bug where a snapshot taken during SST file ingestion would be unstable.
* Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error.
* Fixed a bug where RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful.
+* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed.
+* Fix unprotected concurrent accesses to `WritableFileWriter::filesize_` by `DB::SyncWAL()` and `DB::Put()` in two write queue mode.
+* Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted.
+
### Public API changes
* Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken.
### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
-### Bug Fixes
-* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed.
-
## 7.3.0 (05/20/2022)
### Bug Fixes
* Fixed a bug where manual flush would block forever even though flush options had wait=false.
Reopen(options);
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
}
+
+TEST_F(DBBasicTest, ManualWalSync) {
+ Options options = CurrentOptions();
+ options.track_and_verify_wals_in_manifest = true;
+ options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("x", "y"));
+ // This does not create a new WAL.
+ ASSERT_OK(db_->SyncWAL());
+ EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
+
+ std::unique_ptr<LogFile> wal;
+ Status s = db_->GetCurrentWalFile(&wal);
+ ASSERT_OK(s);
+ Close();
+
+ EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
+
+ ASSERT_TRUE(TryReopen(options).IsCorruption());
+}
#endif // !ROCKSDB_LITE
// A test class for intercepting random reads and injecting artificial
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.getting_synced);
+ if (immutable_db_options_.track_and_verify_wals_in_manifest &&
+ wal.writer->file()->GetFileSize() > 0) {
+ synced_wals.AddWal(wal.number,
+ WalMetadata(wal.writer->file()->GetFileSize()));
+ }
+
if (logs_.size() > 1) {
- if (immutable_db_options_.track_and_verify_wals_in_manifest &&
- wal.writer->file()->GetFileSize() > 0) {
- synced_wals.AddWal(wal.number,
- WalMetadata(wal.writer->file()->GetFileSize()));
- }
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
TEST_KILL_RANDOM("WritableFileWriter::Append:1");
if (s.ok()) {
- filesize_ += data.size();
+ uint64_t cur_size = filesize_.load(std::memory_order_acquire);
+ filesize_.store(cur_size + data.size(), std::memory_order_release);
}
return s;
}
cap = buf_.Capacity() - buf_.CurrentSize();
}
pending_sync_ = true;
- filesize_ += pad_bytes;
+ uint64_t cur_size = filesize_.load(std::memory_order_acquire);
+ filesize_.store(cur_size + pad_bytes, std::memory_order_release);
if (perform_data_verification_) {
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_,
start_ts = FileOperationInfo::StartNow();
}
#endif
- interim = writable_file_->Truncate(filesize_, io_options, nullptr);
+ uint64_t filesz = filesize_.load(std::memory_order_acquire);
+ interim = writable_file_->Truncate(filesz, io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
- filesize_);
+ filesz);
}
}
#endif
const uint64_t kBytesNotSyncRange =
1024 * 1024; // recent 1MB is not synced.
const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
- if (filesize_ > kBytesNotSyncRange) {
- uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
+ uint64_t cur_size = filesize_.load(std::memory_order_acquire);
+ if (cur_size > kBytesNotSyncRange) {
+ uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 &&
size_t max_buffer_size_;
// Actually written data size can be used for truncate
// not counting padding data
- uint64_t filesize_;
+ std::atomic<uint64_t> filesize_;
#ifndef ROCKSDB_LITE
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
// returns NotSupported status.
IOStatus SyncWithoutFlush(bool use_fsync);
- uint64_t GetFileSize() const { return filesize_; }
+ uint64_t GetFileSize() const {
+ return filesize_.load(std::memory_order_acquire);
+ }
IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);