#include "common/errno.h"
#include "common/safe_io.h"
#include "common/PriorityCache.h"
+#include "common/RWLock.h"
#include "Allocator.h"
#include "FreelistManager.h"
#include "BlueFS.h"
: CollectionImpl(cid),
store(store_),
cache(bc),
- lock("BlueStore::Collection::lock", true, false),
exists(true),
onode_map(oc),
commit_queue(nullptr)
bool create,
bool is_createop)
{
- ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
+ ceph_assert(create ? ceph_mutex_is_wlocked(lock) : ceph_mutex_is_locked(lock));
spg_t pgid;
if (cid.is_pg(&pgid)) {
void *BlueStore::MempoolThread::entry()
{
- std::unique_lock l(lock);
+ std::unique_lock l{lock};
uint64_t base = store->osd_memory_base;
double fragmentation = store->osd_memory_expected_fragmentation;
CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
: c(c), o(o), it(it)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
if (o->onode.has_omap()) {
get_omap_key(o->onode.nid, string(), &head);
get_omap_tail(o->onode.nid, &tail);
int BlueStore::OmapIteratorImpl::seek_to_first()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
auto start1 = mono_clock::now();
if (o->onode.has_omap()) {
it->lower_bound(head);
int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
auto start1 = mono_clock::now();
if (o->onode.has_omap()) {
string key;
int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
auto start1 = mono_clock::now();
if (o->onode.has_omap()) {
string key;
bool BlueStore::OmapIteratorImpl::valid()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
bool r = o->onode.has_omap() && it && it->valid() &&
it->raw_key().second < tail;
if (it && it->valid()) {
int BlueStore::OmapIteratorImpl::next()
{
int r = -1;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
auto start1 = mono_clock::now();
if (o->onode.has_omap()) {
it->next();
string BlueStore::OmapIteratorImpl::key()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
ceph_assert(it->valid());
string db_key = it->raw_key().second;
string user_key;
bufferlist BlueStore::OmapIteratorImpl::value()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
ceph_assert(it->valid());
return it->value();
}
dout(10) << __func__ << " " << oid << dendl;
store_statfs_t onode_statfs;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (o->onode.nid) {
if (o->onode.nid > nid_max) {
dout(20) << __func__ << " check misreference for col:" << c->cid
<< " obj:" << oid << dendl;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
mempool::bluestore_fsck::set<BlobRef> blobs;
CollectionRef c = _get_collection(cid);
ceph_assert(c);
{
- RWLock::WLocker l(c->lock); // just to avoid internal asserts
+ std::unique_lock l{c->lock}; // just to avoid internal asserts
o = c->get_onode(oid, false);
ceph_assert(o);
o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
CollectionRef c1 = _get_collection(cid1);
ceph_assert(c1);
{
- RWLock::WLocker l(c1->lock); // just to avoid internal asserts
+ std::unique_lock l{c1->lock}; // just to avoid internal asserts
o1 = c1->get_onode(oid1, false);
ceph_assert(o1);
o1->extent_map.fault_range(db, offset, OBJECT_MAX_SIZE);
CollectionRef c2 = _get_collection(cid2);
ceph_assert(c2);
{
- RWLock::WLocker l(c2->lock); // just to avoid internal asserts
+ std::unique_lock l{c2->lock}; // just to avoid internal asserts
o2 = c2->get_onode(oid2, false);
ceph_assert(o2);
o2->extent_map.fault_range(db, offset, OBJECT_MAX_SIZE);
BlueStore::CollectionRef BlueStore::_get_collection(const coll_t& cid)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l(coll_lock);
ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
if (cp == coll_map.end())
return CollectionRef();
ObjectStore::CollectionHandle BlueStore::create_new_collection(
const coll_t& cid)
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l{coll_lock};
Collection *c = new Collection(
this,
onode_cache_shards[cid.hash_to_shard(onode_cache_shards.size())],
ContextQueue *commit_queue)
{
if (commit_queue) {
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l(coll_lock);
if (coll_map.count(cid)) {
coll_map[cid]->commit_queue = commit_queue;
} else if (new_coll_map.count(cid)) {
bool r = true;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists)
r = false;
dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists)
return -ENOENT;
dout(15) << __func__ << " " << ch->cid << " options " << opts << dendl;
if (!c->exists)
return -ENOENT;
- RWLock::WLocker l(c->lock);
+ std::unique_lock l{c->lock};
c->pool_opts = opts;
return 0;
}
bl.clear();
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
auto start1 = mono_clock::now();
OnodeRef o = c->get_onode(oid, false);
log_latency("get_onode@read",
if (!c->exists)
return -ENOENT;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
mempool::bluestore_cache_other::string k(name);
OnodeRef o = c->get_onode(oid, false);
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
int BlueStore::list_collections(vector<coll_t>& ls)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l(coll_lock);
ls.reserve(coll_map.size());
for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
p != coll_map.end();
bool BlueStore::collection_exists(const coll_t& c)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l(coll_lock);
return coll_map.count(c);
}
{
dout(15) << __func__ << " " << ch->cid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
return c->cnode.bits;
}
<< " start " << start << " end " << end << " max " << max << dendl;
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
r = _collection_list(c, start, end, max, ls, pnext);
}
dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
if (!c->exists)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
if (!c->exists)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
if (!c->exists)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
if (!c->exists)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
int r = 0;
string final_key;
OnodeRef o = c->get_onode(oid, false);
dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
if (!c->exists)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
int r = 0;
string final_key;
OnodeRef o = c->get_onode(oid, false);
if (!c->exists) {
return ObjectMap::ObjectMapIterator();
}
- RWLock::RLocker l(c->lock);
+ std::shared_lock l(c->lock);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
set<OpSequencerRef> s;
vector<OpSequencerRef> zombies;
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l(coll_lock);
for (auto& i : coll_map) {
s.insert(i.second->osr);
}
{
dout(10) << __func__ << dendl;
{
- std::unique_lock l(kv_lock);
+ std::unique_lock l{kv_lock};
while (!kv_sync_started) {
kv_cond.wait(l);
}
kv_cond.notify_all();
}
{
- std::unique_lock l(kv_finalize_lock);
+ std::unique_lock l{kv_finalize_lock};
while (!kv_finalize_started) {
kv_finalize_cond.wait(l);
}
{
dout(10) << __func__ << " start" << dendl;
deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
- std::unique_lock l(kv_lock);
+ std::unique_lock l{kv_lock};
ceph_assert(!kv_sync_started);
kv_sync_started = true;
kv_cond.notify_all();
ceph_assert(r == 0);
{
- std::unique_lock m(kv_finalize_lock);
+ std::unique_lock m{kv_finalize_lock};
if (kv_committing_to_finalize.empty()) {
kv_committing_to_finalize.swap(kv_committing);
} else {
}
// object operations
- RWLock::WLocker l(c->lock);
+ std::unique_lock l(c->lock);
OnodeRef &o = ovec[op->oid];
if (!o) {
ghobject_t oid = i.get_oid(op->oid);
bufferlist bl;
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l(coll_lock);
if (*c) {
r = -EEXIST;
goto out;
(*c)->flush_all_but_last();
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l(coll_lock);
if (!*c) {
r = -ENOENT;
goto out;
{
dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
<< " bits " << bits << dendl;
- RWLock::WLocker l(c->lock);
- RWLock::WLocker l2(d->lock);
+ std::unique_lock l(c->lock);
+ std::unique_lock l2(d->lock);
int r;
// flush all previous deferred writes on this sequencer. this is a bit
{
dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid
<< " bits " << bits << dendl;
- RWLock::WLocker l((*c)->lock);
- RWLock::WLocker l2(d->lock);
+ std::unique_lock l((*c)->lock);
+ std::unique_lock l2(d->lock);
int r;
coll_t cid = (*c)->cid;
// remove source collection
{
- RWLock::WLocker l3(coll_lock);
+ std::unique_lock l3(coll_lock);
_do_remove_collection(txc, c);
}
#include "include/mempool.h"
#include "common/bloom_filter.hpp"
#include "common/Finisher.h"
+#include "common/ceph_mutex.h"
#include "common/Throttle.h"
#include "common/perf_counters.h"
#include "common/PriorityCache.h"
OpSequencerRef osr;
BufferCacheShard *cache; ///< our cache shard
bluestore_cnode_t cnode;
- RWLock lock;
+ ceph::shared_mutex lock =
+ ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
bool exists;
int fsid_fd = -1; ///< open handle (locked) to $path/fsid
bool mounted = false;
- RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
+ ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
map<coll_t,CollectionRef> new_coll_map;
list<CollectionRef> removed_collections;
- RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
+ ceph::shared_mutex debug_read_error_lock =
+ ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
set<ghobject_t> debug_data_error_objects;
set<ghobject_t> debug_mdata_error_objects;
// error injection
void inject_data_error(const ghobject_t& o) override {
- RWLock::WLocker l(debug_read_error_lock);
+ std::unique_lock l(debug_read_error_lock);
debug_data_error_objects.insert(o);
}
void inject_mdata_error(const ghobject_t& o) override {
- RWLock::WLocker l(debug_read_error_lock);
+ std::unique_lock l(debug_read_error_lock);
debug_mdata_error_objects.insert(o);
}
if (!cct->_conf->bluestore_debug_inject_read_err) {
return false;
}
- RWLock::RLocker l(debug_read_error_lock);
+ std::shared_lock l(debug_read_error_lock);
return debug_data_error_objects.count(o);
}
bool _debug_mdata_eio(const ghobject_t& o) {
if (!cct->_conf->bluestore_debug_inject_read_err) {
return false;
}
- RWLock::RLocker l(debug_read_error_lock);
+ std::shared_lock l(debug_read_error_lock);
return debug_mdata_error_objects.count(o);
}
void _debug_obj_on_delete(const ghobject_t& o) {
if (cct->_conf->bluestore_debug_inject_read_err) {
- RWLock::WLocker l(debug_read_error_lock);
+ std::unique_lock l(debug_read_error_lock);
debug_data_error_objects.erase(o);
debug_mdata_error_objects.erase(o);
}
};
public:
- RWLock access_lock;
+ ceph::shared_mutex access_lock =
+ ceph::make_shared_mutex("CollectionIndex::access_lock", true, false);
/// Type of returned paths
typedef std::shared_ptr<Path> IndexedPath;
virtual int prep_delete() { return 0; }
CollectionIndex(CephContext* cct, const coll_t& collection)
- : cct(cct), access_lock("CollectionIndex::access_lock", true, false) {}
+ : cct(cct) {}
/*
* Pre-hash the collection, this collection should map to a PG folder.
void DBObjectMap::set_state()
{
- Mutex::Locker l(header_lock);
+ std::lock_guard l{header_lock};
KeyValueDB::Transaction t = db->get_transaction();
write_state(t);
int ret = db->submit_transaction_sync(t);
*
* See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
*/
- Mutex::Locker l(header_lock);
+ std::lock_guard l{header_lock};
write_state(t);
return db->submit_transaction_sync(t);
} else {
- Mutex::Locker l(header_lock);
+ std::lock_guard l{header_lock};
write_state(t);
return db->submit_transaction_sync(t);
}
}
int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
- ceph_assert(header_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(header_lock));
dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
bufferlist bl;
_Header *header = new _Header();
{
- Mutex::Locker l(cache_lock);
+ std::lock_guard l{cache_lock};
if (caches.lookup(oid, header)) {
ceph_assert(!in_use.count(header->seq));
in_use.insert(header->seq);
auto iter = out.cbegin();
ret->decode(iter);
{
- Mutex::Locker l(cache_lock);
+ std::lock_guard l{cache_lock};
caches.add(oid, *ret);
}
DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
{
- Mutex::Locker l(header_lock);
- while (in_use.count(input->parent))
- header_cond.Wait(header_lock);
+ std::unique_lock l{header_lock};
+ header_cond.wait(l, [&input, this] { return !in_use.count(input->parent); });
map<string, bufferlist> out;
set<string> keys;
keys.insert(HEADER_KEY);
const ghobject_t &oid,
KeyValueDB::Transaction t)
{
- Mutex::Locker l(header_lock);
+ std::lock_guard l{header_lock};
Header header = _lookup_map_header(hl, oid);
if (!header) {
header = _generate_new_header(oid, Header());
to_remove.insert(map_header_key(oid));
t->rmkeys(HOBJECT_TO_SEQ, to_remove);
{
- Mutex::Locker l(cache_lock);
+ std::lock_guard l{cache_lock};
caches.clear(oid);
}
}
header.encode(to_set[map_header_key(oid)]);
t->set(HOBJECT_TO_SEQ, to_set);
{
- Mutex::Locker l(cache_lock);
+ std::lock_guard l{cache_lock};
caches.add(oid, header);
}
}
#include "os/ObjectMap.h"
#include "kv/KeyValueDB.h"
#include "osd/osd_types.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
#include "common/simple_cache.hpp"
#include <boost/optional/optional_io.hpp>
/**
* Serializes access to next_seq as well as the in_use set
*/
- Mutex header_lock;
- Cond header_cond;
- Cond map_header_cond;
+ ceph::mutex header_lock = ceph::make_mutex("DBOBjectMap");
+ ceph::condition_variable header_cond;
+ ceph::condition_variable map_header_cond;
/**
* Set of headers currently in use
public:
explicit MapHeaderLock(DBObjectMap *db) : db(db) {}
MapHeaderLock(DBObjectMap *db, const ghobject_t &oid) : db(db), locked(oid) {
- Mutex::Locker l(db->header_lock);
- while (db->map_header_in_use.count(*locked))
- db->map_header_cond.Wait(db->header_lock);
+ std::unique_lock l{db->header_lock};
+ db->map_header_cond.wait(l, [db, this] {
+ return !db->map_header_in_use.count(*locked);
+ });
db->map_header_in_use.insert(*locked);
}
~MapHeaderLock() {
if (locked) {
- Mutex::Locker l(db->header_lock);
+ std::lock_guard l{db->header_lock};
ceph_assert(db->map_header_in_use.count(*locked));
- db->map_header_cond.Signal();
+ db->map_header_cond.notify_all();
db->map_header_in_use.erase(*locked);
}
}
};
DBObjectMap(CephContext* cct, KeyValueDB *db)
- : ObjectMap(cct, db), header_lock("DBOBjectMap"),
- cache_lock("DBObjectMap::CacheLock"),
+ : ObjectMap(cct, db),
caches(cct->_conf->filestore_omap_header_cache_size)
{}
private:
/// Implicit lock on Header->seq
typedef std::shared_ptr<_Header> Header;
- Mutex cache_lock;
+ ceph::mutex cache_lock = ceph::make_mutex("DBObjectMap::CacheLock");
SimpleLRU<ghobject_t, _Header> caches;
string map_header_key(const ghobject_t &oid);
*/
Header _generate_new_header(const ghobject_t &oid, Header parent);
Header generate_new_header(const ghobject_t &oid, Header parent) {
- Mutex::Locker l(header_lock);
+ std::lock_guard l{header_lock};
return _generate_new_header(oid, parent);
}
Header lookup_map_header(
const MapHeaderLock &l2,
const ghobject_t &oid) {
- Mutex::Locker l(header_lock);
+ std::lock_guard l{header_lock};
return _lookup_map_header(l2, oid);
}
explicit RemoveOnDelete(DBObjectMap *db) :
db(db) {}
void operator() (_Header *header) {
- Mutex::Locker l(db->header_lock);
+ std::lock_guard l{db->header_lock};
ceph_assert(db->in_use.count(header->seq));
db->in_use.erase(header->seq);
- db->header_cond.Signal();
+ db->header_cond.notify_all();
delete header;
}
};
#include <cstdio>
#include "common/config_obs.h"
#include "common/hobject.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
#include "common/shared_cache.hpp"
#include "include/compat.h"
#include "include/intarith.h"
if (!write_stop)
{
{
- Mutex::Locker l(write_lock);
- Mutex::Locker p(writeq_lock);
+ std::lock_guard l{write_lock};
+ std::lock_guard p{writeq_lock};
write_stop = true;
- writeq_cond.Signal();
+ writeq_cond.notify_all();
// Doesn't hurt to signal commit_cond in case thread is waiting there
// and caller didn't use committed_thru() first.
- commit_cond.Signal();
+ commit_cond.notify_all();
}
write_thread.join();
// stop aio completeion thread *after* writer thread has stopped
// and has submitted all of its io
if (aio && !aio_stop) {
- aio_lock.Lock();
+ aio_lock.lock();
aio_stop = true;
- aio_cond.Signal();
- write_finish_cond.Signal();
- aio_lock.Unlock();
+ aio_cond.notify_all();
+ write_finish_cond.notify_all();
+ aio_lock.unlock();
write_finish_thread.join();
}
#endif
{
bufferlist bl;
{
- Mutex::Locker l(finisher_lock);
+ std::lock_guard l{finisher_lock};
header.committed_up_to = journaled_seq;
}
encode(header, bl);
void FileJournal::write_header_sync()
{
- Mutex::Locker locker(write_lock);
+ std::lock_guard locker{write_lock};
must_write_header = true;
bufferlist bl;
do_write(bl);
if (room >= (header.max_size >> 1) &&
room - size < (header.max_size >> 1)) {
dout(10) << " passing half full mark, triggering commit" << dendl;
- do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
+#ifdef CEPH_DEBUG_MUTEX
+ do_sync_cond->notify_all(true); // initiate a real commit so we can trim
+#else
+ do_sync_cond->notify_all();
+#endif
}
}
items.erase(it++);
#ifdef HAVE_LIBAIO
{
- Mutex::Locker locker(aio_lock);
+ std::lock_guard locker{aio_lock};
ceph_assert(aio_write_queue_ops > 0);
aio_write_queue_ops--;
ceph_assert(aio_write_queue_bytes >= bytes);
void FileJournal::queue_completions_thru(uint64_t seq)
{
- ceph_assert(finisher_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(finisher_lock));
utime_t now = ceph_clock_now();
list<completion_item> items;
batch_pop_completions(items);
items.erase(it++);
}
batch_unpop_completions(items);
- finisher_cond.Signal();
+ finisher_cond.notify_all();
}
if (write_pos >= header.max_size)
write_pos = write_pos - header.max_size + get_top();
- write_lock.Unlock();
+ write_lock.unlock();
// split?
off64_t split = 0;
utime_t lat = ceph_clock_now() - from;
dout(20) << "do_write latency " << lat << dendl;
- write_lock.Lock();
+ write_lock.lock();
ceph_assert(write_pos == pos);
ceph_assert(write_pos % header.alignment == 0);
{
- Mutex::Locker locker(finisher_lock);
+ std::lock_guard locker{finisher_lock};
journaled_seq = writing_seq;
// kick finisher?
{
dout(10) << "waiting for completions to empty" << dendl;
{
- Mutex::Locker l(finisher_lock);
- while (!completions_empty())
- finisher_cond.Wait(finisher_lock);
+ std::unique_lock l{finisher_lock};
+ finisher_cond.wait(l, [this] { return completions_empty(); });
}
dout(10) << "flush waiting for finisher" << dendl;
finisher->wait_for_empty();
dout(10) << "write_thread_entry start" << dendl;
while (1) {
{
- Mutex::Locker locker(writeq_lock);
+ std::unique_lock locker{writeq_lock};
if (writeq.empty() && !must_write_header) {
if (write_stop)
break;
dout(20) << "write_thread_entry going to sleep" << dendl;
- writeq_cond.Wait(writeq_lock);
+ writeq_cond.wait(locker);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
#ifdef HAVE_LIBAIO
if (aio) {
- Mutex::Locker locker(aio_lock);
+ std::unique_lock locker{aio_lock};
// should we back off to limit aios in flight? try to do this
// adaptively so that we submit larger aios once we have lots of
// them in flight.
dout(20) << "write_thread_entry deferring until more aios complete: "
<< aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
<< " bytes to start a new aio (currently " << cur << " pending)" << dendl;
- aio_cond.Wait(aio_lock);
+ aio_cond.wait(locker);
dout(20) << "write_thread_entry woke up" << dendl;
}
}
#endif
- Mutex::Locker locker(write_lock);
+ std::unique_lock locker{write_lock};
uint64_t orig_ops = 0;
uint64_t orig_bytes = 0;
r = 0;
} else {
dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
- commit_cond.Wait(write_lock);
+ commit_cond.wait(locker);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
// lock only aio_queue, current aio, aio_num, aio_bytes, which may be
// modified in check_aio_completion
- aio_lock.Lock();
+ aio_lock.lock();
aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
aio_info& aio = aio_queue.back();
aio.iov = iov;
// aio could be ereased from aio_queue once it is done
uint64_t cur_len = aio.len;
// unlock aio_lock because following io_submit might take time to return
- aio_lock.Unlock();
+ aio_lock.unlock();
iocb *piocb = &aio.iocb;
} while (true);
pos += cur_len;
}
- aio_lock.Lock();
- write_finish_cond.Signal();
- aio_lock.Unlock();
+ aio_lock.lock();
+ write_finish_cond.notify_all();
+ aio_lock.unlock();
return 0;
}
#endif
dout(10) << __func__ << " enter" << dendl;
while (true) {
{
- Mutex::Locker locker(aio_lock);
+ std::unique_lock locker{aio_lock};
if (aio_queue.empty()) {
if (aio_stop)
break;
dout(20) << __func__ << " sleeping" << dendl;
- write_finish_cond.Wait(aio_lock);
+ write_finish_cond.wait(locker);
continue;
}
}
}
{
- Mutex::Locker locker(aio_lock);
+ std::lock_guard locker{aio_lock};
for (int i=0; i<r; i++) {
aio_info *ai = (aio_info *)event[i].obj;
if (event[i].res != ai->len) {
*/
void FileJournal::check_aio_completion()
{
- ceph_assert(aio_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(aio_lock));
dout(20) << "check_aio_completion" << dendl;
bool completed_something = false, signal = false;
if (completed_something) {
// kick finisher?
// only if we haven't filled up recently!
- Mutex::Locker locker(finisher_lock);
+ std::lock_guard locker{finisher_lock};
journaled_seq = new_journaled_seq;
if (full_state != FULL_NOTFULL) {
dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
}
if (signal) {
// maybe write queue was waiting for aio count to drop?
- aio_cond.Signal();
+ aio_cond.notify_all();
}
}
#endif
}
}
{
- Mutex::Locker l1(writeq_lock);
+ std::lock_guard l1{writeq_lock};
#ifdef HAVE_LIBAIO
- Mutex::Locker l2(aio_lock);
+ std::lock_guard l2{aio_lock};
#endif
- Mutex::Locker l3(completions_lock);
+ std::lock_guard l3{completions_lock};
#ifdef HAVE_LIBAIO
aio_write_queue_ops++;
aio_write_queue_bytes += e.length();
- aio_cond.Signal();
+ aio_cond.notify_all();
#endif
completions.push_back(
completion_item(
seq, oncommit, ceph_clock_now(), osd_op));
if (writeq.empty())
- writeq_cond.Signal();
+ writeq_cond.notify_all();
writeq.push_back(write_item(seq, e, orig_len, osd_op));
if (osd_op)
osd_op->journal_trace.keyval("queue depth", writeq.size());
bool FileJournal::writeq_empty()
{
- Mutex::Locker locker(writeq_lock);
+ std::lock_guard locker{writeq_lock};
return writeq.empty();
}
FileJournal::write_item &FileJournal::peek_write()
{
- ceph_assert(write_lock.is_locked());
- Mutex::Locker locker(writeq_lock);
+ ceph_assert(ceph_mutex_is_locked(write_lock));
+ std::lock_guard locker{writeq_lock};
return writeq.front();
}
void FileJournal::pop_write()
{
- ceph_assert(write_lock.is_locked());
- Mutex::Locker locker(writeq_lock);
+ ceph_assert(ceph_mutex_is_locked(write_lock));
+ std::lock_guard locker{writeq_lock};
if (logger) {
logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
logger->dec(l_filestore_journal_queue_ops, 1);
void FileJournal::batch_pop_write(list<write_item> &items)
{
- ceph_assert(write_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(write_lock));
{
- Mutex::Locker locker(writeq_lock);
+ std::lock_guard locker{writeq_lock};
writeq.swap(items);
}
for (auto &&i : items) {
void FileJournal::batch_unpop_write(list<write_item> &items)
{
- ceph_assert(write_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(write_lock));
for (auto &&i : items) {
if (logger) {
logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
logger->inc(l_filestore_journal_queue_ops, 1);
}
}
- Mutex::Locker locker(writeq_lock);
+ std::lock_guard locker{writeq_lock};
writeq.splice(writeq.begin(), items);
}
void FileJournal::committed_thru(uint64_t seq)
{
- Mutex::Locker locker(write_lock);
+ std::lock_guard locker{write_lock};
auto released = throttle.flush(seq);
if (logger) {
// completions!
{
- Mutex::Locker locker(finisher_lock);
+ std::lock_guard locker{finisher_lock};
queue_completions_thru(seq);
if (plug_journal_completions && seq >= header.start_seq) {
dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
pop_write();
}
- commit_cond.Signal();
+ commit_cond.notify_all();
dout(10) << "committed_thru done" << dendl;
}
#ifndef CEPH_FILEJOURNAL_H
#define CEPH_FILEJOURNAL_H
-#include <stdlib.h>
+#include <condition_variable>
#include <deque>
+#include <mutex>
+#include <stdlib.h>
using std::deque;
#include "Journal.h"
#include "common/config_fwd.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
#include "common/Thread.h"
#include "common/Throttle.h"
#include "JournalThrottle.h"
write_item() : seq(0), orig_len(0) {}
};
- Mutex finisher_lock;
- Cond finisher_cond;
+ ceph::mutex finisher_lock = ceph::make_mutex("FileJournal::finisher_lock");
+ ceph::condition_variable finisher_cond;
uint64_t journaled_seq;
bool plug_journal_completions;
- Mutex writeq_lock;
- Cond writeq_cond;
+ ceph::mutex writeq_lock = ceph::make_mutex("FileJournal::writeq_lock");
+ ceph::condition_variable writeq_cond;
list<write_item> writeq;
bool writeq_empty();
write_item &peek_write();
void batch_pop_write(list<write_item> &items);
void batch_unpop_write(list<write_item> &items);
- Mutex completions_lock;
+ ceph::mutex completions_lock =
+ ceph::make_mutex("FileJournal::completions_lock");
list<completion_item> completions;
bool completions_empty() {
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
return completions.empty();
}
void batch_pop_completions(list<completion_item> &items) {
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
completions.swap(items);
}
void batch_unpop_completions(list<completion_item> &items) {
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
completions.splice(completions.begin(), items);
}
completion_item completion_peek_front() {
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
ceph_assert(!completions.empty());
return completions.front();
}
void completion_pop_front() {
- Mutex::Locker l(completions_lock);
+ std::lock_guard l{completions_lock};
ceph_assert(!completions.empty());
completions.pop_front();
}
delete[] iov;
}
};
- Mutex aio_lock;
- Cond aio_cond;
- Cond write_finish_cond;
- io_context_t aio_ctx;
+ ceph::mutex aio_lock = ceph::make_mutex("FileJournal::aio_lock");
+ ceph::condition_variable aio_cond;
+ ceph::condition_variable write_finish_cond;
+ io_context_t aio_ctx = 0;
list<aio_info> aio_queue;
- int aio_num, aio_bytes;
- uint64_t aio_write_queue_ops;
- uint64_t aio_write_queue_bytes;
+ int aio_num = 0, aio_bytes = 0;
+ uint64_t aio_write_queue_ops = 0;
+ uint64_t aio_write_queue_bytes = 0;
/// End protected by aio_lock
#endif
JournalThrottle throttle;
// write thread
- Mutex write_lock;
+ ceph::mutex write_lock = ceph::make_mutex("FileJournal::write_lock");
bool write_stop;
bool aio_stop;
- Cond commit_cond;
+ ceph::condition_variable commit_cond;
int _open(bool wr, bool create=false);
int _open_block_device();
ZTracer::Endpoint trace_endpoint;
public:
- FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
+ FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, ceph::condition_variable *sync_cond,
const char *f, bool dio=false, bool ai=true, bool faio=false) :
Journal(cct, fsid, fin, sync_cond),
- finisher_lock("FileJournal::finisher_lock", false, true, false),
journaled_seq(0),
plug_journal_completions(false),
- writeq_lock("FileJournal::writeq_lock", false, true, false),
- completions_lock(
- "FileJournal::completions_lock", false, true, false),
fn(f),
zero_buf(NULL),
max_size(0), block_size(0),
must_write_header(false),
write_pos(0), read_pos(0),
discard(false),
-#ifdef HAVE_LIBAIO
- aio_lock("FileJournal::aio_lock"),
- aio_ctx(0),
- aio_num(0), aio_bytes(0),
- aio_write_queue_ops(0),
- aio_write_queue_bytes(0),
-#endif
last_committed_seq(0),
journaled_since_start(0),
full_state(FULL_NOTFULL),
fd(-1),
writing_seq(0),
throttle(cct->_conf->filestore_caller_concurrency),
- write_lock("FileJournal::write_lock", false, true, false),
write_stop(true),
aio_stop(true),
write_thread(this),
return r;
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(oid, index, &path);
if (r < 0)
int fd, exist;
ceph_assert((*index).index);
if (need_lock) {
- ((*index).index)->access_lock.get_write();
+ ((*index).index)->access_lock.lock();
}
if (!replaying) {
*outfd = fdcache.lookup(oid);
if (*outfd) {
if (need_lock) {
- ((*index).index)->access_lock.put_write();
+ ((*index).index)->access_lock.unlock();
}
return 0;
}
}
if (need_lock) {
- ((*index).index)->access_lock.put_write();
+ ((*index).index)->access_lock.unlock();
}
return 0;
fail:
if (need_lock) {
- ((*index).index)->access_lock.put_write();
+ ((*index).index)->access_lock.unlock();
}
if (r == -EIO && m_filestore_fail_eio) handle_eio();
if (!index_same) {
- RWLock::RLocker l1((index_old.index)->access_lock);
+ std::shared_lock l1{(index_old.index)->access_lock};
r = index_old->lookup(o, &path_old, &exist);
if (r < 0) {
if (!exist)
return -ENOENT;
- RWLock::WLocker l2((index_new.index)->access_lock);
+ std::unique_lock l2{(index_new.index)->access_lock};
r = index_new->lookup(newoid, &path_new, &exist);
if (r < 0) {
return r;
}
} else {
- RWLock::WLocker l1((index_old.index)->access_lock);
+ std::unique_lock l1{(index_old.index)->access_lock};
r = index_old->lookup(o, &path_old, &exist);
if (r < 0) {
}
ceph_assert(index.index);
- RWLock::WLocker l((index.index)->access_lock);
+ std::unique_lock l{(index.index)->access_lock};
{
IndexedPath path;
basedir_fd(-1), current_fd(-1),
backend(nullptr),
index_manager(cct, do_update),
- lock("FileStore::lock"),
force_sync(false),
- sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
timer(cct, sync_entry_timeo_lock),
stop(false), sync_thread(this),
- coll_lock("FileStore::coll_lock"),
fdcache(cct),
wbthrottle(cct),
next_osr_id(0),
cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
logger(nullptr),
trace_endpoint("0.0.0.0", 0, "FileStore"),
- read_error_lock("FileStore::read_error_lock"),
m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
goto close_current_fd;
}
ceph_assert(index.index);
- RWLock::WLocker l((index.index)->access_lock);
+ std::unique_lock l{(index.index)->access_lock};
index->cleanup();
}
stop_sync:
// stop sync thread
- lock.Lock();
- stop = true;
- sync_cond.Signal();
- lock.Unlock();
+ {
+ std::lock_guard l{lock};
+ stop = true;
+ sync_cond.notify_all();
+ }
sync_thread.join();
if (!m_disable_wbthrottle) {
wbthrottle.stop();
do_force_sync();
{
- Mutex::Locker l(coll_lock);
+ std::lock_guard l(coll_lock);
coll_map.clear();
}
- lock.Lock();
- stop = true;
- sync_cond.Signal();
- lock.Unlock();
+ {
+ std::lock_guard l{lock};
+ stop = true;
+ sync_cond.notify_all();
+ }
sync_thread.join();
if (!m_disable_wbthrottle){
wbthrottle.stop();
object_map.reset();
{
- Mutex::Locker l(sync_entry_timeo_lock);
+ std::lock_guard l{sync_entry_timeo_lock};
timer.shutdown();
}
ObjectStore::CollectionHandle FileStore::open_collection(const coll_t& c)
{
- Mutex::Locker l(coll_lock);
+ std::lock_guard l{coll_lock};
auto p = coll_map.find(c);
if (p == coll_map.end()) {
return CollectionHandle();
ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c)
{
- Mutex::Locker l(coll_lock);
+ std::lock_guard l{coll_lock};
auto p = coll_map.find(c);
if (p == coll_map.end()) {
auto *r = new OpSequencer(cct, ++next_osr_id, c);
dout(5) << __FUNC__ << ": done stalling" << dendl;
}
- osr->apply_lock.Lock();
+ osr->apply_lock.lock();
Op *o = osr->peek_queue();
o->trace.event("op_apply_start");
apply_manager.op_apply_start(o->op);
lat -= o->start;
dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " lat " << lat << dendl;
- osr->apply_lock.Unlock(); // locked in _do_op
+ osr->apply_lock.unlock(); // locked in _do_op
o->trace.event("_finish_op");
// called with tp lock held
goto out2;
}
ceph_assert(index.index);
- RWLock::WLocker l((index.index)->access_lock);
+ std::unique_lock l{(index.index)->access_lock};
r = lfn_open(cid, newoid, true, &n, &index);
if (r < 0) {
void FileStore::sync_entry()
{
- lock.Lock();
+ std::unique_lock l{lock};
while (!stop) {
- utime_t max_interval;
- max_interval.set_from_double(m_filestore_max_sync_interval);
- utime_t min_interval;
- min_interval.set_from_double(m_filestore_min_sync_interval);
-
- utime_t startwait = ceph_clock_now();
+ auto min_interval = ceph::make_timespan(m_filestore_min_sync_interval);
+ auto max_interval = ceph::make_timespan(m_filestore_max_sync_interval);
+ auto startwait = ceph::real_clock::now();
if (!force_sync) {
dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
- sync_cond.WaitInterval(lock, max_interval);
+ sync_cond.wait_for(l, max_interval);
} else {
dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
}
break;
} else {
// wait for at least the min interval
- utime_t woke = ceph_clock_now();
- woke -= startwait;
+ auto woke = ceph::real_clock::now() - startwait;
dout(20) << __FUNC__ << ": woke after " << woke << dendl;
if (woke < min_interval) {
- utime_t t = min_interval;
- t -= woke;
+ auto t = min_interval - woke;
dout(20) << __FUNC__ << ": waiting for another " << t
<< " to reach min interval " << min_interval << dendl;
- sync_cond.WaitInterval(lock, t);
+ sync_cond.wait_for(l, t);
}
}
list<Context*> fin;
again:
fin.swap(sync_waiters);
- lock.Unlock();
+ l.unlock();
op_tp.pause();
if (apply_manager.commit_start()) {
- utime_t start = ceph_clock_now();
+ auto start = ceph::real_clock::now();
uint64_t cp = apply_manager.get_committing_seq();
- sync_entry_timeo_lock.Lock();
+ sync_entry_timeo_lock.lock();
SyncEntryTimeout *sync_entry_timeo =
new SyncEntryTimeout(cct, m_filestore_commit_timeout);
if (!timer.add_event_after(m_filestore_commit_timeout,
sync_entry_timeo)) {
sync_entry_timeo = nullptr;
}
- sync_entry_timeo_lock.Unlock();
+ sync_entry_timeo_lock.unlock();
logger->set(l_filestore_committing, 1);
}
}
- utime_t done = ceph_clock_now();
- utime_t lat = done - start;
- utime_t dur = done - startwait;
+ auto done = ceph::real_clock::now();
+ auto lat = done - start;
+ auto dur = done - startwait;
dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
- if (max_pause_lat < dur - lat) {
+ if (max_pause_lat < utime_t{dur - lat}) {
logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
}
dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
if (sync_entry_timeo) {
- Mutex::Locker lock(sync_entry_timeo_lock);
+ std::lock_guard lock{sync_entry_timeo_lock};
timer.cancel_event(sync_entry_timeo);
}
} else {
op_tp.unpause();
}
- lock.Lock();
+ l.lock();
finish_contexts(cct, fin, 0);
fin.clear();
if (!sync_waiters.empty()) {
}
}
stop = false;
- lock.Unlock();
}
void FileStore::do_force_sync()
{
dout(10) << __FUNC__ << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
force_sync = true;
- sync_cond.Signal();
+ sync_cond.notify_all();
}
void FileStore::start_sync(Context *onsafe)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
sync_waiters.push_back(onsafe);
- sync_cond.Signal();
+ sync_cond.notify_all();
force_sync = true;
dout(10) << __FUNC__ << dendl;
}
void FileStore::sync()
{
- Mutex l("FileStore::sync");
- Cond c;
+ ceph::mutex m = ceph::make_mutex("FileStore::sync");
+ ceph::condition_variable c;
bool done;
- C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
+ C_SafeCond *fin = new C_SafeCond(m, c, &done);
start_sync(fin);
- l.Lock();
- while (!done) {
- dout(10) << "sync waiting" << dendl;
- c.Wait(l);
- }
- l.Unlock();
+ std::unique_lock l{m};
+ c.wait(l, [&done, this] {
+ if (!done) {
+ dout(10) << "sync waiting" << dendl;
+ }
+ return done;
+ });
dout(10) << "sync done" << dendl;
}
if (cct->_conf->filestore_blackhole) {
// wait forever
- Mutex lock("FileStore::flush::lock");
- Cond cond;
- lock.Lock();
- while (true)
- cond.Wait(lock);
+ ceph::mutex lock = ceph::make_mutex("FileStore::flush::lock");
+ ceph::condition_variable cond;
+ std::unique_lock l{lock};
+ cond.wait(l, [] {return false;} );
ceph_abort();
}
// debug EIO injection
void FileStore::inject_data_error(const ghobject_t &oid) {
- Mutex::Locker l(read_error_lock);
+ std::lock_guard l{read_error_lock};
dout(10) << __FUNC__ << ": init error on " << oid << dendl;
data_error_set.insert(oid);
}
void FileStore::inject_mdata_error(const ghobject_t &oid) {
- Mutex::Locker l(read_error_lock);
+ std::lock_guard l{read_error_lock};
dout(10) << __FUNC__ << ": init error on " << oid << dendl;
mdata_error_set.insert(oid);
}
void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
- Mutex::Locker l(read_error_lock);
+ std::lock_guard l{read_error_lock};
dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
data_error_set.erase(oid);
mdata_error_set.erase(oid);
}
bool FileStore::debug_data_eio(const ghobject_t &oid) {
- Mutex::Locker l(read_error_lock);
+ std::lock_guard l{read_error_lock};
if (data_error_set.count(oid)) {
dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
return true;
}
}
bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
- Mutex::Locker l(read_error_lock);
+ std::lock_guard l{read_error_lock};
if (mdata_error_set.count(oid)) {
dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
return true;
}
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
vector<ghobject_t> ls;
r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
return r;
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = index->collection_list_partial(start, end, max, ls, next);
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
}
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0) {
where = " (lfn_find)";
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
}
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0) {
dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
if (r < 0)
goto out;
ceph_assert(from.index);
- RWLock::WLocker l((from.index)->access_lock);
+ std::unique_lock l{(from.index)->access_lock};
r = from->prep_delete();
if (r < 0)
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
}
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0) {
dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
return r;
{
ceph_assert(index.index);
- RWLock::RLocker l((index.index)->access_lock);
+ std::shared_lock l{(index.index)->access_lock};
r = lfn_find(hoid, index);
if (r < 0)
return r;
if (!r) {
ceph_assert(from.index);
- RWLock::WLocker l1((from.index)->access_lock);
+ std::unique_lock l1{(from.index)->access_lock};
ceph_assert(to.index);
- RWLock::WLocker l2((to.index)->access_lock);
+ std::unique_lock l2{(to.index)->access_lock};
r = from->merge(bits, to.index);
}
if (!r) {
ceph_assert(from.index);
- RWLock::WLocker l1((from.index)->access_lock);
+ std::unique_lock l1{(from.index)->access_lock};
ceph_assert(to.index);
- RWLock::WLocker l2((to.index)->access_lock);
+ std::unique_lock l2{(to.index)->access_lock};
r = from->merge(bits, to.index);
}
if (!r) {
ceph_assert(from.index);
- RWLock::WLocker l1((from.index)->access_lock);
+ std::unique_lock l1{(from.index)->access_lock};
ceph_assert(to.index);
- RWLock::WLocker l2((to.index)->access_lock);
+ std::unique_lock l2{(to.index)->access_lock};
r = from->split(rem, bits, to.index);
}
changed.count("filestore_max_xattr_value_size_btrfs") ||
changed.count("filestore_max_xattr_value_size_other")) {
if (backend) {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
set_xattr_limits_via_conf();
}
}
changed.count("filestore_queue_high_threshhold") ||
changed.count("filestore_queue_high_delay_multiple") ||
changed.count("filestore_queue_max_delay_multiple")) {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
set_throttle_params();
}
changed.count("filestore_sloppy_crc_block_size") ||
changed.count("filestore_max_alloc_hint_size") ||
changed.count("filestore_fadvise")) {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
m_filestore_kill_at = conf->filestore_kill_at;
m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
}
if (changed.count("filestore_commit_timeout")) {
- Mutex::Locker l(sync_entry_timeo_lock);
+ std::lock_guard l(sync_entry_timeo_lock);
m_filestore_commit_timeout = conf->filestore_commit_timeout;
}
if (changed.count("filestore_dump_file")) {
void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
{
- Mutex::Locker l(qlock);
+ std::unique_lock l{qlock};
uint32_t key = oid.hobj.get_hash();
retry:
while (true) {
if (*p->second == oid) {
dout(20) << __func__ << " " << oid << " waiting on " << p->second
<< dendl;
- cond.Wait(qlock);
+ cond.wait(l);
goto retry;
}
++p;
#include "common/perf_counters.h"
#include "common/zipkin_trace.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "HashIndex.h"
#include "IndexManager.h"
#include "os/ObjectMap.h"
int lock_fsid();
// sync thread
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("FileStore::lock");
bool force_sync;
- Cond sync_cond;
+ ceph::condition_variable sync_cond;
- Mutex sync_entry_timeo_lock;
+ ceph::mutex sync_entry_timeo_lock = ceph::make_mutex("FileStore::sync_entry_timeo_lock");
SafeTimer timer;
list<Context*> sync_waiters;
};
class OpSequencer : public CollectionImpl {
CephContext *cct;
- Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
+ // to protect q, for benefit of flush (peek/dequeue also protected by lock)
+ ceph::mutex qlock =
+ ceph::make_mutex("FileStore::OpSequencer::qlock", false);
list<Op*> q;
list<uint64_t> jq;
list<pair<uint64_t, Context*> > flush_commit_waiters;
- Cond cond;
+ ceph::condition_variable cond;
string osr_name_str;
/// hash of pointers to ghobject_t's for in-flight writes
unordered_multimap<uint32_t,const ghobject_t*> applying;
public:
- Mutex apply_lock; // for apply mutual exclusion
+ // for apply mutual exclusion
+ ceph::mutex apply_lock =
+ ceph::make_mutex("FileStore::OpSequencer::apply_lock", false);
int id;
const char *osr_name;
bool _get_max_uncompleted(
uint64_t *seq ///< [out] max uncompleted seq
) {
- ceph_assert(qlock.is_locked());
ceph_assert(seq);
*seq = 0;
if (q.empty() && jq.empty())
bool _get_min_uncompleted(
uint64_t *seq ///< [out] min uncompleted seq
) {
- ceph_assert(qlock.is_locked());
ceph_assert(seq);
*seq = 0;
if (q.empty() && jq.empty())
}
void queue_journal(Op *o) {
- Mutex::Locker l(qlock);
+ std::lock_guard l{qlock};
jq.push_back(o->op);
_register_apply(o);
}
void dequeue_journal(list<Context*> *to_queue) {
- Mutex::Locker l(qlock);
+ std::lock_guard l{qlock};
jq.pop_front();
- cond.Signal();
+ cond.notify_all();
_wake_flush_waiters(to_queue);
}
void queue(Op *o) {
- Mutex::Locker l(qlock);
+ std::lock_guard l{qlock};
q.push_back(o);
_register_apply(o);
o->trace.keyval("queue depth", q.size());
void _unregister_apply(Op *o);
void wait_for_apply(const ghobject_t& oid);
Op *peek_queue() {
- Mutex::Locker l(qlock);
- ceph_assert(apply_lock.is_locked());
+ std::lock_guard l{qlock};
+ ceph_assert(ceph_mutex_is_locked(apply_lock));
return q.front();
}
Op *dequeue(list<Context*> *to_queue) {
ceph_assert(to_queue);
- ceph_assert(apply_lock.is_locked());
- Mutex::Locker l(qlock);
+ ceph_assert(ceph_mutex_is_locked(apply_lock));
+ std::lock_guard l{qlock};
Op *o = q.front();
q.pop_front();
- cond.Signal();
+ cond.notify_all();
_unregister_apply(o);
_wake_flush_waiters(to_queue);
return o;
}
void flush() override {
- Mutex::Locker l(qlock);
-
- while (cct->_conf->filestore_blackhole)
- cond.Wait(qlock); // wait forever
-
+ std::unique_lock l{qlock};
+ // wait forever
+ cond.wait(l, [this] { return !cct->_conf->filestore_blackhole; });
// get max for journal _or_ op queues
uint64_t seq = 0;
if (seq) {
// everything prior to our watermark to drain through either/both queues
- while ((!q.empty() && q.front()->op <= seq) ||
- (!jq.empty() && jq.front() <= seq))
- cond.Wait(qlock);
+ cond.wait(l, [seq, this] {
+ return ((q.empty() || q.front()->op > seq) &&
+ (jq.empty() || jq.front() > seq));
+ });
}
}
bool flush_commit(Context *c) override {
- Mutex::Locker l(qlock);
+ std::lock_guard l{qlock};
uint64_t seq = 0;
if (_get_max_uncompleted(&seq)) {
return true;
OpSequencer(CephContext* cct, int i, coll_t cid)
: CollectionImpl(cid),
cct(cct),
- qlock("FileStore::OpSequencer::qlock", false, false),
osr_name_str(stringify(cid)),
- apply_lock("FileStore::OpSequencer::apply_lock", false, false),
id(i),
osr_name(osr_name_str.c_str()) {}
~OpSequencer() override {
};
typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
- Mutex coll_lock;
+ ceph::mutex coll_lock = ceph::make_mutex("FileStore::coll_lock");
map<coll_t,OpSequencerRef> coll_map;
friend ostream& operator<<(ostream& out, const OpSequencer& s);
uint64_t estimate_objects_overhead(uint64_t num_objects) override;
// DEBUG read error injection, an object is removed from both on delete()
- Mutex read_error_lock;
+ ceph::mutex read_error_lock = ceph::make_mutex("FileStore::read_error_lock");
set<ghobject_t> data_error_set; // read() will return -EIO
set<ghobject_t> mdata_error_set; // getattr(),stat() will return -EIO
void inject_data_error(const ghobject_t &oid) override;
#include <errno.h>
-#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/config.h"
#include "common/debug.h"
int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
int r = set_version(path, version);
if (r < 0)
return r;
}
bool IndexManager::get_index_optimistic(coll_t c, Index *index) {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
ceph::unordered_map<coll_t, CollectionIndex* > ::iterator it = col_indices.find(c);
if (it == col_indices.end())
return false;
int IndexManager::get_index(coll_t c, const string& baseDir, Index *index) {
if (get_index_optimistic(c, index))
return 0;
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
ceph::unordered_map<coll_t, CollectionIndex* > ::iterator it = col_indices.find(c);
if (it == col_indices.end()) {
char path[PATH_MAX];
#include "include/unordered_map.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/config.h"
#include "common/debug.h"
*/
class IndexManager {
CephContext* cct;
- RWLock lock; ///< Lock for Index Manager
+ /// Lock for Index Manager
+ ceph::shared_mutex lock = ceph::make_shared_mutex("IndexManager lock");
bool upgrade;
ceph::unordered_map<coll_t, CollectionIndex* > col_indices;
/// Constructor
explicit IndexManager(CephContext* cct,
bool upgrade) : cct(cct),
- lock("IndexManager lock"),
upgrade(upgrade) {}
~IndexManager();
CephContext* cct;
PerfCounters *logger;
protected:
- Cond *do_sync_cond;
+ ceph::condition_variable *do_sync_cond;
bool wait_on_full;
public:
- Journal(CephContext* cct, uuid_d f, Finisher *fin, Cond *c=0) :
+ Journal(CephContext* cct, uuid_d f, Finisher *fin, ceph::condition_variable *c=0) :
fsid(f), finisher(fin), cct(cct), logger(NULL),
do_sync_cond(c),
wait_on_full(false) { }
uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
- Mutex::Locker l(apply_lock);
- while (blocked) {
- dout(10) << "op_apply_start blocked, waiting" << dendl;
- blocked_cond.Wait(apply_lock);
- }
+ std::unique_lock l{apply_lock};
+ blocked_cond.wait(l, [this] {
+ if (blocked) {
+ dout(10) << "op_apply_start blocked, waiting" << dendl;
+ }
+ return !blocked;
+ });
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> "
<< (open_ops+1) << dendl;
ceph_assert(!blocked);
void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
- Mutex::Locker l(apply_lock);
+ std::lock_guard l{apply_lock};
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> "
<< (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> "
<< std::max(op, max_applied_seq) << dendl;
// signal a blocked commit_start
if (blocked) {
- blocked_cond.Signal();
+ blocked_cond.notify_all();
}
// there can be multiple applies in flight; track the max value we
uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
{
- lock.Lock();
+ lock.lock();
uint64_t op = ++op_seq;
dout(10) << "op_submit_start " << op << dendl;
return op;
ceph_abort_msg("out of order op_submit_finish");
}
op_submitted = op;
- lock.Unlock();
+ lock.unlock();
}
void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
{
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
ceph_assert(c);
commit_waiters[op].push_back(c);
}
bool ret = false;
{
- Mutex::Locker l(apply_lock);
+ std::unique_lock l{apply_lock};
dout(10) << "commit_start max_applied_seq " << max_applied_seq
<< ", open_ops " << open_ops << dendl;
blocked = true;
- while (open_ops > 0) {
- dout(10) << "commit_start waiting for " << open_ops
- << " open ops to drain" << dendl;
- blocked_cond.Wait(apply_lock);
- }
+ blocked_cond.wait(l, [this] {
+ if (open_ops > 0) {
+ dout(10) << "commit_start waiting for " << open_ops
+ << " open ops to drain" << dendl;
+ }
+ return open_ops == 0;
+ });
ceph_assert(open_ops == 0);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
if (max_applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
void JournalingObjectStore::ApplyManager::commit_started()
{
- Mutex::Locker l(apply_lock);
+ std::lock_guard l{apply_lock};
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking"
<< dendl;
blocked = false;
- blocked_cond.Signal();
+ blocked_cond.notify_all();
}
void JournalingObjectStore::ApplyManager::commit_finish()
{
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
dout(10) << "commit_finish thru " << committing_seq << dendl;
if (journal)
class SubmitManager {
CephContext* cct;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("JOS::SubmitManager::lock");
uint64_t op_seq;
uint64_t op_submitted;
public:
SubmitManager(CephContext* cct) :
- cct(cct), lock("JOS::SubmitManager::lock", false, true, false),
+ cct(cct),
op_seq(0), op_submitted(0)
{}
uint64_t op_submit_start();
void op_submit_finish(uint64_t op);
void set_op_seq(uint64_t seq) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
op_submitted = op_seq = seq;
}
uint64_t get_op_seq() {
Journal *&journal;
Finisher &finisher;
- Mutex apply_lock;
+ ceph::mutex apply_lock = ceph::make_mutex("JOS::ApplyManager::apply_lock");
bool blocked;
- Cond blocked_cond;
+ ceph::condition_variable blocked_cond;
int open_ops;
uint64_t max_applied_seq;
- Mutex com_lock;
+ ceph::mutex com_lock = ceph::make_mutex("JOS::ApplyManager::com_lock");
map<version_t, vector<Context*> > commit_waiters;
uint64_t committing_seq, committed_seq;
public:
ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
cct(cct), journal(j), finisher(f),
- apply_lock("JOS::ApplyManager::apply_lock", false, true, false),
blocked(false),
open_ops(0),
max_applied_seq(0),
- com_lock("JOS::ApplyManager::com_lock", false, true, false),
committing_seq(0), committed_seq(0) {}
void reset() {
ceph_assert(open_ops == 0);
void commit_started();
void commit_finish();
bool is_committing() {
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
return committing_seq != committed_seq;
}
uint64_t get_committed_seq() {
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
return committed_seq;
}
uint64_t get_committing_seq() {
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
return committing_seq;
}
void init_seq(uint64_t fs_op_seq) {
{
- Mutex::Locker l(com_lock);
+ std::lock_guard l{com_lock};
committed_seq = fs_op_seq;
committing_seq = fs_op_seq;
}
{
- Mutex::Locker l(apply_lock);
+ std::lock_guard l{apply_lock};
max_applied_seq = fs_op_seq;
}
}
cct(cct),
logger(NULL),
stopping(true),
- lock("WBThrottle::lock", false, true, false),
fs(XFS)
{
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
set_from_conf();
}
ceph_assert(cct);
void WBThrottle::start()
{
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
stopping = false;
}
create("wb_throttle");
void WBThrottle::stop()
{
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
stopping = true;
- cond.Signal();
+ cond.notify_all();
}
join();
void WBThrottle::set_from_conf()
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (fs == BTRFS) {
size_limits.first =
cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
} else {
ceph_abort_msg("invalid value for fs");
}
- cond.Signal();
+ cond.notify_all();
}
void WBThrottle::handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (const char** i = get_tracked_conf_keys(); *i; ++i) {
if (changed.count(*i)) {
set_from_conf();
}
bool WBThrottle::get_next_should_flush(
+ std::unique_lock<ceph::mutex>& locker,
boost::tuple<ghobject_t, FDRef, PendingWB> *next)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(next);
- while (!stopping && (!beyond_limit() || pending_wbs.empty()))
- cond.Wait(lock);
+ {
+ cond.wait(locker, [this] {
+ return stopping || (beyond_limit() && !pending_wbs.empty());
+ });
+ }
if (stopping)
return false;
ceph_assert(!pending_wbs.empty());
void *WBThrottle::entry()
{
- Mutex::Locker l(lock);
+ std::unique_lock l{lock};
boost::tuple<ghobject_t, FDRef, PendingWB> wb;
- while (get_next_should_flush(&wb)) {
+ while (get_next_should_flush(l, &wb)) {
clearing = wb.get<0>();
cur_ios -= wb.get<2>().ios;
logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size);
logger->dec(l_wbthrottle_inodes_dirtied);
logger->inc(l_wbthrottle_inodes_wb);
- lock.Unlock();
+ l.unlock();
#if defined(HAVE_FDATASYNC)
int r = ::fdatasync(**wb.get<1>());
#else
ceph_assert(fa_r == 0);
}
#endif
- lock.Lock();
+ l.lock();
clearing = ghobject_t();
- cond.Signal();
+ cond.notify_all();
wb = boost::tuple<ghobject_t, FDRef, PendingWB>();
}
return 0;
FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len,
bool nocache)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
pending_wbs.find(hoid);
if (wbiter == pending_wbs.end()) {
wbiter->second.first.add(nocache, len, 1);
insert_object(hoid);
if (beyond_limit())
- cond.Signal();
+ cond.notify_all();
}
void WBThrottle::clear()
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
pending_wbs.begin();
i != pending_wbs.end();
pending_wbs.clear();
lru.clear();
rev_lru.clear();
- cond.Signal();
+ cond.notify_all();
}
void WBThrottle::clear_object(const ghobject_t &hoid)
{
- Mutex::Locker l(lock);
- while (clearing == hoid)
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l, [hoid, this] { return clearing != hoid; });
ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
pending_wbs.find(hoid);
if (i == pending_wbs.end())
pending_wbs.erase(i);
remove_object(hoid);
- cond.Signal();
+ cond.notify_all();
}
void WBThrottle::throttle()
{
- Mutex::Locker l(lock);
- while (!stopping && need_flush())
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l, [this] { return stopping || !need_flush(); });
}
CephContext *cct;
PerfCounters *logger;
bool stopping;
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("WBThrottle::lock");
+ ceph::condition_variable cond;
/**
list<ghobject_t> lru;
ceph::unordered_map<ghobject_t, list<ghobject_t>::iterator> rev_lru;
void remove_object(const ghobject_t &oid) {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph::unordered_map<ghobject_t, list<ghobject_t>::iterator>::iterator iter =
rev_lru.find(oid);
if (iter == rev_lru.end())
/// get next flush to perform
bool get_next_should_flush(
+ std::unique_lock<ceph::mutex>& locker,
boost::tuple<ghobject_t, FDRef, PendingWB> *next ///< [out] next to flush
); ///< @return false if we are shutting down
public:
void stop();
/// Set fs as XFS or BTRFS
void set_fs(FS new_fs) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
fs = new_fs;
set_from_conf();
}
#include <string>
#include "include/types.h"
-#include "common/Mutex.h"
#include "common/Cond.h"
class FS {
KStore::Collection::Collection(KStore *ns, coll_t cid)
: CollectionImpl(cid),
store(ns),
- lock("KStore::Collection::lock", true, false),
osr(new OpSequencer()),
onode_map(store->cct)
{
const ghobject_t& oid,
bool create)
{
- ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
+ ceph_assert(create ? ceph_mutex_is_wlocked(lock) : ceph_mutex_is_locked(lock));
spg_t pgid;
if (cid.is_pg(&pgid)) {
path_fd(-1),
fsid_fd(-1),
mounted(false),
- coll_lock("KStore::coll_lock"),
nid_last(0),
nid_max(0),
throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid)
{
auto *c = new Collection(this, cid);
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l{coll_lock};
new_coll_map[cid] = c;
return c;
}
KStore::CollectionRef KStore::_get_collection(coll_t cid)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l{coll_lock};
ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
if (cp == coll_map.end())
return CollectionRef();
{
dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists)
return false;
{
dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists)
return -ENOENT;
<< dendl;
bl.clear();
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r;
CollectionRef c = static_cast<Collection*>(ch.get());
if (!c)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " " << oid << " " << name << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r;
string k(name);
{
dout(15) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r;
OnodeRef o = c->get_onode(oid, false);
int KStore::list_collections(vector<coll_t>& ls)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l{coll_lock};
for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
p != coll_map.end();
++p)
bool KStore::collection_exists(const coll_t& c)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l{coll_lock};
return coll_map.count(c);
}
{
dout(15) << __func__ << " " << ch->cid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
return c->cnode.bits;
}
<< " start " << start << " end " << end << " max " << max << dendl;
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
r = _collection_list(c, start, end, max, ls, pnext);
}
CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
: c(c), o(o), it(it)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
get_omap_key(o->onode.omap_head, string(), &head);
get_omap_tail(o->onode.omap_head, &tail);
int KStore::OmapIteratorImpl::seek_to_first()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
it->lower_bound(head);
} else {
int KStore::OmapIteratorImpl::upper_bound(const string& after)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
string key;
get_omap_key(o->onode.omap_head, after, &key);
int KStore::OmapIteratorImpl::lower_bound(const string& to)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
string key;
get_omap_key(o->onode.omap_head, to, &key);
bool KStore::OmapIteratorImpl::valid()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
return true;
} else {
int KStore::OmapIteratorImpl::next()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
it->next();
return 0;
string KStore::OmapIteratorImpl::key()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
ceph_assert(it->valid());
string db_key = it->raw_key().second;
string user_key;
bufferlist KStore::OmapIteratorImpl::value()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
ceph_assert(it->valid());
return it->value();
}
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
}
// object operations
- RWLock::WLocker l(c->lock);
+ std::unique_lock l{c->lock};
OnodeRef &o = ovec[op->oid];
if (!o) {
// these operations implicity create the object
bufferlist bl;
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l{coll_lock};
if (*c) {
r = -EEXIST;
goto out;
int r;
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l{coll_lock};
if (!*c) {
r = -ENOENT;
goto out;
dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
<< " bits " << bits << dendl;
int r;
- RWLock::WLocker l(c->lock);
- RWLock::WLocker l2(d->lock);
+ std::unique_lock l{c->lock};
+ std::unique_lock l2{d->lock};
c->onode_map.clear();
d->onode_map.clear();
c->cnode.bits = bits;
dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid << " "
<< " bits " << bits << dendl;
int r;
- RWLock::WLocker l((*c)->lock);
- RWLock::WLocker l2(d->lock);
+ std::scoped_lock l{(*c)->lock, d->lock};
(*c)->onode_map.clear();
d->onode_map.clear();
d->cnode.bits = bits;
struct Collection : public CollectionImpl {
KStore *store;
kstore_cnode_t cnode;
- RWLock lock;
+ ceph::shared_mutex lock =
+ ceph::make_shared_mutex("KStore::Collection::lock", true, false);
OpSequencerRef osr;
int fsid_fd; ///< open handle (locked) to $path/fsid
bool mounted;
- RWLock coll_lock; ///< rwlock to protect coll_map
+ /// rwlock to protect coll_map
+ ceph::shared_mutex coll_lock = ceph::make_shared_mutex("KStore::coll_lock");
ceph::unordered_map<coll_t, CollectionRef> coll_map;
map<coll_t,CollectionRef> new_coll_map;