From: Yi Wu Date: Fri, 19 Jan 2018 01:32:50 +0000 (-0800) Subject: Fix Flush() keep waiting after flush finish X-Git-Tag: rocksdb-5.10.2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e7e0aa7a2aabfb63e21ee63f66400ea09c14b999;p=rocksdb.git Fix Flush() keep waiting after flush finish Summary: Flush() call could be waiting indefinitely if min_write_buffer_number_to_merge is used. Consider the sequence: 1. User call Flush() with flush_options.wait = true 2. The manual flush started in the background 3. New memtable become immutable because of writes. The new memtable will not trigger flush if min_write_buffer_number_to_merge is not reached. 4. The manual flush finish. Because of the new memtable created at step 3 not being flush, previous logic of WaitForFlushMemTable() keep waiting, despite the memtables it intent to flush has been flushed. Here instead of checking if there are any more memtables to flush, WaitForFlushMemTable() also check the id of the earliest memtable. If the id is larger than that of latest memtable at the time flush was initiated, it means all the memtable at the time of flush start has all been flush. Closes https://github.com/facebook/rocksdb/pull/3378 Differential Revision: D6746789 Pulled By: yiwu-arbug fbshipit-source-id: 35e698f71c7f90b06337a93e6825f4ea3b619bfa --- diff --git a/HISTORY.md b/HISTORY.md index 6cdf01d0..2f8383de 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log -## Unreleased +## 5.10.1 (01/18/2018) +### Bug Fixes +* Fix DB::Flush() keep waiting after flush finish under certain condition. ## 5.10.0 (12/11/2017) ### Public API Change diff --git a/db/column_family.cc b/db/column_family.cc index 81b63262..c32e97f0 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -385,7 +385,8 @@ ColumnFamilyData::ColumnFamilyData( pending_flush_(false), pending_compaction_(false), prev_compaction_needed_bytes_(0), - allow_2pc_(db_options.allow_2pc) { + allow_2pc_(db_options.allow_2pc), + last_memtable_id_(0) { Ref(); // Convert user defined table properties collector factories to internal ones. diff --git a/db/column_family.h b/db/column_family.h index 9a125aa1..ce1fd473 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -239,7 +239,11 @@ class ColumnFamilyData { void SetCurrent(Version* _current); uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held - void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } + void SetMemtable(MemTable* new_mem) { + uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1; + new_mem->SetID(memtable_id); + mem_ = new_mem; + } // calculate the oldest log needed for the durability of this column family uint64_t OldestLogToKeep(); @@ -419,6 +423,9 @@ class ColumnFamilyData { // if the database was opened with 2pc enabled bool allow_2pc_; + + // Memtable id to track flush. + std::atomic last_memtable_id_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 107e8246..83895ea6 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -126,6 +126,41 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { ASSERT_EQ(1, num_compactions); } +TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) { + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + Reopen(options); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush", + "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"}, + {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2", + "DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key1", "value1")); + + port::Thread t([&]() { + // The call wait for flush to finish, i.e. with flush_options.wait = true. + ASSERT_OK(Flush()); + }); + + // Wait for flush start. + TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"); + // Insert a second memtable before the manual flush finish. + // At the end of the manual flush job, it will check if further flush + // is needed, but it will not trigger flush of the second memtable because + // min_write_buffer_number_to_merge is not reached. + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2"); + + // Manual flush should return, without waiting for flush indefinitely. + t.join(); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl.h b/db/db_impl.h index 3db65048..a9eaad72 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -810,8 +810,12 @@ class DBImpl : public DB { Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, bool writes_stopped = false); - // Wait for memtable flushed - Status WaitForFlushMemTable(ColumnFamilyData* cfd); + // Wait for memtable flushed. + // If flush_memtable_id is non-null, wait until the memtable with the ID + // gets flush. Otherwise, wait until the column family don't have any + // memtable pending flush. + Status WaitForFlushMemTable(ColumnFamilyData* cfd, + const uint64_t* flush_memtable_id = nullptr); // REQUIRES: mutex locked Status SwitchWAL(WriteContext* write_context); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 02f35df2..1985c9fe 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -134,6 +134,7 @@ Status DBImpl::FlushMemTableToOutputFile( } if (s.ok()) { + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"); InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context, mutable_cf_options); if (made_progress) { @@ -809,7 +810,13 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - return FlushMemTable(cfh->cfd(), flush_options); + ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", + cfh->GetName().c_str()); + Status s = FlushMemTable(cfh->cfd(), flush_options); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] Manual flush finished, status: %s\n", + cfh->GetName().c_str(), s.ToString().c_str()); + return s; } Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, @@ -944,6 +951,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, bool writes_stopped) { Status s; + uint64_t flush_memtable_id = 0; { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -961,6 +969,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // SwitchMemtable() will release and reacquire mutex during execution s = SwitchMemtable(cfd, &context); + flush_memtable_id = cfd->imm()->GetLatestMemTableID(); if (!writes_stopped) { write_thread_.ExitUnbatched(&w); @@ -975,16 +984,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok() && flush_options.wait) { // Wait until the compaction completes - s = WaitForFlushMemTable(cfd); + s = WaitForFlushMemTable(cfd, &flush_memtable_id); } return s; } -Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { +Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, + const uint64_t* flush_memtable_id) { Status s; // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) { + while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() && + (flush_memtable_id == nullptr || + cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } diff --git a/db/memtable.h b/db/memtable.h index 76e3cf1b..bfafbeac 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -368,6 +368,11 @@ class MemTable { return oldest_key_time_.load(std::memory_order_relaxed); } + // REQUIRES: db_mutex held. + void SetID(uint64_t id) { id_ = id; } + + uint64_t GetID() const { return id_; } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -437,6 +442,9 @@ class MemTable { // Timestamp of oldest key std::atomic oldest_key_time_; + // Memtable id to track flush. + uint64_t id_ = 0; + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/db/memtable_list.h b/db/memtable_list.h index 1e46642a..c2ac65a2 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -5,11 +5,12 @@ // #pragma once -#include +#include +#include #include -#include #include -#include +#include +#include #include "db/dbformat.h" #include "db/memtable.h" @@ -244,6 +245,22 @@ class MemTableList { uint64_t GetMinLogContainingPrepSection(); + uint64_t GetEarliestMemTableID() const { + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return std::numeric_limits::max(); + } + return memlist.back()->GetID(); + } + + uint64_t GetLatestMemTableID() const { + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return 0; + } + return memlist.front()->GetID(); + } + private: // DB mutex held void InstallNewVersion();