From: Sage Weil Date: Tue, 28 May 2013 23:35:55 +0000 (-0700) Subject: os/LevelDBStore: do compact_prefix() work asynchronously X-Git-Tag: v0.64~35^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4af917d4478ec07734a69447420280880d775fa2;p=ceph.git os/LevelDBStore: do compact_prefix() work asynchronously We generally do not want to block while compacting a range of leveldb. Push the blocking+waiting off to a separate thread. (leveldb will do what it can to avoid blocking internally; no reason for us to wait explicitly.) This addresses part of #5176. Signed-off-by: Sage Weil --- diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index f10d96d58a86..3ed144aa0160 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -217,7 +217,7 @@ class MonitorDBStore int r = db->submit_transaction_sync(dbt); if (r >= 0) { while (!compact_prefixes.empty()) { - db->compact_prefix(compact_prefixes.front()); + db->compact_prefix_async(compact_prefixes.front()); compact_prefixes.pop_front(); } } diff --git a/src/os/LevelDBStore.cc b/src/os/LevelDBStore.cc index ff6c557574d2..10238a7d98b2 100644 --- a/src/os/LevelDBStore.cc +++ b/src/os/LevelDBStore.cc @@ -141,3 +141,20 @@ int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) *key= string(in_prefix, prefix_len + 1); return 0; } + +void LevelDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + string prefix = compact_queue.front(); + compact_queue.pop_front(); + compact_queue_lock.Unlock(); + compact_prefix(prefix); + compact_queue_lock.Lock(); + continue; + } + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h index 557595181f69..94a694922479 100644 --- a/src/os/LevelDBStore.h +++ b/src/os/LevelDBStore.h @@ -33,6 +33,24 @@ class LevelDBStore : public KeyValueDB { int init(ostream &out, bool create_if_missing); + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + LevelDBStore *db; + public: + CompactThread(LevelDBStore *d) : db(d) {} + void *entry() { + db->compact_thread_entry(); + return NULL; + } + friend class LevelDBStore; + } compact_thread; + + void compact_thread_entry(); + public: /// compact the underlying leveldb store void compact() { @@ -50,6 +68,16 @@ public: db->CompactRange(&cstart, &cend); } + void compact_prefix_async(const string& prefix) { + Mutex::Locker l(compact_queue_lock); + compact_queue.remove(prefix); // prevent unbounded dups + compact_queue.push_back(prefix); + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create(); + } + } + /** * options_t: Holds options which are minimally interpreted * on initialization and then passed through to LevelDB. @@ -94,10 +122,23 @@ public: #ifdef HAVE_LEVELDB_FILTER_POLICY filterpolicy(NULL), #endif + compact_queue_lock("LevelDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), options() {} - ~LevelDBStore() {} + ~LevelDBStore() { + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + } else { + compact_queue_lock.Unlock(); + } + } /// Opens underlying db int open(ostream &out) {