### Public API Changes
* Substantial changes have been made to the Cache class to support internal development goals. Direct use of Cache class members is discouraged and further breaking modifications are expected in the future. SecondaryCache has some related changes and implementations will need to be updated. (Unlike Cache, SecondaryCache is still intended to support user implementations, and disruptive changes will be avoided.) (#10975)
+* Add `MergeOperationOutput::op_failure_scope` for merge operator users to control the blast radius of merge operator failures. Existing merge operator users do not need to make any change to preserve the old behavior
### Performance Improvements
* Updated xxHash source code, which should improve kXXH3 checksum speed, at least on ARM (#11098).
timestamp_size_ == full_history_ts_low_->size());
#endif
input_.SetPinnedItersMgr(&pinned_iters_mgr_);
+ // The default `merge_until_status_` does not need to be checked since it is
+ // overwritten as soon as `MergeUntil()` is called
+ merge_until_status_.PermitUncheckedError();
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
}
ikey_.user_key = current_key_.GetUserKey();
validity_info_.SetValid(ValidContext::kMerge1);
} else {
+ if (merge_until_status_.IsMergeInProgress()) {
+ // `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
+ // produced only merge operands. Those merge operands were accessed and
+ // written out using `merge_out_iter_`. Since `merge_out_iter_` is
+ // exhausted at this point, all merge operands have been written out.
+ //
+ // Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
+ // needs to be written out. Normally, `CompactionIterator` would skip it
+ // on the basis that it has already output something in the same
+ // snapshot stripe. To prevent this, we reset `has_current_user_key_` to
+ // trick the future iteration from finding out the snapshot stripe is
+ // unchanged.
+ has_current_user_key_ = false;
+ }
// We consumed all pinned merge operands, release pinned iterators
pinned_iters_mgr_.ReleasePinnedData();
// MergeHelper moves the iterator to the first record after the merged
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
- Status s = merge_helper_->MergeUntil(
+ merge_until_status_ = merge_helper_->MergeUntil(
&input_, range_del_agg_, prev_snapshot, bottommost_level_,
allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
prefetch_buffers_.get(), &iter_stats_);
merge_out_iter_.SeekToFirst();
- if (!s.ok() && !s.IsMergeInProgress()) {
- status_ = s;
+ if (!merge_until_status_.ok() &&
+ !merge_until_status_.IsMergeInProgress()) {
+ status_ = merge_until_status_;
return;
} else if (merge_out_iter_.Valid()) {
// NOTE: key, value, and ikey_ refer to old entries.
bool clear_and_output_next_key_ = false;
MergeOutputIterator merge_out_iter_;
+ Status merge_until_status_;
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
}
bool DBIter::Merge(const Slice* val, const Slice& user_key) {
+ // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+ // since a failure must be propagated regardless of its value.
Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_, &pinned_value_,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (!s.ok()) {
valid_ = false;
status_ = s;
}
bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
+ // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+ // since a failure must be propagated regardless of its value.
Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key, entity, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (!s.ok()) {
valid_ = false;
status_ = s;
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
}
+#ifndef ROCKSDB_LITE
+
+TEST_F(DBMergeOperatorTest, MergeOperatorFailsWithMustMerge) {
+ // This is like a mini-stress test dedicated to `OpFailureScope::kMustMerge`.
+ // Some or most of it might be deleted upon adding that option to the actual
+ // stress test.
+ //
+ // "k0" and "k2" are stable (uncorrupted) keys before and after a corrupted
+ // key ("k1"). The outer loop (`i`) varies which write (`j`) to "k1" triggers
+ // the corruption. Inside that loop there are three cases:
+ //
+ // - Case 1: pure `Merge()`s
+ // - Case 2: `Merge()`s on top of a `Put()`
+ // - Case 3: `Merge()`s on top of a `Delete()`
+ //
+ // For each case we test query results before flush, after flush, and after
+ // compaction, as well as cleanup after deletion+compaction. The queries
+ // expect "k0" and "k2" to always be readable. "k1" is expected to be readable
+ // only by APIs that do not require merging, such as `GetMergeOperands()`.
+ const int kNumOperands = 3;
+ Options options;
+ options.merge_operator.reset(new TestPutOperator());
+ options.env = env_;
+ Reopen(options);
+
+ for (int i = 0; i < kNumOperands; ++i) {
+ auto check_query = [&]() {
+ {
+ std::string value;
+ ASSERT_OK(db_->Get(ReadOptions(), "k0", &value));
+ ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
+ ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
+ }
+
+ {
+ std::unique_ptr<Iterator> iter;
+ iter.reset(db_->NewIterator(ReadOptions()));
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("k0", iter->key());
+ iter->Next();
+ ASSERT_TRUE(iter->status().IsCorruption());
+
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("k2", iter->key());
+ iter->Prev();
+ ASSERT_TRUE(iter->status().IsCorruption());
+
+ iter->Seek("k2");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("k2", iter->key());
+ }
+
+ std::vector<PinnableSlice> values(kNumOperands);
+ GetMergeOperandsOptions merge_operands_info;
+ merge_operands_info.expected_max_number_of_operands = kNumOperands;
+ int num_operands_found = 0;
+ ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
+ "k1", values.data(), &merge_operands_info,
+ &num_operands_found));
+ ASSERT_EQ(kNumOperands, num_operands_found);
+ for (int j = 0; j < num_operands_found; ++j) {
+ if (i == j) {
+ ASSERT_EQ(values[j], "corrupted_must_merge");
+ } else {
+ ASSERT_EQ(values[j], "ok");
+ }
+ }
+ };
+
+ ASSERT_OK(Put("k0", "val"));
+ ASSERT_OK(Put("k2", "val"));
+
+ // Case 1
+ for (int j = 0; j < kNumOperands; ++j) {
+ if (j == i) {
+ ASSERT_OK(Merge("k1", "corrupted_must_merge"));
+ } else {
+ ASSERT_OK(Merge("k1", "ok"));
+ }
+ }
+ check_query();
+ ASSERT_OK(Flush());
+ check_query();
+ {
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction =
+ BottommostLevelCompaction::kForceOptimized;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ }
+ check_query();
+
+ // Case 2
+ for (int j = 0; j < kNumOperands; ++j) {
+ Slice val;
+ if (j == i) {
+ val = "corrupted_must_merge";
+ } else {
+ val = "ok";
+ }
+ if (j == 0) {
+ ASSERT_OK(Put("k1", val));
+ } else {
+ ASSERT_OK(Merge("k1", val));
+ }
+ }
+ check_query();
+ ASSERT_OK(Flush());
+ check_query();
+ {
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction =
+ BottommostLevelCompaction::kForceOptimized;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ }
+ check_query();
+
+ // Case 3
+ ASSERT_OK(Delete("k1"));
+ for (int j = 0; j < kNumOperands; ++j) {
+ if (i == j) {
+ ASSERT_OK(Merge("k1", "corrupted_must_merge"));
+ } else {
+ ASSERT_OK(Merge("k1", "ok"));
+ }
+ }
+ check_query();
+ ASSERT_OK(Flush());
+ check_query();
+ {
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction =
+ BottommostLevelCompaction::kForceOptimized;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ }
+ check_query();
+
+ // Verify obsolete data removal still happens
+ ASSERT_OK(Delete("k0"));
+ ASSERT_OK(Delete("k1"));
+ ASSERT_OK(Delete("k2"));
+ ASSERT_EQ("NOT_FOUND", Get("k0"));
+ ASSERT_EQ("NOT_FOUND", Get("k1"));
+ ASSERT_EQ("NOT_FOUND", Get("k2"));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction =
+ BottommostLevelCompaction::kForceOptimized;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_EQ("", FilesPerLevel());
+ }
+}
+
+#endif // ROCKSDB_LITE
+
class MergeOperatorPinningTest : public DBMergeOperatorTest,
public testing::WithParamInterface<bool> {
public:
#endif
// A test merge operator mimics put but also fails if one of merge operands is
-// "corrupted".
+// "corrupted", "corrupted_try_merge", or "corrupted_must_merge".
class TestPutOperator : public MergeOperator {
public:
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
+ static const std::map<std::string, MergeOperator::OpFailureScope>
+ bad_operand_to_op_failure_scope = {
+ {"corrupted", MergeOperator::OpFailureScope::kDefault},
+ {"corrupted_try_merge", MergeOperator::OpFailureScope::kTryMerge},
+ {"corrupted_must_merge",
+ MergeOperator::OpFailureScope::kMustMerge}};
+ auto check_operand =
+ [](Slice operand_val,
+ MergeOperator::OpFailureScope* op_failure_scope) -> bool {
+ auto iter = bad_operand_to_op_failure_scope.find(operand_val.ToString());
+ if (iter != bad_operand_to_op_failure_scope.end()) {
+ *op_failure_scope = iter->second;
+ return false;
+ }
+ return true;
+ };
if (merge_in.existing_value != nullptr &&
- *(merge_in.existing_value) == "corrupted") {
+ !check_operand(*merge_in.existing_value,
+ &merge_out->op_failure_scope)) {
return false;
}
for (auto value : merge_in.operand_list) {
- if (value == "corrupted") {
+ if (!check_operand(value, &merge_out->op_failure_scope)) {
return false;
}
}
if (s->value || s->columns) {
std::string result;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its
+ // value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* result_operand */ nullptr,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (s->status->ok()) {
if (s->value) {
*(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
v, value_of_default);
if (s->status->ok()) {
+ // `op_failure_scope` (an output parameter) is not provided (set
+ // to nullptr) since a failure must be propagated regardless of
+ // its value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &value_of_default,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->clock, /* result_operand */ nullptr,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
}
} else if (s->columns) {
std::string result;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its
+ // value.
*(s->status) = MergeHelper::TimedFullMergeWithEntity(
merge_operator, s->key->user_key(), v,
merge_context->GetOperands(), &result, s->logger, s->statistics,
- s->clock, /* update_num_ops_stats */ true);
+ s->clock, /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (s->status->ok()) {
*(s->status) = s->columns->SetWideColumnValue(result);
if (*(s->merge_in_progress)) {
if (s->value || s->columns) {
std::string result;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its
+ // value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* result_operand */ nullptr,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (s->status->ok()) {
if (s->value) {
merge_context->GetOperandsDirectionBackward())) {
if (s->value || s->columns) {
std::string result;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its
+ // value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* result_operand */ nullptr,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (s->status->ok()) {
if (s->value) {
}
}
-Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
- const Slice& key, const Slice* value,
- const std::vector<Slice>& operands,
- std::string* result, Logger* logger,
- Statistics* statistics, SystemClock* clock,
- Slice* result_operand,
- bool update_num_ops_stats) {
+Status MergeHelper::TimedFullMerge(
+ const MergeOperator* merge_operator, const Slice& key, const Slice* value,
+ const std::vector<Slice>& operands, std::string* result, Logger* logger,
+ Statistics* statistics, SystemClock* clock, Slice* result_operand,
+ bool update_num_ops_stats,
+ MergeOperator::OpFailureScope* op_failure_scope) {
assert(merge_operator != nullptr);
if (operands.empty()) {
statistics ? timer.ElapsedNanos() : 0);
}
+ if (op_failure_scope != nullptr) {
+ *op_failure_scope = merge_out.op_failure_scope;
+ // Apply default per merge_operator.h
+ if (*op_failure_scope == MergeOperator::OpFailureScope::kDefault) {
+ *op_failure_scope = MergeOperator::OpFailureScope::kTryMerge;
+ }
+ }
+
if (!success) {
RecordTick(statistics, NUMBER_MERGE_FAILURES);
return Status::Corruption("Error: Could not perform merge.");
Status MergeHelper::TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* result, Logger* logger,
- Statistics* statistics, SystemClock* clock, bool update_num_ops_stats) {
+ Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
+ MergeOperator::OpFailureScope* op_failure_scope) {
WideColumns base_columns;
{
std::string merge_result;
{
- constexpr Slice* result_operand = nullptr;
-
- const Status s = TimedFullMerge(
- merge_operator, key, &value_of_default, operands, &merge_result, logger,
- statistics, clock, result_operand, update_num_ops_stats);
+ const Status s = TimedFullMerge(merge_operator, key, &value_of_default,
+ operands, &merge_result, logger, statistics,
+ clock, nullptr /* result_operand */,
+ update_num_ops_stats, op_failure_scope);
if (!s.ok()) {
return s;
}
return s;
}
- // TODO(noetzli) If the merge operator returns false, we are currently
- // (almost) silently dropping the put/delete. That's probably not what we
- // want. Also if we're in compaction and it's a put, it would be nice to
- // run compaction filter on it.
+ // TODO: if we're in compaction and it's a put, it would be nice to run
+ // compaction filter on it.
std::string merge_result;
+ MergeOperator::OpFailureScope op_failure_scope;
if (range_del_agg &&
range_del_agg->ShouldDelete(
merge_context_.GetOperands(), &merge_result, logger_,
stats_, clock_,
/* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false, &op_failure_scope);
} else if (ikey.type == kTypeValue) {
const Slice val = iter->value();
merge_context_.GetOperands(), &merge_result, logger_,
stats_, clock_,
/* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false, &op_failure_scope);
} else if (ikey.type == kTypeBlobIndex) {
BlobIndex blob_index;
merge_context_.GetOperands(), &merge_result, logger_,
stats_, clock_,
/* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false, &op_failure_scope);
} else if (ikey.type == kTypeWideColumnEntity) {
s = TimedFullMergeWithEntity(
user_merge_operator_, ikey.user_key, iter->value(),
merge_context_.GetOperands(), &merge_result, logger_, stats_,
- clock_, /* update_num_ops_stats */ false);
+ clock_, /* update_num_ops_stats */ false, &op_failure_scope);
} else {
s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr,
merge_context_.GetOperands(), &merge_result, logger_,
stats_, clock_,
/* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false, &op_failure_scope);
}
// We store the result in keys_.back() and operands_.back()
merge_context_.Clear();
keys_.emplace_front(std::move(original_key));
merge_context_.PushOperand(merge_result);
- }
- // move iter to the next entry
- iter->Next();
+ // move iter to the next entry
+ iter->Next();
+ } else if (op_failure_scope ==
+ MergeOperator::OpFailureScope::kMustMerge) {
+ // Change to `Status::MergeInProgress()` to denote output consists of
+ // merge operands only. Leave `iter` at the non-merge entry so it will
+ // be output after.
+ s = Status::MergeInProgress();
+ }
return s;
} else {
// hit a merge
assert(merge_context_.GetNumOperands() >= 1);
assert(merge_context_.GetNumOperands() == keys_.size());
std::string merge_result;
- s = TimedFullMerge(
- user_merge_operator_, orig_ikey.user_key, nullptr,
- merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_,
- /* result_operand */ nullptr, /* update_num_ops_stats */ false);
+ MergeOperator::OpFailureScope op_failure_scope;
+ s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
+ merge_context_.GetOperands(), &merge_result, logger_,
+ stats_, clock_,
+ /* result_operand */ nullptr,
+ /* update_num_ops_stats */ false, &op_failure_scope);
if (s.ok()) {
// The original key encountered
// We are certain that keys_ is not empty here (see assertions couple of
merge_context_.Clear();
keys_.emplace_front(std::move(original_key));
merge_context_.PushOperand(merge_result);
+ } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) {
+ // Change to `Status::MergeInProgress()` to denote output consists of
+ // merge operands only.
+ s = Status::MergeInProgress();
}
} else {
// We haven't seen the beginning of the key nor a Put/Delete.
// the latency is sensitive.
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
- // - Corruption: Merge operator reported unsuccessful merge.
+ // - Corruption: Merge operator reported unsuccessful merge. The scope of the
+ // damage will be stored in `*op_failure_scope` when `op_failure_scope` is
+ // not nullptr
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::vector<Slice>& operands,
std::string* result, Logger* logger,
Statistics* statistics, SystemClock* clock,
- Slice* result_operand,
- bool update_num_ops_stats);
+ Slice* result_operand, bool update_num_ops_stats,
+ MergeOperator::OpFailureScope* op_failure_scope);
static Status TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* result, Logger* logger,
- Statistics* statistics, SystemClock* clock, bool update_num_ops_stats);
+ Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
+ MergeOperator::OpFailureScope* op_failure_scope);
// During compaction, merge entries until we hit
// - a corrupted key
// - a specific sequence number (snapshot boundary),
// - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
// or - the end of iteration
+ //
+ // The result(s) of the merge can be accessed in `MergeHelper::keys()` and
+ // `MergeHelper::values()`, which are invalidated the next time `MergeUntil()`
+ // is called. `MergeOutputIterator` is specially designed to iterate the
+ // results of a `MergeHelper`'s most recent `MergeUntil()`.
+ //
// iter: (IN) points to the first merge type entry
// (OUT) points to the first entry not included in the merge process
// range_del_agg: (IN) filters merge operands covered by range tombstones.
//
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
- // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
- // of key's history. Output consists of merge operands only.
+ // - MergeInProgress: Output consists of merge operands only.
// - Corruption: Merge operator reported unsuccessful merge or a corrupted
// key has been encountered and not expected (applies only when compiling
// with asserts removed).
// do a final merge of nullptr and operands;
if (value || columns) {
std::string result;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
&result, info_log_, db_statistics_, clock_,
- /* result_operand */ nullptr, /* update_num_ops_stats */ true);
+ /* result_operand */ nullptr, /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (status->ok()) {
if (LIKELY(value != nullptr)) {
*(value->GetSelf()) = std::move(result);
// do a final merge of nullptr and operands;
std::string* str_value =
iter->value != nullptr ? iter->value->GetSelf() : nullptr;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
str_value, info_log_, db_statistics_, clock_,
- /* result_operand */ nullptr, /* update_num_ops_stats */ true);
+ /* result_operand */ nullptr, /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
assert(merge_operator);
std::string new_value;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics,
SystemClock::Default().get(), /* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false,
+ /* op_failure_scope */ nullptr);
if (!merge_status.ok()) {
// Failed to merge!
Logger* logger;
};
+ enum class OpFailureScope {
+ kDefault,
+ kTryMerge,
+ kMustMerge,
+ kOpFailureScopeMax,
+ };
+
struct MergeOperationOutput {
explicit MergeOperationOutput(std::string& _new_value,
Slice& _existing_operand)
// client can set this field to the operand (or existing_value) instead of
// using new_value.
Slice& existing_operand;
+ // Indicates the blast radius of the failure. It is only meaningful to
+ // provide a failure scope when returning `false` from the API populating
+ // the `MergeOperationOutput`. Currently RocksDB operations handle these
+ // values as follows:
+ //
+ // - `OpFailureScope::kDefault`: fallback to default
+ // (`OpFailureScope::kTryMerge`)
+ // - `OpFailureScope::kTryMerge`: operations that try to merge that key will
+ // fail. This includes flush and compaction, which puts the DB in
+ // read-only mode.
+ // - `OpFailureScope::kMustMerge`: operations that must merge that key will
+ // fail (e.g., `Get()`, `MultiGet()`, iteration). Flushes/compactions can
+ // still proceed by copying the original input operands to the output.
+ OpFailureScope op_failure_scope = OpFailureScope::kDefault;
};
// This function applies a stack of merge operands in chronological order
assert(!pinnable_val_ || !columns_);
std::string result;
+ // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+ // since a failure must be propagated regardless of its value.
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, value, merge_context_->GetOperands(), &result,
logger_, statistics_, clock_, /* result_operand */ nullptr,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (!s.ok()) {
state_ = kCorrupt;
return;
}
{
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value_of_default,
merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_,
statistics_, clock_, /* result_operand */ nullptr,
- /* update_num_ops_stats */ true);
+ /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (!s.ok()) {
state_ = kCorrupt;
return;
std::string result;
{
+ // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
+ // since a failure must be propagated regardless of its value.
const Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key_, entity, merge_context_->GetOperands(),
- &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true);
+ &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true,
+ /* op_failure_scope */ nullptr);
if (!s.ok()) {
state_ = kCorrupt;
return;
merge_in.logger),
&user_merge_out);
}
+ merge_out->op_failure_scope = user_merge_out.op_failure_scope;
// Return false if the user merge operator returned false
if (!good) {
Statistics* statistics = immutable_db_options.statistics.get();
Logger* logger = immutable_db_options.info_log.get();
SystemClock* clock = immutable_db_options.clock;
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result, logger,
statistics, clock, /* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false,
+ /* op_failure_scope */ nullptr);
} else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env;
Logger* logger = db_options_->info_log.get();
SystemClock* clock = env->GetSystemClock().get();
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result, logger,
statistics, clock, /* result_operand */ nullptr,
- /* update_num_ops_stats */ false);
+ /* update_num_ops_stats */ false,
+ /* op_failure_scope */ nullptr);
} else {
const auto cf_opts = cfh->cfd()->ioptions();
+ // `op_failure_scope` (an output parameter) is not provided (set to
+ // nullptr) since a failure must be propagated regardless of its value.
return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result,
cf_opts->logger, cf_opts->stats, cf_opts->clock,
- /* result_operand */ nullptr, /* update_num_ops_stats */ false);
+ /* result_operand */ nullptr, /* update_num_ops_stats */ false,
+ /* op_failure_scope */ nullptr);
}
} else {
return Status::InvalidArgument("Must provide a column_family");