# Rocksdb Change Log
+## Unreleased
+### Bug Fixes
+* Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes.
+
## 7.10.0 (01/23/2023)
### Behavior changes
* Make best-efforts recovery verify SST unique ID before Version construction (#10962)
next_(nullptr),
prev_(nullptr),
log_number_(0),
- flush_reason_(FlushReason::kOthers),
column_family_set_(column_family_set),
queued_for_flush_(false),
queued_for_compaction_(false),
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
- void SetFlushReason(FlushReason flush_reason) {
- flush_reason_ = flush_reason;
- }
- FlushReason GetFlushReason() const { return flush_reason_; }
// thread-safe
const FileOptions* soptions() const;
const ImmutableOptions* ioptions() const { return &ioptions_; }
// recovered from
uint64_t log_number_;
- std::atomic<FlushReason> flush_reason_;
-
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
};
#endif // !ROCKSDB_LITE
+// RocksDB lite does not support GetLiveFiles()
+#ifndef ROCKSDB_LITE
+TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) {
+ Options options = CurrentOptions();
+ options.atomic_flush = true;
+ options.disable_auto_compactions = true;
+ CreateAndReopenWithCF({"cf1"}, options);
+
+ for (int idx = 0; idx < 1; ++idx) {
+ ASSERT_OK(Put(0, Key(idx), std::string(1, 'v')));
+ ASSERT_OK(Put(1, Key(idx), std::string(1, 'v')));
+ }
+
+ // To coerce a manual flush happenning in the middle of GetLiveFiles's flush,
+ // we need to pause background flush thread and enable it later.
+ std::shared_ptr<test::SleepingBackgroundTask> sleeping_task =
+ std::make_shared<test::SleepingBackgroundTask>();
+ env_->SetBackgroundThreads(1, Env::HIGH);
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+ sleeping_task.get(), Env::Priority::HIGH);
+ sleeping_task->WaitUntilSleeping();
+
+ // Coerce a manual flush happenning in the middle of GetLiveFiles's flush
+ bool get_live_files_paused_at_sync_point = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void* /* arg */) {
+ if (get_live_files_paused_at_sync_point) {
+ // To prevent non-GetLiveFiles() flush from pausing at this sync point
+ return;
+ }
+ get_live_files_paused_at_sync_point = true;
+
+ FlushOptions fo;
+ fo.wait = false;
+ fo.allow_write_stall = true;
+ ASSERT_OK(dbfull()->Flush(fo));
+
+ // Resume background flush thread so GetLiveFiles() can finish
+ sleeping_task->WakeUp();
+ sleeping_task->WaitUntilDone();
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<std::string> files;
+ uint64_t manifest_file_size;
+ // Before the fix, a race condition on default cf's flush reason due to
+ // concurrent GetLiveFiles's flush and manual flush will fail
+ // an internal assertion.
+ // After the fix, such race condition is fixed and there is no assertion
+ // failure.
+ ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
+ ASSERT_TRUE(get_live_files_paused_at_sync_point);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+#endif // !ROCKSDB_LITE
+
TEST_F(DBFlushTest, MemPurgeBasic) {
Options options = CurrentOptions();
options.atomic_flush = GetParam();
// 64MB so that memtable flush won't be trigger by the small writes.
options.write_buffer_size = (static_cast<size_t>(64) << 20);
-
+ auto flush_listener = std::make_shared<FlushCounterListener>();
+ flush_listener->expected_flush_reason = FlushReason::kManualFlush;
+ options.listeners.push_back(flush_listener);
// Destroy the DB to recreate as a TransactionDB.
Close();
Destroy(options, true);
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
- ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
}
// The recovered min log number with prepared data should be non-zero.
ASSERT_TRUE(db_impl->allow_2pc());
ASSERT_NE(db_impl->MinLogNumberToKeep(), 0);
}
-#endif // ROCKSDB_LITE
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.write_buffer_size = (static_cast<size_t>(64) << 20);
+ auto flush_listener = std::make_shared<FlushCounterListener>();
+ flush_listener->expected_flush_reason = FlushReason::kManualFlush;
+ options.listeners.push_back(flush_listener);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
- ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
}
+#endif // ROCKSDB_LITE
TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
Options options = CurrentOptions();
while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue();
- for (const auto& iter : flush_req) {
+ for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
iter.first->UnrefAndTryDelete();
}
}
#include <map>
#include <set>
#include <string>
+#include <unordered_map>
#include <utility>
#include <vector>
void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
- int job_id);
+ int job_id, FlushReason flush_reason);
void NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
// Argument required by background flush thread.
struct BGFlushArg {
BGFlushArg()
- : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {}
+ : cfd_(nullptr),
+ max_memtable_id_(0),
+ superversion_context_(nullptr),
+ flush_reason_(FlushReason::kOthers) {}
BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id,
- SuperVersionContext* superversion_context)
+ SuperVersionContext* superversion_context,
+ FlushReason flush_reason)
: cfd_(cfd),
max_memtable_id_(max_memtable_id),
- superversion_context_(superversion_context) {}
+ superversion_context_(superversion_context),
+ flush_reason_(flush_reason) {}
// Column family to flush.
ColumnFamilyData* cfd_;
// installs a new superversion for the column family. This operation
// requires a SuperVersionContext object (currently embedded in JobContext).
SuperVersionContext* superversion_context_;
+ FlushReason flush_reason_;
};
// Argument passed to flush thread.
// installs a new super version for the column family.
Status FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
- bool* madeProgress, JobContext* job_context,
+ bool* madeProgress, JobContext* job_context, FlushReason flush_reason,
SuperVersionContext* superversion_context,
std::vector<SequenceNumber>& snapshot_seqs,
SequenceNumber earliest_write_conflict_snapshot,
void MaybeScheduleFlushOrCompaction();
- // A flush request specifies the column families to flush as well as the
- // largest memtable id to persist for each column family. Once all the
- // memtables whose IDs are smaller than or equal to this per-column-family
- // specified value, this flush request is considered to have completed its
- // work of flushing this column family. After completing the work for all
- // column families in this request, this flush is considered complete.
- using FlushRequest = std::vector<std::pair<ColumnFamilyData*, uint64_t>>;
+ struct FlushRequest {
+ FlushReason flush_reason;
+ // A map from column family to flush to largest memtable id to persist for
+ // each column family. Once all the memtables whose IDs are smaller than or
+ // equal to this per-column-family specified value, this flush request is
+ // considered to have completed its work of flushing this column family.
+ // After completing the work for all column families in this request, this
+ // flush is considered complete.
+ std::unordered_map<ColumnFamilyData*, uint64_t>
+ cfd_to_max_mem_id_to_persist;
+ };
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
- FlushRequest* req);
+ FlushReason flush_reason, FlushRequest* req);
- void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
+ void SchedulePendingFlush(const FlushRequest& req);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
- bool* made_progress, JobContext* job_context,
+ bool* made_progress, JobContext* job_context, FlushReason flush_reason,
SuperVersionContext* superversion_context,
std::vector<SequenceNumber>& snapshot_seqs,
SequenceNumber earliest_write_conflict_snapshot,
dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
- job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
+ job_context, flush_reason, log_buffer, directories_.GetDbDir(),
+ GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
- NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
+ NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
+ flush_reason);
#endif // ROCKSDB_LITE
bool switched_to_mempurge = false;
MutableCFOptions mutable_cf_options_copy = *cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context =
bg_flush_arg.superversion_context_;
+ FlushReason flush_reason = bg_flush_arg.flush_reason_;
Status s = FlushMemTableToOutputFile(
- cfd, mutable_cf_options_copy, made_progress, job_context,
+ cfd, mutable_cf_options_copy, made_progress, job_context, flush_reason,
superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, log_buffer, thread_pri);
return s;
for (const auto cfd : cfds) {
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
- assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason());
+ }
+ for (const auto bg_flush_arg : bg_flush_args) {
+ assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
}
#endif /* !NDEBUG */
all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
+ FlushReason flush_reason = bg_flush_args[i].flush_reason_;
jobs.emplace_back(new FlushJob(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
- snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
- data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
- stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
+ snapshot_checker, job_context, flush_reason, log_buffer,
+ directories_.GetDbDir(), data_dir,
+ GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
+ &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
thread_pri, io_tracer_, seqno_time_mapping_, db_id_, db_session_id_,
cfd->GetFullHistoryTsLow(), &blob_callback_));
for (int i = 0; i != num_cfs; ++i) {
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex.
+ FlushReason flush_reason = bg_flush_args[i].flush_reason_;
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
- job_context->job_id);
+ job_context->job_id, flush_reason);
}
#endif /* !ROCKSDB_LITE */
bool resuming_from_bg_err =
error_handler_.IsDBStopped() ||
- (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
- cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
+ (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
+ bg_flush_args[0].flush_reason_ ==
+ FlushReason::kErrorRecoveryRetryFlush);
while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
std::pair<Status, bool> res = wait_to_install_func();
resuming_from_bg_err =
error_handler_.IsDBStopped() ||
- (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
- cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
+ (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
+ bg_flush_args[0].flush_reason_ ==
+ FlushReason::kErrorRecoveryRetryFlush);
}
if (!resuming_from_bg_err) {
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
- int job_id) {
+ int job_id, FlushReason flush_reason) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
- info.flush_reason = cfd->GetFlushReason();
+ info.flush_reason = flush_reason;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
}
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
+ (void)flush_reason;
#endif // ROCKSDB_LITE
}
}
void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
- FlushRequest* req) {
+ FlushReason flush_reason, FlushRequest* req) {
assert(req != nullptr);
- req->reserve(cfds.size());
+ req->flush_reason = flush_reason;
+ req->cfd_to_max_mem_id_to_persist.reserve(cfds.size());
for (const auto cfd : cfds) {
if (nullptr == cfd) {
// cfd may be null, see DBImpl::ScheduleFlushes
continue;
}
uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
- req->emplace_back(cfd, max_memtable_id);
+ req->cfd_to_max_mem_id_to_persist.emplace(cfd, max_memtable_id);
}
}
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
- FlushRequest req{{cfd, flush_memtable_id}};
+ FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
}
"to avoid holding old logs",
cfd->GetName().c_str());
s = SwitchMemtable(cfd_stats, &context);
- FlushRequest req{{cfd_stats, flush_memtable_id}};
+ FlushRequest req{flush_reason, {{cfd_stats, flush_memtable_id}}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd_stats->imm()->GetLatestMemTableID());
if (s.ok() && !flush_reqs.empty()) {
for (const auto& req : flush_reqs) {
- assert(req.size() == 1);
- ColumnFamilyData* loop_cfd = req[0].first;
+ assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
+ ColumnFamilyData* loop_cfd =
+ req.cfd_to_max_mem_id_to_persist.begin()->first;
loop_cfd->imm()->FlushRequested();
}
// If the caller wants to wait for this flush to complete, it indicates
// Therefore, we increase the cfd's ref count.
if (flush_options.wait) {
for (const auto& req : flush_reqs) {
- assert(req.size() == 1);
- ColumnFamilyData* loop_cfd = req[0].first;
+ assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
+ ColumnFamilyData* loop_cfd =
+ req.cfd_to_max_mem_id_to_persist.begin()->first;
loop_cfd->Ref();
}
}
for (const auto& req : flush_reqs) {
- SchedulePendingFlush(req, flush_reason);
+ SchedulePendingFlush(req);
}
MaybeScheduleFlushOrCompaction();
}
autovector<const uint64_t*> flush_memtable_ids;
assert(flush_reqs.size() == memtable_ids_to_wait.size());
for (size_t i = 0; i < flush_reqs.size(); ++i) {
- assert(flush_reqs[i].size() == 1);
- cfds.push_back(flush_reqs[i][0].first);
+ assert(flush_reqs[i].cfd_to_max_mem_id_to_persist.size() == 1);
+ cfds.push_back(flush_reqs[i].cfd_to_max_mem_id_to_persist.begin()->first);
flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
}
s = WaitForFlushMemTables(
cfd->Ref();
}
}
- GenerateFlushRequest(cfds, &flush_req);
- SchedulePendingFlush(flush_req, flush_reason);
+ GenerateFlushRequest(cfds, flush_reason, &flush_req);
+ SchedulePendingFlush(flush_req);
MaybeScheduleFlushOrCompaction();
}
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
autovector<const uint64_t*> flush_memtable_ids;
- for (auto& iter : flush_req) {
+ for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(&(iter.second));
}
s = WaitForFlushMemTables(
FlushRequest flush_req = flush_queue_.front();
flush_queue_.pop_front();
if (!immutable_db_options_.atomic_flush) {
- assert(flush_req.size() == 1);
+ assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
}
- for (const auto& elem : flush_req) {
+ for (const auto& elem : flush_req.cfd_to_max_mem_id_to_persist) {
if (!immutable_db_options_.atomic_flush) {
ColumnFamilyData* cfd = elem.first;
assert(cfd);
cfd->set_queued_for_flush(false);
}
}
- // TODO: need to unset flush reason?
return flush_req;
}
return cfd;
}
-void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
- FlushReason flush_reason) {
+void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
- if (flush_req.empty()) {
+ if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return;
}
if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
- assert(flush_req.size() == 1);
- ColumnFamilyData* cfd = flush_req[0].first;
+ assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
+ ColumnFamilyData* cfd =
+ flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
assert(cfd);
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
cfd->Ref();
cfd->set_queued_for_flush(true);
- cfd->SetFlushReason(flush_reason);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}
} else {
- for (auto& iter : flush_req) {
+ for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
- cfd->SetFlushReason(flush_reason);
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
while (!flush_queue_.empty()) {
// This cfd is already referenced
const FlushRequest& flush_req = PopFirstFromFlushQueue();
+ FlushReason flush_reason = flush_req.flush_reason;
superversion_contexts.clear();
- superversion_contexts.reserve(flush_req.size());
+ superversion_contexts.reserve(
+ flush_req.cfd_to_max_mem_id_to_persist.size());
- for (const auto& iter : flush_req) {
+ for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
ColumnFamilyData* cfd = iter.first;
if (cfd->GetMempurgeUsed()) {
// If imm() contains silent memtables (e.g.: because
}
superversion_contexts.emplace_back(SuperVersionContext(true));
bg_flush_args.emplace_back(cfd, iter.second,
- &(superversion_contexts.back()));
+ &(superversion_contexts.back()), flush_reason);
}
if (!bg_flush_args.empty()) {
break;
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
- // All the CFDs in the FlushReq must have the same flush reason, so just
- // grab the first one
- *reason = bg_flush_args[0].cfd_->GetFlushReason();
+// All the CFD/bg_flush_arg in the FlushReq must have the same flush reason, so
+// just grab the first one
+#ifndef NDEBUG
+ for (const auto bg_flush_arg : bg_flush_args) {
+ assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
+ }
+#endif /* !NDEBUG */
+ *reason = bg_flush_args[0].flush_reason_;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->UnrefAndTryDelete()) {
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
- GenerateFlushRequest({cfd}, &flush_req);
- SchedulePendingFlush(flush_req, FlushReason::kWalFull);
+ GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
+ SchedulePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
- GenerateFlushRequest(cfds, &flush_req);
- SchedulePendingFlush(flush_req, FlushReason::kWalFull);
+ GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
+ SchedulePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
- GenerateFlushRequest({cfd}, &flush_req);
- SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
+ GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
+ &flush_req);
+ SchedulePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
- GenerateFlushRequest(cfds, &flush_req);
- SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
+ GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
+ SchedulePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
- GenerateFlushRequest(cfds, &flush_req);
- SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
+ GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
+ SchedulePendingFlush(flush_req);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
- GenerateFlushRequest({cfd}, &flush_req);
- SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
+ GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
+ SchedulePendingFlush(flush_req);
}
}
MaybeScheduleFlushOrCompaction();
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
- LogBuffer* log_buffer, FSDirectory* db_directory,
+ FlushReason flush_reason, LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
job_context_(job_context),
+ flush_reason_(flush_reason),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_file_directory_(output_file_directory),
}
Status mempurge_s = Status::NotFound("No MemPurge.");
if ((mempurge_threshold > 0.0) &&
- (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
- (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) &&
- !(db_options_.atomic_flush)) {
+ (flush_reason_ == FlushReason::kWriteBufferFull) && (!mems_.empty()) &&
+ MemPurgeDecider(mempurge_threshold) && !(db_options_.atomic_flush)) {
cfd_->SetMempurgeUsed();
mempurge_s = MemPurge();
if (!mempurge_s.ok()) {
<< total_num_deletes << "total_data_size"
<< total_data_size << "memory_usage"
<< total_memory_usage << "flush_reason"
- << GetFlushReasonString(cfd_->GetFlushReason());
+ << GetFlushReasonString(flush_reason_);
{
ScopedArenaIterator iter(
info->smallest_seqno = meta_.fd.smallest_seqno;
info->largest_seqno = meta_.fd.largest_seqno;
info->table_properties = table_properties_;
- info->flush_reason = cfd_->GetFlushReason();
+ info->flush_reason = flush_reason_;
info->blob_compression_type = mutable_cf_options_.blob_compression_type;
// Update BlobFilesInfo.
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
- LogBuffer* log_buffer, FSDirectory* db_directory,
- FSDirectory* output_file_directory,
+ FlushReason flush_reason, LogBuffer* log_buffer,
+ FSDirectory* db_directory, FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
SequenceNumber earliest_write_conflict_snapshot_;
SnapshotChecker* snapshot_checker_;
JobContext* job_context_;
+ FlushReason flush_reason_;
LogBuffer* log_buffer_;
FSDirectory* db_directory_;
FSDirectory* output_file_directory_;
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
- FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
- db_options_, *cfd->GetLatestMutableCFOptions(),
- std::numeric_limits<uint64_t>::max() /* memtable_id */,
- env_options_, versions_.get(), &mutex_, &shutting_down_,
- {}, kMaxSequenceNumber, snapshot_checker, &job_context,
- nullptr, nullptr, nullptr, kNoCompression, nullptr,
- &event_logger, false, true /* sync_output_directory */,
- true /* write_manifest */, Env::Priority::USER,
- nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
+ FlushJob flush_job(
+ dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
+ *cfd->GetLatestMutableCFOptions(),
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
+ versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, nullptr, &event_logger, false,
+ true /* sync_output_directory */, true /* write_manifest */,
+ Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
*cfd->GetLatestMutableCFOptions(),
std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */, true /* write_manifest */,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+ true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
HistogramData hist;
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */, true /* write_manifest */,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+ true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
HistogramData hist;
FileMetaData file_meta;
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
memtable_ids[k], env_options_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
- &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
+ &job_context, FlushReason::kTest, nullptr, nullptr, nullptr,
+ kNoCompression, db_options_.statistics.get(), &event_logger, true,
false /* sync_output_directory */, false /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/,
empty_seqno_to_time_mapping_));
*cfd->GetLatestMutableCFOptions(),
std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */, true /* write_manifest */,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+ true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
mutex_.Lock();
flush_job.PickMemTable();
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */, true /* write_manifest */,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+ true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
// When the state from WriteController is normal.
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */, true /* write_manifest */,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+ true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_,
/*db_id=*/"",
/*db_session_id=*/"", full_history_ts_low);
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */, true /* write_manifest */,
+ snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+ nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+ true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_,
/*db_id=*/"",
/*db_session_id=*/"", full_history_ts_low);