### New Features
* Add FileSystem::ReadAsync API in io_tracing
+* Add blob garbage collection parameters `blob_garbage_collection_policy` and `blob_garbage_collection_age_cutoff` to both force-enable and force-disable GC, as well as selectively override age cutoff when using CompactRange.
* Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file.
### Behavior changes
CompressionOptions _compression_opts, Temperature _output_temperature,
uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, const std::string& _trim_ts, double _score,
- bool _deletion_compaction, CompactionReason _compaction_reason)
+ bool _deletion_compaction, CompactionReason _compaction_reason,
+ BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
+ double _blob_garbage_collection_age_cutoff)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
trim_ts_(_trim_ts),
is_trivial_move_(false),
compaction_reason_(_compaction_reason),
- notify_on_compaction_completion_(false) {
+ notify_on_compaction_completion_(false),
+ enable_blob_garbage_collection_(
+ _blob_garbage_collection_policy == BlobGarbageCollectionPolicy::kForce
+ ? true
+ : (_blob_garbage_collection_policy ==
+ BlobGarbageCollectionPolicy::kDisable
+ ? false
+ : mutable_cf_options()->enable_blob_garbage_collection)),
+ blob_garbage_collection_age_cutoff_(
+ _blob_garbage_collection_age_cutoff < 0 ||
+ _blob_garbage_collection_age_cutoff > 1
+ ? mutable_cf_options()->blob_garbage_collection_age_cutoff
+ : _blob_garbage_collection_age_cutoff) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
}
if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
- input(0, 0)->fd.GetPathId() == output_path_id() &&
- InputCompressionMatchesOutput())) {
+ input(0, 0)->fd.GetPathId() == output_path_id() &&
+ InputCompressionMatchesOutput())) {
return false;
}
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
- CompactionReason compaction_reason = CompactionReason::kUnknown);
+ CompactionReason compaction_reason = CompactionReason::kUnknown,
+ BlobGarbageCollectionPolicy blob_garbage_collection_policy =
+ BlobGarbageCollectionPolicy::kUseDefault,
+ double blob_garbage_collection_age_cutoff = -1);
// No copying allowed
Compaction(const Compaction&) = delete;
uint32_t max_subcompactions() const { return max_subcompactions_; }
+ bool enable_blob_garbage_collection() const {
+ return enable_blob_garbage_collection_;
+ }
+
+ double blob_garbage_collection_age_cutoff() const {
+ return blob_garbage_collection_age_cutoff_;
+ }
+
// start and end are sub compact range. Null if no boundary.
// This is used to filter out some input files' ancester's time range.
uint64_t MinInputFileOldestAncesterTime(const InternalKey* start,
// Get the atomic file boundaries for all files in the compaction. Necessary
// in order to avoid the scenario described in
- // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb
- // down appropriate key boundaries to RangeDelAggregator during compaction.
+ // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and
+ // plumb down appropriate key boundaries to RangeDelAggregator during
+ // compaction.
static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs);
VersionStorageInfo* input_vstorage_;
- const int start_level_; // the lowest level to be compacted
+ const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t max_compaction_bytes_;
VersionEdit edit_;
const int number_levels_;
ColumnFamilyData* cfd_;
- Arena arena_; // Arena used to allocate space for file_levels_
+ Arena arena_; // Arena used to allocate space for file_levels_
const uint32_t output_path_id_;
CompressionType output_compression_;
// State used to check for number of overlapping grandparent files
// (grandparent == "output_level_ + 1")
std::vector<FileMetaData*> grandparents_;
- const double score_; // score that was used to pick this compaction.
+ const double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level?
const bool bottommost_level_;
// Notify on compaction completion only if listener was notified on compaction
// begin.
bool notify_on_compaction_completion_;
+
+ // Enable/disable GC collection for blobs during compaction.
+ bool enable_blob_garbage_collection_;
+
+ // Blob garbage collection age cutoff.
+ double blob_garbage_collection_age_cutoff_;
};
// Return sum of sizes of all files in `files`.
}
bool enable_blob_garbage_collection() const override {
- return compaction_->mutable_cf_options()->enable_blob_garbage_collection;
+ return compaction_->enable_blob_garbage_collection();
}
double blob_garbage_collection_age_cutoff() const override {
- return compaction_->mutable_cf_options()
- ->blob_garbage_collection_age_cutoff;
+ return compaction_->blob_garbage_collection_age_cutoff();
}
uint64_t blob_compaction_readahead_size() const override {
GetCompressionType(vstorage, mutable_cf_options, output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
- /* grandparents */ {}, /* is manual */ true, trim_ts);
+ /* grandparents */ {}, /* is manual */ true, trim_ts, /* score */ -1,
+ /* deletion_compaction */ false, CompactionReason::kUnknown,
+ compact_range_options.blob_garbage_collection_policy,
+ compact_range_options.blob_garbage_collection_age_cutoff);
+
RegisterCompaction(c);
vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
return c;
vstorage->base_level()),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
- std::move(grandparents),
- /* is manual compaction */ true, trim_ts);
+ std::move(grandparents), /* is manual */ true, trim_ts, /* score */ -1,
+ /* deletion_compaction */ false, CompactionReason::kUnknown,
+ compact_range_options.blob_garbage_collection_policy,
+ compact_range_options.blob_garbage_collection_age_cutoff);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
RegisterCompaction(compaction);
::testing::Combine(::testing::Values(0.0, 0.5, 1.0),
::testing::Bool()));
+TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.blob_file_size = 32; // one blob per file
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0;
+
+ DestroyAndReopen(options);
+
+ for (int i = 0; i < 128; i += 2) {
+ ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i)));
+ ASSERT_OK(
+ Put("key" + std::to_string(i + 1), "value" + std::to_string(i + 1)));
+ ASSERT_OK(Flush());
+ }
+
+ std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+ ASSERT_EQ(original_blob_files.size(), 128);
+
+ // Note: turning off enable_blob_files before the compaction results in
+ // garbage collected values getting inlined.
+ ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+
+ CompactRangeOptions cro;
+ cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce;
+ cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
+
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+ // Check that the GC stats are correct
+ {
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
+ assert(versions->GetColumnFamilySet());
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ assert(cfd);
+
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ assert(internal_stats);
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_GE(compaction_stats.size(), 2);
+
+ ASSERT_GE(compaction_stats[1].bytes_read_blob, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+ }
+
+ const size_t cutoff_index = static_cast<size_t>(
+ cro.blob_garbage_collection_age_cutoff * original_blob_files.size());
+ const size_t expected_num_files = original_blob_files.size() - cutoff_index;
+
+ const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+ ASSERT_EQ(new_blob_files.size(), expected_num_files);
+
+ // Original blob files below the cutoff should be gone, original blob files
+ // at or above the cutoff should be still there
+ for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+ ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+ }
+
+ for (size_t i = 0; i < 128; ++i) {
+ ASSERT_EQ(Get("key" + std::to_string(i)), "value" + std::to_string(i));
+ }
+}
+
TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
Options options;
options.env = env_;
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
- FLAGS_read_fault_one_in);
+ FLAGS_read_fault_one_in);
}
#endif // NDEBUG
if (FLAGS_write_fault_one_in) {
static_cast<uint32_t>(bottom_level_styles.size())];
cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
+ std::vector<BlobGarbageCollectionPolicy> blob_gc_policies = {
+ BlobGarbageCollectionPolicy::kForce,
+ BlobGarbageCollectionPolicy::kDisable,
+ BlobGarbageCollectionPolicy::kUseDefault};
+ cro.blob_garbage_collection_policy =
+ blob_gc_policies[thread->rand.Next() %
+ static_cast<uint32_t>(blob_gc_policies.size())];
+ cro.blob_garbage_collection_age_cutoff =
+ static_cast<double>(thread->rand.Next() % 100) / 100.0;
const Snapshot* pre_snapshot = nullptr;
uint32_t pre_hash = 0;
fprintf(stdout, "Open metadata write fault one in:\n");
fprintf(stdout, " %d\n",
FLAGS_open_metadata_write_fault_one_in);
- fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
+ fprintf(stdout, "Sync fault injection : %d\n",
+ FLAGS_sync_fault_injection);
fprintf(stdout, "Best efforts recovery : %d\n",
static_cast<int>(FLAGS_best_efforts_recovery));
fprintf(stdout, "Fail if OPTIONS file error: %d\n",
}
assert(!write_prepared || bg_canceled);
#else
- (void) thread;
+ (void)thread;
#endif
for (auto cf : column_families_) {
kForceOptimized,
};
+// For manual compaction, we can configure if we want to skip/force garbage
+// collection of blob files.
+enum class BlobGarbageCollectionPolicy {
+ // Force blob file garbage collection.
+ kForce,
+ // Skip blob file garbage collection.
+ kDisable,
+ // Inherit blob file garbage collection policy from ColumnFamilyOptions.
+ kUseDefault,
+};
+
// CompactRangeOptions is used by CompactRange() call.
struct CompactRangeOptions {
// If true, no other compaction will run at the same time as this
// Cancellation can be delayed waiting on automatic compactions when used
// together with `exclusive_manual_compaction == true`.
std::atomic<bool>* canceled = nullptr;
+
+ // If set to kForce, RocksDB will override enable_blob_file_garbage_collection
+ // to true; if set to kDisable, RocksDB will override it to false, and
+ // kUseDefault leaves the setting in effect. This enables customers to both
+ // force-enable and force-disable GC when calling CompactRange.
+ BlobGarbageCollectionPolicy blob_garbage_collection_policy =
+ BlobGarbageCollectionPolicy::kUseDefault;
+
+ // If set to < 0 or > 1, RocksDB leaves blob_garbage_collection_age_cutoff
+ // from ColumnFamilyOptions in effect. Otherwise, it will override the
+ // user-provided setting. This enables customers to selectively override the
+ // age cutoff.
+ double blob_garbage_collection_age_cutoff = -1;
};
// IngestExternalFileOptions is used by IngestExternalFile()