* Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected.
* Fix memory corruption error in scans if async_io is enabled. Memory corruption happened if there is IOError while reading the data leading to empty buffer and other buffer already in progress of async read goes again for reading.
* Fix failed memtable flush retry bug that could cause wrongly ordered updates, which would surface to writers as `Status::Corruption` in case of `force_consistency_checks=true` (default). It affects use cases that enable both parallel flush (`max_background_flushes > 1` or `max_background_jobs >= 8`) and non-default memtable count (`max_write_buffer_number > 2`).
+* Fixed an issue where the `READ_NUM_MERGE_OPERANDS` ticker was not updated when the base key-value or tombstone was read from an SST file.
### New Features
* Add basic support for user-defined timestamp to Merge (#10819).
Status DBIter::Merge(const Slice* val, const Slice& user_key) {
Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, val, merge_context_.GetOperands(),
- &saved_value_, logger_, statistics_, clock_, &pinned_value_, true);
+ &saved_value_, logger_, statistics_, clock_, &pinned_value_,
+ /* update_num_ops_stats */ true);
if (!s.ok()) {
valid_ = false;
status_ = s;
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->columns, s->logger,
- s->statistics, s->clock, nullptr /* result_operand */, true);
+ s->statistics, s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
}
} else if (s->value) {
s->value->assign(v.data(), v.size());
*(s->status) = MergeHelper::TimedFullMergeWithEntity(
merge_operator, s->key->user_key(), v,
merge_context->GetOperands(), s->value, s->columns, s->logger,
- s->statistics, s->clock, nullptr /* result_operand */, true);
+ s->statistics, s->clock, /* update_num_ops_stats */ true);
}
} else if (s->value) {
Slice value_of_default;
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger,
- s->statistics, s->clock, nullptr /* result_operand */, true);
+ s->statistics, s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
}
} else {
*(s->status) = Status::NotFound();
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger,
- s->statistics, s->clock, nullptr /* result_operand */, true);
+ s->statistics, s->clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
}
*(s->found_final_value) = true;
const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
- SystemClock* clock, Slice* result_operand, bool update_num_ops_stats) {
+ SystemClock* clock, bool update_num_ops_stats) {
assert(value || columns);
assert(!value || !columns);
std::string result;
{
+ constexpr Slice* result_operand = nullptr;
+
const Status s = TimedFullMerge(
merge_operator, key, &value_of_default, operands, &result, logger,
statistics, clock, result_operand, update_num_ops_stats);
val_ptr = nullptr;
}
std::string merge_result;
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
- merge_context_.GetOperands(), &merge_result, logger_,
- stats_, clock_);
+ s = TimedFullMerge(
+ user_merge_operator_, ikey.user_key, val_ptr,
+ merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_,
+ /* result_operand */ nullptr, /* update_num_ops_stats */ false);
// We store the result in keys_.back() and operands_.back()
// if nothing went wrong (i.e.: no operand corruption on disk)
assert(merge_context_.GetNumOperands() >= 1);
assert(merge_context_.GetNumOperands() == keys_.size());
std::string merge_result;
- s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
- merge_context_.GetOperands(), &merge_result, logger_,
- stats_, clock_);
+ s = TimedFullMerge(
+ user_merge_operator_, orig_ikey.user_key, nullptr,
+ merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_,
+ /* result_operand */ nullptr, /* update_num_ops_stats */ false);
if (s.ok()) {
// The original key encountered
// We are certain that keys_ is not empty here (see assertions couple of
const std::vector<Slice>& operands,
std::string* result, Logger* logger,
Statistics* statistics, SystemClock* clock,
- Slice* result_operand = nullptr,
- bool update_num_ops_stats = false);
+ Slice* result_operand,
+ bool update_num_ops_stats);
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* base_value,
const std::vector<Slice>& operands,
std::string* value, PinnableWideColumns* columns,
Logger* logger, Statistics* statistics,
- SystemClock* clock,
- Slice* result_operand = nullptr,
- bool update_num_ops_stats = false);
+ SystemClock* clock, Slice* result_operand,
+ bool update_num_ops_stats);
static Status TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
- SystemClock* clock, Slice* result_operand = nullptr,
- bool update_num_ops_stats = false);
+ SystemClock* clock, bool update_num_ops_stats);
// During compaction, merge entries until we hit
// - a corrupted key
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, columns, info_log_, db_statistics_, clock_,
- nullptr /* result_operand */, true);
+ /* result_operand */ nullptr, /* update_num_ops_stats */ true);
if (status->ok()) {
if (LIKELY(value != nullptr)) {
value->PinSelf();
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
str_value, info_log_, db_statistics_, clock_,
- nullptr /* result_operand */, true);
+ /* result_operand */ nullptr, /* update_num_ops_stats */ true);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics,
- SystemClock::Default().get());
+ SystemClock::Default().get(), /* result_operand */ nullptr,
+ /* update_num_ops_stats */ false);
if (!merge_status.ok()) {
// Failed to merge!
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, value, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
- statistics_, clock_);
+ statistics_, clock_, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
if (!s.ok()) {
state_ = kCorrupt;
return;
const Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key_, entity, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
- statistics_, clock_);
+ statistics_, clock_, /* update_num_ops_stats */ true);
if (!s.ok()) {
state_ = kCorrupt;
return;
Statistics* statistics = immutable_db_options.statistics.get();
Logger* logger = immutable_db_options.info_log.get();
SystemClock* clock = immutable_db_options.clock;
- return MergeHelper::TimedFullMerge(merge_operator, key, value,
- context.GetOperands(), result, logger,
- statistics, clock);
+ return MergeHelper::TimedFullMerge(
+ merge_operator, key, value, context.GetOperands(), result, logger,
+ statistics, clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ false);
} else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env;
Logger* logger = db_options_->info_log.get();
SystemClock* clock = env->GetSystemClock().get();
- return MergeHelper::TimedFullMerge(merge_operator, key, value,
- context.GetOperands(), result, logger,
- statistics, clock);
+ return MergeHelper::TimedFullMerge(
+ merge_operator, key, value, context.GetOperands(), result, logger,
+ statistics, clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ false);
} else {
const auto cf_opts = cfh->cfd()->ioptions();
return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result,
- cf_opts->logger, cf_opts->stats, cf_opts->clock);
+ cf_opts->logger, cf_opts->stats, cf_opts->clock,
+ /* result_operand */ nullptr, /* update_num_ops_stats */ false);
}
} else {
return Status::InvalidArgument("Must provide a column_family");