bool need_log_sync = write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
- if (!two_write_queues_ || !disable_memtable) {
+ assert(!two_write_queues_ || !disable_memtable);
+ {
// With concurrent writes we do preprocess only in the write thread that
// also does write to memtable to avoid sync issue on shared data structure
// with the other thread
return b->is_latest_persistent_state_;
}
-void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
+void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) {
b->is_latest_persistent_state_ = true;
}
// This write batch includes the latest state that should be persisted. Such
// state meant to be used only during recovery.
- static void SetAsLastestPersistentState(WriteBatch* b);
+ static void SetAsLatestPersistentState(WriteBatch* b);
static bool IsLatestPersistentState(const WriteBatch* b);
};
// When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable
- WriteBatchInternal::SetAsLastestPersistentState(working_batch);
+ WriteBatchInternal::SetAsLatestPersistentState(working_batch);
}
auto prepare_seq = GetId();
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
- if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
- if (s.ok()) {
- // Note: RemovePrepared should be called after WriteImpl that publishsed
- // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
- wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
- }
- wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
- } // else RemovePrepared is called from within PreReleaseCallback
return s;
}
prev_max, max_evicted_seq);
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
}
- // After each eviction from commit cache, check if the commit entry should
- // be kept around because it overlaps with a live snapshot.
- CheckAgainstSnapshots(evicted);
if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
WriteLock wl(&prepared_mutex_);
- for (auto dp : delayed_prepared_) {
- if (dp == evicted.prep_seq) {
- // This is a rare case that txn is committed but prepared_txns_ is not
- // cleaned up yet. Refer to delayed_prepared_commits_ definition for
- // why it should be kept updated.
- delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
- ROCKS_LOG_DEBUG(info_log_,
- "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
- evicted.prep_seq, evicted.commit_seq);
- break;
- }
+ auto dp_iter = delayed_prepared_.find(evicted.prep_seq);
+ if (dp_iter != delayed_prepared_.end()) {
+ // This is a rare case that txn is committed but prepared_txns_ is not
+ // cleaned up yet. Refer to delayed_prepared_commits_ definition for
+ // why it should be kept updated.
+ delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
+ ROCKS_LOG_DEBUG(info_log_,
+ "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
+ evicted.prep_seq, evicted.commit_seq);
}
}
+ // After each eviction from commit cache, check if the commit entry should
+ // be kept around because it overlaps with a live snapshot.
+ CheckAgainstSnapshots(evicted);
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
*min =
static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
*max = static_cast_with_check<const SnapshotImpl>(snapshot)->number_;
+ // A duplicate of the check in EnhanceSnapshot().
+ assert(*min <= *max + 1);
return kBackedByDBSnapshot;
} else {
*min = SmallestUnCommittedSeq();
// When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable
- WriteBatchInternal::SetAsLastestPersistentState(working_batch);
+ WriteBatchInternal::SetAsLatestPersistentState(working_batch);
}
const bool includes_data = !empty && !for_recovery;