]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Revisit the interface of MergeHelper::TimedFullMerge(WithEntity) (#10932)
authorLevi Tamasi <ltamasi@fb.com>
Wed, 9 Nov 2022 20:54:05 +0000 (12:54 -0800)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 9 Nov 2022 20:54:05 +0000 (12:54 -0800)
Summary:
The patch refines/reworks `MergeHelper::TimedFullMerge(WithEntity)`
a bit in two ways. First, it eliminates the recently introduced `TimedFullMerge`
overload, which makes the responsibilities clearer by making sure the query
result (`value` for `Get`, `columns` for `GetEntity`) is set uniformly in
`SaveValue` and `GetContext`. Second, it changes the interface of
`TimedFullMergeWithEntity` so it exposes its result in a serialized form; this
is a more decoupled design which will come in handy when adding support
for `Merge` with wide-column entities to `DBIter`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10932

Test Plan: `make check`

Reviewed By: akankshamahajan15

Differential Revision: D41129399

Pulled By: ltamasi

fbshipit-source-id: 69d8da358c77d4fc7e8c40f4dafc2c129a710677

db/memtable.cc
db/merge_helper.cc
db/merge_helper.h
db/version_set.cc
table/get_context.cc

index 829a3c09956e33cdd8ccb9b28a3e5917e80c0f42..98b86446882c09a97f6826b5cb315f148fa9dd05 100644 (file)
@@ -1066,11 +1066,21 @@ static bool SaveValue(void* arg, const char* entry) {
           assert(s->do_merge);
 
           if (s->value || s->columns) {
+            std::string result;
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), &v,
-                merge_context->GetOperands(), s->value, s->columns, s->logger,
-                s->statistics, s->clock, /* result_operand */ nullptr,
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* result_operand */ nullptr,
                 /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              if (s->value) {
+                *(s->value) = std::move(result);
+              } else {
+                assert(s->columns);
+                s->columns->SetPlainValue(result);
+              }
+            }
           }
         } else if (s->value) {
           s->value->assign(v.data(), v.size());
@@ -1115,11 +1125,27 @@ static bool SaveValue(void* arg, const char* entry) {
         } else if (*(s->merge_in_progress)) {
           assert(s->do_merge);
 
-          if (s->value || s->columns) {
+          if (s->value) {
+            Slice value_of_default;
+            *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+                v, value_of_default);
+            if (s->status->ok()) {
+              *(s->status) = MergeHelper::TimedFullMerge(
+                  merge_operator, s->key->user_key(), &value_of_default,
+                  merge_context->GetOperands(), s->value, s->logger,
+                  s->statistics, s->clock, /* result_operand */ nullptr,
+                  /* update_num_ops_stats */ true);
+            }
+          } else if (s->columns) {
+            std::string result;
             *(s->status) = MergeHelper::TimedFullMergeWithEntity(
                 merge_operator, s->key->user_key(), v,
-                merge_context->GetOperands(), s->value, s->columns, s->logger,
-                s->statistics, s->clock, /* update_num_ops_stats */ true);
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              *(s->status) = s->columns->SetWideColumnValue(result);
+            }
           }
         } else if (s->value) {
           Slice value_of_default;
@@ -1150,11 +1176,21 @@ static bool SaveValue(void* arg, const char* entry) {
       case kTypeRangeDeletion: {
         if (*(s->merge_in_progress)) {
           if (s->value || s->columns) {
+            std::string result;
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), nullptr,
-                merge_context->GetOperands(), s->value, s->columns, s->logger,
-                s->statistics, s->clock, /* result_operand */ nullptr,
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* result_operand */ nullptr,
                 /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              if (s->value) {
+                *(s->value) = std::move(result);
+              } else {
+                assert(s->columns);
+                s->columns->SetPlainValue(result);
+              }
+            }
           }
         } else {
           *(s->status) = Status::NotFound();
@@ -1180,11 +1216,21 @@ static bool SaveValue(void* arg, const char* entry) {
         if (s->do_merge && merge_operator->ShouldMerge(
                                merge_context->GetOperandsDirectionBackward())) {
           if (s->value || s->columns) {
+            std::string result;
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), nullptr,
-                merge_context->GetOperands(), s->value, s->columns, s->logger,
-                s->statistics, s->clock, /* result_operand */ nullptr,
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* result_operand */ nullptr,
                 /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              if (s->value) {
+                *(s->value) = std::move(result);
+              } else {
+                assert(s->columns);
+                s->columns->SetPlainValue(result);
+              }
+            }
           }
 
           *(s->found_final_value) = true;
index 5a7c5765e05d40579405c6831380179d444a673d..2d37d6d3320cfab3f29ab107c250e662ccafed0d 100644 (file)
@@ -112,44 +112,10 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
   return Status::OK();
 }
 
-Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
-                                   const Slice& key, const Slice* base_value,
-                                   const std::vector<Slice>& operands,
-                                   std::string* value,
-                                   PinnableWideColumns* columns, Logger* logger,
-                                   Statistics* statistics, SystemClock* clock,
-                                   Slice* result_operand,
-                                   bool update_num_ops_stats) {
-  assert(value || columns);
-  assert(!value || !columns);
-
-  std::string result;
-  const Status s =
-      TimedFullMerge(merge_operator, key, base_value, operands, &result, logger,
-                     statistics, clock, result_operand, update_num_ops_stats);
-  if (!s.ok()) {
-    return s;
-  }
-
-  if (value) {
-    *value = std::move(result);
-    return Status::OK();
-  }
-
-  assert(columns);
-  columns->SetPlainValue(result);
-
-  return Status::OK();
-}
-
 Status MergeHelper::TimedFullMergeWithEntity(
     const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
-    const std::vector<Slice>& operands, std::string* value,
-    PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
-    SystemClock* clock, bool update_num_ops_stats) {
-  assert(value || columns);
-  assert(!value || !columns);
-
+    const std::vector<Slice>& operands, std::string* result, Logger* logger,
+    Statistics* statistics, SystemClock* clock, bool update_num_ops_stats) {
   WideColumns base_columns;
 
   {
@@ -168,44 +134,35 @@ Status MergeHelper::TimedFullMergeWithEntity(
     value_of_default = base_columns[0].value();
   }
 
-  std::string result;
+  std::string merge_result;
 
   {
     constexpr Slice* result_operand = nullptr;
 
     const Status s = TimedFullMerge(
-        merge_operator, key, &value_of_default, operands, &result, logger,
+        merge_operator, key, &value_of_default, operands, &merge_result, logger,
         statistics, clock, result_operand, update_num_ops_stats);
     if (!s.ok()) {
       return s;
     }
   }
 
-  if (value) {
-    *value = std::move(result);
-    return Status::OK();
-  }
-
-  assert(columns);
-
-  std::string output;
-
   if (has_default_column) {
-    base_columns[0].value() = result;
+    base_columns[0].value() = merge_result;
 
-    const Status s = WideColumnSerialization::Serialize(base_columns, output);
+    const Status s = WideColumnSerialization::Serialize(base_columns, *result);
     if (!s.ok()) {
       return s;
     }
   } else {
     const Status s =
-        WideColumnSerialization::Serialize(result, base_columns, output);
+        WideColumnSerialization::Serialize(merge_result, base_columns, *result);
     if (!s.ok()) {
       return s;
     }
   }
 
-  return columns->SetWideColumnValue(output);
+  return Status::OK();
 }
 
 // PRE:  iter points to the first merge type entry
index 923850a0898a51cede5873d209e3f11bb05a2b62..790ec6239055c61017fc7edc149c1be8a2ec6004 100644 (file)
@@ -57,19 +57,10 @@ class MergeHelper {
                                Slice* result_operand,
                                bool update_num_ops_stats);
 
-  static Status TimedFullMerge(const MergeOperator* merge_operator,
-                               const Slice& key, const Slice* base_value,
-                               const std::vector<Slice>& operands,
-                               std::string* value, PinnableWideColumns* columns,
-                               Logger* logger, Statistics* statistics,
-                               SystemClock* clock, Slice* result_operand,
-                               bool update_num_ops_stats);
-
   static Status TimedFullMergeWithEntity(
       const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
-      const std::vector<Slice>& operands, std::string* value,
-      PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
-      SystemClock* clock, bool update_num_ops_stats);
+      const std::vector<Slice>& operands, std::string* result, Logger* logger,
+      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats);
 
   // During compaction, merge entries until we hit
   //     - a corrupted key
