ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
const ghobject_t &oid)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return ObjectMapIterator(new EmptyIteratorImpl());
- return _get_iterator(header);
+ DBObjectMapIterator iter = _get_iterator(header);
+ iter->hlock.swap(hl);
+ return iter;
}
int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
- Header header = lookup_create_map_header(oid, t);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_create_map_header(hl, oid, t);
if (!header)
return -EINVAL;
if (check_spos(oid, header, spos))
const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
- Header header = lookup_create_map_header(oid, t);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_create_map_header(hl, oid, t);
if (!header)
return -EINVAL;
if (check_spos(oid, header, spos))
int DBObjectMap::get_header(const ghobject_t &oid,
bufferlist *bl)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header) {
return 0;
}
const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
if (check_spos(oid, header, spos))
return 0;
- remove_map_header(oid, header, t);
+ remove_map_header(hl, oid, header, t);
assert(header->num_children > 0);
header->num_children--;
int r = _clear(header, t);
const set<string> &to_clear,
const SequencerPosition *spos)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
KeyValueDB::Transaction t = db->get_transaction();
parent->num_children--;
_clear(parent, t);
header->parent = 0;
- set_map_header(oid, *header, t);
+ set_map_header(hl, oid, *header, t);
t->rmkeys_by_prefix(complete_prefix(header));
}
return db->submit_transaction(t);
const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
if (check_spos(oid, header, spos))
return iter->status();
// remove current header
- remove_map_header(oid, header, t);
+ remove_map_header(hl, oid, header, t);
assert(header->num_children > 0);
header->num_children--;
int r = _clear(header, t);
// create new header
Header newheader = generate_new_header(oid, Header());
- set_map_header(oid, *newheader, t);
+ set_map_header(hl, oid, *newheader, t);
if (!attrs.empty())
t->set(xattr_prefix(newheader), attrs);
return db->submit_transaction(t);
bufferlist *_header,
map<string, bufferlist> *out)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
_get_header(header, _header);
int DBObjectMap::get_keys(const ghobject_t &oid,
set<string> *keys)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
ObjectMapIterator iter = get_iterator(oid);
const set<string> &keys,
map<string, bufferlist> *out)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
return scan(header, keys, 0, out);
const set<string> &keys,
set<string> *out)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
return scan(header, keys, out, 0);
const set<string> &to_get,
map<string, bufferlist> *out)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
return db->get(xattr_prefix(header), to_get, out);
int DBObjectMap::get_all_xattrs(const ghobject_t &oid,
set<string> *out)
{
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
- Header header = lookup_create_map_header(oid, t);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_create_map_header(hl, oid, t);
if (!header)
return -EINVAL;
if (check_spos(oid, header, spos))
const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
- Header header = lookup_map_header(oid);
+ MapHeaderLock hl(this, oid);
+ Header header = lookup_map_header(hl, oid);
if (!header)
return -ENOENT;
if (check_spos(oid, header, spos))
if (oid == target)
return 0;
+ MapHeaderLock _l1(this, MIN(oid, target));
+ MapHeaderLock _l2(this, MAX(oid, target));
+ MapHeaderLock *lsource, *ltarget;
+ if (oid > target) {
+ lsource = &_l2;
+ ltarget= &_l1;
+ } else {
+ lsource = &_l1;
+ ltarget= &_l2;
+ }
+
KeyValueDB::Transaction t = db->get_transaction();
{
- Header destination = lookup_map_header(target);
+ Header destination = lookup_map_header(*ltarget, target);
if (destination) {
- remove_map_header(target, destination, t);
+ remove_map_header(*ltarget, target, destination, t);
if (check_spos(target, destination, spos))
return 0;
destination->num_children--;
}
}
- Header parent = lookup_map_header(oid);
+ Header parent = lookup_map_header(*lsource, oid);
if (!parent)
return db->submit_transaction(t);
parent->num_children = 2;
set_header(parent, t);
- set_map_header(oid, *source, t);
- set_map_header(target, *destination, t);
+ set_map_header(*lsource, oid, *source, t);
+ set_map_header(*ltarget, target, *destination, t);
map<string, bufferlist> to_set;
KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(parent));
write_state(t);
if (oid) {
assert(spos);
- Header header = lookup_map_header(*oid);
+ MapHeaderLock hl(this, *oid);
+ Header header = lookup_map_header(hl, *oid);
if (header) {
dout(10) << "oid: " << *oid << " setting spos to "
<< *spos << dendl;
header->spos = *spos;
- set_map_header(*oid, *header, t);
+ set_map_header(hl, *oid, *header, t);
}
}
return db->submit_transaction_sync(t);
}
-DBObjectMap::Header DBObjectMap::_lookup_map_header(const ghobject_t &oid)
+DBObjectMap::Header DBObjectMap::_lookup_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid)
{
- while (map_header_in_use.count(oid))
- header_cond.Wait(header_lock);
+ assert(l.get_locked() == oid);
_Header *header = new _Header();
{
Mutex::Locker l(cache_lock);
if (caches.lookup(oid, header)) {
- return Header(header, RemoveMapHeaderOnDelete(this, oid));
+ assert(!in_use.count(header->seq));
+ in_use.insert(header->seq);
+ return Header(header, RemoveOnDelete(this));
}
}
return Header();
}
- Header ret(header, RemoveMapHeaderOnDelete(this, oid));
+ Header ret(header, RemoveOnDelete(this));
bufferlist::iterator iter = out.begin()->second.begin();
ret->decode(iter);
{
caches.add(oid, *ret);
}
+ assert(!in_use.count(header->seq));
+ in_use.insert(header->seq);
return ret;
}
}
DBObjectMap::Header DBObjectMap::lookup_create_map_header(
+ const MapHeaderLock &hl,
const ghobject_t &oid,
KeyValueDB::Transaction t)
{
Mutex::Locker l(header_lock);
- Header header = _lookup_map_header(oid);
+ Header header = _lookup_map_header(hl, oid);
if (!header) {
header = _generate_new_header(oid, Header());
- set_map_header(oid, *header, t);
+ set_map_header(hl, oid, *header, t);
}
return header;
}
t->set(sys_prefix(header), to_write);
}
-void DBObjectMap::remove_map_header(const ghobject_t &oid,
- Header header,
- KeyValueDB::Transaction t)
+void DBObjectMap::remove_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid,
+ Header header,
+ KeyValueDB::Transaction t)
{
+ assert(l.get_locked() == oid);
dout(20) << "remove_map_header: removing " << header->seq
<< " oid " << oid << dendl;
set<string> to_remove;
}
}
-void DBObjectMap::set_map_header(const ghobject_t &oid, _Header header,
- KeyValueDB::Transaction t)
+void DBObjectMap::set_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid, _Header header,
+ KeyValueDB::Transaction t)
{
+ assert(l.get_locked() == oid);
dout(20) << "set_map_header: setting " << header.seq
<< " oid " << oid << " parent seq "
<< header.parent << dendl;
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/simple_cache.hpp"
+#include <boost/optional.hpp>
/**
* DBObjectMap: Implements ObjectMap in terms of KeyValueDB
set<uint64_t> in_use;
set<ghobject_t> map_header_in_use;
+ /**
+ * Takes the map_header_in_use entry in constructor, releases in
+ * destructor
+ */
+ class MapHeaderLock {
+ DBObjectMap *db;
+ boost::optional<ghobject_t> locked;
+
+ MapHeaderLock(const MapHeaderLock &);
+ MapHeaderLock &operator=(const MapHeaderLock &);
+ public:
+ MapHeaderLock(DBObjectMap *db) : db(db) {}
+ MapHeaderLock(DBObjectMap *db, const ghobject_t &oid) : db(db), locked(oid) {
+ Mutex::Locker l(db->header_lock);
+ while (db->map_header_in_use.count(*locked))
+ db->map_header_cond.Wait(db->header_lock);
+ db->map_header_in_use.insert(*locked);
+ }
+
+ const ghobject_t &get_locked() const {
+ assert(locked);
+ return *locked;
+ }
+
+ void swap(MapHeaderLock &o) {
+ assert(db == o.db);
+
+ // centos6's boost optional doesn't seem to have swap :(
+ boost::optional<ghobject_t> _locked = o.locked;
+ o.locked = locked;
+ locked = _locked;
+ }
+
+ ~MapHeaderLock() {
+ if (locked) {
+ Mutex::Locker l(db->header_lock);
+ assert(db->map_header_in_use.count(*locked));
+ db->map_header_cond.Signal();
+ db->map_header_in_use.erase(*locked);
+ }
+ }
+ };
+
DBObjectMap(KeyValueDB *db) : db(db), header_lock("DBOBjectMap"),
cache_lock("DBObjectMap::CacheLock"),
caches(g_conf->filestore_omap_header_cache_size)
public:
DBObjectMap *map;
+ /// NOTE: implicit lock hlock->get_locked() when returned out of the class
+ MapHeaderLock hlock;
/// NOTE: implicit lock on header->seq AND for all ancestors
Header header;
bool invalid;
DBObjectMapIteratorImpl(DBObjectMap *map, Header header) :
- map(map), header(header), r(0), ready(false), invalid(true) {}
+ map(map), hlock(map), header(header), r(0), ready(false), invalid(true) {}
int seek_to_first();
int seek_to_last();
int upper_bound(const string &after);
void set_header(Header input, KeyValueDB::Transaction t);
/// Remove leaf node corresponding to oid in c
- void remove_map_header(const ghobject_t &oid,
- Header header,
- KeyValueDB::Transaction t);
+ void remove_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid,
+ Header header,
+ KeyValueDB::Transaction t);
/// Set leaf node for c and oid to the value of header
- void set_map_header(const ghobject_t &oid, _Header header,
- KeyValueDB::Transaction t);
+ void set_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid, _Header header,
+ KeyValueDB::Transaction t);
/// Set leaf node for c and oid to the value of header
bool check_spos(const ghobject_t &oid,
const SequencerPosition *spos);
/// Lookup or create header for c oid
- Header lookup_create_map_header(const ghobject_t &oid,
- KeyValueDB::Transaction t);
+ Header lookup_create_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid,
+ KeyValueDB::Transaction t);
/**
* Generate new header for c oid with new seq number
}
/// Lookup leaf header for c oid
- Header _lookup_map_header(const ghobject_t &oid);
- Header lookup_map_header(const ghobject_t &oid) {
+ Header _lookup_map_header(
+ const MapHeaderLock &l,
+ const ghobject_t &oid);
+ Header lookup_map_header(
+ const MapHeaderLock &l2,
+ const ghobject_t &oid) {
Mutex::Locker l(header_lock);
- return _lookup_map_header(oid);
+ return _lookup_map_header(l2, oid);
}
/// Lookup header node for input
KeyValueDB::Transaction t);
/**
- * Removes map header lock once Header is out of scope
- * @see lookup_map_header
- */
- class RemoveMapHeaderOnDelete {
- public:
- DBObjectMap *db;
- ghobject_t oid;
- RemoveMapHeaderOnDelete(DBObjectMap *db, const ghobject_t &oid) :
- db(db), oid(oid) {}
- void operator() (_Header *header) {
- Mutex::Locker l(db->header_lock);
- db->map_header_in_use.erase(oid);
- db->map_header_cond.Signal();
- delete header;
- }
- };
-
- /**
- * Removes header seq lock once Header is out of scope
+ * Removes header seq lock and possibly object lock
+ * once Header is out of scope
* @see lookup_parent
* @see generate_new_header
*/
db(db) {}
void operator() (_Header *header) {
Mutex::Locker l(db->header_lock);
+ assert(db->in_use.count(header->seq));
db->in_use.erase(header->seq);
db->header_cond.Signal();
delete header;