column_family_set_(column_family_set),
pending_flush_(false),
pending_compaction_(false),
- prev_compaction_needed_bytes_(0) {
+ prev_compaction_needed_bytes_(0),
+ allow_2pc_(db_options.allow_2pc) {
Ref();
// Convert user defined table properties collector factories to internal ones.
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
}
+uint64_t ColumnFamilyData::OldestLogToKeep() {
+ auto current_log = GetLogNumber();
+
+ if (allow_2pc_) {
+ auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
+ auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
+
+ if (imm_prep_log > 0 && imm_prep_log < current_log) {
+ current_log = imm_prep_log;
+ }
+
+ if (mem_prep_log > 0 && mem_prep_log < current_log) {
+ current_log = mem_prep_log;
+ }
+ }
+
+ return current_log;
+}
+
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
+ // calculate the oldest log needed for the durability of this column family
+ uint64_t OldestLogToKeep();
+
// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
bool pending_compaction_;
uint64_t prev_compaction_needed_bytes_;
+
+ // if the database was opened with 2pc enabled
+ bool allow_2pc_;
};
// ColumnFamilySet has interesting thread-safety requirements
last_stats_dump_time_microsec_(0),
next_job_id_(1),
has_unpersisted_data_(false),
+ unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
}
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
+ if (!allow_2pc()) {
+ return 0;
+ }
+
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
}
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
+
+ if (!allow_2pc()) {
+ return 0;
+ }
+
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
uint64_t min_log = 0;
mutable_db_options_ = new_options;
if (total_log_size_ > GetMaxTotalWalSize()) {
- FlushColumnFamilies();
+ MaybeFlushColumnFamilies();
}
persist_options_status = PersistOptions();
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(!single_column_family_mode_ &&
- !alive_log_files_.begin()->getting_flushed &&
total_log_size_ > GetMaxTotalWalSize())) {
- FlushColumnFamilies();
+ MaybeFlushColumnFamilies();
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
return status;
}
-void DBImpl::FlushColumnFamilies() {
+void DBImpl::MaybeFlushColumnFamilies() {
mutex_.AssertHeld();
- WriteContext context;
-
if (alive_log_files_.begin()->getting_flushed) {
return;
}
- uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
- alive_log_files_.begin()->getting_flushed = true;
+ auto oldest_alive_log = alive_log_files_.begin()->number;
+ auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
+
+ if (allow_2pc() &&
+ unable_to_flush_oldest_log_ &&
+ oldest_log_with_uncommited_prep > 0 &&
+ oldest_log_with_uncommited_prep <= oldest_alive_log) {
+ // we already attempted to flush all column families dependent on
+ // the oldest alive log but the log still contained uncommited transactions.
+ // the oldest alive log STILL contains uncommited transaction so there
+ // is still nothing that we can do.
+ return;
+ }
+
+ WriteContext context;
+
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
- flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
+ oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
- if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
+ if (cfd->OldestLogToKeep() <= oldest_alive_log) {
auto status = SwitchMemtable(cfd, &context);
if (!status.ok()) {
break;
}
}
MaybeScheduleFlushOrCompaction();
+
+ // we only mark this log as getting flushed if we have successfully
+ // flushed all data in this log. If this log contains outstanding prepred
+ // transactions then we cannot flush this log until those transactions are commited.
+
+ unable_to_flush_oldest_log_ = false;
+
+ if (allow_2pc()) {
+ if (oldest_log_with_uncommited_prep == 0 ||
+ oldest_log_with_uncommited_prep > oldest_alive_log) {
+ // this log contains no outstanding prepared transactions
+ alive_log_files_.begin()->getting_flushed = true;
+ } else {
+ Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
+ "Unable to release oldest log due to uncommited transaction");
+ unable_to_flush_oldest_log_ = true;
+ }
+ } else {
+ alive_log_files_.begin()->getting_flushed = true;
+ }
}
uint64_t DBImpl::GetMaxTotalWalSize() const {
ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false);
+ void TEST_MaybeFlushColumnFamilies();
+
+ bool TEST_UnableToFlushOldestLog() {
+ return unable_to_flush_oldest_log_;
+ }
+
+ bool TEST_IsLogGettingFlushed() {
+ return alive_log_files_.begin()->getting_flushed;
+ }
+
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true,
ColumnFamilyHandle* cfh = nullptr);
// REQUIRES: mutex locked
Status PersistOptions();
- void FlushColumnFamilies();
+ void MaybeFlushColumnFamilies();
uint64_t GetMaxTotalWalSize() const;
// Used when disableWAL is true.
bool has_unpersisted_data_;
+
+ // if an attempt was made to flush all column families that
+ // the oldest log depends on but uncommited data in the oldest
+ // log prevents the log from being released.
+ // We must attempt to free the dependent memtables again
+ // at a later time after the transaction in the oldest
+ // log is fully commited.
+ bool unable_to_flush_oldest_log_;
+
static const int KEEP_LOG_FILE_NUM = 1000;
// MSVC version 1800 still does not have constexpr for ::max()
static const uint64_t kNoTimeOut = port::kMaxUint64;
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}
+void DBImpl::TEST_MaybeFlushColumnFamilies() {
+ InstrumentedMutexLock l(&mutex_);
+ MaybeFlushColumnFamilies();
+}
+
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;
// PickMemtablesToFlush() is called.
void FlushRequested() { flush_requested_ = true; }
+ bool HasFlushRequested() { return flush_requested_; }
+
// Copying allowed
// MemTableList(const MemTableList&);
// void operator=(const MemTableList&);
delete cfb;
}
+TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ Status s;
+ ColumnFamilyHandle *cfa, *cfb;
+
+ ColumnFamilyOptions cf_options;
+ s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
+ ASSERT_OK(s);
+ s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
+ ASSERT_OK(s);
+
+ WriteOptions wopts;
+ wopts.disableWAL = false;
+ wopts.sync = true;
+
+ auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
+ auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
+
+ TransactionOptions topts1;
+ Transaction* txn1 = db->BeginTransaction(wopts, topts1);
+ s = txn1->SetName("xid1");
+ ASSERT_OK(s);
+ s = txn1->Put(cfa, "boys", "girls1");
+ ASSERT_OK(s);
+
+ Transaction* txn2 = db->BeginTransaction(wopts, topts1);
+ s = txn2->SetName("xid2");
+ ASSERT_OK(s);
+ s = txn2->Put(cfb, "up", "down1");
+ ASSERT_OK(s);
+
+ // prepre transaction in LOG A
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ // prepre transaction in LOG A
+ s = txn2->Prepare();
+ ASSERT_OK(s);
+
+ // regular put so that mem table can actually be flushed for log rolling
+ s = db->Put(wopts, "cats", "dogs1");
+ ASSERT_OK(s);
+
+ auto prepare_log_no = txn1->GetLogNumber();
+
+ // roll to LOG B
+ s = db_impl->TEST_FlushMemTable(true);
+ ASSERT_OK(s);
+
+ // now we pause background work so that
+ // imm()s are not flushed before we can check their status
+ s = db_impl->PauseBackgroundWork();
+ ASSERT_OK(s);
+
+ ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
+ ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
+ ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
+ prepare_log_no);
+ ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
+
+ // commit in LOG B
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
+
+ ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
+
+ // request a flush for all column families such that the earliest
+ // alive log file can be killed
+ db_impl->TEST_MaybeFlushColumnFamilies();
+ // log cannot be flushed because txn2 has not been commited
+ ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
+ ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
+
+ // assert that cfa has a flush requested
+ ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
+
+ // cfb should not be flushed becuse it has no data from LOG A
+ ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
+
+ // cfb now has data from LOG A
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ db_impl->TEST_MaybeFlushColumnFamilies();
+ ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
+
+ // we should see that cfb now has a flush requested
+ ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
+
+ // all data in LOG A resides in a memtable that has been
+ // requested for a flush
+ ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
+
+ delete txn1;
+ delete txn2;
+ delete cfa;
+ delete cfb;
+}
/*
* 1) use prepare to keep first log around to determine starting sequence
* during recovery.