IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size,
Env::IOPriority rate_limiter_priority,
- bool with_db_mutex = false, bool with_log_mutex = false);
+ LogFileNumberSize& log_file_number_size);
IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
- SequenceNumber sequence);
+ SequenceNumber sequence,
+ LogFileNumberSize& log_file_number_size);
IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used,
// are protected by locking both mutex_ and log_write_mutex_, and reads must
// be under either mutex_ or log_write_mutex_.
std::deque<LogFileNumberSize> alive_log_files_;
- // Caching the result of `alive_log_files_.back()` so that we do not have to
- // call `alive_log_files_.back()` in the write thread (WriteToWAL()) which
- // requires locking db mutex if log_mutex_ is not already held in
- // two-write-queues mode.
- std::deque<LogFileNumberSize>::reverse_iterator alive_log_files_tail_;
+
// Log files that aren't fully synced, and the current log file.
// Synchronization:
// - push_back() is done from write_thread_ with locked mutex_ and
total_log_size_ += log.size;
alive_log_files_.push_back(log);
}
- alive_log_files_tail_ = alive_log_files_.rbegin();
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
}
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));
- impl->alive_log_files_tail_ = impl->alive_log_files_.rbegin();
if (impl->two_write_queues_) {
impl->log_write_mutex_.Unlock();
}
WriteOptions write_options;
uint64_t log_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer;
+ LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back();
+
+ assert(log_writer->get_log_number() == log_file_number_size.number);
+ impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
- Env::IO_TOTAL, /*with_db_mutex==*/true);
+ Env::IO_TOTAL, log_file_number_size);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
PERF_TIMER_START(write_pre_and_post_process_time);
}
+
log::Writer* log_writer = logs_.back().writer;
+ LogFileNumberSize& log_file_number_size = alive_log_files_.back();
+
+ assert(log_writer->get_log_number() == log_file_number_size.number);
mutex_.Unlock();
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
- need_log_dir_sync, last_sequence + 1);
+ need_log_dir_sync, last_sequence + 1,
+ log_file_number_size);
}
} else {
if (status.ok() && !write_options.disableWAL) {
w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time);
log::Writer* log_writer = logs_.back().writer;
+ LogFileNumberSize& log_file_number_size = alive_log_files_.back();
+
+ assert(log_writer->get_log_number() == log_file_number_size.number);
+
mutex_.Unlock();
// This can set non-OK status if callback fail.
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
- io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
- need_log_dir_sync, current_sequence);
+ io_s =
+ WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
+ need_log_dir_sync, current_sequence, log_file_number_size);
w.status = io_s;
}
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
Env::IOPriority rate_limiter_priority,
- bool with_db_mutex, bool with_log_mutex) {
+ LogFileNumberSize& log_file_number_size) {
assert(log_size != nullptr);
- // Assert mutex explicitly.
- if (with_db_mutex) {
- mutex_.AssertHeld();
- } else if (two_write_queues_) {
- log_write_mutex_.AssertHeld();
- assert(with_log_mutex);
- }
-
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
*log_used = logfile_number_;
}
total_log_size_ += log_entry.size();
- if (with_db_mutex || with_log_mutex) {
- assert(alive_log_files_tail_ == alive_log_files_.rbegin());
- assert(alive_log_files_tail_ != alive_log_files_.rend());
- }
- LogFileNumberSize& last_alive_log = *alive_log_files_tail_;
- last_alive_log.AddSize(*log_size);
+ log_file_number_size.AddSize(*log_size);
log_empty_ = false;
return io_s;
}
IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
- SequenceNumber sequence) {
+ SequenceNumber sequence,
+ LogFileNumberSize& log_file_number_size) {
IOStatus io_s;
assert(!two_write_queues_);
assert(!write_group.leader->disable_wal);
uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
- write_group.leader->rate_limiter_priority);
+ write_group.leader->rate_limiter_priority,
+ log_file_number_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
WriteBatchInternal::SetSequence(merged_batch, sequence);
log::Writer* log_writer = logs_.back().writer;
+ LogFileNumberSize& log_file_number_size = alive_log_files_.back();
+
+ assert(log_writer->get_log_number() == log_file_number_size.number);
+
uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
write_group.leader->rate_limiter_priority,
- /*with_db_mutex=*/false, /*with_log_mutex=*/true);
+ log_file_number_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
log_dir_synced_ = false;
logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
- alive_log_files_tail_ = alive_log_files_.rbegin();
}
log_write_mutex_.Unlock();
}