count_--;
}
- // retrieve all snapshot numbers. They are sorted in ascending order.
+ // retrieve all snapshot numbers up until max_seq. They are sorted in
+ // ascending order.
std::vector<SequenceNumber> GetAll(
- SequenceNumber* oldest_write_conflict_snapshot = nullptr) const {
+ SequenceNumber* oldest_write_conflict_snapshot = nullptr,
+ const SequenceNumber& max_seq = kMaxSequenceNumber) const {
std::vector<SequenceNumber> ret;
if (oldest_write_conflict_snapshot != nullptr) {
}
const SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
+ if (s->next_->number_ > max_seq) {
+ break;
+ }
ret.push_back(s->next_->number_);
if (oldest_write_conflict_snapshot != nullptr &&
enum TxnDBWritePolicy {
WRITE_COMMITTED = 0, // write only the committed data
- WRITE_PREPARED, // write data after the prepare phase of 2pc
- WRITE_UNPREPARED // write data before the prepare phase of 2pc
+ // TODO(myabandeh): Not implemented yet
+ WRITE_PREPARED, // write data after the prepare phase of 2pc
+ // TODO(myabandeh): Not implemented yet
+ WRITE_UNPREPARED // write data before the prepare phase of 2pc
};
const uint32_t kInitialMaxDeadlocks = 5;
#ifndef ROCKSDB_LITE
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
#include "utilities/transactions/pessimistic_transaction_db.h"
+#include <inttypes.h>
#include <string>
#include <unordered_set>
#include <vector>
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {
assert(db_impl_ != nullptr);
+ info_log_ = db_impl_->GetDBOptions().info_log;
}
// Support initiliazing PessimisticTransactionDB from a stackable db
return false;
}
-void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); }
+void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
+ ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq);
+ WriteLock wl(&prepared_mutex_);
+ prepared_txns_.push(seq);
+}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
+ ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
+ prepare_seq, commit_seq);
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
+ // TODO(myabandeh) inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq;
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
+
// With each change to max_evicted_seq_ fetch the live snapshots behind it
+ SequenceNumber curr_seq;
+ std::vector<SequenceNumber> all_snapshots;
+ bool update_snapshots = false;
{
- WriteLock wl(&snapshots_mutex_);
InstrumentedMutex(db_impl_->mutex());
- snapshots_ = db_impl_->snapshots().GetAll();
+ // We use this to identify how fresh are the snapshot list. Since this
+ // is done atomically with obtaining the snapshot list, the one with
+ // the larger seq is more fresh. If the seq is equal the full snapshot
+ // list could be different since taking snapshots does not increase
+ // the db seq. However since we only care about snapshots before the
+ // new max, such recent snapshots would not be included the in the
+ // list anyway.
+ curr_seq = db_impl_->GetLatestSequenceNumber();
+ if (curr_seq > snapshots_version_) {
+ // This is to avoid updating the snapshots_ if it already updated
+ // with a more recent vesion by a concrrent thread
+ update_snapshots = true;
+ // We only care about snapshots lower then max
+ all_snapshots =
+ db_impl_->snapshots().GetAll(nullptr, max_evicted_seq);
+ }
+ }
+ if (update_snapshots) {
+ WriteLock wl(&snapshots_mutex_);
+ snapshots_version_ = curr_seq;
+ // We update the list concurrently with the readers.
+ // Both new and old lists are sorted and the new list is subset of the
+ // previous list plus some new items. Thus if a snapshot repeats in
+ // both new and old lists, it will appear upper in the new list. So if
+ // we simply insert the new snapshots in order, if an overwritten item
+ // is still valid in the new list is either written to the same place in
+ // the array or it is written in a higher palce before it gets
+ // overwritten by another item. This guarantess a reader that reads the
+ // list bottom-up will eventaully see a snapshot that repeats in the
+ // update, either before it gets overwritten by the writer or
+ // afterwards.
+ size_t i = 0;
+ auto it = all_snapshots.begin();
+ for (; it != all_snapshots.end() && i < SNAPSHOT_CACHE_SIZE;
+ it++, i++) {
+ snapshot_cache_[i].store(*it, std::memory_order_release);
+ }
+ snapshots_.clear();
+ for (; it != all_snapshots.end(); it++) {
+ // Insert them to a vector that is less efficient to access
+ // concurrently
+ snapshots_.push_back(*it);
+ }
+ // Update the size at the end. Otherwise a parallel reader might read
+ // items that are not set yet.
+ snapshots_total_.store(all_snapshots.size(), std::memory_order_release);
}
while (prev_max < max_evicted_seq &&
!max_evicted_seq_.compare_exchange_weak(
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
- {
+ // First check the snapshot cache that is efficient for concurrent access
+ auto cnt = snapshots_total_.load(std::memory_order_acquire);
+ // The list might get updated concurrently as we are reading from it. The
+ // reader should be able to read all the snapshots that are still valid
+ // after the update. Since the survived snapshots are written in a higher
+ // place before gets overwritten the reader that reads bottom-up will
+ // eventully see it.
+ const bool next_is_larger = true;
+ SequenceNumber snapshot_seq = kMaxSequenceNumber;
+ size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
+ for (; 0 < ip1; ip1--) {
+ snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
+ if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
+ snapshot_seq, !next_is_larger)) {
+ break;
+ }
+ }
+ if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
+ snapshot_seq < evicted.prep_seq)) {
+ // Then access the less efficient list of snapshots_
ReadLock rl(&snapshots_mutex_);
- for (auto snapshot_seq : snapshots_) {
- if (evicted.commit_seq <= snapshot_seq) {
+ // Items could have moved from the snapshots_ to snapshot_cache_ before
+ // accquiring the lock. To make sure that we do not miss a valid snapshot,
+ // read snapshot_cache_ again while holding the lock.
+ for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
+ snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
+ if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
+ snapshot_seq, next_is_larger)) {
break;
}
- // then snapshot_seq < evicted.commit_seq
- if (evicted.prep_seq <= snapshot_seq) { // overlapping range
- WriteLock wl(&old_commit_map_mutex_);
- old_commit_map_empty_.store(false, std::memory_order_release);
- old_commit_map_[evicted.prep_seq] = evicted.commit_seq;
+ }
+ for (auto snapshot_seq_2 : snapshots_) {
+ if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
+ snapshot_seq_2, next_is_larger)) {
+ break;
}
}
}
}
// 10m entry, 80MB size
-uint64_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
- static_cast<uint64_t>(1 << 21);
+size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
+size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE =
+ static_cast<size_t>(1 << 7);
+
+bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
+ const uint64_t& prep_seq, const uint64_t& commit_seq,
+ const uint64_t& snapshot_seq, const bool next_is_larger = true) {
+ // If we do not store an entry in old_commit_map we assume it is committed in
+ // all snapshots. if commit_seq <= snapshot_seq, it is considered already in
+ // the snapshot so we need not to keep the entry around for this snapshot.
+ if (commit_seq <= snapshot_seq) {
+ // continue the search if the next snapshot could be smaller than commit_seq
+ return !next_is_larger;
+ }
+ // then snapshot_seq < commit_seq
+ if (prep_seq <= snapshot_seq) { // overlapping range
+ WriteLock wl(&old_commit_map_mutex_);
+ old_commit_map_empty_.store(false, std::memory_order_release);
+ old_commit_map_[prep_seq] = commit_seq;
+ // Storing once is enough. No need to check it for other snapshots.
+ return false;
+ }
+ // continue the search if the next snapshot could be larger than prep_seq
+ return next_is_larger;
+}
+
} // namespace rocksdb
#endif // ROCKSDB_LITE
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
+ CommitEntry() : prep_seq(0), commit_seq(0) {}
+ CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
};
protected:
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
DBImpl* db_impl_;
+ std::shared_ptr<Logger> info_log_;
private:
+ friend class WritePreparedTxnDB;
const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_;
explicit WritePreparedTxnDB(DB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options),
+ SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
init(txn_db_options);
}
explicit WritePreparedTxnDB(StackableDB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options),
+ SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
init(txn_db_options);
}
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
void init(const TransactionDBOptions& /* unused */) {
+ snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
+ new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ =
unique_ptr<CommitEntry[]>(new CommitEntry[COMMIT_CACHE_SIZE]{});
}
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
- std::priority_queue<uint64_t> heap_;
- std::priority_queue<uint64_t> erased_heap_;
+ std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
+ heap_;
+ std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
+ erased_heap_;
public:
bool empty() { return heap_.empty(); }
}
void erase(uint64_t seq) {
if (!heap_.empty()) {
- if (heap_.top() < seq) {
+ if (seq < heap_.top()) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
heap_.pop();
bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry,
CommitEntry new_entry);
+ // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
+ // commit_seq. Return false if checking the next snapshot(s) is not needed.
+ // This is the case if the entry already added to old_commit_map_ or none of
+ // the next snapshots could satisfy the condition. next_is_larger: the next
+ // snapshot will be a larger value
+ bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
+ const uint64_t& commit_seq,
+ const uint64_t& snapshot_seq,
+ const bool next_is_larger);
+
// The list of live snapshots at the last time that max_evicted_seq_ advanced.
- // The list sorted in ascending order. Thread-safety is provided with
- // snapshots_mutex_.
+ // The list stored into two data structures: in snapshot_cache_ that is
+ // efficient for concurrent reads, and in snapshots_ if the data does not fit
+ // into snapshot_cache_. The total number of snapshots in the two lists
+ std::atomic<size_t> snapshots_total_ = {};
+ // The list sorted in ascending order. Thread-safety for writes is provided
+ // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
+ // each entry. In x86_64 architecture such reads are compiled to simple read
+ // instructions. 128 entries
+ // TODO(myabandeh): avoid non-const static variables
+ static size_t DEF_SNAPSHOT_CACHE_SIZE;
+ const size_t SNAPSHOT_CACHE_SIZE;
+ unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
+ // 2nd list for storing snapshots. The list sorted in ascending order.
+ // Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_;
+ // The version of the latest list of snapshots. This can be used to avoid
+ // rewrittiing a list that is concurrently updated with a more recent version.
+ SequenceNumber snapshots_version_ = 0;
+
// A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_.
PreparedHeap prepared_txns_;
- static uint64_t DEF_COMMIT_CACHE_SIZE;
- const uint64_t COMMIT_CACHE_SIZE;
+ // TODO(myabandeh): avoid non-const static variables
+ static size_t DEF_COMMIT_CACHE_SIZE;
+ const size_t COMMIT_CACHE_SIZE;
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<CommitEntry[]> commit_cache_;
#ifndef ROCKSDB_LITE
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
#include <algorithm>
#include <functional>
#include <string>
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
- WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
- 8; // will take effect after ReOpen
+ // will take effect after ReOpen
+ WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8;
+ // Same for snapshot cache size
+ WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
+ std::vector<const Snapshot*> to_be_released;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
- db->GetSnapshot();
+ auto tmp_snapshot = db->GetSnapshot();
+ to_be_released.push_back(tmp_snapshot);
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
- snapshot = db->GetSnapshot()->GetSequenceNumber();
+ auto tmp_snapshot = db->GetSnapshot();
+ to_be_released.push_back(tmp_snapshot);
+ snapshot = tmp_snapshot->GetSequenceNumber();
// We increase the db seq artificailly by a dummy Put. Check that this
// technique is effective and db seq is that same as ours.
ASSERT_EQ(snapshot, seq);
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
- printf(
- "max_snapshots %d max_gap %d seq %lu max %lu snapshot %lu "
- "gap_cnt %d num_snapshots %d\n",
- max_snapshots, max_gap, seq, wp_db->max_evicted_seq_.load(),
- snapshot, gap_cnt, num_snapshots);
+ printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
+ " snapshot %" PRIu64
+ " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
+ max_snapshots, max_gap, seq,
+ wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
+ num_snapshots, s);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
+ for (auto s : to_be_released) {
+ db->ReleaseSnapshot(s);
+ }
}
}
}