*key= string(in_prefix, prefix_len + 1);
return 0;
}
+
+void LevelDBStore::compact_thread_entry()
+{
+ compact_queue_lock.Lock();
+ while (!compact_queue_stop) {
+ while (!compact_queue.empty()) {
+ string prefix = compact_queue.front();
+ compact_queue.pop_front();
+ compact_queue_lock.Unlock();
+ compact_prefix(prefix);
+ compact_queue_lock.Lock();
+ continue;
+ }
+ compact_queue_cond.Wait(compact_queue_lock);
+ }
+ compact_queue_lock.Unlock();
+}
int init(ostream &out, bool create_if_missing);
+ // manage async compactions
+ Mutex compact_queue_lock;
+ Cond compact_queue_cond;
+ list<string> compact_queue;
+ bool compact_queue_stop;
+ class CompactThread : public Thread {
+ LevelDBStore *db;
+ public:
+ CompactThread(LevelDBStore *d) : db(d) {}
+ void *entry() {
+ db->compact_thread_entry();
+ return NULL;
+ }
+ friend class LevelDBStore;
+ } compact_thread;
+
+ void compact_thread_entry();
+
public:
/// compact the underlying leveldb store
void compact() {
db->CompactRange(&cstart, &cend);
}
+ void compact_prefix_async(const string& prefix) {
+ Mutex::Locker l(compact_queue_lock);
+ compact_queue.remove(prefix); // prevent unbounded dups
+ compact_queue.push_back(prefix);
+ compact_queue_cond.Signal();
+ if (!compact_thread.is_started()) {
+ compact_thread.create();
+ }
+ }
+
/**
* options_t: Holds options which are minimally interpreted
* on initialization and then passed through to LevelDB.
#ifdef HAVE_LEVELDB_FILTER_POLICY
filterpolicy(NULL),
#endif
+ compact_queue_lock("LevelDBStore::compact_thread_lock"),
+ compact_queue_stop(false),
+ compact_thread(this),
options()
{}
- ~LevelDBStore() {}
+ ~LevelDBStore() {
+ compact_queue_lock.Lock();
+ if (compact_thread.is_started()) {
+ compact_queue_stop = true;
+ compact_queue_cond.Signal();
+ compact_queue_lock.Unlock();
+ compact_thread.join();
+ } else {
+ compact_queue_lock.Unlock();
+ }
+ }
/// Opens underlying db
int open(ostream &out) {