* Fix a bug that could return wrong results with `index_type=kHashSearch` and using `SetOptions` to change the `prefix_extractor`.
* Fixed a bug in WAL tracking with wal_compression. WAL compression writes a kSetCompressionType record which is not associated with any sequence number. As result, WalManager::GetSortedWalsOfType() will skip these WALs and not return them to caller, e.g. Checkpoint, Backup, causing the operations to fail.
* Avoid a crash if the IDENTITY file is accidentally truncated to empty. A new DB ID will be written and generated on Open.
+* Fixed a possible corruption for users of `manual_wal_flush` and/or `FlushWAL(true /* sync */)`, together with `track_and_verify_wals_in_manifest == true`. For those users, losing unsynced data (e.g., due to power loss) could make future DB opens fail with a `Status::Corruption` complaining about missing WAL data.
### Public API changes
* Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken.
}
Status DBImpl::SyncWAL() {
+ TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
autovector<log::Writer*, 1> logs_to_sync;
bool need_log_dir_sync;
uint64_t current_log_number;
current_log_number = logfile_number_;
while (logs_.front().number <= current_log_number &&
- logs_.front().getting_synced) {
+ logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
// First check that logs are safe to sync in background.
for (auto it = logs_.begin();
it != logs_.end() && it->number <= current_log_number; ++it) {
auto& log = *it;
- assert(!log.getting_synced);
- log.getting_synced = true;
+ log.PrepareForSync();
logs_to_sync.push_back(log.writer);
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
- assert(wal.getting_synced);
+ assert(wal.IsSyncing());
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
- wal.writer->file()->GetFileSize() > 0) {
- synced_wals.AddWal(wal.number,
- WalMetadata(wal.writer->file()->GetFileSize()));
+ wal.GetPreSyncSize() > 0) {
+ synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
if (logs_.size() > 1) {
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
- wal.getting_synced = false;
+ wal.FinishSync();
++it;
}
}
assert(logs_.empty() || logs_[0].number > up_to ||
- (logs_.size() == 1 && !logs_[0].getting_synced));
+ (logs_.size() == 1 && !logs_[0].IsSyncing()));
Status s;
if (synced_wals.IsWalAddition()) {
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
- assert(wal.getting_synced);
- wal.getting_synced = false;
+ wal.FinishSync();
}
log_sync_cv_.SignalAll();
}
return s;
}
+ bool IsSyncing() { return getting_synced; }
+
+ uint64_t GetPreSyncSize() {
+ assert(getting_synced);
+ return pre_sync_size;
+ }
+
+ void PrepareForSync() {
+ assert(!getting_synced);
+ // Size is expected to be monotonically increasing.
+ assert(writer->file()->GetFlushedSize() >= pre_sync_size);
+ getting_synced = true;
+ pre_sync_size = writer->file()->GetFlushedSize();
+ }
+
+ void FinishSync() {
+ assert(getting_synced);
+ getting_synced = false;
+ }
+
uint64_t number;
// Visual Studio doesn't support deque's member to be noncopyable because
// of a std::unique_ptr as a member.
log::Writer* writer; // own
+
+ private:
// true for some prefix of logs_
bool getting_synced = false;
+ // The size of the file before the sync happens. This amount is guaranteed
+ // to be persisted even if appends happen during sync so it can be used for
+ // tracking the synced size in MANIFEST.
+ uint64_t pre_sync_size = 0;
};
// PurgeFileInfo is a structure to hold information of files to be deleted in
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
- logs_.front().getting_synced) {
+ logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
for (auto it = logs_.begin();
it != logs_.end() && it->number < current_log_number; ++it) {
auto& log = *it;
- assert(!log.getting_synced);
- log.getting_synced = true;
+ log.PrepareForSync();
logs_to_sync.push_back(log.writer);
}
}
while (!logs_.empty() && logs_.front().number < min_log_number) {
auto& log = logs_.front();
- if (log.getting_synced) {
+ if (log.IsSyncing()) {
log_sync_cv_.Wait();
// logs_ could have changed while we were waiting.
continue;
// Note: there does not seem to be a reason to wait for parallel sync at
// this early step but it is not important since parallel sync (SyncWAL) and
// need_log_sync are usually not used together.
- while (logs_.front().getting_synced) {
+ while (logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
for (auto& log : logs_) {
- assert(!log.getting_synced);
// This is just to prevent the logs to be synced by a parallel SyncWAL
// call. We will do the actual syncing later after we will write to the
// WAL.
// Note: there does not seem to be a reason to set this early before we
// actually write to the WAL
- log.getting_synced = true;
+ log.PrepareForSync();
}
} else {
*need_log_sync = false;
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
}
+TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
+ // Repro race condition bug where unflushed WAL data extended the synced size
+ // recorded to MANIFEST despite being unrecoverable.
+ Options options = GetOptions();
+ std::unique_ptr<FaultInjectionTestEnv> fault_env(
+ new FaultInjectionTestEnv(env_));
+ options.env = fault_env.get();
+ options.manual_wal_flush = true;
+ options.track_and_verify_wals_in_manifest = true;
+ Reopen(options);
+
+ ASSERT_OK(Put("key1", "val1"));
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::SyncWAL:Begin",
+ [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db_->FlushWAL(true /* sync */));
+
+ // Ensure callback ran.
+ ASSERT_EQ("val2", Get("key2"));
+
+ Close();
+
+ // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
+ // DB WAL.
+ fault_env->DropUnsyncedFileData();
+
+ Reopen(options);
+
+ // Need to close before `fault_env` goes out of scope.
+ Close();
+}
+
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
left -= allowed;
src += allowed;
+ uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+ flushed_size_.store(cur_size + allowed, std::memory_order_release);
}
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
// the corresponding checksum value
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
+ uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+ flushed_size_.store(cur_size + left, std::memory_order_release);
return s;
}
left -= size;
src += size;
write_offset += size;
+ uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+ flushed_size_.store(cur_size + size, std::memory_order_release);
assert((next_write_offset_ % alignment) == 0);
}
IOSTATS_ADD(bytes_written, left);
assert((next_write_offset_ % alignment) == 0);
+ uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+ flushed_size_.store(cur_size + left, std::memory_order_release);
if (s.ok()) {
// Move the tail to the beginning of the buffer
// Actually written data size can be used for truncate
// not counting padding data
std::atomic<uint64_t> filesize_;
+ std::atomic<uint64_t> flushed_size_;
#ifndef ROCKSDB_LITE
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
filesize_(0),
+ flushed_size_(0),
#ifndef ROCKSDB_LITE
next_write_offset_(0),
#endif // ROCKSDB_LITE
return filesize_.load(std::memory_order_acquire);
}
+ // Returns the size of data flushed to the underlying `FSWritableFile`.
+ // Expected to match `writable_file()->GetFileSize()`.
+ // The return value can serve as a lower-bound for the amount of data synced
+ // by a future call to `SyncWithoutFlush()`.
+ uint64_t GetFlushedSize() const {
+ return flushed_size_.load(std::memory_order_acquire);
+ }
+
IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}
delete checkpoint;
}
+TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
+ // Repro for a race condition where a user write comes in after the checkpoint
+ // syncs WAL for `track_and_verify_wals_in_manifest` but before the
+ // corresponding MANIFEST update. With the bug, that scenario resulted in an
+ // unopenable DB with error "Corruption: Size mismatch: WAL ...".
+ Options options = CurrentOptions();
+ std::unique_ptr<FaultInjectionTestEnv> fault_env(
+ new FaultInjectionTestEnv(env_));
+ options.env = fault_env.get();
+ options.track_and_verify_wals_in_manifest = true;
+ Reopen(options);
+
+ ASSERT_OK(Put("key1", "val1"));
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
+ [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::unique_ptr<Checkpoint> checkpoint;
+ {
+ Checkpoint* checkpoint_ptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
+ checkpoint.reset(checkpoint_ptr);
+ }
+
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+
+ // Ensure callback ran.
+ ASSERT_EQ("val2", Get("key2"));
+
+ Close();
+
+ // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
+ // DB WAL.
+ fault_env->DropUnsyncedFileData();
+
+ // Before the bug fix, reopening the DB would fail because the MANIFEST's
+ // AddWal entry indicated the WAL should be synced through "key2" -> "val2".
+ Reopen(options);
+
+ // Need to close before `fault_env` goes out of scope.
+ Close();
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {