]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
WriteAtPrepare: Efficient read from snapshot list
authorMaysam Yabandeh <myabandeh@fb.com>
Sat, 26 Aug 2017 07:53:13 +0000 (00:53 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Sat, 26 Aug 2017 08:00:38 +0000 (01:00 -0700)
Summary:
Divide the old snapshots to two lists: a few that fit into a cached array and the rest in a vector, which is expected to be empty in normal cases. The former is to optimize concurrent reads from snapshots without requiring locks. It is done by an array of std::atomic, from which std::memory_order_acquire reads are compiled to simple read instructions in most of the x86_64 architectures.
Closes https://github.com/facebook/rocksdb/pull/2758

Differential Revision: D5660504

Pulled By: maysamyabandeh

fbshipit-source-id: 524fcf9a8e7f90a92324536456912a99aaa6740c

db/snapshot_impl.h
include/rocksdb/utilities/transaction_db.h
utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/pessimistic_transaction_db.h
utilities/transactions/transaction_test.cc

index 8441050fd2cd26d1eac78610b95a790bf2b03a5f..ad9c1a9fbccb77dc007488b5554598109bbf704a 100644 (file)
@@ -74,9 +74,11 @@ class SnapshotList {
     count_--;
   }
 
-  // retrieve all snapshot numbers. They are sorted in ascending order.
+  // retrieve all snapshot numbers up until max_seq. They are sorted in
+  // ascending order.
   std::vector<SequenceNumber> GetAll(
-      SequenceNumber* oldest_write_conflict_snapshot = nullptr) const {
+      SequenceNumber* oldest_write_conflict_snapshot = nullptr,
+      const SequenceNumber& max_seq = kMaxSequenceNumber) const {
     std::vector<SequenceNumber> ret;
 
     if (oldest_write_conflict_snapshot != nullptr) {
@@ -88,6 +90,9 @@ class SnapshotList {
     }
     const SnapshotImpl* s = &list_;
     while (s->next_ != &list_) {
+      if (s->next_->number_ > max_seq) {
+        break;
+      }
       ret.push_back(s->next_->number_);
 
       if (oldest_write_conflict_snapshot != nullptr &&
index 7a592c4f6cf6bd169ce0d48b376945a248f90922..77043897a70cd64efd2958ab35be7c77de3b8423 100644 (file)
@@ -25,8 +25,10 @@ class TransactionDBMutexFactory;
 
 enum TxnDBWritePolicy {
   WRITE_COMMITTED = 0,  // write only the committed data
-  WRITE_PREPARED,       // write data after the prepare phase of 2pc
-  WRITE_UNPREPARED      // write data before the prepare phase of 2pc
+  // TODO(myabandeh): Not implemented yet
+  WRITE_PREPARED,  // write data after the prepare phase of 2pc
+  // TODO(myabandeh): Not implemented yet
+  WRITE_UNPREPARED  // write data before the prepare phase of 2pc
 };
 
 const uint32_t kInitialMaxDeadlocks = 5;
index 07c3eeeeb04f830d666a9daed84c17c97060835b..8fa9575e43032945a9a46a4aa391c286da0d670e 100644 (file)
@@ -5,8 +5,13 @@
 
 #ifndef ROCKSDB_LITE
 
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
 #include "utilities/transactions/pessimistic_transaction_db.h"
 
+#include <inttypes.h>
 #include <string>
 #include <unordered_set>
 #include <vector>
@@ -34,6 +39,7 @@ PessimisticTransactionDB::PessimisticTransactionDB(
                     : std::shared_ptr<TransactionDBMutexFactory>(
                           new TransactionDBMutexFactoryImpl())) {
   assert(db_impl_ != nullptr);
+  info_log_ = db_impl_->GetDBOptions().info_log;
 }
 
 // Support initiliazing PessimisticTransactionDB from a stackable db
@@ -581,16 +587,23 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
   return false;
 }
 
-void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); }
+void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
+  ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq);
+  WriteLock wl(&prepared_mutex_);
+  prepared_txns_.push(seq);
+}
 
 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
                                       uint64_t commit_seq) {
+  ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
+                  prepare_seq, commit_seq);
   auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
   CommitEntry evicted;
   bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted);
   if (to_be_evicted) {
     auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
     if (prev_max < evicted.commit_seq) {
+      // TODO(myabandeh) inc max in larger steps to avoid frequent updates
       auto max_evicted_seq = evicted.commit_seq;
       // When max_evicted_seq_ advances, move older entries from prepared_txns_
       // to delayed_prepared_. This guarantees that if a seq is lower than max,
@@ -607,11 +620,59 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
           delayed_prepared_empty_.store(false, std::memory_order_release);
         }
       }
+
       // With each change to max_evicted_seq_ fetch the live snapshots behind it
+      SequenceNumber curr_seq;
+      std::vector<SequenceNumber> all_snapshots;
+      bool update_snapshots = false;
       {
-        WriteLock wl(&snapshots_mutex_);
         InstrumentedMutex(db_impl_->mutex());
-        snapshots_ = db_impl_->snapshots().GetAll();
+        // We use this to identify how fresh are the snapshot list. Since this
+        // is done atomically with obtaining the snapshot list, the one with
+        // the larger seq is more fresh. If the seq is equal the full snapshot
+        // list could be different since taking snapshots does not increase
+        // the db seq. However since we only care about snapshots before the
+        // new max, such recent snapshots would not be included the in the
+        // list anyway.
+        curr_seq = db_impl_->GetLatestSequenceNumber();
+        if (curr_seq > snapshots_version_) {
+          // This is to avoid updating the snapshots_ if it already updated
+          // with a more recent vesion by a concrrent thread
+          update_snapshots = true;
+          // We only care about snapshots lower then max
+          all_snapshots =
+              db_impl_->snapshots().GetAll(nullptr, max_evicted_seq);
+        }
+      }
+      if (update_snapshots) {
+        WriteLock wl(&snapshots_mutex_);
+        snapshots_version_ = curr_seq;
+        // We update the list concurrently with the readers.
+        // Both new and old lists are sorted and the new list is subset of the
+        // previous list plus some new items. Thus if a snapshot repeats in
+        // both new and old lists, it will appear upper in the new list. So if
+        // we simply insert the new snapshots in order, if an overwritten item
+        // is still valid in the new list is either written to the same place in
+        // the array or it is written in a higher palce before it gets
+        // overwritten by another item. This guarantess a reader that reads the
+        // list bottom-up will eventaully see a snapshot that repeats in the
+        // update, either before it gets overwritten by the writer or
+        // afterwards.
+        size_t i = 0;
+        auto it = all_snapshots.begin();
+        for (; it != all_snapshots.end() && i < SNAPSHOT_CACHE_SIZE;
+             it++, i++) {
+          snapshot_cache_[i].store(*it, std::memory_order_release);
+        }
+        snapshots_.clear();
+        for (; it != all_snapshots.end(); it++) {
+          // Insert them to a vector that is less efficient to access
+          // concurrently
+          snapshots_.push_back(*it);
+        }
+        // Update the size at the end. Otherwise a parallel reader might read
+        // items that are not set yet.
+        snapshots_total_.store(all_snapshots.size(), std::memory_order_release);
       }
       while (prev_max < max_evicted_seq &&
              !max_evicted_seq_.compare_exchange_weak(
@@ -621,17 +682,41 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
     }
     // After each eviction from commit cache, check if the commit entry should
     // be kept around because it overlaps with a live snapshot.
-    {
+    // First check the snapshot cache that is efficient for concurrent access
+    auto cnt = snapshots_total_.load(std::memory_order_acquire);
+    // The list might get updated concurrently as we are reading from it. The
+    // reader should be able to read all the snapshots that are still valid
+    // after the update. Since the survived snapshots are written in a higher
+    // place before gets overwritten the reader that reads bottom-up will
+    // eventully see it.
+    const bool next_is_larger = true;
+    SequenceNumber snapshot_seq = kMaxSequenceNumber;
+    size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
+    for (; 0 < ip1; ip1--) {
+      snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
+      if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
+                                   snapshot_seq, !next_is_larger)) {
+        break;
+      }
+    }
+    if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
+                 snapshot_seq < evicted.prep_seq)) {
+      // Then access the less efficient list of snapshots_
       ReadLock rl(&snapshots_mutex_);
-      for (auto snapshot_seq : snapshots_) {
-        if (evicted.commit_seq <= snapshot_seq) {
+      // Items could have moved from the snapshots_ to snapshot_cache_ before
+      // accquiring the lock. To make sure that we do not miss a valid snapshot,
+      // read snapshot_cache_ again while holding the lock.
+      for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
+        snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
+        if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
+                                     snapshot_seq, next_is_larger)) {
           break;
         }
-        // then snapshot_seq < evicted.commit_seq
-        if (evicted.prep_seq <= snapshot_seq) {  // overlapping range
-          WriteLock wl(&old_commit_map_mutex_);
-          old_commit_map_empty_.store(false, std::memory_order_release);
-          old_commit_map_[evicted.prep_seq] = evicted.commit_seq;
+      }
+      for (auto snapshot_seq_2 : snapshots_) {
+        if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
+                                     snapshot_seq_2, next_is_larger)) {
+          break;
         }
       }
     }
@@ -691,7 +776,31 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq,
 }
 
 // 10m entry, 80MB size
-uint64_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
-    static_cast<uint64_t>(1 << 21);
+size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
+size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE =
+    static_cast<size_t>(1 << 7);
+
+bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
+    const uint64_t& prep_seq, const uint64_t& commit_seq,
+    const uint64_t& snapshot_seq, const bool next_is_larger = true) {
+  // If we do not store an entry in old_commit_map we assume it is committed in
+  // all snapshots. if commit_seq <= snapshot_seq, it is considered already in
+  // the snapshot so we need not to keep the entry around for this snapshot.
+  if (commit_seq <= snapshot_seq) {
+    // continue the search if the next snapshot could be smaller than commit_seq
+    return !next_is_larger;
+  }
+  // then snapshot_seq < commit_seq
+  if (prep_seq <= snapshot_seq) {  // overlapping range
+    WriteLock wl(&old_commit_map_mutex_);
+    old_commit_map_empty_.store(false, std::memory_order_release);
+    old_commit_map_[prep_seq] = commit_seq;
+    // Storing once is enough. No need to check it for other snapshots.
+    return false;
+  }
+  // continue the search if the next snapshot could be larger than prep_seq
+  return next_is_larger;
+}
+
 }  //  namespace rocksdb
 #endif  // ROCKSDB_LITE
index 489da30bfcb2ec8c995903b4ea02c4e8f5fd2cf5..e3eec6b602aa4809b5f8fff018479f297767b043 100644 (file)
@@ -107,6 +107,8 @@ class PessimisticTransactionDB : public TransactionDB {
   struct CommitEntry {
     uint64_t prep_seq;
     uint64_t commit_seq;
+    CommitEntry() : prep_seq(0), commit_seq(0) {}
+    CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
   };
 
  protected:
@@ -114,8 +116,10 @@ class PessimisticTransactionDB : public TransactionDB {
       Transaction* txn, const WriteOptions& write_options,
       const TransactionOptions& txn_options = TransactionOptions());
   DBImpl* db_impl_;
+  std::shared_ptr<Logger> info_log_;
 
  private:
+  friend class WritePreparedTxnDB;
   const TransactionDBOptions txn_db_options_;
   TransactionLockMgr lock_mgr_;
 
@@ -162,6 +166,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   explicit WritePreparedTxnDB(DB* db,
                               const TransactionDBOptions& txn_db_options)
       : PessimisticTransactionDB(db, txn_db_options),
+        SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE),
         COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
     init(txn_db_options);
   }
@@ -169,6 +174,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   explicit WritePreparedTxnDB(StackableDB* db,
                               const TransactionDBOptions& txn_db_options)
       : PessimisticTransactionDB(db, txn_db_options),
+        SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE),
         COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
     init(txn_db_options);
   }
@@ -192,6 +198,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
 
   void init(const TransactionDBOptions& /* unused */) {
+    snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
+        new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
     commit_cache_ =
         unique_ptr<CommitEntry[]>(new CommitEntry[COMMIT_CACHE_SIZE]{});
   }
@@ -199,8 +207,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // A heap with the amortized O(1) complexity for erase. It uses one extra heap
   // to keep track of erased entries that are not yet on top of the main heap.
   class PreparedHeap {
-    std::priority_queue<uint64_t> heap_;
-    std::priority_queue<uint64_t> erased_heap_;
+    std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
+        heap_;
+    std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
+        erased_heap_;
 
    public:
     bool empty() { return heap_.empty(); }
@@ -216,7 +226,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
     }
     void erase(uint64_t seq) {
       if (!heap_.empty()) {
-        if (heap_.top() < seq) {
+        if (seq < heap_.top()) {
           // Already popped, ignore it.
         } else if (heap_.top() == seq) {
           heap_.pop();
@@ -242,15 +252,42 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry,
                            CommitEntry new_entry);
 
+  // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
+  // commit_seq. Return false if checking the next snapshot(s) is not needed.
+  // This is the case if the entry already added to old_commit_map_ or none of
+  // the next snapshots could satisfy the condition. next_is_larger: the next
+  // snapshot will be a larger value
+  bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
+                               const uint64_t& commit_seq,
+                               const uint64_t& snapshot_seq,
+                               const bool next_is_larger);
+
   // The list of live snapshots at the last time that max_evicted_seq_ advanced.
-  // The list sorted in ascending order. Thread-safety is provided with
-  // snapshots_mutex_.
+  // The list stored into two data structures: in snapshot_cache_ that is
+  // efficient for concurrent reads, and in snapshots_ if the data does not fit
+  // into snapshot_cache_. The total number of snapshots in the two lists
+  std::atomic<size_t> snapshots_total_ = {};
+  // The list sorted in ascending order. Thread-safety for writes is provided
+  // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
+  // each entry. In x86_64 architecture such reads are compiled to simple read
+  // instructions. 128 entries
+  // TODO(myabandeh): avoid non-const static variables
+  static size_t DEF_SNAPSHOT_CACHE_SIZE;
+  const size_t SNAPSHOT_CACHE_SIZE;
+  unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
+  // 2nd list for storing snapshots. The list sorted in ascending order.
+  // Thread-safety is provided with snapshots_mutex_.
   std::vector<SequenceNumber> snapshots_;
+  // The version of the latest list of snapshots. This can be used to avoid
+  // rewrittiing a list that is concurrently updated with a more recent version.
+  SequenceNumber snapshots_version_ = 0;
+
   // A heap of prepared transactions. Thread-safety is provided with
   // prepared_mutex_.
   PreparedHeap prepared_txns_;
-  static uint64_t DEF_COMMIT_CACHE_SIZE;
-  const uint64_t COMMIT_CACHE_SIZE;
+  // TODO(myabandeh): avoid non-const static variables
+  static size_t DEF_COMMIT_CACHE_SIZE;
+  const size_t COMMIT_CACHE_SIZE;
   // commit_cache_ must be initialized to zero to tell apart an empty index from
   // a filled one. Thread-safety is provided with commit_cache_mutex_.
   unique_ptr<CommitEntry[]> commit_cache_;
index 2e8c87f49ac5ce3136349180e4f5dfd1ffd1934a..eac8e563d7b27ec4de230ff786687ffe1f379f7b 100644 (file)
@@ -5,6 +5,11 @@
 
 #ifndef ROCKSDB_LITE
 
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
 #include <algorithm>
 #include <functional>
 #include <string>
@@ -4734,8 +4739,10 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
   WriteOptions wo;
   // Use small commit cache to trigger lots of eviction and fast advance of
   // max_evicted_seq_
-  WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
-      8;  // will take effect after ReOpen
+  // will take effect after ReOpen
+  WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8;
+  // Same for snapshot cache size
+  WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5;
 
   // Take some preliminary snapshots first. This is to stress the data structure
   // that holds the old snapshots as it will be designed to be efficient when
@@ -4755,6 +4762,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
       uint64_t cur_txn = 0;
       // Number of snapshots taken so far
       int num_snapshots = 0;
+      std::vector<const Snapshot*> to_be_released;
       // Number of gaps applied so far
       int gap_cnt = 0;
       // The final snapshot that we will inspect
@@ -4800,14 +4808,17 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
 
         if (num_snapshots < max_snapshots - 1) {
           // Take preliminary snapshots
-          db->GetSnapshot();
+          auto tmp_snapshot = db->GetSnapshot();
+          to_be_released.push_back(tmp_snapshot);
           num_snapshots++;
         } else if (gap_cnt < max_gap) {
           // Wait for some gap before taking the final snapshot
           gap_cnt++;
         } else if (!snapshot) {
           // Take the final snapshot if it is not already taken
-          snapshot = db->GetSnapshot()->GetSequenceNumber();
+          auto tmp_snapshot = db->GetSnapshot();
+          to_be_released.push_back(tmp_snapshot);
+          snapshot = tmp_snapshot->GetSequenceNumber();
           // We increase the db seq artificailly by a dummy Put. Check that this
           // technique is effective and db seq is that same as ours.
           ASSERT_EQ(snapshot, seq);
@@ -4823,11 +4834,12 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
                 (committed_before.find(s) != committed_before.end());
             bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
             if (was_committed != is_in_snapshot) {
-              printf(
-                  "max_snapshots %d max_gap %d seq %lu max %lu snapshot %lu "
-                  "gap_cnt %d num_snapshots %d\n",
-                  max_snapshots, max_gap, seq, wp_db->max_evicted_seq_.load(),
-                  snapshot, gap_cnt, num_snapshots);
+              printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
+                     " snapshot %" PRIu64
+                     " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
+                     max_snapshots, max_gap, seq,
+                     wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
+                     num_snapshots, s);
             }
             ASSERT_EQ(was_committed, is_in_snapshot);
             found_committed = found_committed || is_in_snapshot;
@@ -4846,6 +4858,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
       }
       ASSERT_TRUE(wp_db->delayed_prepared_.empty());
       ASSERT_TRUE(wp_db->prepared_txns_.empty());
+      for (auto s : to_be_released) {
+        db->ReleaseSnapshot(s);
+      }
     }
   }
 }