]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Merge operator fixes part 1.
authorDeon Nicholas <dnicholas@fb.com>
Mon, 19 Aug 2013 18:42:47 +0000 (11:42 -0700)
committerDeon Nicholas <dnicholas@fb.com>
Mon, 19 Aug 2013 18:42:47 +0000 (11:42 -0700)
Summary:
-Added null checks and revisions to DBIter::MergeValuesNewToOld()
-Added DBIter test to stringappend_test
-Major fix with Merge and TTL
More plans for fixes later.

Test Plan:
-make clean; make stringappend_test -j 32; ./stringappend_test
-make all check;

Reviewers: haobo, emayanke, vamsi, dhruba

Reviewed By: haobo

CC: leveldb
Differential Revision: https://reviews.facebook.net/D12315

15 files changed:
Makefile
db/db_iter.cc
db/memtable.cc
db/merge_helper.cc
db/merge_operator.cc
db/version_set.cc
include/leveldb/merge_operator.h
include/utilities/stackable_db.h
utilities/merge_operators/put.cc
utilities/merge_operators/string_append/stringappend.h
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/string_append/stringappend2.h
utilities/merge_operators/string_append/stringappend_test.cc
utilities/ttl/db_ttl.cc
utilities/ttl/db_ttl.h

index d16956dc51c1033c3715adbac2daaf1d35bfe54d..196b88c4cd80051afccccbe7351e975abca7e3b1 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -20,7 +20,7 @@ include build_config.mk
 
 WARNING_FLAGS = -Wall -Werror
 CFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
-CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x
+CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x -Woverloaded-virtual
 
 LDFLAGS += $(PLATFORM_LDFLAGS)
 
