virtual Status RollbackToSavePoint() = 0;
// This function is similar to DB::Get() except it will also read pending
- // changes in this transaction.
+ // changes in this transaction. Currently, this function will return
+ // Status::MergeInProgress if the most recent write to the queried key in
+ // this batch is a Merge.
//
// If read_options.snapshot is not set, the current version of the key will
// be read. Calling SetSnapshot() does not affect the version of the data
// snapshot is set in this transaction). The transaction behavior is the
// same regardless of whether the key exists or not.
//
+ // Note: Currently, this function will return Status::MergeInProgress
+ // if the most recent write to the queried key in this batch is a Merge.
+ //
// The values returned by this function are similar to Transaction::Get().
// If value==nullptr, then this function will not read any data, but will
// still ensure that this key cannot be written to by outside of this
// Status::TimedOut() if a lock could not be acquired,
// Status::TryAgain() if the memtable history size is not large enough
// (See max_write_buffer_number_to_maintain)
+ // Status::MergeInProgress() if merge operations cannot be resolved.
// or other errors if this key could not be read.
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WBWIIterator* NewIterator();
// Will create a new Iterator that will use WBWIIterator as a delta and
- // base_iterator as base
+ // base_iterator as base.
+ //
+ // This function is only supported if the WriteBatchWithIndex was
+ // constructed with overwrite_key=true.
+ //
+ // The returned iterator should be deleted by the caller.
+ // The base_iterator is now 'owned' by the returned iterator. Deleting the
+ // returned iterator will also delete the base_iterator.
Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
Iterator* base_iterator);
// default column family
// Similar to previous function but does not require a column_family.
// Note: An InvalidArgument status will be returned if there are any Merge
- // operators for this key.
+ // operators for this key. Use previous method instead.
Status GetFromBatch(const DBOptions& options, const Slice& key,
std::string* value) {
return GetFromBatch(nullptr, options, key, value);
#include "rocksdb/utilities/transaction_db.h"
#include "util/logging.h"
#include "util/testharness.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
using std::string;
TransactionTest() {
options.create_if_missing = true;
options.max_write_buffer_number = 2;
+ options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options);
delete txn2;
}
+TEST_F(TransactionTest, MergeTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
+ ASSERT_TRUE(txn);
+
+ s = db->Put(write_options, "A", "a0");
+ ASSERT_OK(s);
+
+ s = txn->Merge("A", "1");
+ ASSERT_OK(s);
+
+ s = txn->Merge("A", "2");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ s = txn->Merge("A", "3");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ TransactionOptions txn_options;
+ txn_options.lock_timeout = 1; // 1 ms
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ // verify that txn has "A" locked
+ s = txn2->Merge("A", "4");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a,3", value);
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
MergeContext merge_context;
WriteBatchWithIndexInternal::Result result =
- WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family,
- key, &merge_context,
- &rep->comparator, value, &s);
+ WriteBatchWithIndexInternal::GetFromBatch(
+ options, this, column_family, key, &merge_context, &rep->comparator,
+ value, rep->overwrite_key, &s);
switch (result) {
case WriteBatchWithIndexInternal::Result::kFound:
std::string batch_value;
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
- options, this, column_family, key, &merge_context,
- &rep->comparator, &batch_value, &s);
+ options, this, column_family, key, &merge_context, &rep->comparator,
+ &batch_value, rep->overwrite_key, &s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
value->assign(batch_value.data(), batch_value.size());
if (result == WriteBatchWithIndexInternal::Result::kError) {
return s;
}
+ if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
+ rep->overwrite_key == true) {
+ // Since we've overwritten keys, we do not know what other operations are
+ // in this batch for this key, so we cannot do a Merge to compute the
+ // result. Instead, we will simply return MergeInProgress.
+ return Status::MergeInProgress();
+ }
+
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
const DBOptions& options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
- std::string* value, Status* s) {
+ std::string* value, bool overwrite_key, Status* s) {
uint32_t cf_id = GetColumnFamilyID(column_family);
*s = Status::OK();
WriteBatchWithIndexInternal::Result result =
// We can stop iterating once we find a PUT or DELETE
break;
}
+ if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
+ overwrite_key == true) {
+ // Since we've overwritten keys, we do not know what other operations are
+ // in this batch for this key, so we cannot do a Merge to compute the
+ // result. Instead, we will simply return MergeInProgress.
+ break;
+ }
iter->Prev();
}
const DBOptions& options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
- std::string* value, Status* s);
+ std::string* value, bool overwrite_key, Status* s);
};
} // namespace rocksdb
DestroyDB(dbname, options);
Status s = DB::Open(options, dbname, &db);
- assert(s.ok());
+ ASSERT_OK(s);
ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
WriteBatchWithIndex batch;
DestroyDB(dbname, options);
}
+TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) {
+ DB* db;
+ Options options;
+ options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
+ options.create_if_missing = true;
+
+ std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
+
+ DestroyDB(dbname, options);
+ Status s = DB::Open(options, dbname, &db);
+ ASSERT_OK(s);
+
+ ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
+
+ // Test batch with overwrite_key=true
+ WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
+ std::string value;
+
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ batch.Put(column_family, "X", "x");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("x", value);
+
+ batch.Put(column_family, "X", "x2");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("x2", value);
+
+ batch.Merge(column_family, "X", "aaa");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ batch.Merge(column_family, "X", "bbb");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ batch.Put(column_family, "X", "x3");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("x3", value);
+
+ batch.Merge(column_family, "X", "ccc");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ batch.Delete(column_family, "X");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ batch.Merge(column_family, "X", "ddd");
+ s = batch.GetFromBatch(column_family, options, "X", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ delete db;
+ DestroyDB(dbname, options);
+}
+
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
DB* db;
Options options;
DestroyDB(dbname, options);
Status s = DB::Open(options, dbname, &db);
- assert(s.ok());
+ ASSERT_OK(s);
WriteBatchWithIndex batch;
ReadOptions read_options;
DestroyDB(dbname, options);
}
+TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) {
+ DB* db;
+ Options options;
+
+ options.create_if_missing = true;
+ std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
+
+ options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
+
+ DestroyDB(dbname, options);
+ Status s = DB::Open(options, dbname, &db);
+ assert(s.ok());
+
+ // Test batch with overwrite_key=true
+ WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
+
+ ReadOptions read_options;
+ WriteOptions write_options;
+ std::string value;
+
+ s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ batch.Merge("A", "xxx");
+
+ s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ batch.Merge("A", "yyy");
+
+ s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ s = db->Put(write_options, "A", "a0");
+ ASSERT_OK(s);
+
+ s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ batch.Delete("A");
+
+ s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete db;
+ DestroyDB(dbname, options);
+}
+
void AssertKey(std::string key, WBWIIterator* iter) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(key, iter->Entry().key.ToString());