#pragma once
-#include "port/port.h"
-
#include <assert.h>
#include <chrono>
#include <condition_variable>
#include <utility>
#include <vector>
+#include "port/port.h"
+#include "util/sync_point.h"
+
// Allows execution of handlers at a specified time in the future
// Guarantees:
// - All handlers are executed ONCE, even if cancelled (aborted parameter will
public:
TimerQueue() : m_th(&TimerQueue::run, this) {}
- ~TimerQueue() {
+ ~TimerQueue() { shutdown(); }
+
+ // This function is not thread-safe.
+ void shutdown() {
+ if (closed_) {
+ return;
+ }
cancelAll();
// Abusing the timer queue to trigger the shutdown.
add(0, [this](bool) {
return std::make_pair(false, 0);
});
m_th.join();
+ closed_ = true;
}
// Adds a new timer
WorkItem item;
Clock::time_point tp = Clock::now();
item.end = tp + std::chrono::milliseconds(milliseconds);
+ TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
item.period = milliseconds;
item.handler = std::move(handler);
std::vector<WorkItem>& getContainer() { return this->c; }
} m_items;
rocksdb::port::Thread m_th;
+ bool closed_ = false;
};
}
BlobDBImpl::~BlobDBImpl() {
+ tqueue_.shutdown();
// CancelAllBackgroundWork(db_, true);
Status s __attribute__((__unused__)) = Close();
assert(s.ok());
return std::make_pair(false, -1);
}
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
+
std::vector<std::shared_ptr<BlobFile>> process_files;
uint64_t now = EpochNow();
{
}
}
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
+ TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
+
SequenceNumber seq = GetLatestSequenceNumber();
{
MutexLock l(&write_mutex_);
#ifndef ROCKSDB_LITE
#include <algorithm>
+#include <chrono>
#include <cstdlib>
#include <map>
#include <memory>
Open(bdb_options, options);
}
+ void Close() {
+ assert(blob_db_ != nullptr);
+ delete blob_db_;
+ blob_db_ = nullptr;
+ }
+
void Destroy() {
if (blob_db_) {
Options options = blob_db_->GetOptions();
}
}
+TEST_F(BlobDBTest, ShutdownWait) {
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 100;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = false;
+ Options options;
+ options.env = mock_env_.get();
+
+ SyncPoint::GetInstance()->LoadDependency({
+ {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
+ {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
+ {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
+ {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
+ });
+ // Force all tasks to be scheduled immediately.
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TimeQueue::Add:item.end", [&](void *arg) {
+ std::chrono::steady_clock::time_point *tp =
+ static_cast<std::chrono::steady_clock::time_point *>(arg);
+ *tp =
+ std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
+ });
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
+ // Sleep 3 ms to increase the chance of data race.
+ // We've synced up the code so that EvictExpiredFiles()
+ // is called concurrently with ~BlobDBImpl().
+ // ~BlobDBImpl() is supposed to wait for all background
+ // task to shutdown before doing anything else. In order
+ // to use the same test to reproduce a bug of the waiting
+ // logic, we wait a little bit here, so that TSAN can
+ // catch the data race.
+ // We should improve the test if we find a better way.
+ Env::Default()->SleepForMicroseconds(3000);
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Open(bdb_options, options);
+ mock_env_->set_current_time(50);
+ std::map<std::string, std::string> data;
+ ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_FALSE(blob_file->Immutable());
+ ASSERT_FALSE(blob_file->Obsolete());
+ VerifyDB(data);
+
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
+ mock_env_->set_current_time(250);
+ // The key should expired now.
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
+
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
+ Close();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
} // namespace blob_db
} // namespace rocksdb