Summary:
**Context/Summary:**
Truncated range deletion in input files can be output by CompactionIterator with type kMaxValid instead of kTypeRangeDeletion, to satisfy ordering requirement between the truncated range deletion start key and a file's point keys. There was a plan to skip such key in https://github.com/facebook/rocksdb/pull/14122 but blockers remain to fulfill the plan.
Resumable compaction is not able to handle resumption from range deletion well at this point and should consider kMaxValid type same as kTypeRangeDeletion for resumption. Previously, it didn't and mistakenly allow resumption from a delete range. That led to an assertion failure, complaining about lacking information to update file boundaries in the presence of range deletion needed during cutting an output file, after the compaction resumes from that delete range and happens to cut the output file shortly after without any point keys in between.
```
frame https://github.com/facebook/rocksdb/issues/9: 0x00007f4f4743bc93 libc.so.6`__GI___assert_fail(assertion="meta.smallest.size() > 0", file="db/compaction/compaction_outputs.cc", line=530, function="rocksdb::Status rocksdb::CompactionOutputs::AddRangeDels(rocksdb::CompactionRangeDelAggregator&, const rocksdb::Slice*, const rocksdb::Slice*, rocksdb::CompactionIterationStats&, bool, const rocksdb::InternalKeyComparator&, rocksdb::SequenceNumber, std::pair<long unsigned int, long unsigned int>, const rocksdb::Slice&, const string&)") at assert.c:101:3
frame https://github.com/facebook/rocksdb/issues/10: 0x00007f4f4808c68c librocksdb.so.10.9`rocksdb::CompactionOutputs::AddRangeDels(this=0x00007f4f0c27e1a0, range_del_agg=0x00007f4f0c21ecc0, comp_start_user_key=0x0000000000000000, comp_end_user_key=0x0000000000000000, range_del_out_stats=0x00007f4f0dffa140, bottommost_level=false, icmp=0x00007f4ef4c93040, earliest_snapshot=
13108729, keep_seqno_range=<unavailable>, next_table_min_key=0x00007f4ef4c8f540, full_history_ts_low="") at compaction_outputs.cc:530:7
frame https://github.com/facebook/rocksdb/issues/11: 0x00007f4f480480dd librocksdb.so.10.9`rocksdb::CompactionJob::FinishCompactionOutputFile(this=0x00007f4f0dffb890, input_status=<unavailable>, prev_table_last_internal_key=0x00007f4f0dffa650, next_table_min_key=0x00007f4ef4c8f540, comp_start_user_key=0x0000000000000000, comp_end_user_key=0x0000000000000000, c_iter=0x00007f4ef4c8f400, sub_compact=0x00007f4f0c27e000, outputs=0x00007f4f0c27e1a0) at compaction_job.cc:1917:31
```
This PR simply prevents MaxValid from being a resumption point like regular range deletion - see commit
842d66eb18ea67e965d6acb1fce12c18eeb778d2
Besides that, the PR also improves the testing, variable naming, logging in resumable compaction codes that were needed to debug this assertion failure - see commit https://github.com/facebook/rocksdb/pull/14184/commits/
aecd4e7f971f6dd4df672d9e5f1409fe4747c561. These improvements are covered by existing tests.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/14184
Test Plan:
- The stress initially surfaced the error. Using the exact same LSM shapes and files that were used in stress test but in a unit test, I'm able to get a deterministic repro and confirmed the fix resolves the error. This is the repro test https://github.com/hx235/rocksdb/commit/
1075936e693c68c960761855900c53f5b894f57a
```
./compaction_service_test --gtest_filter=ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
# Before fix
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from ResumableCompactionServiceTest
[ RUN ] ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
compaction_service_test: db/compaction/compaction_outputs.cc:530: rocksdb::Status rocksdb::CompactionOutputs::AddRangeDels(rocksdb::CompactionRangeDelAggregator&, const rocksdb::Slice*, const rocksdb::Slice*, rocksdb::CompactionIterationStats&, bool, const rocksdb::InternalKeyComparator&, rocksdb::SequenceNumber, std::pair<long unsigned int, long unsigned int>, const rocksdb::Slice&, const string&): Assertion `meta.smallest.size() > 0' failed.
Received signal 6 (Aborted)
Invoking GDB for stack trace...
[New LWP
2621610]
[New LWP
2621611]
[New LWP
2621612]
[New LWP
2621613]
[New LWP
2621614]
[New LWP
2621630]
[New LWP
2621631]
# After fix
Note: Google Test filter = ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from ResumableCompactionServiceTest
[ RUN ] ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
[ OK ] ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume (4722 ms)
[----------] 1 test from ResumableCompactionServiceTest (4722 ms total)
[----------] Global test environment tear-down
[==========] 1 test from 1 test case ran. (4722 ms total)
[ PASSED ] 1 test.
```
- Follow-up: I tried a couple time to coerce the truncated range delete from scratch in the unit test but failed doing so. Considering kMaxValid may not be outputted by compaction iterator anymore after https://github.com/facebook/rocksdb/pull/14122/files gets landed again (and obsolete the bug) ADN the simple nature of this fix
842d66eb18ea67e965d6acb1fce12c18eeb778d2 AND the worst case of such fix going wrong is just less resumption, I decided to leave writing a unit test to coerce truncated ranged deletion from scratch a follow-up. Maybe I will draw inspiration from https://github.com/facebook/rocksdb/pull/14122/files.
Reviewed By: jaykorean
Differential Revision:
D88912663
Pulled By: hx235
fbshipit-source-id:
80a01135684c8fea659650faaa00c2dc452c482a
const CompactionFileCloseFunc close_file_func =
[this, sub_compact, start_user_key, end_user_key](
const Status& status,
- const ParsedInternalKey& prev_table_last_internal_key,
+ const ParsedInternalKey& prev_iter_output_internal_key,
const Slice& next_table_min_key, const CompactionIterator* c_iter,
CompactionOutputs& outputs) {
return this->FinishCompactionOutputFile(
- status, prev_table_last_internal_key, next_table_min_key,
+ status, prev_iter_output_internal_key, next_table_min_key,
start_user_key, end_user_key, c_iter, sub_compact, outputs);
};
const uint64_t kRecordStatsEvery = 1000;
[[maybe_unused]] const std::optional<const Slice> end = sub_compact->end;
- IterKey last_output_key;
- ParsedInternalKey last_output_ikey;
+ IterKey prev_iter_output_key;
+ ParsedInternalKey prev_iter_output_internal_key;
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::ProcessKeyValueCompaction()::Processing",
// and `close_file_func`.
// TODO: it would be better to have the compaction file open/close moved
// into `CompactionOutputs` which has the output file information.
- status =
- sub_compact->AddToOutput(*c_iter, use_proximal_output, open_file_func,
- close_file_func, last_output_ikey);
+ status = sub_compact->AddToOutput(*c_iter, use_proximal_output,
+ open_file_func, close_file_func,
+ prev_iter_output_internal_key);
if (!status.ok()) {
break;
}
static_cast<void*>(const_cast<std::atomic<bool>*>(
&manual_compaction_canceled_)));
- last_output_key.SetInternalKey(c_iter->key(), &last_output_ikey);
- last_output_ikey.sequence = ikey.sequence;
- last_output_ikey.type = ikey.type;
+ prev_iter_output_key.SetInternalKey(c_iter->key(),
+ &prev_iter_output_internal_key);
+ prev_iter_output_internal_key.sequence = ikey.sequence;
+ prev_iter_output_internal_key.type = ikey.type;
c_iter->Next();
#ifndef NDEBUG
Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status,
- const ParsedInternalKey& prev_table_last_internal_key,
+ const ParsedInternalKey& prev_iter_output_internal_key,
const Slice& next_table_min_key, const Slice* comp_start_user_key,
const Slice* comp_end_user_key, const CompactionIterator* c_iter,
SubcompactionState* sub_compact, CompactionOutputs& outputs) {
}
if (s.ok() && ShouldUpdateSubcompactionProgress(sub_compact, c_iter,
- prev_table_last_internal_key,
+ prev_iter_output_internal_key,
next_table_min_key, meta)) {
UpdateSubcompactionProgress(c_iter, next_table_min_key, sub_compact);
s = PersistSubcompactionProgress(sub_compact);
bool CompactionJob::ShouldUpdateSubcompactionProgress(
const SubcompactionState* sub_compact, const CompactionIterator* c_iter,
- const ParsedInternalKey& prev_table_last_internal_key,
+ const ParsedInternalKey& prev_iter_output_internal_key,
const Slice& next_table_min_internal_key, const FileMetaData* meta) const {
const auto* cfd = sub_compact->compaction->column_family_data();
- // No need to update when the output will not get persisted
+ // No need to update when the progress will not get persisted
if (compaction_progress_writer_ == nullptr) {
return false;
}
}
// LIMITATION: Compaction progress persistence disabled for file boundaries
- // contaning range deletions. Range deletions can span file boundaries, making
- // it difficult (but possible) to ensure adjacent output tables have different
- // user keys. See the last check for why different users keys of adjacent
- // output tables are needed
+ // containing range deletions. Range deletions can span file boundaries,
+ // making it difficult to ensure adjacent output tables have different user
+ // keys. See the last check for why different users keys of adjacent output
+ // tables are needed
const ValueType next_table_min_internal_key_type =
ExtractValueType(next_table_min_internal_key);
- const ValueType prev_table_last_internal_key_type =
- prev_table_last_internal_key.user_key.empty()
+ const ValueType prev_iter_output_internal_key_type =
+ prev_iter_output_internal_key.user_key.empty()
? ValueType::kTypeValue
- : prev_table_last_internal_key.type;
-
- if (next_table_min_internal_key_type == ValueType::kTypeRangeDeletion ||
- prev_table_last_internal_key_type == ValueType::kTypeRangeDeletion) {
+ : prev_iter_output_internal_key.type;
+
+ // Range deletes truncated to align with file boundaries may be output by the
+ // compaction iterator with `ValueType::kTypeMaxValid` instead of the original
+ // type.
+ if ((next_table_min_internal_key_type == ValueType::kTypeRangeDeletion ||
+ next_table_min_internal_key_type == ValueType::kTypeMaxValid) ||
+ (prev_iter_output_internal_key_type == ValueType::kTypeRangeDeletion ||
+ prev_iter_output_internal_key_type == ValueType::kTypeMaxValid)) {
return false;
}
const Slice next_table_min_user_key =
ExtractUserKey(next_table_min_internal_key);
const Slice prev_table_last_user_key =
- prev_table_last_internal_key.user_key.empty()
+ prev_iter_output_internal_key.user_key.empty()
? Slice()
- : prev_table_last_internal_key.user_key;
+ : prev_iter_output_internal_key.user_key;
if (cfd->user_comparator()->EqualWithoutTimestamp(next_table_min_user_key,
prev_table_last_user_key)) {
Status FinishCompactionOutputFile(
const Status& input_status,
- const ParsedInternalKey& prev_table_last_internal_key,
+ const ParsedInternalKey& prev_iter_output_internal_key,
const Slice& next_table_min_key, const Slice* comp_start_user_key,
const Slice* comp_end_user_key, const CompactionIterator* c_iter,
SubcompactionState* sub_compact, CompactionOutputs& outputs);
bool ShouldUpdateSubcompactionProgress(
const SubcompactionState* sub_compact, const CompactionIterator* c_iter,
- const ParsedInternalKey& prev_table_last_internal_key,
+ const ParsedInternalKey& prev_iter_output_internal_key,
const Slice& next_table_min_internal_key, const FileMetaData* meta) const;
void UpdateSubcompactionProgress(const CompactionIterator* c_iter,
bool enable_cancel_ = false;
std::atomic<int> stop_count_{0};
std::atomic<bool> cancel_{false};
+ SequenceNumber cancel_before_seqno = kMaxSequenceNumber;
void SetUp() override {
CompactionJobTestBase::SetUp();
if (enable_cancel_) {
ParsedInternalKey parsed_key;
if (ParseInternalKey(pair->second, &parsed_key, true).ok()) {
- if (parsed_key.user_key == kCancelBeforeThisKey) {
+ if (parsed_key.user_key == kCancelBeforeThisKey &&
+ (cancel_before_seqno == kMaxSequenceNumber ||
+ parsed_key.sequence == cancel_before_seqno)) {
cancel_.store(true);
}
}
const std::initializer_list<mock::KVPair>& input_file_2,
uint64_t last_sequence, const std::vector<uint64_t>& snapshots,
const std::string& expected_next_key_to_compact,
- const std::vector<std::string>& expected_input_keys, bool exists_progress,
+ const std::vector<std::string>& expected_input_keys,
bool cancelled_past_mid_point = false) {
std::shared_ptr<Statistics> stats = ROCKSDB_NAMESPACE::CreateDBStatistics();
// Resume compaction
CompactionProgress compaction_progress;
- if (exists_progress) {
+ if (expected_next_key_to_compact != "") {
compaction_progress.push_back(
ReadAndParseProgress(compaction_progress_file));
}
4U /* last_sequence */, {} /* snapshots */,
kCancelBeforeThisKey /* expected_next_key_to_compact */,
{"a", "b", "bb", kCancelBeforeThisKey} /* expected_input_keys */,
- true /* exists_progress */, true /* cancelled_past_mid_point*/);
+ true /* cancelled_past_mid_point */);
}
TEST_F(ResumableCompactionJobTest, NoProgressResumeOnSameKey) {
NewDB();
+ // `cancel_before_seqno` is set to 0U to force cancellation after
+ // `kCancelBeforeThisKey@1` instead of `kCancelBeforeThisKey@2`.
+ // The seqno is 0 because `kCancelBeforeThisKey@1` will have its sequence
+ // number zeroed during compaction while `kCancelBeforeThisKey@2` won't be
+ cancel_before_seqno = 0U;
RunCancelAndResumeTest(
{{KeyStr(kCancelBeforeThisKey, 1U, kTypeValue),
"val1"}} /* input_file_1 */,
- {{KeyStr(kCancelBeforeThisKey, 2U, kTypeValue),
- "val2"}} /* input_file_2 */,
- 2U /* last_sequence */, {1U} /* snapshots */,
+ {{KeyStr(kCancelBeforeThisKey, 2U, kTypeValue), "val11"},
+ {KeyStr("d", 3U, kTypeValue), "val2"}} /* input_file_2 */,
+ 3U /* last_sequence */, {1U} /* snapshots */,
"" /* expected_next_key_to_compact */,
- {kCancelBeforeThisKey, kCancelBeforeThisKey} /* expected_input_keys */,
- false /* exists_progress */);
+ {kCancelBeforeThisKey, kCancelBeforeThisKey,
+ "d"} /* expected_input_keys */);
}
TEST_F(ResumableCompactionJobTest, NoProgressResumeOnDeleteRange) {
NewDB();
RunCancelAndResumeTest(
- {{KeyStr(kCancelBeforeThisKey, 1U, kTypeValue),
- "val1"}} /* input_file_1 */,
- {{KeyStr(kCancelBeforeThisKey, 2U, kTypeRangeDeletion),
- "val2"}} /* input_file_2 */,
- 2U /* last_sequence */, {1U} /* snapshots */,
- "" /* expected_next_key_to_compact */,
- {kCancelBeforeThisKey, kCancelBeforeThisKey} /* expected_input_keys */,
- false /* exists_progress */);
+ {{KeyStr("a", 1U, kTypeValue), "val1"},
+ {KeyStr("b", 2U, kTypeValue), "val2"},
+ {KeyStr(kCancelBeforeThisKey, 3U, kTypeValue),
+ "val3"}} /* input_file_1 */,
+ {{KeyStr(kCancelBeforeThisKey, 4U, kTypeRangeDeletion),
+ "range_deletion_end_key"},
+ {KeyStr("d", 5U, kTypeValue), "val4"}} /* input_file_2 */,
+ 5U /* last_sequence */, {3U} /* snapshots */,
+ "b" /* expected_next_key_to_compact */,
+ {"a", "b", kCancelBeforeThisKey, kCancelBeforeThisKey,
+ "d"} /* expected_input_keys */);
}
TEST_F(ResumableCompactionJobTest, NoProgressResumeOnMerge) {
"val4"}} /* input_file_2 */,
4U /* last_sequence */, {} /* snapshots */,
"bb" /* expected_next_key_to_compact */,
- {"a", "b", "bb", kCancelBeforeThisKey} /* expected_input_keys */,
- true /* exists_progress */);
+ {"a", "b", "bb", kCancelBeforeThisKey} /* expected_input_keys */);
}
TEST_F(ResumableCompactionJobTest, NoProgressResumeOnSingleDelete) {
5U /* last_sequence */, {3U} /* snapshots */,
"b" /* expected_next_key_to_compact */,
{"a", "b", kCancelBeforeThisKey, kCancelBeforeThisKey,
- "d"} /* expected_input_keys */,
- true /* exists_progress */);
+ "d"} /* expected_input_keys */);
}
TEST_F(ResumableCompactionJobTest, NoProgressResumeOnDeletionAtBottom) {
5U /* last_sequence */, {3U} /* snapshots */,
"b" /* expected_next_key_to_compact */,
{"a", "b", kCancelBeforeThisKey, kCancelBeforeThisKey,
- "d"} /* expected_input_keys */,
- true /* exists_progress */);
+ "d"} /* expected_input_keys */);
}
} // namespace ROCKSDB_NAMESPACE
const CompactionIterator& c_iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func,
- const ParsedInternalKey& prev_table_last_internal_key) {
+ const ParsedInternalKey& prev_iter_output_internal_key) {
Status s;
bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
if (is_range_del && compaction_->bottommost_level()) {
}
const Slice& key = c_iter.key();
if (ShouldStopBefore(c_iter) && HasBuilder()) {
- s = close_file_func(c_iter.InputStatus(), prev_table_last_internal_key, key,
- &c_iter, *this);
+ s = close_file_func(c_iter.InputStatus(), prev_iter_output_internal_key,
+ key, &c_iter, *this);
if (!s.ok()) {
return s;
}
Status AddToOutput(const CompactionIterator& c_iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func,
- const ParsedInternalKey& prev_table_last_internal_key);
+ const ParsedInternalKey& prev_iter_output_internal_key);
// Close the current output. `open_file_func` is needed for creating new file
// for range-dels only output file.
const CompactionIterator& iter, bool use_proximal_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func,
- const ParsedInternalKey& prev_table_last_internal_key) {
+ const ParsedInternalKey& prev_iter_output_internal_key) {
// update target output
current_outputs_ =
use_proximal_output ? &proximal_level_outputs_ : &compaction_outputs_;
return current_outputs_->AddToOutput(iter, open_file_func, close_file_func,
- prev_table_last_internal_key);
+ prev_iter_output_internal_key);
}
} // namespace ROCKSDB_NAMESPACE
Status AddToOutput(const CompactionIterator& iter, bool use_proximal_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func,
- const ParsedInternalKey& prev_table_last_internal_key);
+ const ParsedInternalKey& prev_iter_output_internal_key);
// Close all compaction output files, both output_to_proximal_level outputs
// and normal outputs.
return s;
}
- ROCKS_LOG_INFO(immutable_db_options_.info_log,
- "Initialized compaction workspace with %zu subcompaction "
- "progress to resume",
- compaction_progress_.size());
-
return Status::OK();
}
return HandleInvalidOrNoCompactionProgress(compaction_progress_file_path,
scan_result);
}
+
+ ROCKS_LOG_DEBUG(
+ immutable_db_options_.info_log,
+ "Loaded compaction progress with %zu subcompaction(s) from %s",
+ compaction_progress_.size(), compaction_progress_file_path.c_str());
return s;
} else {
return HandleInvalidOrNoCompactionProgress(
return HandleCompactionProgressWriterCreationFailure(
"" /* temp_file_path */, final_file_path, compaction_progress_writer);
}
+
+ ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
+ "Finalized compaction progress writer onto %s",
+ final_file_path.c_str());
+
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE
Slice key_slice(next_internal_key_to_compact);
if (ParseInternalKey(key_slice, &parsed_key, false /* log_err_key */)
.ok()) {
- oss << "user_key=\"" << parsed_key.user_key.ToString(false /* hex */)
- << "\" (hex:" << parsed_key.user_key.ToString(true /* hex */)
- << ")";
+ oss << "user_key(hex)=" << parsed_key.user_key.ToString(true /* hex */);
oss << ", seq=";
if (parsed_key.sequence == kMaxSequenceNumber) {
oss << "kMaxSequenceNumber";
} else {
oss << parsed_key.sequence;
}
- oss << ", type=" << static_cast<int>(parsed_key.type);
+ oss << ", type=";
+ if (parsed_key.type == kValueTypeForSeek) {
+ oss << "kValueTypeForSeek";
+ } else {
+ oss << static_cast<int>(parsed_key.type);
+ }
} else {
oss << "raw=" << key_slice.ToString(true /* hex */);
}
--- /dev/null
+Fix resumable compaction incorrectly allowing resumption from a truncated range deletion that is not well handled currently.