]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add API to limit blast radius of merge operator failure (#11092)
authorAndrew Kryczka <andrewkr@fb.com>
Fri, 20 Jan 2023 22:40:30 +0000 (14:40 -0800)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 20 Jan 2023 22:40:30 +0000 (14:40 -0800)
Summary:
Prior to this PR, `FullMergeV2()` can only return `false` to indicate failure, which causes any operation invoking it to fail. During a compaction, such a failure causes the compaction to fail and causes the DB to irreversibly enter read-only mode. Some users asked for a way to allow the merge operator to fail without such widespread damage.

To limit the blast radius of merge operator failures, this PR introduces the `MergeOperationOutput::op_failure_scope` API. When unpopulated (`kDefault`) or set to `kTryMerge`, the merge operator failure handling is the same as before. When set to `kMustMerge`, merge operator failure still causes failure to operations that must merge (`Get()`, iterator, `MultiGet()`, etc.). However, under `kMustMerge`, flushes/compactions can survive merge operator failures by outputting the unmerged input operands.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11092

Reviewed By: siying

Differential Revision: D42525673

Pulled By: ajkr

fbshipit-source-id: 951dc3bf190f86347dccf3381be967565cda52ee

15 files changed:
HISTORY.md
db/compaction/compaction_iterator.cc
db/compaction/compaction_iterator.h
db/db_iter.cc
db/db_merge_operator_test.cc
db/db_test_util.h
db/memtable.cc
db/merge_helper.cc
db/merge_helper.h
db/version_set.cc
db/write_batch.cc
include/rocksdb/merge_operator.h
table/get_context.cc
utilities/ttl/db_ttl_impl.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc

index 4ff99985b2b2e1ebd1375d696fba25b3197725a0..ec264ff117f59c8f1f74a7bbf4d3acdc55b5d6b4 100644 (file)
@@ -23,6 +23,7 @@
 
 ### Public API Changes
 * Substantial changes have been made to the Cache class to support internal development goals. Direct use of Cache class members is discouraged and further breaking modifications are expected in the future. SecondaryCache has some related changes and implementations will need to be updated. (Unlike Cache, SecondaryCache is still intended to support user implementations, and disruptive changes will be avoided.) (#10975)
+* Add `MergeOperationOutput::op_failure_scope` for merge operator users to control the blast radius of merge operator failures. Existing merge operator users do not need to make any change to preserve the old behavior
 
 ### Performance Improvements
 * Updated xxHash source code, which should improve kXXH3 checksum speed, at least on ARM (#11098).
index 9f54f781384a7d5f8ace4ec110057aca6f8b3602..e1bdddcb750c49662f29ef8c346ef4bd2bc80b2f 100644 (file)
@@ -124,6 +124,9 @@ CompactionIterator::CompactionIterator(
          timestamp_size_ == full_history_ts_low_->size());
 #endif
   input_.SetPinnedItersMgr(&pinned_iters_mgr_);
+  // The default `merge_until_status_` does not need to be checked since it is
+  // overwritten as soon as `MergeUntil()` is called
+  merge_until_status_.PermitUncheckedError();
   TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
 }
 
@@ -178,6 +181,20 @@ void CompactionIterator::Next() {
       ikey_.user_key = current_key_.GetUserKey();
       validity_info_.SetValid(ValidContext::kMerge1);
     } else {
+      if (merge_until_status_.IsMergeInProgress()) {
+        // `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
+        // produced only merge operands. Those merge operands were accessed and
+        // written out using `merge_out_iter_`. Since `merge_out_iter_` is
+        // exhausted at this point, all merge operands have been written out.
+        //
+        // Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
+        // needs to be written out. Normally, `CompactionIterator` would skip it
+        // on the basis that it has already output something in the same
+        // snapshot stripe. To prevent this, we reset `has_current_user_key_` to
+        // trick the future iteration from finding out the snapshot stripe is
+        // unchanged.
+        has_current_user_key_ = false;
+      }
       // We consumed all pinned merge operands, release pinned iterators
       pinned_iters_mgr_.ReleasePinnedData();
       // MergeHelper moves the iterator to the first record after the merged
@@ -880,14 +897,15 @@ void CompactionIterator::NextFromInput() {
       // have hit (A)
       // We encapsulate the merge related state machine in a different
       // object to minimize change to the existing flow.
-      Status s = merge_helper_->MergeUntil(
+      merge_until_status_ = merge_helper_->MergeUntil(
           &input_, range_del_agg_, prev_snapshot, bottommost_level_,
           allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
           prefetch_buffers_.get(), &iter_stats_);
       merge_out_iter_.SeekToFirst();
 
-      if (!s.ok() && !s.IsMergeInProgress()) {
-        status_ = s;
+      if (!merge_until_status_.ok() &&
+          !merge_until_status_.IsMergeInProgress()) {
+        status_ = merge_until_status_;
         return;
       } else if (merge_out_iter_.Valid()) {
         // NOTE: key, value, and ikey_ refer to old entries.
index c215d2bbbd0648a638377693c4993587ebe98247..a224a8e0e2990247b75ddacd37b4f2e0b46cfdba 100644 (file)
@@ -432,6 +432,7 @@ class CompactionIterator {
   bool clear_and_output_next_key_ = false;
 
   MergeOutputIterator merge_out_iter_;
+  Status merge_until_status_;
   // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
   // merge operands and then releasing them after consuming them.
   PinnedIteratorsManager pinned_iters_mgr_;
index e1375deb7d2130730fb33a092e79d1c826d243d7..1e4a735dca4a36c957b77ac312706f54148171c3 100644 (file)
@@ -1247,10 +1247,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
 }
 
 bool DBIter::Merge(const Slice* val, const Slice& user_key) {
+  // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+  // since a failure must be propagated regardless of its value.
   Status s = MergeHelper::TimedFullMerge(
       merge_operator_, user_key, val, merge_context_.GetOperands(),
       &saved_value_, logger_, statistics_, clock_, &pinned_value_,
-      /* update_num_ops_stats */ true);
+      /* update_num_ops_stats */ true,
+      /* op_failure_scope */ nullptr);
   if (!s.ok()) {
     valid_ = false;
     status_ = s;
@@ -1265,10 +1268,13 @@ bool DBIter::Merge(const Slice* val, const Slice& user_key) {
 }
 
 bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
+  // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+  // since a failure must be propagated regardless of its value.
   Status s = MergeHelper::TimedFullMergeWithEntity(
       merge_operator_, user_key, entity, merge_context_.GetOperands(),
       &saved_value_, logger_, statistics_, clock_,
-      /* update_num_ops_stats */ true);
+      /* update_num_ops_stats */ true,
+      /* op_failure_scope */ nullptr);
   if (!s.ok()) {
     valid_ = false;
     status_ = s;
index 7c5505bd1feabd7f4e01239ad04dd0fd150926da..f8c90c15871f5083f398a0199c0d528067fa97d7 100644 (file)
@@ -202,6 +202,161 @@ TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
   VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
 }
 
+#ifndef ROCKSDB_LITE
+
+TEST_F(DBMergeOperatorTest, MergeOperatorFailsWithMustMerge) {
+  // This is like a mini-stress test dedicated to `OpFailureScope::kMustMerge`.
+  // Some or most of it might be deleted upon adding that option to the actual
+  // stress test.
+  //
+  // "k0" and "k2" are stable (uncorrupted) keys before and after a corrupted
+  // key ("k1"). The outer loop (`i`) varies which write (`j`) to "k1" triggers
+  // the corruption. Inside that loop there are three cases:
+  //
+  // - Case 1: pure `Merge()`s
+  // - Case 2: `Merge()`s on top of a `Put()`
+  // - Case 3: `Merge()`s on top of a `Delete()`
+  //
+  // For each case we test query results before flush, after flush, and after
+  // compaction, as well as cleanup after deletion+compaction. The queries
+  // expect "k0" and "k2" to always be readable. "k1" is expected to be readable
+  // only by APIs that do not require merging, such as `GetMergeOperands()`.
+  const int kNumOperands = 3;
+  Options options;
+  options.merge_operator.reset(new TestPutOperator());
+  options.env = env_;
+  Reopen(options);
+
+  for (int i = 0; i < kNumOperands; ++i) {
+    auto check_query = [&]() {
+      {
+        std::string value;
+        ASSERT_OK(db_->Get(ReadOptions(), "k0", &value));
+        ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
+        ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
+      }
+
+      {
+        std::unique_ptr<Iterator> iter;
+        iter.reset(db_->NewIterator(ReadOptions()));
+        iter->SeekToFirst();
+        ASSERT_TRUE(iter->Valid());
+        ASSERT_EQ("k0", iter->key());
+        iter->Next();
+        ASSERT_TRUE(iter->status().IsCorruption());
+
+        iter->SeekToLast();
+        ASSERT_TRUE(iter->Valid());
+        ASSERT_EQ("k2", iter->key());
+        iter->Prev();
+        ASSERT_TRUE(iter->status().IsCorruption());
+
+        iter->Seek("k2");
+        ASSERT_TRUE(iter->Valid());
+        ASSERT_EQ("k2", iter->key());
+      }
+
+      std::vector<PinnableSlice> values(kNumOperands);
+      GetMergeOperandsOptions merge_operands_info;
+      merge_operands_info.expected_max_number_of_operands = kNumOperands;
+      int num_operands_found = 0;
+      ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
+                                      "k1", values.data(), &merge_operands_info,
+                                      &num_operands_found));
+      ASSERT_EQ(kNumOperands, num_operands_found);
+      for (int j = 0; j < num_operands_found; ++j) {
+        if (i == j) {
+          ASSERT_EQ(values[j], "corrupted_must_merge");
+        } else {
+          ASSERT_EQ(values[j], "ok");
+        }
+      }
+    };
+
+    ASSERT_OK(Put("k0", "val"));
+    ASSERT_OK(Put("k2", "val"));
+
+    // Case 1
+    for (int j = 0; j < kNumOperands; ++j) {
+      if (j == i) {
+        ASSERT_OK(Merge("k1", "corrupted_must_merge"));
+      } else {
+        ASSERT_OK(Merge("k1", "ok"));
+      }
+    }
+    check_query();
+    ASSERT_OK(Flush());
+    check_query();
+    {
+      CompactRangeOptions cro;
+      cro.bottommost_level_compaction =
+          BottommostLevelCompaction::kForceOptimized;
+      ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+    }
+    check_query();
+
+    // Case 2
+    for (int j = 0; j < kNumOperands; ++j) {
+      Slice val;
+      if (j == i) {
+        val = "corrupted_must_merge";
+      } else {
+        val = "ok";
+      }
+      if (j == 0) {
+        ASSERT_OK(Put("k1", val));
+      } else {
+        ASSERT_OK(Merge("k1", val));
+      }
+    }
+    check_query();
+    ASSERT_OK(Flush());
+    check_query();
+    {
+      CompactRangeOptions cro;
+      cro.bottommost_level_compaction =
+          BottommostLevelCompaction::kForceOptimized;
+      ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+    }
+    check_query();
+
+    // Case 3
+    ASSERT_OK(Delete("k1"));
+    for (int j = 0; j < kNumOperands; ++j) {
+      if (i == j) {
+        ASSERT_OK(Merge("k1", "corrupted_must_merge"));
+      } else {
+        ASSERT_OK(Merge("k1", "ok"));
+      }
+    }
+    check_query();
+    ASSERT_OK(Flush());
+    check_query();
+    {
+      CompactRangeOptions cro;
+      cro.bottommost_level_compaction =
+          BottommostLevelCompaction::kForceOptimized;
+      ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+    }
+    check_query();
+
+    // Verify obsolete data removal still happens
+    ASSERT_OK(Delete("k0"));
+    ASSERT_OK(Delete("k1"));
+    ASSERT_OK(Delete("k2"));
+    ASSERT_EQ("NOT_FOUND", Get("k0"));
+    ASSERT_EQ("NOT_FOUND", Get("k1"));
+    ASSERT_EQ("NOT_FOUND", Get("k2"));
+    CompactRangeOptions cro;
+    cro.bottommost_level_compaction =
+        BottommostLevelCompaction::kForceOptimized;
+    ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+    ASSERT_EQ("", FilesPerLevel());
+  }
+}
+
+#endif  // ROCKSDB_LITE
+
 class MergeOperatorPinningTest : public DBMergeOperatorTest,
                                  public testing::WithParamInterface<bool> {
  public:
index c174304229ffcf031977c7e0dfbceb5050a8fa05..4e6ac10071edfe0bcc342163e94801915c5d970d 100644 (file)
@@ -873,17 +873,34 @@ class FlushCounterListener : public EventListener {
 #endif
 
 // A test merge operator mimics put but also fails if one of merge operands is
-// "corrupted".
+// "corrupted", "corrupted_try_merge", or "corrupted_must_merge".
 class TestPutOperator : public MergeOperator {
  public:
   virtual bool FullMergeV2(const MergeOperationInput& merge_in,
                            MergeOperationOutput* merge_out) const override {
+    static const std::map<std::string, MergeOperator::OpFailureScope>
+        bad_operand_to_op_failure_scope = {
+            {"corrupted", MergeOperator::OpFailureScope::kDefault},
+            {"corrupted_try_merge", MergeOperator::OpFailureScope::kTryMerge},
+            {"corrupted_must_merge",
+             MergeOperator::OpFailureScope::kMustMerge}};
+    auto check_operand =
+        [](Slice operand_val,
+           MergeOperator::OpFailureScope* op_failure_scope) -> bool {
+      auto iter = bad_operand_to_op_failure_scope.find(operand_val.ToString());
+      if (iter != bad_operand_to_op_failure_scope.end()) {
+        *op_failure_scope = iter->second;
+        return false;
+      }
+      return true;
+    };
     if (merge_in.existing_value != nullptr &&
-        *(merge_in.existing_value) == "corrupted") {
+        !check_operand(*merge_in.existing_value,
+                       &merge_out->op_failure_scope)) {
       return false;
     }
     for (auto value : merge_in.operand_list) {
-      if (value == "corrupted") {
+      if (!check_operand(value, &merge_out->op_failure_scope)) {
         return false;
       }
     }
index 98b86446882c09a97f6826b5cb315f148fa9dd05..1ea12d8c6dbf84a1b5f4ff06d561d4d445059f0f 100644 (file)
@@ -1067,11 +1067,15 @@ static bool SaveValue(void* arg, const char* entry) {
 
           if (s->value || s->columns) {
             std::string result;
+            // `op_failure_scope` (an output parameter) is not provided (set to
+            // nullptr) since a failure must be propagated regardless of its
+            // value.
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), &v,
                 merge_context->GetOperands(), &result, s->logger, s->statistics,
                 s->clock, /* result_operand */ nullptr,
-                /* update_num_ops_stats */ true);
+                /* update_num_ops_stats */ true,
+                /* op_failure_scope */ nullptr);
 
             if (s->status->ok()) {
               if (s->value) {
@@ -1130,18 +1134,26 @@ static bool SaveValue(void* arg, const char* entry) {
             *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
                 v, value_of_default);
             if (s->status->ok()) {
+              // `op_failure_scope` (an output parameter) is not provided (set
+              // to nullptr) since a failure must be propagated regardless of
+              // its value.
               *(s->status) = MergeHelper::TimedFullMerge(
                   merge_operator, s->key->user_key(), &value_of_default,
                   merge_context->GetOperands(), s->value, s->logger,
                   s->statistics, s->clock, /* result_operand */ nullptr,
-                  /* update_num_ops_stats */ true);
+                  /* update_num_ops_stats */ true,
+                  /* op_failure_scope */ nullptr);
             }
           } else if (s->columns) {
             std::string result;
+            // `op_failure_scope` (an output parameter) is not provided (set to
+            // nullptr) since a failure must be propagated regardless of its
+            // value.
             *(s->status) = MergeHelper::TimedFullMergeWithEntity(
                 merge_operator, s->key->user_key(), v,
                 merge_context->GetOperands(), &result, s->logger, s->statistics,
-                s->clock, /* update_num_ops_stats */ true);
+                s->clock, /* update_num_ops_stats */ true,
+                /* op_failure_scope */ nullptr);
 
             if (s->status->ok()) {
               *(s->status) = s->columns->SetWideColumnValue(result);
@@ -1177,11 +1189,15 @@ static bool SaveValue(void* arg, const char* entry) {
         if (*(s->merge_in_progress)) {
           if (s->value || s->columns) {
             std::string result;
+            // `op_failure_scope` (an output parameter) is not provided (set to
+            // nullptr) since a failure must be propagated regardless of its
+            // value.
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), nullptr,
                 merge_context->GetOperands(), &result, s->logger, s->statistics,
                 s->clock, /* result_operand */ nullptr,
-                /* update_num_ops_stats */ true);
+                /* update_num_ops_stats */ true,
+                /* op_failure_scope */ nullptr);
 
             if (s->status->ok()) {
               if (s->value) {
@@ -1217,11 +1233,15 @@ static bool SaveValue(void* arg, const char* entry) {
                                merge_context->GetOperandsDirectionBackward())) {
           if (s->value || s->columns) {
             std::string result;
+            // `op_failure_scope` (an output parameter) is not provided (set to
+            // nullptr) since a failure must be propagated regardless of its
+            // value.
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), nullptr,
                 merge_context->GetOperands(), &result, s->logger, s->statistics,
                 s->clock, /* result_operand */ nullptr,
-                /* update_num_ops_stats */ true);
+                /* update_num_ops_stats */ true,
+                /* op_failure_scope */ nullptr);
 
             if (s->status->ok()) {
               if (s->value) {
index 6df841012673cbbec384b28728038f8d2deecbde..e29d9c5badb86ea01da5e90041eeeed1a8b7c699 100644 (file)
@@ -56,13 +56,12 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
   }
 }
 
-Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
-                                   const Slice& key, const Slice* value,
-                                   const std::vector<Slice>& operands,
-                                   std::string* result, Logger* logger,
-                                   Statistics* statistics, SystemClock* clock,
-                                   Slice* result_operand,
-                                   bool update_num_ops_stats) {
+Status MergeHelper::TimedFullMerge(
+    const MergeOperator* merge_operator, const Slice& key, const Slice* value,
+    const std::vector<Slice>& operands, std::string* result, Logger* logger,
+    Statistics* statistics, SystemClock* clock, Slice* result_operand,
+    bool update_num_ops_stats,
+    MergeOperator::OpFailureScope* op_failure_scope) {
   assert(merge_operator != nullptr);
 
   if (operands.empty()) {
@@ -104,6 +103,14 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
                statistics ? timer.ElapsedNanos() : 0);
   }
 
+  if (op_failure_scope != nullptr) {
+    *op_failure_scope = merge_out.op_failure_scope;
+    // Apply default per merge_operator.h
+    if (*op_failure_scope == MergeOperator::OpFailureScope::kDefault) {
+      *op_failure_scope = MergeOperator::OpFailureScope::kTryMerge;
+    }
+  }
+
   if (!success) {
     RecordTick(statistics, NUMBER_MERGE_FAILURES);
     return Status::Corruption("Error: Could not perform merge.");
@@ -115,7 +122,8 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
 Status MergeHelper::TimedFullMergeWithEntity(
     const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
     const std::vector<Slice>& operands, std::string* result, Logger* logger,
-    Statistics* statistics, SystemClock* clock, bool update_num_ops_stats) {
+    Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
+    MergeOperator::OpFailureScope* op_failure_scope) {
   WideColumns base_columns;
 
   {
@@ -137,11 +145,10 @@ Status MergeHelper::TimedFullMergeWithEntity(
   std::string merge_result;
 
   {
-    constexpr Slice* result_operand = nullptr;
-
-    const Status s = TimedFullMerge(
-        merge_operator, key, &value_of_default, operands, &merge_result, logger,
-        statistics, clock, result_operand, update_num_ops_stats);
+    const Status s = TimedFullMerge(merge_operator, key, &value_of_default,
+                                    operands, &merge_result, logger, statistics,
+                                    clock, nullptr /* result_operand */,
+                                    update_num_ops_stats, op_failure_scope);
     if (!s.ok()) {
       return s;
     }
@@ -286,11 +293,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
         return s;
       }
 
-      // TODO(noetzli) If the merge operator returns false, we are currently
-      // (almost) silently dropping the put/delete. That's probably not what we
-      // want. Also if we're in compaction and it's a put, it would be nice to
-      // run compaction filter on it.
+      // TODO: if we're in compaction and it's a put, it would be nice to run
+      // compaction filter on it.
       std::string merge_result;
+      MergeOperator::OpFailureScope op_failure_scope;
 
       if (range_del_agg &&
           range_del_agg->ShouldDelete(
@@ -299,7 +305,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
                            merge_context_.GetOperands(), &merge_result, logger_,
                            stats_, clock_,
                            /* result_operand */ nullptr,
-                           /* update_num_ops_stats */ false);
+                           /* update_num_ops_stats */ false, &op_failure_scope);
       } else if (ikey.type == kTypeValue) {
         const Slice val = iter->value();
 
@@ -307,7 +313,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
                            merge_context_.GetOperands(), &merge_result, logger_,
                            stats_, clock_,
                            /* result_operand */ nullptr,
-                           /* update_num_ops_stats */ false);
+                           /* update_num_ops_stats */ false, &op_failure_scope);
       } else if (ikey.type == kTypeBlobIndex) {
         BlobIndex blob_index;
 
@@ -341,18 +347,18 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
                            merge_context_.GetOperands(), &merge_result, logger_,
                            stats_, clock_,
                            /* result_operand */ nullptr,
-                           /* update_num_ops_stats */ false);
+                           /* update_num_ops_stats */ false, &op_failure_scope);
       } else if (ikey.type == kTypeWideColumnEntity) {
         s = TimedFullMergeWithEntity(
             user_merge_operator_, ikey.user_key, iter->value(),
             merge_context_.GetOperands(), &merge_result, logger_, stats_,
-            clock_, /* update_num_ops_stats */ false);
+            clock_, /* update_num_ops_stats */ false, &op_failure_scope);
       } else {
         s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr,
                            merge_context_.GetOperands(), &merge_result, logger_,
                            stats_, clock_,
                            /* result_operand */ nullptr,
-                           /* update_num_ops_stats */ false);
+                           /* update_num_ops_stats */ false, &op_failure_scope);
       }
 
       // We store the result in keys_.back() and operands_.back()
@@ -368,10 +374,16 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
         merge_context_.Clear();
         keys_.emplace_front(std::move(original_key));
         merge_context_.PushOperand(merge_result);
-      }
 
-      // move iter to the next entry
-      iter->Next();
+        // move iter to the next entry
+        iter->Next();
+      } else if (op_failure_scope ==
+                 MergeOperator::OpFailureScope::kMustMerge) {
+        // Change to `Status::MergeInProgress()` to denote output consists of
+        // merge operands only. Leave `iter` at the non-merge entry so it will
+        // be output after.
+        s = Status::MergeInProgress();
+      }
       return s;
     } else {
       // hit a merge
@@ -482,10 +494,12 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
     assert(merge_context_.GetNumOperands() >= 1);
     assert(merge_context_.GetNumOperands() == keys_.size());
     std::string merge_result;
-    s = TimedFullMerge(
-        user_merge_operator_, orig_ikey.user_key, nullptr,
-        merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_,
-        /* result_operand */ nullptr, /* update_num_ops_stats */ false);
+    MergeOperator::OpFailureScope op_failure_scope;
+    s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
+                       merge_context_.GetOperands(), &merge_result, logger_,
+                       stats_, clock_,
+                       /* result_operand */ nullptr,
+                       /* update_num_ops_stats */ false, &op_failure_scope);
     if (s.ok()) {
       // The original key encountered
       // We are certain that keys_ is not empty here (see assertions couple of
@@ -497,6 +511,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
       merge_context_.Clear();
       keys_.emplace_front(std::move(original_key));
       merge_context_.PushOperand(merge_result);
+    } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) {
+      // Change to `Status::MergeInProgress()` to denote output consists of
+      // merge operands only.
+      s = Status::MergeInProgress();
     }
   } else {
     // We haven't seen the beginning of the key nor a Put/Delete.
index 790ec6239055c61017fc7edc149c1be8a2ec6004..7f624b74328df928c0b449da49555f00731126a2 100644 (file)
@@ -48,19 +48,22 @@ class MergeHelper {
   // the latency is sensitive.
   // Returns one of the following statuses:
   // - OK: Entries were successfully merged.
-  // - Corruption: Merge operator reported unsuccessful merge.
+  // - Corruption: Merge operator reported unsuccessful merge. The scope of the
+  //   damage will be stored in `*op_failure_scope` when `op_failure_scope` is
+  //   not nullptr
   static Status TimedFullMerge(const MergeOperator* merge_operator,
                                const Slice& key, const Slice* value,
                                const std::vector<Slice>& operands,
                                std::string* result, Logger* logger,
                                Statistics* statistics, SystemClock* clock,
-                               Slice* result_operand,
-                               bool update_num_ops_stats);
+                               Slice* result_operand, bool update_num_ops_stats,
+                               MergeOperator::OpFailureScope* op_failure_scope);
 
   static Status TimedFullMergeWithEntity(
       const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
       const std::vector<Slice>& operands, std::string* result, Logger* logger,
-      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats);
+      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
+      MergeOperator::OpFailureScope* op_failure_scope);
 
   // During compaction, merge entries until we hit
   //     - a corrupted key
@@ -69,6 +72,12 @@ class MergeHelper {
   //     - a specific sequence number (snapshot boundary),
   //     - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
   //  or - the end of iteration
+  //
+  // The result(s) of the merge can be accessed in `MergeHelper::keys()` and
+  // `MergeHelper::values()`, which are invalidated the next time `MergeUntil()`
+  // is called. `MergeOutputIterator` is specially designed to iterate the
+  // results of a `MergeHelper`'s most recent `MergeUntil()`.
+  //
   // iter: (IN)  points to the first merge type entry
   //       (OUT) points to the first entry not included in the merge process
   // range_del_agg: (IN) filters merge operands covered by range tombstones.
@@ -85,8 +94,7 @@ class MergeHelper {
   //
   // Returns one of the following statuses:
   // - OK: Entries were successfully merged.
-  // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
-  //   of key's history. Output consists of merge operands only.
+  // - MergeInProgress: Output consists of merge operands only.
   // - Corruption: Merge operator reported unsuccessful merge or a corrupted
   //   key has been encountered and not expected (applies only when compiling
   //   with asserts removed).
index f29ffbead368d81a0fc847a0ea67aa66c3ba135b..267f0c2d56687d97ca8bd54a6a0e8b97a4fbccbe 100644 (file)
@@ -2390,10 +2390,13 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
     // do a final merge of nullptr and operands;
     if (value || columns) {
       std::string result;
+      // `op_failure_scope` (an output parameter) is not provided (set to
+      // nullptr) since a failure must be propagated regardless of its value.
       *status = MergeHelper::TimedFullMerge(
           merge_operator_, user_key, nullptr, merge_context->GetOperands(),
           &result, info_log_, db_statistics_, clock_,
-          /* result_operand */ nullptr, /* update_num_ops_stats */ true);
+          /* result_operand */ nullptr, /* update_num_ops_stats */ true,
+          /* op_failure_scope */ nullptr);
       if (status->ok()) {
         if (LIKELY(value != nullptr)) {
           *(value->GetSelf()) = std::move(result);
@@ -2638,10 +2641,13 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
       // do a final merge of nullptr and operands;
       std::string* str_value =
           iter->value != nullptr ? iter->value->GetSelf() : nullptr;
+      // `op_failure_scope` (an output parameter) is not provided (set to
+      // nullptr) since a failure must be propagated regardless of its value.
       *status = MergeHelper::TimedFullMerge(
           merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
           str_value, info_log_, db_statistics_, clock_,
-          /* result_operand */ nullptr, /* update_num_ops_stats */ true);
+          /* result_operand */ nullptr, /* update_num_ops_stats */ true,
+          /* op_failure_scope */ nullptr);
       if (LIKELY(iter->value != nullptr)) {
         iter->value->PinSelf();
         range->AddValueSize(iter->value->size());
index 796697cfc0774c3e34ee2fb15e3b6c90ba19ec8b..5f5c0bfcd21f1d1e8dd574a178bbd7dac4e1b079 100644 (file)
@@ -2499,11 +2499,14 @@ class MemTableInserter : public WriteBatch::Handler {
         assert(merge_operator);
 
         std::string new_value;
+        // `op_failure_scope` (an output parameter) is not provided (set to
+        // nullptr) since a failure must be propagated regardless of its value.
         Status merge_status = MergeHelper::TimedFullMerge(
             merge_operator, key, &get_value_slice, {value}, &new_value,
             moptions->info_log, moptions->statistics,
             SystemClock::Default().get(), /* result_operand */ nullptr,
-            /* update_num_ops_stats */ false);
+            /* update_num_ops_stats */ false,
+            /* op_failure_scope */ nullptr);
 
         if (!merge_status.ok()) {
           // Failed to merge!
index ae795220ba3af25429976a8ea438028f1dfae85e..077130475dab9ac7bb3f4b26cbea7a5f64931b6a 100644 (file)
@@ -104,6 +104,13 @@ class MergeOperator : public Customizable {
     Logger* logger;
   };
 
+  enum class OpFailureScope {
+    kDefault,
+    kTryMerge,
+    kMustMerge,
+    kOpFailureScopeMax,
+  };
+
   struct MergeOperationOutput {
     explicit MergeOperationOutput(std::string& _new_value,
                                   Slice& _existing_operand)
@@ -115,6 +122,20 @@ class MergeOperator : public Customizable {
     // client can set this field to the operand (or existing_value) instead of
     // using new_value.
     Slice& existing_operand;
+    // Indicates the blast radius of the failure. It is only meaningful to
+    // provide a failure scope when returning `false` from the API populating
+    // the `MergeOperationOutput`. Currently RocksDB operations handle these
+    // values as follows:
+    //
+    // - `OpFailureScope::kDefault`: fallback to default
+    //   (`OpFailureScope::kTryMerge`)
+    // - `OpFailureScope::kTryMerge`: operations that try to merge that key will
+    //   fail. This includes flush and compaction, which puts the DB in
+    //   read-only mode.
+    // - `OpFailureScope::kMustMerge`: operations that must merge that key will
+    //   fail (e.g., `Get()`, `MultiGet()`, iteration). Flushes/compactions can
+    //   still proceed by copying the original input operands to the output.
+    OpFailureScope op_failure_scope = OpFailureScope::kDefault;
   };
 
   // This function applies a stack of merge operands in chronological order
index 69e7527147588174b0285c51de3654d01d31ebf6..2b5a7ae65966b7ea4d56303536a46df099474ffd 100644 (file)
@@ -469,10 +469,13 @@ void GetContext::Merge(const Slice* value) {
   assert(!pinnable_val_ || !columns_);
 
   std::string result;
+  // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+  // since a failure must be propagated regardless of its value.
   const Status s = MergeHelper::TimedFullMerge(
       merge_operator_, user_key_, value, merge_context_->GetOperands(), &result,
       logger_, statistics_, clock_, /* result_operand */ nullptr,
-      /* update_num_ops_stats */ true);
+      /* update_num_ops_stats */ true,
+      /* op_failure_scope */ nullptr);
   if (!s.ok()) {
     state_ = kCorrupt;
     return;
@@ -505,11 +508,14 @@ void GetContext::MergeWithEntity(Slice entity) {
     }
 
     {
+      // `op_failure_scope` (an output parameter) is not provided (set to
+      // nullptr) since a failure must be propagated regardless of its value.
       const Status s = MergeHelper::TimedFullMerge(
           merge_operator_, user_key_, &value_of_default,
           merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_,
           statistics_, clock_, /* result_operand */ nullptr,
-          /* update_num_ops_stats */ true);
+          /* update_num_ops_stats */ true,
+          /* op_failure_scope */ nullptr);
       if (!s.ok()) {
         state_ = kCorrupt;
         return;
@@ -523,9 +529,12 @@ void GetContext::MergeWithEntity(Slice entity) {
   std::string result;
 
   {
+    // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+    // since a failure must be propagated regardless of its value.
     const Status s = MergeHelper::TimedFullMergeWithEntity(
         merge_operator_, user_key_, entity, merge_context_->GetOperands(),
-        &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true);
+        &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true,
+        /* op_failure_scope */ nullptr);
     if (!s.ok()) {
       state_ = kCorrupt;
       return;
index 6ec9d87b08fc14f9bd557b086d3f053d907f67eb..3bfc66649a7664649dc0af4c3253682bf2434309 100644 (file)
@@ -68,6 +68,7 @@ bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
                             merge_in.logger),
         &user_merge_out);
   }
+  merge_out->op_failure_scope = user_merge_out.op_failure_scope;
 
   // Return false if the user merge operator returned false
   if (!good) {
index 3c9205bf78f1e5f248106a29d0d447b152ee29de..5ae4df7dd05ef21ad9d80f4782ad72516f6df32b 100644 (file)
@@ -664,25 +664,34 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
       Statistics* statistics = immutable_db_options.statistics.get();
       Logger* logger = immutable_db_options.info_log.get();
       SystemClock* clock = immutable_db_options.clock;
+      // `op_failure_scope` (an output parameter) is not provided (set to
+      // nullptr) since a failure must be propagated regardless of its value.
       return MergeHelper::TimedFullMerge(
           merge_operator, key, value, context.GetOperands(), result, logger,
           statistics, clock, /* result_operand */ nullptr,
-          /* update_num_ops_stats */ false);
+          /* update_num_ops_stats */ false,
+          /* op_failure_scope */ nullptr);
     } else if (db_options_ != nullptr) {
       Statistics* statistics = db_options_->statistics.get();
       Env* env = db_options_->env;
       Logger* logger = db_options_->info_log.get();
       SystemClock* clock = env->GetSystemClock().get();
+      // `op_failure_scope` (an output parameter) is not provided (set to
+      // nullptr) since a failure must be propagated regardless of its value.
       return MergeHelper::TimedFullMerge(
           merge_operator, key, value, context.GetOperands(), result, logger,
           statistics, clock, /* result_operand */ nullptr,
-          /* update_num_ops_stats */ false);
+          /* update_num_ops_stats */ false,
+          /* op_failure_scope */ nullptr);
     } else {
       const auto cf_opts = cfh->cfd()->ioptions();
+      // `op_failure_scope` (an output parameter) is not provided (set to
+      // nullptr) since a failure must be propagated regardless of its value.
       return MergeHelper::TimedFullMerge(
           merge_operator, key, value, context.GetOperands(), result,
           cf_opts->logger, cf_opts->stats, cf_opts->clock,
-          /* result_operand */ nullptr, /* update_num_ops_stats */ false);
+          /* result_operand */ nullptr, /* update_num_ops_stats */ false,
+          /* op_failure_scope */ nullptr);
     }
   } else {
     return Status::InvalidArgument("Must provide a column_family");