]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Return MergeInProgress when fetching from transactions or WBWI with overwrite_key
authoragiardullo <agiardullo@fb.com>
Tue, 29 Sep 2015 02:24:44 +0000 (19:24 -0700)
committeragiardullo <agiardullo@fb.com>
Wed, 30 Sep 2015 18:45:32 +0000 (11:45 -0700)
Summary:
WriteBatchWithIndex::GetFromBatchAndDB only works correctly for overwrite_key=false.  Transactions use overwrite_key=true (since WriteBatchWithIndex::GetIteratorWithBase only works when overwrite_key=true).  So currently, Transactions could return incorrectly merged results when calling Get/GetForUpdate().

Until a permanent fix can be put in place, Transaction::Get[ForUpdate] and WriteBatchWithIndex::GetFromBatch[AndDB] will now return MergeInProgress if the most recent write to a key in the batch is a Merge.

Test Plan: more tests

Reviewers: sdong, yhchiang, rven, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D47817

include/rocksdb/utilities/transaction.h
include/rocksdb/utilities/write_batch_with_index.h
utilities/transactions/transaction_test.cc
utilities/write_batch_with_index/write_batch_with_index.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc
utilities/write_batch_with_index/write_batch_with_index_internal.h
utilities/write_batch_with_index/write_batch_with_index_test.cc