index 1d8c84427fa221f98dc210616c4a79759803ca8e..5051b355393bcb1155be985ee83e9602a6324ef4 100644 (file)
@@ -212,11 +212,8 @@ void DBIter::FindNextUserEntry(bool skipping) {
             SaveKey(ikey.user_key, &saved_key_);
             current_entry_is_merged_ = true;
             valid_ = true;
-            // Go to a different state machine
-            MergeValuesNewToOld();
-            // TODO: what if !iter_->Valid()
+            MergeValuesNewToOld();  // Go to a different state machine
             return;
-            break;
           case kTypeLogData:
             assert(false);
             break;
@@ -235,7 +232,11 @@ void DBIter::FindNextUserEntry(bool skipping) {
 // POST: saved_value_ has the merged value for the user key
 //       iter_ points to the next entry (or invalid)
 void DBIter::MergeValuesNewToOld() {
-  // TODO: Is there a way to unite with MergeHelper or other similar code?
+  if (!user_merge_operator_) {
+    Log(logger_, "Options::merge_operator is null.");
+    throw std::logic_error("DBIter::MergeValuesNewToOld() with"
+                           " Options::merge_operator null");
+  }
 
   // Start the merge process by pushing the first operand
   std::deque<std::string> operands;
@@ -266,8 +267,8 @@ void DBIter::MergeValuesNewToOld() {
       // final result in saved_value_. We are done!
       // ignore corruption if there is any.
       const Slice value = iter_->value();
-      user_merge_operator_->Merge(ikey.user_key, &value, operands,
-                                  &saved_value_, logger_.get());
+      user_merge_operator_->FullMerge(ikey.user_key, &value, operands,
+                                      &saved_value_, logger_.get());
       // iter_ is positioned after put
       iter_->Next();
       return;
@@ -300,8 +301,8 @@ void DBIter::MergeValuesNewToOld() {
   // a deletion marker.
   // feed null as the existing value to the merge operator, such that
   // client can differentiate this scenario and do things accordingly.
-  user_merge_operator_->Merge(ikey.user_key, nullptr, operands,
-                              &saved_value_, logger_.get());
+  user_merge_operator_->FullMerge(saved_key_, nullptr, operands,
+                                  &saved_value_, logger_.get());
 }
 
 void DBIter::Prev() {
index 24d6bd8dd29e354a5c2da36fa3e0d3b1f72976b5..37ab558521255bb1c51125b0a7bcc5fe93d96865 100644 (file)
@@ -167,8 +167,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
           *s = Status::OK();
           if (merge_in_progress) {
             assert(merge_operator);
-            if (!merge_operator->Merge(key.user_key(), &v, *operands,
-                                       value, logger.get())) {
+            if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
+                                           value, logger.get())) {
               RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
               *s = Status::Corruption("Error: Could not perform merge.");
             }
@@ -181,8 +181,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
           if (merge_in_progress) {
             assert(merge_operator);
             *s = Status::OK();
-            if (!merge_operator->Merge(key.user_key(), nullptr, *operands,
-                                       value, logger.get())) {
+            if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
+                                           value, logger.get())) {
               RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
               *s = Status::Corruption("Error: Could not perform merge.");
             }
index d1f3c6683de980907bfe8cb7467a5985792b62c0..32d4874cbe3059e3508b88ab0234b6b330acf4fb 100644 (file)
@@ -67,9 +67,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
       //   => store result in operands_.back() (and update keys_.back())
       //   => change the entry type to kTypeValue for keys_.back()
       // We are done! Return a success if the merge passes.
-      success_ = user_merge_operator_->Merge(ikey.user_key, nullptr,
-                                             operands_, &merge_result,
-                                             logger_);
+      success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr,
+                                                 operands_, &merge_result,
+                                                 logger_);
 
       // We store the result in keys_.back() and operands_.back()
       // if nothing went wrong (i.e.: no operand corruption on disk)
@@ -95,9 +95,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
       //   => change the entry type to kTypeValue for keys_.back()
       // We are done! Success!
       const Slice value = iter->value();
-      success_ = user_merge_operator_->Merge(ikey.user_key, &value,
-                                             operands_, &merge_result,
-                                             logger_);
+      success_ = user_merge_operator_->FullMerge(ikey.user_key, &value,
+                                                 operands_, &merge_result,
+                                                 logger_);
 
       // We store the result in keys_.back() and operands_.back()
       // if nothing went wrong (i.e.: no operand corruption on disk)
@@ -170,9 +170,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
     assert(kTypeMerge == orig_ikey.type);
     assert(operands_.size() >= 1);
     assert(operands_.size() == keys_.size());
-    success_ = user_merge_operator_->Merge(ikey.user_key, nullptr,
-                                           operands_, &merge_result,
-                                           logger_);
+    success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr,
+                                               operands_, &merge_result,
+                                               logger_);
 
     if (success_) {
       std::string& key = keys_.back();  // The original key encountered
index 89c64b5d99c781c3e1e63af852e039830986666e..aee99128a5585c1adedc6114262ce5533846f8fb 100644 (file)
@@ -12,7 +12,7 @@ namespace leveldb {
 // Given a "real" merge from the library, call the user's
 // associative merge function one-by-one on each of the operands.
 // NOTE: It is assumed that the client's merge-operator will handle any errors.
-bool AssociativeMergeOperator::Merge(
+bool AssociativeMergeOperator::FullMerge(
     const Slice& key,
     const Slice* existing_value,
     const std::deque<std::string>& operand_list,
index 63a58f6713d619d955ce71219222895db7742e03..609965224ea7d696806f847c7f866f51d6b9b41a 100644 (file)
@@ -286,8 +286,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
           } else if (kMerge == s->state) {
             assert(s->merge_operator != nullptr);
             s->state = kFound;
-            if (!s->merge_operator->Merge(s->user_key, &v, *ops,
-                                          s->value, s->logger)) {
+            if (!s->merge_operator->FullMerge(s->user_key, &v, *ops,
+                                              s->value, s->logger)) {
               RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
               s->state = kCorrupt;
             }
@@ -301,8 +301,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
             s->state = kDeleted;
           } else if (kMerge == s->state) {
             s->state = kFound;
-            if (!s->merge_operator->Merge(s->user_key, nullptr, *ops,
-                                          s->value, s->logger)) {
+            if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops,
+                                              s->value, s->logger)) {
               RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
               s->state = kCorrupt;
             }
@@ -521,8 +521,8 @@ void Version::Get(const ReadOptions& options,
   if (kMerge == saver.state) {
     // merge_operands are in saver and we hit the beginning of the key history
     // do a final merge of nullptr and operands;
-    if (merge_operator->Merge(user_key, nullptr, *saver.merge_operands,
-                              value, logger.get())) {
+    if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands,
+                                  value, logger.get())) {
       *status = Status::OK();
     } else {
       RecordTick(db_options.statistics, NUMBER_MERGE_FAILURES);
index 44c4db66ba6759e1a5a810b1e9317d48034c9e6b..e64418aee59a1dcc2ddf7d867b749e723fc230e8 100644 (file)
@@ -29,10 +29,11 @@ class Logger;
 //    into rocksdb); numeric addition and string concatenation are examples;
 //
 //  b) MergeOperator - the generic class for all the more abstract / complex
-//    operations; one method to merge a Put/Delete value with a merge operand;
-//    and another method (PartialMerge) that merges two operands together.
-//    this is especially useful if your key values have a complex structure,
-//    but you would still like to support client-specific incremental updates.
+//    operations; one method (FullMerge) to merge a Put/Delete value with a
+//    merge operand; and another method (PartialMerge) that merges two
+//    operands together. this is especially useful if your key values have a
+//    complex structure but you would still like to support client-specific
+//    incremental updates.
 //
 // AssociativeMergeOperator is simpler to implement. MergeOperator is simply
 // more powerful.
@@ -60,11 +61,11 @@ class MergeOperator {
   // internal corruption. This will be treated as an error by the library.
   //
   // Also make use of the *logger for error messages.
-  virtual bool Merge(const Slice& key,
-                     const Slice* existing_value,
-                     const std::deque<std::string>& operand_list,
-                     std::string* new_value,
-                     Logger* logger) const = 0;
+  virtual bool FullMerge(const Slice& key,
+                         const Slice* existing_value,
+                         const std::deque<std::string>& operand_list,
+                         std::string* new_value,
+                         Logger* logger) const = 0;
 
   // This function performs merge(left_op, right_op)
   // when both the operands are themselves merge operation types
@@ -85,7 +86,7 @@ class MergeOperator {
   // TODO: Presently there is no way to differentiate between error/corruption
   // and simply "return false". For now, the client should simply return
   // false in any case it cannot perform partial-merge, regardless of reason.
-  // If there is corruption in the data, handle it in the above Merge() function,
+  // If there is corruption in the data, handle it in the FullMerge() function,
   // and return false there.
   virtual bool PartialMerge(const Slice& key,
                             const Slice& left_operand,
@@ -128,11 +129,11 @@ class AssociativeMergeOperator : public MergeOperator {
 
  private:
   // Default implementations of the MergeOperator functions
-  virtual bool Merge(const Slice& key,
-                     const Slice* existing_value,
-                     const std::deque<std::string>& operand_list,
-                     std::string* new_value,
-                     Logger* logger) const override;
+  virtual bool FullMerge(const Slice& key,
+                         const Slice* existing_value,
+                         const std::deque<std::string>& operand_list,
+                         std::string* new_value,
+                         Logger* logger) const override;
 
   virtual bool PartialMerge(const Slice& key,
                             const Slice& left_operand,
index 28d5a7701b623a20723795048a75b2664ace05e5..916496e0496c683330a8b8082405f40bead59bf1 100644 (file)
@@ -60,7 +60,7 @@ class StackableDB : public DB {
                            const Slice& key,
                            std::string* value,
                            bool* value_found = nullptr) override {
-    return KeyMayExist(options, key, value, value_found);
+    return sdb_->KeyMayExist(options, key, value, value_found);
   }
 
   virtual Status Delete(const WriteOptions& wopts, const Slice& key) override {
index 123dce66a6991c1097cc99fac02eae254407e7a5..39da05a7e1d516cc7350369ed8853db7228da464 100644 (file)
@@ -17,11 +17,11 @@ namespace { // anonymous namespace
 // From the client-perspective, semantics are the same.
 class PutOperator : public MergeOperator {
  public:
-  virtual bool Merge(const Slice& key,
-                     const Slice* existing_value,
-                     const std::deque<std::string>& operand_sequence,
-                     std::string* new_value,
-                     Logger* logger) const override {
+  virtual bool FullMerge(const Slice& key,
+                         const Slice* existing_value,
+                         const std::deque<std::string>& operand_sequence,
+                         std::string* new_value,
+                         Logger* logger) const override {
     // Put basically only looks at the current/latest value
     assert(!operand_sequence.empty());
     assert(new_value != nullptr);
index 34cca69449e1be9b670e061da23fc90b45783cec..3787773a80763b0854a7b4f9772944e6d08262a5 100644 (file)
@@ -11,7 +11,6 @@ namespace leveldb {
 
 class StringAppendOperator : public AssociativeMergeOperator {
  public:
-
   StringAppendOperator(char delim_char);    /// Constructor: specify delimiter
 
   virtual bool Merge(const Slice& key,
index 93e65a7b810cfe82a957ce0116b9c2a4cf8d8e46..c409a1105d2cd41930b322a5f3f19c393f39bc27 100644 (file)
@@ -20,11 +20,12 @@ StringAppendTESTOperator::StringAppendTESTOperator(char delim_char)
 }
 
 // Implementation for the merge operation (concatenates two strings)
-bool StringAppendTESTOperator::Merge(const Slice& key,
-                                     const Slice* existing_value,
-                                     const std::deque<std::string>& operands,
-                                     std::string* new_value,
-                                     Logger* logger) const {
+bool StringAppendTESTOperator::FullMerge(
+    const Slice& key,
+    const Slice* existing_value,
+    const std::deque<std::string>& operands,
+    std::string* new_value,
+    Logger* logger) const {
 
   // Clear the *new_value for writing.
   assert(new_value);
index 2e9f505bd13eee3e557c458b244b70706954af2f..a8f090ffd35a62d32bd6de95d27b22d72ec1c575 100644 (file)
@@ -20,11 +20,11 @@ class StringAppendTESTOperator : public MergeOperator {
 
   StringAppendTESTOperator(char delim_char);    /// Constructor with delimiter
 
-  virtual bool Merge(const Slice& key,
-                     const Slice* existing_value,
-                     const std::deque<std::string>& operand_sequence,
-                     std::string* new_value,
-                     Logger* logger) const override;
+  virtual bool FullMerge(const Slice& key,
+                         const Slice* existing_value,
+                         const std::deque<std::string>& operand_sequence,
+                         std::string* new_value,
+                         Logger* logger) const override;
 
   virtual bool PartialMerge(const Slice& key,
                             const Slice& left_operand,
index b852d30d9c8eb0983b69884adafd3141fd0fd7c4..5ec30a4c40c95c016c58b2e140bba88d77b6e553 100644 (file)
@@ -96,6 +96,101 @@ class StringLists {
 // THE TEST CASES BEGIN HERE
 
 class StringAppendOperatorTest { };
+TEST(StringAppendOperatorTest, IteratorTest) {
+  DestroyDB(kDbName, Options());    // Start this test with a fresh DB
+
+  StringAppendOperator append_op(',');
+  auto db_ = OpenDb(&append_op);
+  StringLists slists(db_);
+
+  slists.Append("k1","v1");
+  slists.Append("k1","v2");
+  slists.Append("k1","v3");
+
+  slists.Append("k2","a1");
+  slists.Append("k2","a2");
+  slists.Append("k2","a3");
+
+  std::string res;
+  std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(ReadOptions()));
+  std::string k1("k1");
+  std::string k2("k2");
+  bool first = true;
+  for (it->Seek(k1); it->Valid(); it->Next()) {
+    res = it->value().ToString();
+    if (first) {
+      ASSERT_EQ(res, "v1,v2,v3");
+      first = false;
+    } else {
+      ASSERT_EQ(res, "a1,a2,a3");
+    }
+  }
+  slists.Append("k2", "a4");
+  slists.Append("k1", "v4");
+
+  // Snapshot should still be the same. Should ignore a4 and v4.
+  first = true;
+  for (it->Seek(k1); it->Valid(); it->Next()) {
+    res = it->value().ToString();
+    if (first) {
+      ASSERT_EQ(res, "v1,v2,v3");
+      first = false;
+    } else {
+      ASSERT_EQ(res, "a1,a2,a3");
+    }
+  }
+
+
+  // Should release the snapshot and be aware of the new stuff now
+  it.reset(db_->NewIterator(ReadOptions()));
+  first = true;
+  for (it->Seek(k1); it->Valid(); it->Next()) {
+    res = it->value().ToString();
+    if (first) {
+      ASSERT_EQ(res, "v1,v2,v3,v4");
+      first = false;
+    } else {
+      ASSERT_EQ(res, "a1,a2,a3,a4");
+    }
+  }
+
+  // start from k2 this time.
+  for (it->Seek(k2); it->Valid(); it->Next()) {
+    res = it->value().ToString();
+    if (first) {
+      ASSERT_EQ(res, "v1,v2,v3,v4");
+      first = false;
+    } else {
+      ASSERT_EQ(res, "a1,a2,a3,a4");
+    }
+  }
+
+  slists.Append("k3","g1");
+
+  it.reset(db_->NewIterator(ReadOptions()));
+  first = true;
+  std::string k3("k3");
+  for(it->Seek(k2); it->Valid(); it->Next()) {
+    res = it->value().ToString();
+    if (first) {
+      ASSERT_EQ(res, "a1,a2,a3,a4");
+      first = false;
+    } else {
+      ASSERT_EQ(res, "g1");
+    }
+  }
+  for(it->Seek(k3); it->Valid(); it->Next()) {
+    res = it->value().ToString();
+    if (first) {
+      // should not be hit
+      ASSERT_EQ(res, "a1,a2,a3,a4");
+      first = false;
+    } else {
+      ASSERT_EQ(res, "g1");
+    }
+  }
+
+}
 
 TEST(StringAppendOperatorTest,SimpleTest) {
   DestroyDB(kDbName, Options());    // Start this test with a fresh DB
index 0791cdc878848a2a5e6d622c54d21fc927e0a0bf..b6235ba4a4eedf1c821ba59dd41c2ba0b857f4cb 100644 (file)
@@ -154,7 +154,7 @@ std::vector<Status> DBWithTTL::MultiGet(const ReadOptions& options,
                                supported with TTL"));
 }
 
-bool DBWithTTL::KeyMayExist(ReadOptions& options,
+bool DBWithTTL::KeyMayExist(const ReadOptions& options,
                             const Slice& key,
                             std::string* value,
                             bool* value_found) {
index ada7f99bac660985cf3ac9b7162a047da64e9b2f..1285b448e26f78132c0ef197eab2e34c4dabf825 100644 (file)
@@ -36,10 +36,10 @@ class DBWithTTL : public StackableDB {
                                        const std::vector<Slice>& keys,
                                        std::vector<std::string>* values);
 
-  virtual bool KeyMayExist(ReadOptions& options,
+  virtual bool KeyMayExist(const ReadOptions& options,
                            const Slice& key,
                            std::string* value,
-                           bool* value_found = nullptr);
+                           bool* value_found = nullptr) override;
 
   virtual Status Delete(const WriteOptions& wopts, const Slice& key);
 
@@ -259,11 +259,11 @@ class TtlMergeOperator : public MergeOperator {
     assert(merge_op);
   }
 
-  virtual bool Merge(const Slice& key,
-                     const Slice* existing_value,
-                     const std::deque<std::string>& operands,
-                     std::string* new_value,
-                     Logger* logger) const override {
+  virtual bool FullMerge(const Slice& key,
+                         const Slice* existing_value,
+                         const std::deque<std::string>& operands,
+                         std::string* new_value,
+                         Logger* logger) const override {
     const uint32_t ts_len = DBWithTTL::kTSLength;
     if (existing_value && existing_value->size() < ts_len) {
       Log(logger, "Error: Could not remove timestamp from existing value.");
@@ -281,14 +281,20 @@ class TtlMergeOperator : public MergeOperator {
     }
 
     // Apply the user merge operator (store result in *new_value)
+    bool good = true;
     if (existing_value) {
       Slice existing_value_without_ts(existing_value->data(),
                                       existing_value->size() - ts_len);
-      user_merge_op_->Merge(key, &existing_value_without_ts,
-                            operands_without_ts, new_value, logger);
+      good = user_merge_op_->FullMerge(key, &existing_value_without_ts,
+                                       operands_without_ts, new_value, logger);
     } else {
-      user_merge_op_->Merge(key, nullptr, operands_without_ts, new_value,
-                            logger);
+      good = user_merge_op_->FullMerge(key, nullptr, operands_without_ts,
+                                       new_value, logger);
+    }
+
+    // Return false if the user merge operator returned false
+    if (!good) {
+      return false;
     }
 
     // Augment the *new_value with the ttl time-stamp
@@ -321,8 +327,10 @@ class TtlMergeOperator : public MergeOperator {
     assert(new_value);
     Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len);
     Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len);
-    user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
-                                 new_value, logger);
+    if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
+                                      new_value, logger)) {
+      return false;
+    }
 
     // Augment the *new_value with the ttl time-stamp
     int32_t curtime;