void LevelDBStore::close()
{
// stop compaction thread
- compact_queue_lock.Lock();
+ compact_queue_lock.lock();
if (compact_thread.is_started()) {
compact_queue_stop = true;
- compact_queue_cond.Signal();
- compact_queue_lock.Unlock();
+ compact_queue_cond.notify_all();
+ compact_queue_lock.unlock();
compact_thread.join();
} else {
- compact_queue_lock.Unlock();
+ compact_queue_lock.unlock();
}
if (logger)
void LevelDBStore::compact_thread_entry()
{
- compact_queue_lock.Lock();
+ std::unique_lock l{compact_queue_lock};
while (!compact_queue_stop) {
while (!compact_queue.empty()) {
pair<string,string> range = compact_queue.front();
compact_queue.pop_front();
logger->set(l_leveldb_compact_queue_len, compact_queue.size());
- compact_queue_lock.Unlock();
+ l.unlock();
logger->inc(l_leveldb_compact_range);
if (range.first.empty() && range.second.empty()) {
compact();
} else {
compact_range(range.first, range.second);
}
- compact_queue_lock.Lock();
+ l.lock();
continue;
}
if (compact_queue_stop)
break;
- compact_queue_cond.Wait(compact_queue_lock);
+ compact_queue_cond.wait(l);
}
- compact_queue_lock.Unlock();
}
void LevelDBStore::compact_range_async(const string& start, const string& end)
compact_queue.push_back(make_pair(start, end));
logger->set(l_leveldb_compact_queue_len, compact_queue.size());
}
- compact_queue_cond.Signal();
+ compact_queue_cond.notify_all();
if (!compact_thread.is_started()) {
compact_thread.create("levdbst_compact");
}
int do_open(ostream &out, bool create_if_missing);
// manage async compactions
- Mutex compact_queue_lock;
- Cond compact_queue_cond;
+ ceph::mutex compact_queue_lock =
+ ceph::make_mutex("LevelDBStore::compact_thread_lock");
+ ceph::condition_variable compact_queue_cond;
list< pair<string,string> > compact_queue;
bool compact_queue_stop;
class CompactThread : public Thread {
#ifdef HAVE_LEVELDB_FILTER_POLICY
filterpolicy(NULL),
#endif
- compact_queue_lock("LevelDBStore::compact_thread_lock"),
compact_queue_stop(false),
compact_thread(this),
options()
void RocksDBStore::close()
{
// stop compaction thread
- compact_queue_lock.Lock();
+ compact_queue_lock.lock();
if (compact_thread.is_started()) {
compact_queue_stop = true;
- compact_queue_cond.Signal();
- compact_queue_lock.Unlock();
+ compact_queue_cond.notify_all();
+ compact_queue_lock.unlock();
compact_thread.join();
} else {
- compact_queue_lock.Unlock();
+ compact_queue_lock.unlock();
}
if (logger)
void RocksDBStore::compact_thread_entry()
{
- compact_queue_lock.Lock();
+ std::unique_lock l{compact_queue_lock};
while (!compact_queue_stop) {
while (!compact_queue.empty()) {
pair<string,string> range = compact_queue.front();
compact_queue.pop_front();
logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
- compact_queue_lock.Unlock();
+ l.unlock();
logger->inc(l_rocksdb_compact_range);
if (range.first.empty() && range.second.empty()) {
compact();
} else {
compact_range(range.first, range.second);
}
- compact_queue_lock.Lock();
+ l.lock();
continue;
}
- compact_queue_cond.Wait(compact_queue_lock);
+ compact_queue_cond.wait(l);
}
- compact_queue_lock.Unlock();
}
void RocksDBStore::compact_range_async(const string& start, const string& end)
compact_queue.push_back(make_pair(start, end));
logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
}
- compact_queue_cond.Signal();
+ compact_queue_cond.notify_all();
if (!compact_thread.is_started()) {
compact_thread.create("rstore_compact");
}
int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
// manage async compactions
- Mutex compact_queue_lock;
- Cond compact_queue_cond;
+ ceph::mutex compact_queue_lock =
+ ceph::make_mutex("RocksDBStore::compact_thread_lock");
+ ceph::condition_variable compact_queue_cond;
list< pair<string,string> > compact_queue;
bool compact_queue_stop;
class CompactThread : public Thread {
db(NULL),
env(static_cast<rocksdb::Env*>(p)),
dbstats(NULL),
- compact_queue_lock("RocksDBStore::compact_thread_lock"),
compact_queue_stop(false),
compact_thread(this),
compact_on_mount(false),