&blob_file_paths, blob_file_additions)
: nullptr);
+ const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter(
iter, tboptions.internal_comparator.user_comparator(), &merge,
kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot,
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get(), ioptions.allow_data_in_errors,
ioptions.enforce_single_del_contracts,
+ /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
/*compaction=*/nullptr, compaction_filter.get(),
- /*shutting_down=*/nullptr,
- /*manual_compaction_paused=*/nullptr,
- /*manual_compaction_canceled=*/nullptr, db_options.info_log,
- full_history_ts_low);
+ /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
- bool enforce_single_del_contracts, const Compaction* compaction,
- const CompactionFilter* compaction_filter,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const Compaction* compaction, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
- const std::atomic<int>* manual_compaction_paused,
- const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: CompactionIterator(
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
+ manual_compaction_canceled,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
- compaction_filter, shutting_down, manual_compaction_paused,
- manual_compaction_canceled, info_log, full_history_ts_low) {}
+ compaction_filter, shutting_down, info_log, full_history_ts_low) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
- const std::atomic<int>* manual_compaction_paused,
- const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: input_(input, cmp,
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
- manual_compaction_paused_(manual_compaction_paused),
manual_compaction_canceled_(manual_compaction_canceled),
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors),
const Compaction* compaction_;
};
- CompactionIterator(
- InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
- SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
- SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
- Env* env, bool report_detailed_time, bool expect_valid_internal_key,
- CompactionRangeDelAggregator* range_del_agg,
- BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
- bool enforce_single_del_contracts, const Compaction* compaction = nullptr,
- const CompactionFilter* compaction_filter = nullptr,
- const std::atomic<bool>* shutting_down = nullptr,
- const std::atomic<int>* manual_compaction_paused = nullptr,
- const std::atomic<bool>* manual_compaction_canceled = nullptr,
- const std::shared_ptr<Logger> info_log = nullptr,
- const std::string* full_history_ts_low = nullptr);
+ CompactionIterator(InternalIterator* input, const Comparator* cmp,
+ MergeHelper* merge_helper, SequenceNumber last_sequence,
+ std::vector<SequenceNumber>* snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SequenceNumber job_snapshot,
+ const SnapshotChecker* snapshot_checker, Env* env,
+ bool report_detailed_time, bool expect_valid_internal_key,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder,
+ bool allow_data_in_errors,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const Compaction* compaction = nullptr,
+ const CompactionFilter* compaction_filter = nullptr,
+ const std::atomic<bool>* shutting_down = nullptr,
+ const std::shared_ptr<Logger> info_log = nullptr,
+ const std::string* full_history_ts_low = nullptr);
// Constructor with custom CompactionProxy, used for tests.
- CompactionIterator(
- InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
- SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
- SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
- Env* env, bool report_detailed_time, bool expect_valid_internal_key,
- CompactionRangeDelAggregator* range_del_agg,
- BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
- bool enforce_single_del_contracts,
- std::unique_ptr<CompactionProxy> compaction,
- const CompactionFilter* compaction_filter = nullptr,
- const std::atomic<bool>* shutting_down = nullptr,
- const std::atomic<int>* manual_compaction_paused = nullptr,
- const std::atomic<bool>* manual_compaction_canceled = nullptr,
- const std::shared_ptr<Logger> info_log = nullptr,
- const std::string* full_history_ts_low = nullptr);
+ CompactionIterator(InternalIterator* input, const Comparator* cmp,
+ MergeHelper* merge_helper, SequenceNumber last_sequence,
+ std::vector<SequenceNumber>* snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SequenceNumber job_snapshot,
+ const SnapshotChecker* snapshot_checker, Env* env,
+ bool report_detailed_time, bool expect_valid_internal_key,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder,
+ bool allow_data_in_errors,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ std::unique_ptr<CompactionProxy> compaction,
+ const CompactionFilter* compaction_filter = nullptr,
+ const std::atomic<bool>* shutting_down = nullptr,
+ const std::shared_ptr<Logger> info_log = nullptr,
+ const std::string* full_history_ts_low = nullptr);
~CompactionIterator();
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
- const std::atomic<int>* manual_compaction_paused_;
- const std::atomic<bool>* manual_compaction_canceled_;
+ const std::atomic<bool>& manual_compaction_canceled_;
bool bottommost_level_;
bool valid_ = false;
bool visible_at_tip_;
bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
- return (manual_compaction_paused_ &&
- manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
- (manual_compaction_canceled_ &&
- manual_compaction_canceled_->load(std::memory_order_relaxed));
+ return manual_compaction_canceled_.load(std::memory_order_relaxed);
}
};
snapshot_checker_.get(), Env::Default(),
false /* report_detailed_time */, false, range_del_agg_.get(),
nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
- true /*enforce_single_del_contracts*/, std::move(compaction), filter,
- &shutting_down_,
- /*manual_compaction_paused=*/nullptr,
- /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr,
+ true /*enforce_single_del_contracts*/,
+ /*manual_compaction_canceled=*/kManualCompactionCanceledFalse_,
+ std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr,
full_history_ts_low));
}
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
std::unique_ptr<SnapshotChecker> snapshot_checker_;
std::atomic<bool> shutting_down_{false};
+ const std::atomic<bool> kManualCompactionCanceledFalse_{false};
FakeCompaction* compaction_proxy_;
};
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<int>* manual_compaction_paused,
- const std::atomic<bool>* manual_compaction_canceled,
+ const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback)
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
versions_(versions),
shutting_down_(shutting_down),
- manual_compaction_paused_(manual_compaction_paused),
manual_compaction_canceled_(manual_compaction_canceled),
db_directory_(db_directory),
blob_output_directory_(blob_output_directory),
if (shutting_down_->load(std::memory_order_acquire)) {
return;
}
- if (c->is_manual_compaction() && manual_compaction_paused_ &&
- manual_compaction_paused_->load(std::memory_order_acquire) > 0) {
+ if (c->is_manual_compaction() &&
+ manual_compaction_canceled_.load(std::memory_order_acquire)) {
return;
}
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
- const_cast<std::atomic<int>*>(manual_compaction_paused_)));
+ const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
Status status;
const std::string* const full_history_ts_low =
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors,
- db_options_.enforce_single_del_contracts, sub_compact->compaction,
- compaction_filter, shutting_down_, manual_compaction_paused_,
- manual_compaction_canceled_, db_options_.info_log, full_history_ts_low));
+ db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
+ sub_compact->compaction, compaction_filter, shutting_down_,
+ db_options_.info_log, full_history_ts_low));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
- const_cast<std::atomic<int>*>(manual_compaction_paused_)));
+ const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
status = Status::ShutdownInProgress("Database shutdown");
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
- ((manual_compaction_paused_ &&
- manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
- (manual_compaction_canceled_ &&
- manual_compaction_canceled_->load(std::memory_order_relaxed)))) {
+ (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<bool>* manual_compaction_canceled,
+ const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
- nullptr, manual_compaction_canceled, db_id, db_session_id,
+ manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path),
compaction_input_(compaction_service_input),
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<int>* manual_compaction_paused = nullptr,
- const std::atomic<bool>* manual_compaction_canceled = nullptr,
+ const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr);
FileOptions file_options_for_read_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
- const std::atomic<int>* manual_compaction_paused_;
- const std::atomic<bool>* manual_compaction_canceled_;
+ const std::atomic<bool>& manual_compaction_canceled_;
FSDirectory* db_directory_;
FSDirectory* blob_output_directory_;
InstrumentedMutex* db_mutex_;
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<bool>* manual_compaction_canceled,
+ const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
SnapshotChecker* snapshot_checker = nullptr;
ASSERT_TRUE(full_history_ts_low_.empty() ||
ucmp_->timestamp_size() == full_history_ts_low_.size());
+ const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionJob compaction_job(
0, &compaction, db_options_, mutable_db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
earliest_write_conflict_snapshot, snapshot_checker, nullptr,
table_cache_, &event_logger, false, false, dbname_,
&compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
- /*manual_compaction_paused=*/nullptr,
- /*manual_compaction_canceled=*/nullptr, env_->GenerateUniqueId(),
- DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_);
+ /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
+ env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr),
+ full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();
InstrumentedMutex trace_mutex_;
BlockCacheTracer block_cache_tracer_;
+ // constant false canceled flag, used when the compaction is not manual
+ const std::atomic<bool> kManualCompactionCanceledFalse_{false};
+
// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
output_path_id(_output_path_id),
exclusive(_exclusive),
disallow_trivial_move(_disallow_trivial_move),
- canceled(_canceled) {}
+ canceled(_canceled ? *_canceled : canceled_internal_storage) {}
+ // When _canceled is not provided by ther user, we assign the reference of
+ // canceled_internal_storage to it to consolidate canceled and
+ // manual_compaction_paused since DisableManualCompaction() might be
+ // called
ColumnFamilyData* cfd;
int input_level;
InternalKey* manual_end = nullptr; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress
- std::atomic<bool>* canceled; // Compaction canceled by the user?
+
+ // When the user provides a canceled pointer in CompactRangeOptions, the
+ // above varaibe is the reference of the user-provided
+ // `canceled`, otherwise, it is the reference of canceled_internal_storage
+ std::atomic<bool> canceled_internal_storage = false;
+ std::atomic<bool>& canceled; // Compaction canceled pointer reference
};
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.
// Perform CompactFiles
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
+ TEST_SYNC_POINT_CALLBACK(
+ "TestCompactFiles:PausingManualCompaction:3",
+ reinterpret_cast<void*>(
+ const_cast<std::atomic<int>*>(&manual_compaction_paused_)));
{
InstrumentedMutexLock l(&mutex_);
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
- &manual_compaction_paused_, nullptr, db_id_, db_session_id_,
+ kManualCompactionCanceledFalse_, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
&blob_callback_);
// and `CompactRangeOptions::canceled` might not work well together.
while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) {
- if (manual_compaction_paused_ > 0 ||
- (manual.canceled != nullptr && *manual.canceled == true)) {
+ if (manual_compaction_paused_ > 0 || manual.canceled == true) {
// Pretend the error came from compaction so the below cleanup/error
// handling code can process it.
manual.done = true;
return s;
}
+// NOTE: Calling DisableManualCompaction() may overwrite the
+// user-provided canceled variable in CompactRangeOptions
void DBImpl::DisableManualCompaction() {
InstrumentedMutexLock l(&mutex_);
manual_compaction_paused_.fetch_add(1, std::memory_order_release);
+ // Mark the canceled as true when the cancellation is triggered by
+ // manual_compaction_paused (may overwrite user-provided `canceled`)
+ for (const auto& manual_compaction : manual_compaction_dequeue_) {
+ manual_compaction->canceled = true;
+ }
+
// Wake up manual compactions waiting to start.
bg_cv_.SignalAll();
}
}
+// NOTE: In contrast to DisableManualCompaction(), calling
+// EnableManualCompaction() does NOT overwrite the user-provided *canceled
+// variable to be false since there is NO CHANCE a canceled compaction
+// is uncanceled. In other words, a canceled compaction must have been
+// dropped out of the manual compaction queue, when we disable it.
void DBImpl::EnableManualCompaction() {
InstrumentedMutexLock l(&mutex_);
assert(manual_compaction_paused_ > 0);
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
} else if (is_manual &&
- manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
- status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
- } else if (is_manual && manual_compaction->canceled &&
- manual_compaction->canceled->load(std::memory_order_acquire)) {
+ manual_compaction->canceled.load(std::memory_order_acquire)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
} else {
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
assert(is_snapshot_supported_ || snapshots_.empty());
+
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
mutable_db_options_, file_options_for_compaction_, versions_.get(),
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,
- is_manual ? &manual_compaction_paused_ : nullptr,
- is_manual ? manual_compaction->canceled : nullptr, db_id_,
- db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
+ is_manual ? manual_compaction->canceled
+ : kManualCompactionCanceledFalse_,
+ db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
c->trim_ts(), &blob_callback_);
compaction_job.Prepare();
file_options_for_compaction_, versions_.get(), &shutting_down_,
&log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
- options.canceled, input.db_id, db_session_id_, secondary_path_, input,
- result);
+ options.canceled ? *options.canceled : kManualCompactionCanceledFalse_,
+ input.db_id, db_session_id_, secondary_path_, input, result);
mutex_.Unlock();
s = compaction_job.Run();
int manual_compactions_paused = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
+ auto canceled = static_cast<std::atomic<bool>*>(arg);
+ // CompactRange triggers manual compaction and cancel the compaction
+ // by set *canceled as true
+ if (canceled != nullptr) {
+ canceled->store(true, std::memory_order_release);
+ }
+ manual_compactions_paused += 1;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
+ // CompactFiles() relies on manual_compactions_paused to
+ // determine if thie compaction should be paused or not
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
- manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < 2; i++) {
- // Generate a file containing 10 keys.
+ // Generate a file containing 100 keys.
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(j), rnd.RandomString(50)));
}
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
+ auto canceled = static_cast<std::atomic<bool>*>(arg);
+ // CompactRange triggers manual compaction and cancel the compaction
+ // by set *canceled as true
+ if (canceled != nullptr) {
+ canceled->store(true, std::memory_order_release);
+ }
+ run_manual_compactions++;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
+ // CompactFiles() relies on manual_compactions_paused to
+ // determine if thie compaction should be paused or not
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
- run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:2");
- dbfull()->EnableManualCompaction();
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
#ifndef ROCKSDB_LITE
"CompactionJob::FinishCompactionOutputFile1",
[&](void* /*arg*/) { running_compaction++; });
- // Case I: 1 Notify begin compaction, 2 DisableManualCompaction, 3 Compaction
- // not run, 4 Notify compaction end.
+ // Case I: 1 Notify begin compaction, 2 Set *canceled as true to disable
+ // manual compaction in the callback function, 3 Compaction not run,
+ // 4 Notify compaction end.
listener->code_ = Status::kIncomplete;
listener->subcode_ = Status::SubCode::kManualCompactionPaused;
listener->num_compaction_started_ = 0;
listener->num_compaction_ended_ = 0;
- // Case II: 1 DisableManualCompaction, 2 Notify begin compaction (return
- // without notifying), 3 Notify compaction end (return without notifying).
+ // Case II: 1 Set *canceled as true in the callback function to disable manual
+ // compaction, 2 Notify begin compaction (return without notifying), 3 Notify
+ // compaction end (return without notifying).
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_EQ(running_compaction, 0);
// Case III: 1 Notify begin compaction, 2 Compaction in between
- // 3. DisableManualCompaction, , 4 Notify compaction end.
- // compact_options.canceled->store(false, std::memory_order_release);
+ // 3. Set *canceled as true in the callback function to disable manual
+ // compaction, 4 Notify compaction end.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionIterator:ProcessKV");
snapshot_checker_);
assert(job_context_);
SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence();
+ const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter(
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &existing_snapshots_,
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
ioptions->enforce_single_del_contracts,
+ /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
/*compaction=*/nullptr, compaction_filter.get(),
- /*shutting_down=*/nullptr,
- /*manual_compaction_paused=*/nullptr,
- /*manual_compaction_canceled=*/nullptr, ioptions->info_log,
+ /*shutting_down=*/nullptr, ioptions->info_log,
&(cfd_->GetFullHistoryTsLow()));
// Set earliest sequence number in the new memtable
// Cancellation can be delayed waiting on automatic compactions when used
// together with `exclusive_manual_compaction == true`.
std::atomic<bool>* canceled = nullptr;
+ // NOTE: Calling DisableManualCompaction() overwrites the uer-provided
+ // canceled variable in CompactRangeOptions.
+ // Typically, when CompactRange is being called in one thread (t1) with
+ // canceled = false, and DisableManualCompaction is being called in the
+ // other thread (t2), manual compaction is disabled normally, even if the
+ // compaction iterator may still scan a few items before *canceled is
+ // set to true
// If set to kForce, RocksDB will override enable_blob_file_garbage_collection
// to true; if set to kDisable, RocksDB will override it to false, and