]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/LevelDBStore: do compact_prefix() work asynchronously
authorSage Weil <sage@inktank.com>
Tue, 28 May 2013 23:35:55 +0000 (16:35 -0700)
committerSage Weil <sage@inktank.com>
Wed, 29 May 2013 03:40:28 +0000 (20:40 -0700)
We generally do not want to block while compacting a range of leveldb.
Push the blocking+waiting off to a separate thread.  (leveldb will do what
it can to avoid blocking internally; no reason for us to wait explicitly.)

This addresses part of #5176.

Signed-off-by: Sage Weil <sage@inktank.com>
src/mon/MonitorDBStore.h
src/os/LevelDBStore.cc
src/os/LevelDBStore.h

index f10d96d58a8606fd2672379bd5a77f9ebf0895ef..3ed144aa016029484393a41d0e79aa1e54e17104 100644 (file)
@@ -217,7 +217,7 @@ class MonitorDBStore
     int r = db->submit_transaction_sync(dbt);
     if (r >= 0) {
       while (!compact_prefixes.empty()) {
-       db->compact_prefix(compact_prefixes.front());
+       db->compact_prefix_async(compact_prefixes.front());
        compact_prefixes.pop_front();
       }
     }
index ff6c557574d2a7cd143cb0c44902c019d0a3d225..10238a7d98b2c5c12cced5be18cf8326d34edc5e 100644 (file)
@@ -141,3 +141,20 @@ int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
     *key= string(in_prefix, prefix_len + 1);
   return 0;
 }
+
+void LevelDBStore::compact_thread_entry()
+{
+  compact_queue_lock.Lock();
+  while (!compact_queue_stop) {
+    while (!compact_queue.empty()) {
+      string prefix = compact_queue.front();
+      compact_queue.pop_front();
+      compact_queue_lock.Unlock();
+      compact_prefix(prefix);
+      compact_queue_lock.Lock();
+      continue;
+    }
+    compact_queue_cond.Wait(compact_queue_lock);
+  }
+  compact_queue_lock.Unlock();
+}
index 557595181f69189fece7590c35056a30035bcf86..94a69492247982a0a85ca05ad1722eed50803a58 100644 (file)
@@ -33,6 +33,24 @@ class LevelDBStore : public KeyValueDB {
 
   int init(ostream &out, bool create_if_missing);
 
+  // manage async compactions
+  Mutex compact_queue_lock;
+  Cond compact_queue_cond;
+  list<string> compact_queue;
+  bool compact_queue_stop;
+  class CompactThread : public Thread {
+    LevelDBStore *db;
+  public:
+    CompactThread(LevelDBStore *d) : db(d) {}
+    void *entry() {
+      db->compact_thread_entry();
+      return NULL;
+    }
+    friend class LevelDBStore;
+  } compact_thread;
+
+  void compact_thread_entry();
+
 public:
   /// compact the underlying leveldb store
   void compact() {
@@ -50,6 +68,16 @@ public:
     db->CompactRange(&cstart, &cend);
   }
 
+  void compact_prefix_async(const string& prefix) {
+    Mutex::Locker l(compact_queue_lock);
+    compact_queue.remove(prefix);     // prevent unbounded dups
+    compact_queue.push_back(prefix);
+    compact_queue_cond.Signal();
+    if (!compact_thread.is_started()) {
+      compact_thread.create();
+    }
+  }
+
   /**
    * options_t: Holds options which are minimally interpreted
    * on initialization and then passed through to LevelDB.
@@ -94,10 +122,23 @@ public:
 #ifdef HAVE_LEVELDB_FILTER_POLICY
     filterpolicy(NULL),
 #endif
+    compact_queue_lock("LevelDBStore::compact_thread_lock"),
+    compact_queue_stop(false),
+    compact_thread(this),
     options()
   {}
 
-  ~LevelDBStore() {}
+  ~LevelDBStore() {
+    compact_queue_lock.Lock();
+    if (compact_thread.is_started()) {
+      compact_queue_stop = true;
+      compact_queue_cond.Signal();
+      compact_queue_lock.Unlock();
+      compact_thread.join();
+    } else {
+      compact_queue_lock.Unlock();
+    }
+  }
 
   /// Opens underlying db
   int open(ostream &out) {