class PosixEnv : public Env {
public:
PosixEnv();
- virtual ~PosixEnv() {
- fprintf(stderr, "Destroying Env::Default()\n");
- exit(1);
+
+ virtual ~PosixEnv(){
+ WaitForBGThreads();
}
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
virtual void Schedule(void (*function)(void*), void* arg);
+ virtual void WaitForBGThreads();
+
virtual void StartThread(void (*function)(void* arg), void* arg);
virtual Status GetTestDirectory(std::string* result) {
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
typedef std::deque<BGItem> BGQueue;
+ int queue_size_; // number of items in BGQueue
+ bool exit_all_threads_;
BGQueue queue_;
+ std::vector<pthread_t> threads_to_join_;
};
PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
forceMmapOff(false),
page_size_(getpagesize()),
started_bgthread_(0),
- num_threads_(1) {
+ num_threads_(1),
+ queue_size_(0),
+ exit_all_threads_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
bgthread_.resize(num_threads_);
}
+// Signal and Join all background threads started by calls to Schedule
+void PosixEnv::WaitForBGThreads() {
+ PthreadCall("lock", pthread_mutex_lock(&mu_));
+ assert(! exit_all_threads_);
+ exit_all_threads_ = true;
+ PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
+ PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ for (unsigned int i = 0; i < threads_to_join_.size(); i++) {
+ pthread_join(threads_to_join_[i], nullptr);
+ }
+}
+
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
+ if (exit_all_threads_) {
+ PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ return;
+ }
// Start background thread if necessary
for (; started_bgthread_ < num_threads_; started_bgthread_++) {
PthreadCall(
nullptr,
&PosixEnv::BGThreadWrapper,
this));
+ threads_to_join_.push_back(bgthread_[started_bgthread_]);
fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
}
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
- while (queue_.empty()) {
+ while (queue_.empty() && !exit_all_threads_) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
-
+ if (exit_all_threads_) { // mechanism to let BG threads exit safely
+ PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ break;
+ }
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
state->arg = arg;
PthreadCall("start thread",
pthread_create(&t, nullptr, &StartThreadWrapper, state));
+ threads_to_join_.push_back(t);
}
} // namespace
-static pthread_once_t once = PTHREAD_ONCE_INIT;
-static Env* default_env;
-static void InitDefaultEnv() { default_env = new PosixEnv; }
+static PosixEnv default_env;
Env* Env::Default() {
- pthread_once(&once, InitDefaultEnv);
- return default_env;
+ return &default_env;
}
} // namespace leveldb