]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Exit and Join the background compaction threads while running rocksdb tests
authorMayank Agarwal <amayank@fb.com>
Tue, 19 Mar 2013 21:39:28 +0000 (14:39 -0700)
committerMayank Agarwal <amayank@fb.com>
Wed, 10 Apr 2013 21:50:25 +0000 (14:50 -0700)
Summary:
The background compaction threads are never exitted and therefore caused
memory-leaks while running rpcksdb tests. Have changed the PosixEnv destructor to exit and join them and changed the tests likewise
The memory leaked has reduced from 320 bytes to 64 bytes in all the tests. The 64
bytes is relating to
pthread_exit, but still have to figure out why. The stack-trace right now with
table_test.cc = 64 bytes in 1 blocks are possibly lost in loss record 4 of 5
   at 0x475D8C: malloc (jemalloc.c:914)
   by 0x400D69E: _dl_map_object_deps (dl-deps.c:505)
   by 0x4013393: dl_open_worker (dl-open.c:263)
   by 0x400F015: _dl_catch_error (dl-error.c:178)
   by 0x4013B2B: _dl_open (dl-open.c:569)
   by 0x5D3E913: do_dlopen (dl-libc.c:86)
   by 0x400F015: _dl_catch_error (dl-error.c:178)
   by 0x5D3E9D6: __libc_dlopen_mode (dl-libc.c:47)
   by 0x5048BF3: pthread_cancel_init (unwind-forcedunwind.c:53)
   by 0x5048DC9: _Unwind_ForcedUnwind (unwind-forcedunwind.c:126)
   by 0x5046D9F: __pthread_unwind (unwind.c:130)
   by 0x50413A4: pthread_exit (pthreadP.h:289)

Test Plan: make all check

Reviewers: dhruba, sheki, haobo

Reviewed By: dhruba

CC: leveldb, chip
Differential Revision: https://reviews.facebook.net/D9573

util/env_posix.cc

index 7cdf66cec996b2d2e1060693906f6e04d535c0f6..957c5b1052e5cd993875d00b2a8fc480957b4bc1 100644 (file)
@@ -600,9 +600,9 @@ class PosixFileLock : public FileLock {
 class PosixEnv : public Env {
  public:
   PosixEnv();
-  virtual ~PosixEnv() {
-    fprintf(stderr, "Destroying Env::Default()\n");
-    exit(1);
+
+  virtual ~PosixEnv(){
+    WaitForBGThreads();
   }
 
   void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
@@ -804,6 +804,8 @@ class PosixEnv : public Env {
 
   virtual void Schedule(void (*function)(void*), void* arg);
 
+  virtual void WaitForBGThreads();
+
   virtual void StartThread(void (*function)(void* arg), void* arg);
 
   virtual Status GetTestDirectory(std::string* result) {
@@ -973,22 +975,43 @@ class PosixEnv : public Env {
   // Entry per Schedule() call
   struct BGItem { void* arg; void (*function)(void*); };
   typedef std::deque<BGItem> BGQueue;
+  int queue_size_; // number of items in BGQueue
+  bool exit_all_threads_;
   BGQueue queue_;
+  std::vector<pthread_t> threads_to_join_;
 };
 
 PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
                        forceMmapOff(false),
                        page_size_(getpagesize()),
                        started_bgthread_(0),
-                       num_threads_(1) {
+                       num_threads_(1),
+                       queue_size_(0),
+                       exit_all_threads_(false) {
   PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
   PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
   bgthread_.resize(num_threads_);
 }
 
+// Signal and Join all background threads started by calls to Schedule
+void PosixEnv::WaitForBGThreads() {
+  PthreadCall("lock", pthread_mutex_lock(&mu_));
+  assert(! exit_all_threads_);
+  exit_all_threads_ = true;
+  PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
+  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+  for (unsigned int i = 0; i < threads_to_join_.size(); i++) {
+    pthread_join(threads_to_join_[i], nullptr);
+  }
+}
+
 void PosixEnv::Schedule(void (*function)(void*), void* arg) {
   PthreadCall("lock", pthread_mutex_lock(&mu_));
 
+  if (exit_all_threads_) {
+    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+    return;
+  }
   // Start background thread if necessary
   for (; started_bgthread_ < num_threads_; started_bgthread_++) {
     PthreadCall(
@@ -997,6 +1020,7 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) {
                        nullptr,
                        &PosixEnv::BGThreadWrapper,
                        this));
+    threads_to_join_.push_back(bgthread_[started_bgthread_]);
     fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
   }
 
@@ -1015,10 +1039,13 @@ void PosixEnv::BGThread() {
   while (true) {
     // Wait until there is an item that is ready to run
     PthreadCall("lock", pthread_mutex_lock(&mu_));
-    while (queue_.empty()) {
+    while (queue_.empty() && !exit_all_threads_) {
       PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
     }
-
+    if (exit_all_threads_) { // mechanism to let BG threads exit safely
+      PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+      break;
+    }
     void (*function)(void*) = queue_.front().function;
     void* arg = queue_.front().arg;
     queue_.pop_front();
@@ -1048,17 +1075,15 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
   state->arg = arg;
   PthreadCall("start thread",
               pthread_create(&t, nullptr,  &StartThreadWrapper, state));
+  threads_to_join_.push_back(t);
 }
 
 }  // namespace
 
-static pthread_once_t once = PTHREAD_ONCE_INIT;
-static Env* default_env;
-static void InitDefaultEnv() { default_env = new PosixEnv; }
+static PosixEnv default_env;
 
 Env* Env::Default() {
-  pthread_once(&once, InitDefaultEnv);
-  return default_env;
+  return &default_env;
 }
 
 }  // namespace leveldb