index aa0dc394ee32487384566d0a24c51405541ae4ee..cac3a0a9e1f32d3dfd383da79d1dd3c65c38815a 100644 (file)
@@ -2384,15 +2384,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
     }
     // merge_operands are in saver and we hit the beginning of the key history
     // do a final merge of nullptr and operands;
-    std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
-    if (str_value || columns) {
+    if (value || columns) {
+      std::string result;
       *status = MergeHelper::TimedFullMerge(
           merge_operator_, user_key, nullptr, merge_context->GetOperands(),
-          str_value, columns, info_log_, db_statistics_, clock_,
+          &result, info_log_, db_statistics_, clock_,
           /* result_operand */ nullptr, /* update_num_ops_stats */ true);
       if (status->ok()) {
         if (LIKELY(value != nullptr)) {
+          *(value->GetSelf()) = std::move(result);
           value->PinSelf();
+        } else {
+          assert(columns != nullptr);
+          columns->SetPlainValue(result);
         }
       }
     }
index b2daa1789f75ea3fa2ba45a88c4f858336f56e70..69e7527147588174b0285c51de3654d01d31ebf6 100644 (file)
@@ -468,10 +468,10 @@ void GetContext::Merge(const Slice* value) {
   assert(do_merge_);
   assert(!pinnable_val_ || !columns_);
 
+  std::string result;
   const Status s = MergeHelper::TimedFullMerge(
-      merge_operator_, user_key_, value, merge_context_->GetOperands(),
-      pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
-      statistics_, clock_, /* result_operand */ nullptr,
+      merge_operator_, user_key_, value, merge_context_->GetOperands(), &result,
+      logger_, statistics_, clock_, /* result_operand */ nullptr,
       /* update_num_ops_stats */ true);
   if (!s.ok()) {
     state_ = kCorrupt;
@@ -479,25 +479,66 @@ void GetContext::Merge(const Slice* value) {
   }
 
   if (LIKELY(pinnable_val_ != nullptr)) {
+    *(pinnable_val_->GetSelf()) = std::move(result);
     pinnable_val_->PinSelf();
+    return;
   }
+
+  assert(columns_);
+  columns_->SetPlainValue(result);
 }
 
 void GetContext::MergeWithEntity(Slice entity) {
   assert(do_merge_);
   assert(!pinnable_val_ || !columns_);
 
-  const Status s = MergeHelper::TimedFullMergeWithEntity(
-      merge_operator_, user_key_, entity, merge_context_->GetOperands(),
-      pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
-      statistics_, clock_, /* update_num_ops_stats */ true);
-  if (!s.ok()) {
-    state_ = kCorrupt;
+  if (LIKELY(pinnable_val_ != nullptr)) {
+    Slice value_of_default;
+
+    {
+      const Status s = WideColumnSerialization::GetValueOfDefaultColumn(
+          entity, value_of_default);
+      if (!s.ok()) {
+        state_ = kCorrupt;
+        return;
+      }
+    }
+
+    {
+      const Status s = MergeHelper::TimedFullMerge(
+          merge_operator_, user_key_, &value_of_default,
+          merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_,
+          statistics_, clock_, /* result_operand */ nullptr,
+          /* update_num_ops_stats */ true);
+      if (!s.ok()) {
+        state_ = kCorrupt;
+        return;
+      }
+    }
+
+    pinnable_val_->PinSelf();
     return;
   }
 
-  if (LIKELY(pinnable_val_ != nullptr)) {
-    pinnable_val_->PinSelf();
+  std::string result;
+
+  {
+    const Status s = MergeHelper::TimedFullMergeWithEntity(
+        merge_operator_, user_key_, entity, merge_context_->GetOperands(),
+        &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true);
+    if (!s.ok()) {
+      state_ = kCorrupt;
+      return;
+    }
+  }
+
+  {
+    assert(columns_);
+    const Status s = columns_->SetWideColumnValue(result);
+    if (!s.ok()) {
+      state_ = kCorrupt;
+      return;
+    }
   }
 }