index 0e3a0b22894603496b5ce342b03a893bc6eee6fb..63ad89457ecff8e38c264e9e267fd6f1e72765c7 100644 (file)
@@ -99,7 +99,9 @@ class Transaction {
   virtual Status RollbackToSavePoint() = 0;
 
   // This function is similar to DB::Get() except it will also read pending
-  // changes in this transaction.
+  // changes in this transaction.  Currently, this function will return
+  // Status::MergeInProgress if the most recent write to the queried key in
+  // this batch is a Merge.
   //
   // If read_options.snapshot is not set, the current version of the key will
   // be read.  Calling SetSnapshot() does not affect the version of the data
@@ -131,6 +133,9 @@ class Transaction {
   // snapshot is set in this transaction).  The transaction behavior is the
   // same regardless of whether the key exists or not.
   //
+  // Note: Currently, this function will return Status::MergeInProgress
+  // if the most recent write to the queried key in this batch is a Merge.
+  //
   // The values returned by this function are similar to Transaction::Get().
   // If value==nullptr, then this function will not read any data, but will
   // still ensure that this key cannot be written to by outside of this
@@ -146,6 +151,7 @@ class Transaction {
   // Status::TimedOut() if a lock could not be acquired,
   // Status::TryAgain() if the memtable history size is not large enough
   //  (See max_write_buffer_number_to_maintain)
+  // Status::MergeInProgress() if merge operations cannot be resolved.
   // or other errors if this key could not be read.
   virtual Status GetForUpdate(const ReadOptions& options,
                               ColumnFamilyHandle* column_family,
index 402e3f3a7d073c15cac4465ff418df8b41c7221a..5cf0d5d3ca6308a338322b7942a12984d3a8a705 100644 (file)
@@ -120,7 +120,14 @@ class WriteBatchWithIndex : public WriteBatchBase {
   WBWIIterator* NewIterator();
 
   // Will create a new Iterator that will use WBWIIterator as a delta and
-  // base_iterator as base
+  // base_iterator as base.
+  //
+  // This function is only supported if the WriteBatchWithIndex was
+  // constructed with overwrite_key=true.
+  //
+  // The returned iterator should be deleted by the caller.
+  // The base_iterator is now 'owned' by the returned iterator. Deleting the
+  // returned iterator will also delete the base_iterator.
   Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
                                 Iterator* base_iterator);
   // default column family
@@ -135,7 +142,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
 
   // Similar to previous function but does not require a column_family.
   // Note:  An InvalidArgument status will be returned if there are any Merge
-  // operators for this key.
+  // operators for this key.  Use previous method instead.
   Status GetFromBatch(const DBOptions& options, const Slice& key,
                       std::string* value) {
     return GetFromBatch(nullptr, options, key, value);
index 473d738bba52f7f3543ba211e04cf495fcb8d489..e009d0c0fce3b227ee0de7419de7570c2a173bb8 100644 (file)
@@ -12,6 +12,8 @@
 #include "rocksdb/utilities/transaction_db.h"
 #include "util/logging.h"
 #include "util/testharness.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
 
 using std::string;
 
@@ -28,6 +30,7 @@ class TransactionTest : public testing::Test {
   TransactionTest() {
     options.create_if_missing = true;
     options.max_write_buffer_number = 2;
+    options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
     dbname = test::TmpDir() + "/transaction_testdb";
 
     DestroyDB(dbname, options);
@@ -1722,6 +1725,62 @@ TEST_F(TransactionTest, TimeoutTest) {
   delete txn2;
 }
 
+TEST_F(TransactionTest, MergeTest) {
+  WriteOptions write_options;
+  ReadOptions read_options;
+  string value;
+  Status s;
+
+  Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
+  ASSERT_TRUE(txn);
+
+  s = db->Put(write_options, "A", "a0");
+  ASSERT_OK(s);
+
+  s = txn->Merge("A", "1");
+  ASSERT_OK(s);
+
+  s = txn->Merge("A", "2");
+  ASSERT_OK(s);
+
+  s = txn->Get(read_options, "A", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  s = txn->Put("A", "a");
+  ASSERT_OK(s);
+
+  s = txn->Get(read_options, "A", &value);
+  ASSERT_OK(s);
+  ASSERT_EQ("a", value);
+
+  s = txn->Merge("A", "3");
+  ASSERT_OK(s);
+
+  s = txn->Get(read_options, "A", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  TransactionOptions txn_options;
+  txn_options.lock_timeout = 1;  // 1 ms
+  Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+  ASSERT_TRUE(txn2);
+
+  // verify that txn has "A" locked
+  s = txn2->Merge("A", "4");
+  ASSERT_TRUE(s.IsTimedOut());
+
+  s = txn2->Commit();
+  ASSERT_OK(s);
+  delete txn2;
+
+  s = txn->Commit();
+  ASSERT_OK(s);
+  delete txn;
+
+  s = db->Get(read_options, "A", &value);
+  ASSERT_OK(s);
+  ASSERT_EQ("a,3", value);
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index e3caa79329e2b2bb9203c68679fdd97015ccce08..cdc12e8cf14a2ed5797f151f138815a22c868091 100644 (file)
@@ -619,9 +619,9 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
   MergeContext merge_context;
 
   WriteBatchWithIndexInternal::Result result =
-      WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family,
-                                                key, &merge_context,
-                                                &rep->comparator, value, &s);
+      WriteBatchWithIndexInternal::GetFromBatch(
+          options, this, column_family, key, &merge_context, &rep->comparator,
+          value, rep->overwrite_key, &s);
 
   switch (result) {
     case WriteBatchWithIndexInternal::Result::kFound:
@@ -662,8 +662,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
   std::string batch_value;
   WriteBatchWithIndexInternal::Result result =
       WriteBatchWithIndexInternal::GetFromBatch(
-          options, this, column_family, key, &merge_context,
-          &rep->comparator, &batch_value, &s);
+          options, this, column_family, key, &merge_context, &rep->comparator,
+          &batch_value, rep->overwrite_key, &s);
 
   if (result == WriteBatchWithIndexInternal::Result::kFound) {
     value->assign(batch_value.data(), batch_value.size());
@@ -675,6 +675,14 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
   if (result == WriteBatchWithIndexInternal::Result::kError) {
     return s;
   }
+  if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
+      rep->overwrite_key == true) {
+    // Since we've overwritten keys, we do not know what other operations are
+    // in this batch for this key, so we cannot do a Merge to compute the
+    // result.  Instead, we will simply return MergeInProgress.
+    return Status::MergeInProgress();
+  }
+
   assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
          result == WriteBatchWithIndexInternal::Result::kNotFound);
 
index f5c1411213f834e607565b45d8a461721796b92c..bc5c3800d568e28b6ba4b7798724fcc8ce778045 100644 (file)
@@ -132,7 +132,7 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
     const DBOptions& options, WriteBatchWithIndex* batch,
     ColumnFamilyHandle* column_family, const Slice& key,
     MergeContext* merge_context, WriteBatchEntryComparator* cmp,
-    std::string* value, Status* s) {
+    std::string* value, bool overwrite_key, Status* s) {
   uint32_t cf_id = GetColumnFamilyID(column_family);
   *s = Status::OK();
   WriteBatchWithIndexInternal::Result result =
@@ -205,6 +205,13 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
       // We can stop iterating once we find a PUT or DELETE
       break;
     }
+    if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
+        overwrite_key == true) {
+      // Since we've overwritten keys, we do not know what other operations are
+      // in this batch for this key, so we cannot do a Merge to compute the
+      // result.  Instead, we will simply return MergeInProgress.
+      break;
+    }
 
     iter->Prev();
   }
index 54bbd81da8dc2314b56d94d577c5e351426730a9..bba9e8bf3e9d2b9f1f519e38ee7f232f5a5e1133 100644 (file)
@@ -91,7 +91,7 @@ class WriteBatchWithIndexInternal {
       const DBOptions& options, WriteBatchWithIndex* batch,
       ColumnFamilyHandle* column_family, const Slice& key,
       MergeContext* merge_context, WriteBatchEntryComparator* cmp,
-      std::string* value, Status* s);
+      std::string* value, bool overwrite_key, Status* s);
 };
 
 }  // namespace rocksdb
index 3e509ca93cbb6c4814e69b04299670174abb79c4..0671322721adbc63841e14b05b5689658e203229 100644 (file)
@@ -971,7 +971,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
 
   DestroyDB(dbname, options);
   Status s = DB::Open(options, dbname, &db);
-  assert(s.ok());
+  ASSERT_OK(s);
 
   ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
   WriteBatchWithIndex batch;
@@ -1009,6 +1009,66 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
   DestroyDB(dbname, options);
 }
 
+TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) {
+  DB* db;
+  Options options;
+  options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
+  options.create_if_missing = true;
+
+  std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
+
+  DestroyDB(dbname, options);
+  Status s = DB::Open(options, dbname, &db);
+  ASSERT_OK(s);
+
+  ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
+
+  // Test batch with overwrite_key=true
+  WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
+  std::string value;
+
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_TRUE(s.IsNotFound());
+
+  batch.Put(column_family, "X", "x");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_OK(s);
+  ASSERT_EQ("x", value);
+
+  batch.Put(column_family, "X", "x2");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_OK(s);
+  ASSERT_EQ("x2", value);
+
+  batch.Merge(column_family, "X", "aaa");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  batch.Merge(column_family, "X", "bbb");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  batch.Put(column_family, "X", "x3");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_OK(s);
+  ASSERT_EQ("x3", value);
+
+  batch.Merge(column_family, "X", "ccc");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  batch.Delete(column_family, "X");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_TRUE(s.IsNotFound());
+
+  batch.Merge(column_family, "X", "ddd");
+  s = batch.GetFromBatch(column_family, options, "X", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  delete db;
+  DestroyDB(dbname, options);
+}
+
 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
   DB* db;
   Options options;
@@ -1017,7 +1077,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
 
   DestroyDB(dbname, options);
   Status s = DB::Open(options, dbname, &db);
-  assert(s.ok());
+  ASSERT_OK(s);
 
   WriteBatchWithIndex batch;
   ReadOptions read_options;
@@ -1185,6 +1245,54 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) {
   DestroyDB(dbname, options);
 }
 
+TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) {
+  DB* db;
+  Options options;
+
+  options.create_if_missing = true;
+  std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
+
+  options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
+
+  DestroyDB(dbname, options);
+  Status s = DB::Open(options, dbname, &db);
+  assert(s.ok());
+
+  // Test batch with overwrite_key=true
+  WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
+
+  ReadOptions read_options;
+  WriteOptions write_options;
+  std::string value;
+
+  s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+  ASSERT_TRUE(s.IsNotFound());
+
+  batch.Merge("A", "xxx");
+
+  s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  batch.Merge("A", "yyy");
+
+  s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  s = db->Put(write_options, "A", "a0");
+  ASSERT_OK(s);
+
+  s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+  ASSERT_TRUE(s.IsMergeInProgress());
+
+  batch.Delete("A");
+
+  s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+  ASSERT_TRUE(s.IsNotFound());
+
+  delete db;
+  DestroyDB(dbname, options);
+}
+
 void AssertKey(std::string key, WBWIIterator* iter) {
   ASSERT_TRUE(iter->Valid());
   ASSERT_EQ(key, iter->Entry().key.ToString());