buffer_size_.store(new_size, std::memory_order_relaxed);
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
// Check if stall is active and can be ended.
- if (allow_stall_) {
- EndWriteStall();
- }
+ MaybeEndWriteStall();
}
// Below functions should be called by RocksDB internally.
// pass allow_stall = true during WriteBufferManager instance creation.
//
// Should only be called by RocksDB internally .
- bool ShouldStall() {
- if (allow_stall_ && enabled()) {
- if (IsStallActive()) {
- return true;
- }
- if (IsStallThresholdExceeded()) {
- stall_active_.store(true, std::memory_order_relaxed);
- return true;
- }
+ bool ShouldStall() const {
+ if (!allow_stall_ || !enabled()) {
+ return false;
}
- return false;
+
+ return IsStallActive() || IsStallThresholdExceeded();
}
// Returns true if stall is active.
}
// Returns true if stalling condition is met.
- bool IsStallThresholdExceeded() { return memory_usage() >= buffer_size_; }
+ bool IsStallThresholdExceeded() const {
+ return memory_usage() >= buffer_size_;
+ }
void ReserveMem(size_t mem);
// Should only be called by RocksDB internally.
void BeginWriteStall(StallInterface* wbm_stall);
- // Remove DB instances from queue and signal them to continue.
- void EndWriteStall();
+ // If stall conditions have resolved, remove DB instances from queue and
+ // signal them to continue.
+ void MaybeEndWriteStall();
void RemoveDBFromQueue(StallInterface* wbm_stall);
std::mutex cache_rev_mng_mu_;
std::list<StallInterface*> queue_;
- // Protects the queue_
+ // Protects the queue_ and stall_active_.
std::mutex mu_;
bool allow_stall_;
+ // Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
+ // while holding mu_, but it can be read without a lock.
std::atomic<bool> stall_active_;
void ReserveMemWithCache(size_t mem);
#endif // ROCKSDB_LITE
}
-WriteBufferManager::~WriteBufferManager() = default;
+WriteBufferManager::~WriteBufferManager() {
+#ifndef NDEBUG
+ std::unique_lock<std::mutex> lock(mu_);
+ assert(queue_.empty());
+#endif
+}
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
if (cache_rev_mng_ != nullptr) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
}
// Check if stall is active and can be ended.
- if (allow_stall_) {
- EndWriteStall();
- }
+ MaybeEndWriteStall();
}
void WriteBufferManager::FreeMemWithCache(size_t mem) {
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr);
- if (wbm_stall) {
+ assert(allow_stall_);
+
+ // Allocate outside of the lock.
+ std::list<StallInterface*> new_node = {wbm_stall};
+
+ {
std::unique_lock<std::mutex> lock(mu_);
- queue_.push_back(wbm_stall);
+ // Verify if the stall conditions are stil active.
+ if (ShouldStall()) {
+ stall_active_.store(true, std::memory_order_relaxed);
+ queue_.splice(queue_.end(), std::move(new_node));
+ }
}
- // In case thread enqueue itself and memory got freed in parallel, end the
- // stall.
- if (!ShouldStall()) {
- EndWriteStall();
+
+ // If the node was not consumed, the stall has ended already and we can signal
+ // the caller.
+ if (!new_node.empty()) {
+ new_node.front()->Signal();
}
}
-// Called when memory is freed in FreeMem.
-void WriteBufferManager::EndWriteStall() {
- if (enabled() && !IsStallThresholdExceeded()) {
- {
- std::unique_lock<std::mutex> lock(mu_);
- stall_active_.store(false, std::memory_order_relaxed);
- if (queue_.empty()) {
- return;
- }
- }
+// Called when memory is freed in FreeMem or the buffer size has changed.
+void WriteBufferManager::MaybeEndWriteStall() {
+ // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
+ // the writers.
+ if (!allow_stall_) {
+ return;
+ }
- // Get the instances from the list and call WBMStallInterface::Signal to
- // change the state to running and unblock the DB instances.
- // Check ShouldStall() incase stall got active by other DBs.
- while (!ShouldStall() && !queue_.empty()) {
- std::unique_lock<std::mutex> lock(mu_);
- StallInterface* wbm_stall = queue_.front();
- queue_.pop_front();
- wbm_stall->Signal();
- }
+ if (IsStallThresholdExceeded()) {
+ return; // Stall conditions have not resolved.
+ }
+
+ // Perform all deallocations outside of the lock.
+ std::list<StallInterface*> cleanup;
+
+ std::unique_lock<std::mutex> lock(mu_);
+ if (!stall_active_.load(std::memory_order_relaxed)) {
+ return; // Nothing to do.
+ }
+
+ // Unblock new writers.
+ stall_active_.store(false, std::memory_order_relaxed);
+
+ // Unblock the writers in the queue.
+ for (StallInterface* wbm_stall : queue_) {
+ wbm_stall->Signal();
}
+ cleanup = std::move(queue_);
}
void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr);
+
+ // Deallocate the removed nodes outside of the lock.
+ std::list<StallInterface*> cleanup;
+
if (enabled() && allow_stall_) {
std::unique_lock<std::mutex> lock(mu_);
- queue_.remove(wbm_stall);
- wbm_stall->Signal();
+ for (auto it = queue_.begin(); it != queue_.end();) {
+ auto next = std::next(it);
+ if (*it == wbm_stall) {
+ cleanup.splice(cleanup.end(), queue_, std::move(it));
+ }
+ it = next;
+ }
}
+ wbm_stall->Signal();
}
} // namespace ROCKSDB_NAMESPACE