ldout(cct, 1) << __func__ << dendl;
- const OSDMap *osdmap = objecter->get_osdmap_read();
- const epoch_t osd_epoch = osdmap->get_epoch();
- objecter->put_osdmap_read();
+ const epoch_t osd_epoch
+ = objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
if (f) {
f->open_object_section("metadata");
- {
- for (std::map<std::string, std::string>::const_iterator i = metadata.begin();
- i != metadata.end(); ++i) {
- f->dump_string(i->first.c_str(), i->second);
- }
- }
+ for (const auto& kv : metadata)
+ f->dump_string(kv.first.c_str(), kv.second);
f->close_section();
f->dump_int("dentry_count", lru.lru_get_size());
}
r->set_mdsmap_epoch(mdsmap->get_epoch());
if (r->head.op == CEPH_MDS_OP_SETXATTR) {
- const OSDMap *osdmap = objecter->get_osdmap_read();
- r->set_osdmap_epoch(osdmap->get_epoch());
- objecter->put_osdmap_read();
+ objecter->with_osdmap([r](const OSDMap& o) {
+ r->set_osdmap_epoch(o.get_epoch());
+ });
}
if (request->mds == -1) {
// cancel_writes
std::vector<int64_t> full_pools;
- const OSDMap *osd_map = objecter->get_osdmap_read();
- const map<int64_t,pg_pool_t>& pools = osd_map->get_pools();
- for (map<int64_t,pg_pool_t>::const_iterator i = pools.begin();
- i != pools.end(); ++i) {
- if (i->second.has_flag(pg_pool_t::FLAG_FULL)) {
- full_pools.push_back(i->first);
- }
- }
-
- objecter->put_osdmap_read();
+ objecter->with_osdmap([&full_pools](const OSDMap &o) {
+ for (const auto& kv : o.get_pools()) {
+ if (kv.second.has_flag(pg_pool_t::FLAG_FULL)) {
+ full_pools.push_back(kv.first);
+ }
+ }
+ });
- for (std::vector<int64_t>::iterator i = full_pools.begin();
- i != full_pools.end(); ++i) {
- _handle_full_flag(*i);
- }
+ for (auto p : full_pools)
+ _handle_full_flag(p);
// Subscribe to subsequent maps to watch for the full flag going
// away. For the global full flag objecter does this for us, but
strcmp(name, "ceph.file.layout") == 0 || strcmp(name, "ceph.dir.layout") == 0) {
string rest(strstr(name, "layout"));
string v((const char*)value);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- int r = check_data_pool_exist(rest, v, osdmap);
- objecter->put_osdmap_read();
+ int r = objecter->with_osdmap([&](const OSDMap& o) {
+ return check_data_pool_exist(rest, v, &o);
+ });
if (r == -ENOENT) {
C_SaferCond ctx;
(unsigned long long)in->layout.fl_stripe_unit,
(unsigned long long)in->layout.fl_stripe_count,
(unsigned long long)in->layout.fl_object_size);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- if (osdmap->have_pg_pool(in->layout.fl_pg_pool))
- r += snprintf(val + r, size - r, "%s",
- osdmap->get_pool_name(in->layout.fl_pg_pool).c_str());
- else
- r += snprintf(val + r, size - r, "%lld",
- (unsigned long long)in->layout.fl_pg_pool);
- objecter->put_osdmap_read();
+ objecter->with_osdmap([&](const OSDMap& o) {
+ if (o.have_pg_pool(in->layout.fl_pg_pool))
+ r += snprintf(val + r, size - r, "%s",
+ o.get_pool_name(in->layout.fl_pg_pool).c_str());
+ else
+ r += snprintf(val + r, size - r, "%" PRIu64,
+ (uint64_t)in->layout.fl_pg_pool);
+ });
return r;
}
size_t Client::_vxattrcb_layout_stripe_unit(Inode *in, char *val, size_t size)
size_t Client::_vxattrcb_layout_pool(Inode *in, char *val, size_t size)
{
size_t r;
- const OSDMap *osdmap = objecter->get_osdmap_read();
- if (osdmap->have_pg_pool(in->layout.fl_pg_pool))
- r = snprintf(val, size, "%s", osdmap->get_pool_name(in->layout.fl_pg_pool).c_str());
- else
- r = snprintf(val, size, "%lld", (unsigned long long)in->layout.fl_pg_pool);
- objecter->put_osdmap_read();
+ objecter->with_osdmap([&](const OSDMap& o) {
+ if (o.have_pg_pool(in->layout.fl_pg_pool))
+ r = snprintf(val, size, "%s", o.get_pool_name(
+ in->layout.fl_pg_pool).c_str());
+ else
+ r = snprintf(val, size, "%" PRIu64, (uint64_t)in->layout.fl_pg_pool);
+ });
return r;
}
size_t Client::_vxattrcb_dir_entries(Inode *in, char *val, size_t size)
int64_t pool_id = -1;
if (data_pool && *data_pool) {
- const OSDMap * osdmap = objecter->get_osdmap_read();
- pool_id = osdmap->lookup_pg_pool_name(data_pool);
- objecter->put_osdmap_read();
+ pool_id = objecter->with_osdmap(
+ std::mem_fn(&OSDMap::lookup_pg_pool_name), data_pool);
if (pool_id < 0)
return -EINVAL;
if (pool_id > 0xffffffffll)
int Client::ll_num_osds(void)
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- int ret = osdmap->get_num_osds();
- objecter->put_osdmap_read();
- return ret;
+ return objecter->with_osdmap(std::mem_fn(&OSDMap::get_num_osds));
}
int Client::ll_osdaddr(int osd, uint32_t *addr)
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- bool exists = osdmap->exists(osd);
entity_addr_t g;
- if (exists)
- g = osdmap->get_addr(osd);
- objecter->put_osdmap_read();
- if (!exists) {
+ bool exists = objecter->with_osdmap([&](const OSDMap& o) {
+ if (!o.exists(osd))
+ return false;
+ g = o.get_addr(osd);
+ return true;
+ });
+ if (!exists)
return -1;
- }
uint32_t nb_addr = (g.in4_addr()).sin_addr.s_addr;
*addr = ntohl(nb_addr);
return 0;
}
-
uint32_t Client::ll_stripe_unit(Inode *in)
{
Mutex::Locker lock(client_lock);
uint64_t objectno = objectsetno * stripe_count + stripepos; // object id
object_t oid = file_object_t(ino, objectno);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- ceph_object_layout olayout = osdmap->file_to_object_layout(oid, *layout, "");
- objecter->put_osdmap_read();
-
- pg_t pg = (pg_t)olayout.ol_pgid;
- vector<int> osds;
- int primary;
- osdmap->pg_to_osds(pg, &osds, &primary);
- return osds[0];
+ return objecter->with_osdmap([&](const OSDMap& o) {
+ ceph_object_layout olayout =
+ o.file_to_object_layout(oid, *layout, string());
+ pg_t pg = (pg_t)olayout.ol_pgid;
+ vector<int> osds;
+ int primary;
+ o.pg_to_osds(pg, &osds, &primary);
+ return osds[0];
+ });
}
/* Return the offset of the block, internal to the object */
int64_t Client::get_pool_id(const char *pool_name)
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
- objecter->put_osdmap_read();
- return pool;
+ return objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
+ pool_name);
}
string Client::get_pool_name(int64_t pool)
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- string ret;
- if (osdmap->have_pg_pool(pool))
- ret = osdmap->get_pool_name(pool);
- objecter->put_osdmap_read();
- return ret;
+ return objecter->with_osdmap([pool](const OSDMap& o) {
+ return o.have_pg_pool(pool) ? o.get_pool_name(pool) : string();
+ });
}
int Client::get_pool_replication(int64_t pool)
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- int ret;
- if (!osdmap->have_pg_pool(pool))
- ret = -ENOENT;
- else
- ret = osdmap->get_pg_pool(pool)->get_size();
- objecter->put_osdmap_read();
- return ret;
+ return objecter->with_osdmap([pool](const OSDMap& o) {
+ return o.have_pg_pool(pool) ? o.get_pg_pool(pool)->get_size() : -ENOENT;
+ });
}
int Client::get_file_extent_osds(int fd, loff_t off, loff_t *len, vector<int>& osds)
Striper::file_to_extents(cct, in->ino, &in->layout, off, 1, in->truncate_size, extents);
assert(extents.size() == 1);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- pg_t pg = osdmap->object_locator_to_pg(extents[0].oid, extents[0].oloc);
- osdmap->pg_to_acting_osds(pg, osds);
- objecter->put_osdmap_read();
+ objecter->with_osdmap([&](const OSDMap& o) {
+ pg_t pg = o.object_locator_to_pg(extents[0].oid, extents[0].oloc);
+ o.pg_to_acting_osds(pg, osds);
+ });
if (osds.empty())
return -EINVAL;
Mutex::Locker lock(client_lock);
if (id < 0)
return -EINVAL;
- const OSDMap *osdmap = objecter->get_osdmap_read();
- int ret = osdmap->crush->get_full_location_ordered(id, path);
- objecter->put_osdmap_read();
- return ret;
+ return objecter->with_osdmap([&](const OSDMap& o) {
+ return o.crush->get_full_location_ordered(id, path);
+ });
}
-int Client::get_file_stripe_address(int fd, loff_t offset, vector<entity_addr_t>& address)
+int Client::get_file_stripe_address(int fd, loff_t offset,
+ vector<entity_addr_t>& address)
{
Mutex::Locker lock(client_lock);
// which object?
vector<ObjectExtent> extents;
- Striper::file_to_extents(cct, in->ino, &in->layout, offset, 1, in->truncate_size, extents);
+ Striper::file_to_extents(cct, in->ino, &in->layout, offset, 1,
+ in->truncate_size, extents);
assert(extents.size() == 1);
// now we have the object and its 'layout'
- const OSDMap *osdmap = objecter->get_osdmap_read();
- pg_t pg = osdmap->object_locator_to_pg(extents[0].oid, extents[0].oloc);
- vector<int> osds;
- osdmap->pg_to_acting_osds(pg, osds);
- int ret = 0;
- if (!osds.empty()) {
- ret = -EINVAL;
- } else {
- for (unsigned i = 0; i < osds.size(); i++) {
- entity_addr_t addr = osdmap->get_addr(osds[i]);
- address.push_back(addr);
- }
- }
- objecter->put_osdmap_read();
- return ret;
+ return objecter->with_osdmap([&](const OSDMap& o) {
+ pg_t pg = o.object_locator_to_pg(extents[0].oid, extents[0].oloc);
+ vector<int> osds;
+ o.pg_to_acting_osds(pg, osds);
+ if (osds.empty())
+ return -EINVAL;
+ for (unsigned i = 0; i < osds.size(); i++) {
+ entity_addr_t addr = o.get_addr(osds[i]);
+ address.push_back(addr);
+ }
+ return 0;
+ });
}
int Client::get_osd_addr(int osd, entity_addr_t& addr)
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- int ret = 0;
- if (!osdmap->exists(osd))
- ret = -ENOENT;
- else
- addr = osdmap->get_addr(osd);
- objecter->put_osdmap_read();
- return ret;
+ return objecter->with_osdmap([&](const OSDMap& o) {
+ if (!o.exists(osd))
+ return -ENOENT;
+
+ addr = o.get_addr(osd);
+ return 0;
+ });
}
int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
int Client::get_local_osd()
{
Mutex::Locker lock(client_lock);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- if (osdmap->get_epoch() != local_osd_epoch) {
- local_osd = osdmap->find_osd_on_ip(messenger->get_myaddr());
- local_osd_epoch = osdmap->get_epoch();
- }
- objecter->put_osdmap_read();
+ objecter->with_osdmap([this](const OSDMap& o) {
+ if (o.get_epoch() != local_osd_epoch) {
+ local_osd = o.find_osd_on_ip(messenger->get_myaddr());
+ local_osd_epoch = o.get_epoch();
+ }
+ });
return local_osd;
}
NULL
};
-Mutex *Objecter::OSDSession::get_lock(object_t& oid)
+Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
+ object_t& oid)
{
-#define HASH_PRIME 1021
+ if (oid.name.empty())
+ return unique_completion_lock();
+
+ static constexpr uint32_t HASH_PRIME = 1021;
uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
% HASH_PRIME;
- return completion_locks[h % num_locks];
+ return unique_completion_lock(completion_locks[h % num_locks],
+ std::defer_lock);
}
const char** Objecter::get_tracked_conf_keys() const
void Objecter::update_crush_location()
{
- RWLock::WLocker rwlocker(rwlock);
+ unique_lock wl(rwlock);
std::multimap<string,string> new_crush_location;
vector<string> lvec;
get_str_vec(cct->_conf->crush_location, ";, \t", lvec);
*/
void Objecter::start()
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
start_tick();
if (osdmap->get_epoch() == 0) {
{
assert(initialized.read());
- rwlock.get_write();
+ unique_lock wl(rwlock);
initialized.set(0);
ldout(cct, 10) << " linger_op " << i->first << dendl;
LingerOp *lop = i->second;
{
- RWLock::WLocker wl(homeless_session->lock);
+ OSDSession::unique_lock swl(homeless_session->lock);
_session_linger_op_remove(homeless_session, lop);
}
linger_ops.erase(lop->linger_id);
ldout(cct, 10) << " op " << i->first << dendl;
Op *op = i->second;
{
- RWLock::WLocker wl(homeless_session->lock);
+ OSDSession::unique_lock swl(homeless_session->lock);
_session_op_remove(homeless_session, op);
}
op->put();
ldout(cct, 10) << " command_op " << i->first << dendl;
CommandOp *cop = i->second;
{
- RWLock::WLocker wl(homeless_session->lock);
+ OSDSession::unique_lock swl(homeless_session->lock);
_session_command_op_remove(homeless_session, cop);
}
cop->put();
}
// Let go of Objecter write lock so timer thread can shutdown
- rwlock.unlock();
+ wl.unlock();
}
-void Objecter::_send_linger(LingerOp *info)
+void Objecter::_send_linger(LingerOp *info,
+ shunique_lock& sul)
{
- assert(rwlock.is_wlocked());
-
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+ assert(sul.owns_lock() && sul.mutex() == &rwlock);
vector<OSDOp> opv;
Context *oncommit = NULL;
- info->watch_lock.get_read(); // just to read registered status
+ LingerOp::shared_lock watchl(info->watch_lock);
bufferlist *poutbl = NULL;
if (info->registered && info->is_watch) {
ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
}
oncommit = c;
}
- info->watch_lock.put_read();
+ watchl.unlock();
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
opv, info->target.flags | CEPH_OSD_FLAG_READ,
NULL, NULL,
if (info->register_tid) {
// repeat send. cancel old registeration op, if any.
- info->session->lock.get_write();
+ OSDSession::unique_lock sl(info->session->lock);
if (info->session->ops.count(info->register_tid)) {
Op *o = info->session->ops[info->register_tid];
_op_cancel_map_check(o);
_cancel_linger_op(o);
}
- info->session->lock.unlock();
+ sl.unlock();
- info->register_tid = _op_submit(o, lc);
+ info->register_tid = _op_submit(o, sul);
} else {
// first send
- info->register_tid = _op_submit_with_budget(o, lc);
+ info->register_tid = _op_submit_with_budget(o, sul);
}
logger->inc(l_osdc_linger_send);
void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
{
- RWLock::WLocker wl(info->watch_lock);
+ LingerOp::unique_lock wl(info->watch_lock);
ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
if (info->on_reg_commit) {
info->on_reg_commit->complete(r);
info->_queued_async();
}
void finish(int r) {
- objecter->rwlock.get_read();
+ Objecter::unique_lock wl(objecter->rwlock);
bool canceled = info->canceled;
- objecter->rwlock.put_read();
+ wl.unlock();
if (!canceled) {
info->watch_context->handle_error(info->get_cookie(), err);
ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
<< " (last_error " << info->last_error << ")" << dendl;
if (r < 0) {
- info->watch_lock.get_write();
+ LingerOp::unique_lock wl(info->watch_lock);
if (!info->last_error) {
r = _normalize_watch_error(r);
info->last_error = r;
_linger_callback_queue();
}
}
- info->watch_lock.put_write();
+ wl.unlock();
}
}
void Objecter::_send_linger_ping(LingerOp *info)
{
- assert(rwlock.is_locked());
- assert(info->session->lock.is_locked());
+ // rwlock is locked unique
+ // info->session->lock is locked
if (cct->_conf->objecter_inject_no_watch_ping) {
ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
uint32_t register_gen)
{
- RWLock::WLocker l(info->watch_lock);
+ LingerOp::unique_lock l(info->watch_lock);
ldout(cct, 10) << __func__ << " " << info->linger_id
<< " sent " << sent << " gen " << register_gen << " = " << r
<< " (last_error " << info->last_error
int Objecter::linger_check(LingerOp *info)
{
- RWLock::RLocker l(info->watch_lock);
+ LingerOp::shared_lock l(info->watch_lock);
mono_time stamp = info->watch_valid_thru;
if (!info->watch_pending_async.empty())
void Objecter::linger_cancel(LingerOp *info)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
_linger_cancel(info);
info->put();
}
void Objecter::_linger_cancel(LingerOp *info)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
if (!info->canceled) {
OSDSession *s = info->session;
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
_session_linger_op_remove(s, info);
- s->lock.unlock();
+ sl.unlock();
linger_ops.erase(info->linger_id);
linger_ops_set.erase(info);
info->target.flags = flags;
info->watch_valid_thru = mono_clock::now();
- RWLock::WLocker l(rwlock);
+ unique_lock l(rwlock);
// Acquire linger ID
info->linger_id = ++max_linger_id;
info->pobjver = objver;
info->on_reg_commit = oncommit;
- RWLock::WLocker wl(rwlock);
- _linger_submit(info);
+ shunique_lock sul(rwlock, ceph::acquire_unique);
+ _linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
return info->linger_id;
info->pobjver = objver;
info->on_reg_commit = onfinish;
- RWLock::WLocker wl(rwlock);
- _linger_submit(info);
+ shunique_lock sul(rwlock, ceph::acquire_unique);
+ _linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
return info->linger_id;
}
-void Objecter::_linger_submit(LingerOp *info)
+void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
{
- assert(rwlock.is_wlocked());
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
-
+ assert(sul.owns_lock() && sul.mutex() == &rwlock);
assert(info->linger_id);
// Populate Op::target
_calc_target(&info->target, &info->last_force_resend);
// Create LingerOp<->OSDSession relation
- int r = _get_session(info->target.osd, &s, lc);
+ int r = _get_session(info->target.osd, &s, sul);
assert(r == 0);
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
_session_linger_op_assign(s, info);
- s->lock.unlock();
+ sl.unlock();
put_session(s);
- _send_linger(info);
+ _send_linger(info, sul);
}
struct C_DoWatchNotify : public Context {
void Objecter::handle_watch_notify(MWatchNotify *m)
{
- RWLock::RLocker l(rwlock);
+ shared_lock l(rwlock);
if (!initialized.read()) {
return;
}
ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
return;
}
- RWLock::WLocker wl(info->watch_lock);
+ LingerOp::unique_lock wl(info->watch_lock);
if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
if (!info->last_error) {
info->last_error = -ENOTCONN;
{
ldout(cct, 10) << __func__ << " " << *m << dendl;
- rwlock.get_read();
+ shared_lock l(rwlock);
assert(initialized.read());
if (info->canceled) {
- rwlock.put_read();
+ l.unlock();
goto out;
}
assert(info->watch_context);
assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
- rwlock.put_read();
+ l.unlock();
switch (m->opcode) {
case CEPH_WATCH_EVENT_NOTIFY:
map<int64_t, bool> *pool_full_map,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
- map<ceph_tid_t, CommandOp*>& need_resend_command)
+ map<ceph_tid_t, CommandOp*>& need_resend_command,
+ shunique_lock& sul)
{
- assert(rwlock.is_wlocked());
+ assert(sul.owns_lock() && sul.mutex() == &rwlock);
list<LingerOp*> unregister_lingers;
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
-
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
// check for changed linger mappings (_before_ regular ops)
map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
++lp;
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
bool unregister, force_resend_writes = cluster_full;
- int r = _recalc_linger_op_target(op, lc);
+ int r = _recalc_linger_op_target(op, sul);
if (pool_full_map)
force_resend_writes = force_resend_writes ||
(*pool_full_map)[op->target.base_oloc.pool];
_op_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
- _check_op_pool_dne(op, true);
+ _check_op_pool_dne(op, sl);
break;
}
}
if (pool_full_map)
force_resend_writes = force_resend_writes ||
(*pool_full_map)[c->target_pg.pool()];
- int r = _calc_command_target(c);
+ int r = _calc_command_target(c, sul);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
// resend if skipped map; otherwise do nothing.
}
}
- s->lock.unlock();
+ sl.unlock();
for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
iter != unregister_lingers.end();
void Objecter::handle_osd_map(MOSDMap *m)
{
- RWLock::WLocker wl(rwlock);
+ shunique_lock sul(rwlock, acquire_unique);
if (!initialized.read())
return;
update_pool_full_map(pool_full_map);
_scan_requests(homeless_session, skipped_map, cluster_full,
&pool_full_map, need_resend,
- need_resend_linger, need_resend_command);
+ need_resend_linger, need_resend_command, sul);
// osd addr changes?
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
OSDSession *s = p->second;
_scan_requests(s, skipped_map, cluster_full,
&pool_full_map, need_resend,
- need_resend_linger, need_resend_command);
+ need_resend_linger, need_resend_command, sul);
++p;
if (!osdmap->is_up(s->osd) ||
(s->con &&
p != osd_sessions.end(); ++p) {
OSDSession *s = p->second;
_scan_requests(s, false, false, NULL, need_resend,
- need_resend_linger, need_resend_command);
+ need_resend_linger, need_resend_command, sul);
}
ldout(cct, 3) << "handle_osd_map decoding full epoch "
<< m->get_last() << dendl;
_scan_requests(homeless_session, false, false, NULL,
need_resend, need_resend_linger,
- need_resend_command);
+ need_resend_command, sul);
} else {
ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
<< dendl;
_maybe_request_map();
}
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
-
// resend requests
for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
p != need_resend.end(); ++p) {
OSDSession *s = op->session;
bool mapped_session = false;
if (!s) {
- int r = _map_session(&op->target, &s, lc);
+ int r = _map_session(&op->target, &s, sul);
assert(r == 0);
mapped_session = true;
} else {
get_session(s);
}
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
if (mapped_session) {
_session_op_assign(s, op);
}
_op_cancel_map_check(op);
_cancel_linger_op(op);
}
- s->lock.unlock();
+ sl.unlock();
put_session(s);
}
for (list<LingerOp*>::iterator p = need_resend_linger.begin();
if (!op->session) {
_calc_target(&op->target, &op->last_force_resend);
OSDSession *s = NULL;
- int const r = _get_session(op->target.osd, &s, lc);
+ int const r = _get_session(op->target.osd, &s, sul);
assert(r == 0);
assert(s != NULL);
op->session = s;
}
if (!op->session->is_homeless()) {
logger->inc(l_osdc_linger_resend);
- _send_linger(op);
+ _send_linger(op, sul);
}
}
for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
p != need_resend_command.end(); ++p) {
CommandOp *c = p->second;
- _assign_command_session(c);
+ _assign_command_session(c, sul);
if (c->session && !c->session->is_homeless()) {
_send_command(c);
}
<< "op_map_latest r=" << r << " tid=" << tid
<< " latest " << latest << dendl;
- RWLock::WLocker wl(objecter->rwlock);
+ Objecter::unique_lock wl(objecter->rwlock);
map<ceph_tid_t, Op*>::iterator iter =
objecter->check_latest_map_ops.find(tid);
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;
- objecter->_check_op_pool_dne(op, false);
+ OSDSession::unique_lock sl(op->session->lock, defer_lock);
+ objecter->_check_op_pool_dne(op, sl);
op->put();
}
int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
snapid_t *snap)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
const map<int64_t, pg_pool_t>& pools = osdmap->get_pools();
map<int64_t, pg_pool_t>::const_iterator iter = pools.find(poolid);
int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
pool_snap_info_t *info)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
const map<int64_t, pg_pool_t>& pools = osdmap->get_pools();
map<int64_t, pg_pool_t>::const_iterator iter = pools.find(poolid);
int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
if (!pi)
return 0;
}
-void Objecter::_check_op_pool_dne(Op *op, bool session_locked)
+// sl may be unlocked.
+void Objecter::_check_op_pool_dne(Op *op, unique_lock& sl)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
if (op->attempts) {
// we send a reply earlier, which means that previously the pool
OSDSession *s = op->session;
assert(s != NULL);
+ assert(sl.mutex() == &s->lock);
+ bool session_locked = sl.owns_lock();
if (!session_locked) {
- s->lock.get_write();
+ sl.lock();
}
_finish_op(op, 0);
if (!session_locked) {
- s->lock.unlock();
+ sl.unlock();
}
}
} else {
void Objecter::_send_op_map_check(Op *op)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
// ask the monitor
if (check_latest_map_ops.count(op->tid) == 0) {
op->get();
void Objecter::_op_cancel_map_check(Op *op)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
map<ceph_tid_t, Op*>::iterator iter =
check_latest_map_ops.find(op->tid);
if (iter != check_latest_map_ops.end()) {
return;
}
- RWLock::WLocker wl(objecter->rwlock);
+ unique_lock wl(objecter->rwlock);
map<uint64_t, LingerOp*>::iterator iter =
objecter->check_latest_map_lingers.find(linger_id);
void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
*need_unregister = false;
void Objecter::_linger_cancel_map_check(LingerOp *op)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
map<uint64_t, LingerOp*>::iterator iter =
check_latest_map_lingers.find(op->linger_id);
return;
}
- RWLock::WLocker wl(objecter->rwlock);
+ unique_lock wl(objecter->rwlock);
map<uint64_t, CommandOp*>::iterator iter =
objecter->check_latest_map_commands.find(tid);
void Objecter::_check_command_map_dne(CommandOp *c)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
<< " current " << osdmap->get_epoch()
void Objecter::_send_command_map_check(CommandOp *c)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
// ask the monitor
if (check_latest_map_commands.count(c->tid) == 0) {
void Objecter::_command_cancel_map_check(CommandOp *c)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked uniqe
map<uint64_t, CommandOp*>::iterator iter =
check_latest_map_commands.find(c->tid);
* @returns 0 on success, or -EAGAIN if the lock context requires
* promotion to write.
*/
-int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
+int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
{
- assert(rwlock.is_locked());
+ assert(sul && sul.mutex() == &rwlock);
if (osd < 0) {
*session = homeless_session;
<< s->get_nref() << dendl;
return 0;
}
- if (!lc.is_wlocked()) {
+ if (!sul.owns_lock()) {
return -EAGAIN;
}
OSDSession *s = new OSDSession(cct, osd);
void Objecter::_reopen_session(OSDSession *s)
{
- assert(s->lock.is_locked());
+ // s->lock is locked
entity_inst_t inst = osdmap->get_inst(s->osd);
ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
void Objecter::close_session(OSDSession *s)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
if (s->con) {
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
std::list<LingerOp*> homeless_lingers;
std::list<CommandOp*> homeless_commands;
}
osd_sessions.erase(s->osd);
- s->lock.unlock();
+ sl.unlock();
put_session(s);
// Assign any leftover ops to the homeless session
{
- RWLock::WLocker wl(homeless_session->lock);
+ OSDSession::unique_lock hsl(homeless_session->lock);
for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
i != homeless_lingers.end(); ++i) {
_session_linger_op_assign(homeless_session, *i);
void Objecter::wait_for_osd_map()
{
- rwlock.get_write();
+ unique_lock l(rwlock);
if (osdmap->get_epoch()) {
- rwlock.put_write();
+ l.unlock();
return;
}
+ // Leave this since it goes with C_SafeCond
Mutex lock("");
Cond cond;
bool done;
lock.Lock();
C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
- rwlock.put_write();
+ l.unlock();
while (!done)
cond.Wait(lock);
lock.Unlock();
void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
_get_latest_version(oldest, newest, fin);
}
void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
Context *fin)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
if (osdmap->get_epoch() >= newest) {
ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
if (fin)
void Objecter::maybe_request_map()
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
_maybe_request_map();
}
void Objecter::_maybe_request_map()
{
- assert(rwlock.is_locked());
+ // rwlock is locked
int flag = 0;
if (_osdmap_full_flag()
|| osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
_maybe_request_map();
}
*/
bool Objecter::have_map(const epoch_t epoch)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
if (osdmap->get_epoch() >= epoch) {
return true;
} else {
bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
if (osdmap->get_epoch() >= epoch) {
return true;
}
ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
map<uint64_t, LingerOp *> lresend;
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
- session->lock.get_write();
+ OSDSession::unique_lock sl(session->lock);
_kick_requests(session, lresend);
- session->lock.unlock();
+ sl.unlock();
- _linger_ops_resend(lresend);
+ _linger_ops_resend(lresend, wl);
}
void Objecter::_kick_requests(OSDSession *session,
map<uint64_t, LingerOp *>& lresend)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
// resend ops
map<ceph_tid_t,Op*> resend; // resend in tid order
}
}
-void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend)
+void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
+ unique_lock& ul)
{
- assert(rwlock.is_wlocked());
-
+ assert(ul.owns_lock());
+ shunique_lock sul(std::move(ul));
while (!lresend.empty()) {
LingerOp *op = lresend.begin()->second;
if (!op->canceled) {
- _send_linger(op);
+ _send_linger(op, sul);
}
op->put();
lresend.erase(lresend.begin());
}
+ ul = unique_lock(sul.release_to_unique());
}
void Objecter::start_tick()
void Objecter::tick()
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
ldout(cct, 10) << "tick" << dendl;
for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- RWLock::WLocker l(s->lock);
+ OSDSession::lock_guard l(s->lock);
bool found = false;
for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
p != s->ops.end();
p != s->linger_ops.end();
++p) {
LingerOp *op = p->second;
- RWLock::WLocker wl(op->watch_lock);
+ LingerOp::unique_lock wl(op->watch_lock);
assert(op->session);
ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
void Objecter::resend_mon_ops()
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "resend_mon_ops" << dendl;
ceph_tid_t Objecter::op_submit(Op *op, int *ctx_budget)
{
- RWLock::RLocker rl(rwlock);
- RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
- return _op_submit_with_budget(op, lc, ctx_budget);
+ shunique_lock rl(rwlock, ceph::acquire_shared);
+ return _op_submit_with_budget(op, rl, ctx_budget);
}
-ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc,
+ceph_tid_t Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
int *ctx_budget)
{
assert(initialized.read());
// throttle. before we look at any state, because
// _take_op_budget() may drop our lock while it blocks.
if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
- int op_budget = _take_op_budget(op);
+ int op_budget = _take_op_budget(op, sul);
// take and pass out the budget for the first OP
// in the context session
if (ctx_budget && (*ctx_budget == -1)) {
op_cancel(tid, -ETIMEDOUT); });
}
- return _op_submit(op, lc);
+ return _op_submit(op, sul);
}
void Objecter::_send_op_account(Op *op)
}
}
-ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
+ceph_tid_t Objecter::_op_submit(Op *op, shunique_lock& sul)
{
- assert(rwlock.is_locked());
+ // rwlock is locked
ldout(cct, 10) << __func__ << " op " << op << dendl;
== RECALC_OP_TARGET_POOL_DNE;
// Try to get a session, including a retry if we need to take write lock
- int r = _get_session(op->target.osd, &s, lc);
+ int r = _get_session(op->target.osd, &s, sul);
if (r == -EAGAIN) {
assert(s == NULL);
- lc.promote();
- r = _get_session(op->target.osd, &s, lc);
+ sul.unlock();
+ sul.lock();
+ r = _get_session(op->target.osd, &s, sul);
}
assert(r == 0);
assert(s); // may be homeless
// We may need to take wlock if we will need to _set_op_map_check later.
- if (check_for_latest_map && !lc.is_wlocked()) {
- lc.promote();
+ if (check_for_latest_map && sul.owns_lock_shared()) {
+ sul.unlock();
+ sul.lock();
}
_send_op_account(op);
m = _prepare_osd_op(op);
}
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
if (op->tid == 0)
op->tid = last_tid.inc();
_session_op_assign(s, op);
}
op = NULL;
- s->lock.unlock();
+ sl.unlock();
put_session(s);
ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read()
{
assert(initialized.read());
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
if (p == s->ops.end()) {
}
_op_cancel_map_check(op);
_finish_op(op, r);
- s->lock.unlock();
+ sl.unlock();
return 0;
}
{
int ret = 0;
- rwlock.get_write();
+ unique_lock wl(rwlock);
ret = _op_cancel(tid, r);
- rwlock.unlock();
return ret;
}
for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
if (s->ops.find(tid) != s->ops.end()) {
- s->lock.unlock();
+ sl.unlock();
ret = op_cancel(s, tid, r);
if (ret == -ENOENT) {
/* oh no! raced, maybe tid moved to another session, restarting */
}
return ret;
}
- s->lock.unlock();
}
ldout(cct, 5) << __func__ << ": tid " << tid
<< " not found in live sessions" << dendl;
// Handle case where the op is in homeless session
- homeless_session->lock.get_read();
+ OSDSession::shared_lock sl(homeless_session->lock);
if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
- homeless_session->lock.unlock();
+ sl.unlock();
ret = op_cancel(homeless_session, tid, r);
if (ret == -ENOENT) {
/* oh no! raced, maybe tid moved to another session, restarting */
return ret;
}
} else {
- homeless_session->lock.unlock();
+ sl.unlock();
}
ldout(cct, 5) << __func__ << ": tid " << tid
epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
{
- rwlock.get_write();
+ unique_lock wl(rwlock);
std::vector<ceph_tid_t> to_cancel;
bool found = false;
for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
op_i != s->ops.end(); ++op_i) {
if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
to_cancel.push_back(op_i->first);
}
}
- s->lock.unlock();
+ sl.unlock();
for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
titer != to_cancel.end();
const epoch_t epoch = osdmap->get_epoch();
- rwlock.unlock();
+ wl.unlock();
if (found) {
return epoch;
*/
bool Objecter::osdmap_full_flag() const
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
return _osdmap_full_flag();
}
bool Objecter::osdmap_pool_full(const int64_t pool_id) const
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
if (_osdmap_full_flag()) {
return true;
int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
const string& ns)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
const pg_pool_t *p = osdmap->get_pg_pool(pool);
if (!p)
return -ENOENT;
int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
const string& ns)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
const pg_pool_t *p = osdmap->get_pg_pool(pool);
if (!p)
return -ENOENT;
int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend,
bool any_change)
{
- assert(rwlock.is_locked());
+ // rwlock is locked
bool is_read = t->flags & CEPH_OSD_FLAG_READ;
bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
}
int Objecter::_map_session(op_target_t *target, OSDSession **s,
- RWLock::Context& lc)
+ shunique_lock& sul)
{
_calc_target(target);
- return _get_session(target->osd, s, lc);
+ return _get_session(target->osd, s, sul);
}
void Objecter::_session_op_assign(OSDSession *to, Op *op)
{
- assert(to->lock.is_locked());
+ // to->lock is locked
assert(op->session == NULL);
assert(op->tid);
void Objecter::_session_op_remove(OSDSession *from, Op *op)
{
assert(op->session == from);
- assert(from->lock.is_locked());
+ // from->lock is locked
if (from->is_homeless()) {
num_homeless_ops.dec();
void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
{
- assert(to->lock.is_wlocked());
+ // to lock is locked unique
assert(op->session == NULL);
if (to->is_homeless()) {
void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
{
assert(from == op->session);
- assert(from->lock.is_wlocked());
+ // from->lock is locked unique
if (from->is_homeless()) {
num_homeless_ops.dec();
void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
{
assert(from == op->session);
- assert(from->lock.is_locked());
+ // from->lock is locked
if (from->is_homeless()) {
num_homeless_ops.dec();
void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
{
- assert(to->lock.is_locked());
+ // to->lock is locked
assert(op->session == NULL);
assert(op->tid);
}
int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
- RWLock::Context& lc)
+ shunique_lock& sul)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
int r = _calc_target(&linger_op->target, &linger_op->last_force_resend,
true);
<< " acting " << linger_op->target.acting << dendl;
OSDSession *s = NULL;
- r = _get_session(linger_op->target.osd, &s, lc);
+ r = _get_session(linger_op->target.osd, &s, sul);
assert(r == 0);
if (linger_op->session != s) {
// same time here is only safe because we are the only one that
// takes two, and we are holding rwlock for write. Disable
// lockdep because it doesn't know that.
- s->lock.get_write(false);
+ OSDSession::unique_lock sl(s->lock);
_session_linger_op_remove(linger_op->session, linger_op);
_session_linger_op_assign(s, linger_op);
- s->lock.unlock(false);
}
put_session(s);
{
ldout(cct, 15) << "finish_op " << op->tid << dendl;
- assert(op->session->lock.is_wlocked());
+ // op->session->lock is locked unique
if (!op->ctx_budgeted && op->budgeted)
put_op_budget(op);
void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
{
ldout(cct, 15) << "finish_op " << tid << dendl;
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
- RWLock::WLocker wl(session->lock);
+ OSDSession::unique_lock wl(session->lock);
map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
if (iter == session->ops.end())
MOSDOp *Objecter::_prepare_osd_op(Op *op)
{
- assert(rwlock.is_locked());
+ // rwlock is locked
int flags = op->target.flags;
flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
void Objecter::_send_op(Op *op, MOSDOp *m)
{
- assert(rwlock.is_locked());
- assert(op->session->lock.is_locked());
+ // rwlock is locked
+ // op->session->lock is locked
if (!m) {
assert(op->tid > 0);
return op_budget;
}
-void Objecter::_throttle_op(Op *op, int op_budget)
+void Objecter::_throttle_op(Op *op,
+ shunique_lock& sul,
+ int op_budget)
{
- assert(rwlock.is_locked());
-
- bool locked_for_write = rwlock.is_wlocked();
+ assert(sul && sul.mutex() == &rwlock);
+ bool locked_for_write = sul.owns_lock();
if (!op_budget)
op_budget = calc_op_budget(op);
if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
- rwlock.unlock();
+ sul.unlock();
op_throttle_bytes.get(op_budget);
- rwlock.get(locked_for_write);
+ if (locked_for_write)
+ sul.lock();
+ else
+ sul.lock_shared();
}
if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
- rwlock.unlock();
+ sul.unlock();
op_throttle_ops.get(1);
- rwlock.get(locked_for_write);
+ if (locked_for_write)
+ sul.lock();
+ else
+ sul.lock_shared();
}
}
void Objecter::unregister_op(Op *op)
{
- op->session->lock.get_write();
+ OSDSession::unique_lock sl(op->session->lock);
op->session->ops.erase(op->tid);
- op->session->lock.unlock();
+ sl.unlock();
put_session(op->session);
op->session = NULL;
int osd_num = (int)m->get_source().num();
- RWLock::RLocker l(rwlock);
+ shunique_lock sul(rwlock, ceph::acquire_shared);
if (!initialized.read()) {
m->put();
return;
}
- RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
if (siter == osd_sessions.end()) {
OSDSession *s = siter->second;
get_session(s);
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
if (iter == s->ops.end()) {
<< (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
" onnvram" : " ack"))
<< " ... stray" << dendl;
- s->lock.unlock();
+ sl.unlock();
put_session(s);
m->put();
return;
<< "; last attempt " << (op->attempts - 1) << " sent to "
<< op->session->con->get_peer_addr() << dendl;
m->put();
- s->lock.unlock();
+ sl.unlock();
put_session(s);
return;
}
if (op->oncommit || op->oncommit_sync)
num_uncommitted.dec();
_session_op_remove(s, op);
- s->lock.unlock();
+ sl.unlock();
put_session(s);
// FIXME: two redirects could race and reorder
m->get_redirect().combine_with_locator(op->target.target_oloc,
op->target.target_oid.name);
op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
- _op_submit(op, lc);
+ _op_submit(op, sul);
m->put();
return;
}
op->tid = last_tid.inc();
_send_op(op);
- s->lock.unlock();
+ sl.unlock();
put_session(s);
m->put();
return;
}
- l.unlock();
- lc.set_state(RWLock::Context::Untaken);
+ sul.unlock();
if (op->objver)
*op->objver = m->get_user_version();
}
/* get it before we call _finish_op() */
- Mutex *completion_lock = (op->target.base_oid.name.size() ?
- s->get_lock(op->target.base_oid) : NULL);
+ auto completion_lock =
+ (op->target.base_oid.name.size() ? s->get_lock(op->target.base_oid) :
+ OSDSession::unique_completion_lock());
// done with this tid?
if (!op->onack && !op->oncommit && !op->oncommit_sync) {
<< " uncommitted" << dendl;
// serialize completions
- if (completion_lock) {
- completion_lock->Lock();
+ if (completion_lock.mutex()) {
+ completion_lock.lock();
}
- s->lock.unlock();
+ sl.unlock();
// do callbacks
if (onack) {
if (oncommit) {
oncommit->complete(rc);
}
- if (completion_lock) {
- completion_lock->Unlock();
+ if (completion_lock.mutex()) {
+ completion_lock.unlock();
}
m->put();
uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
- uint32_t pos)
+ uint32_t pos)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
ldout(cct, 10) << "list_objects_seek " << list_context
<< " pos " << pos << " -> " << actual << dendl;
return;
}
- rwlock.get_read();
+ shared_lock rl(rwlock);
const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
if (!pool) { // pool is gone
- rwlock.unlock();
+ rl.unlock();
put_nlist_context_budget(list_context);
onfinish->complete(-ENOENT);
return;
}
int pg_num = pool->get_pg_num();
- rwlock.unlock();
+ rl.unlock();
if (list_context->starting_pg_num == 0) { // there can't be zero pgs!
list_context->starting_pg_num = pg_num;
}
void Objecter::_nlist_reply(NListContext *list_context, int r,
- Context *final_finish, epoch_t reply_epoch)
+ Context *final_finish, epoch_t reply_epoch)
{
ldout(cct, 10) << "_list_reply" << dendl;
uint32_t Objecter::list_objects_seek(ListContext *list_context,
uint32_t pos)
{
- RWLock::RLocker rl(rwlock);
+ shared_lock rl(rwlock);
pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
ldout(cct, 10) << "list_objects_seek " << list_context
<< " pos " << pos << " -> " << actual << dendl;
return;
}
- rwlock.get_read();
+ shared_lock rl(rwlock);
const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
if (!pool) { // pool is gone
- rwlock.unlock();
+ rl.unlock();
put_list_context_budget(list_context);
onfinish->complete(-ENOENT);
return;
}
int pg_num = pool->get_pg_num();
- rwlock.unlock();
+ rl.unlock();
if (list_context->starting_pg_num == 0) { // there can't be zero pgs!
list_context->starting_pg_num = pg_num;
int Objecter::create_pool_snap(int64_t pool, string& snap_name,
Context *onfinish)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
<< snap_name << dendl;
int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
Context *onfinish)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
Context *onfinish)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
<< snap_name << dendl;
int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
Context *onfinish)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
<< snap << dendl;
PoolOp *op = new PoolOp;
int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
int crush_rule)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "create_pool name=" << name << dendl;
if (osdmap->lookup_pg_pool_name(name.c_str()) >= 0)
int Objecter::delete_pool(int64_t pool, Context *onfinish)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "delete_pool " << pool << dendl;
if (!osdmap->have_pg_pool(pool))
int Objecter::delete_pool(const string &pool_name, Context *onfinish)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "delete_pool " << pool_name << dendl;
int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
*/
int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
void Objecter::pool_op_submit(PoolOp *op)
{
- assert(rwlock.is_locked());
+ // rwlock is locked
if (mon_timeout > timespan(0)) {
op->ontimeout = timer.add_event(mon_timeout,
[this, op]() {
void Objecter::_pool_op_submit(PoolOp *op)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
*/
void Objecter::handle_pool_op_reply(MPoolOpReply *m)
{
- rwlock.get_read();
+ shunique_lock sul(rwlock, acquire_shared);
if (!initialized.read()) {
- rwlock.put_read();
+ sul.unlock();
m->put();
return;
}
if (m->version > last_seen_osdmap_version)
last_seen_osdmap_version = m->version;
if (osdmap->get_epoch() < m->epoch) {
- rwlock.unlock();
- rwlock.get_write();
+ sul.unlock();
+ sul.lock();
// recheck op existence since we have let go of rwlock
// (for promotion) above.
iter = pool_ops.find(tid);
assert(op->onfinish);
op->onfinish->complete(m->replyCode);
}
- }
- else {
+ } else {
assert(op->onfinish);
op->onfinish->complete(m->replyCode);
}
op->onfinish = NULL;
- if (!rwlock.is_wlocked()) {
- rwlock.unlock();
- rwlock.get_write();
+ if (!sul.owns_lock()) {
+ sul.unlock();
+ sul.lock();
}
iter = pool_ops.find(tid);
if (iter != pool_ops.end()) {
}
done:
- rwlock.unlock();
+ // Not strictly necessary, since we'll release it on return.
+ sul.unlock();
ldout(cct, 10) << "done" << dendl;
m->put();
{
assert(initialized.read());
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
if (it == pool_ops.end()) {
void Objecter::_finish_pool_op(PoolOp *op, int r)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
pool_ops.erase(op->tid);
logger->set(l_osdc_poolop_active, pool_ops.size());
op->ontimeout = 0;
}
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
poolstat_ops[op->tid] = op;
ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
ceph_tid_t tid = m->get_tid();
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
if (!initialized.read()) {
m->put();
return;
{
assert(initialized.read());
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
if (it == poolstat_ops.end()) {
void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
poolstat_ops.erase(op->tid);
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
{
ldout(cct, 10) << "get_fs_stats" << dendl;
- RWLock::WLocker l(rwlock);
+ unique_lock l(rwlock);
StatfsOp *op = new StatfsOp;
op->tid = last_tid.inc();
void Objecter::_fs_stats_submit(StatfsOp *op)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
void Objecter::handle_fs_stats_reply(MStatfsReply *m)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
if (!initialized.read()) {
m->put();
return;
{
assert(initialized.read());
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
if (it == statfs_ops.end()) {
void Objecter::_finish_statfs_op(StatfsOp *op, int r)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
statfs_ops.erase(op->tid);
logger->set(l_osdc_statfs_active, statfs_ops.size());
int osd = osdmap->identify_osd(con->get_peer_addr());
if (osd >= 0) {
ldout(cct, 1) << "ms_handle_reset on osd." << osd << dendl;
- rwlock.get_write();
+ unique_lock wl(rwlock);
if (!initialized.read()) {
- rwlock.put_write();
+ wl.unlock();
return false;
}
map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
if (p != osd_sessions.end()) {
OSDSession *session = p->second;
map<uint64_t, LingerOp *> lresend;
- session->lock.get_write();
+ OSDSession::unique_lock sl(session->lock);
_reopen_session(session);
_kick_requests(session, lresend);
- session->lock.unlock();
- _linger_ops_resend(lresend);
- rwlock.unlock();
+ sl.unlock();
+ _linger_ops_resend(lresend, wl);
+ wl.unlock();
maybe_request_map();
} else {
- rwlock.unlock();
+ wl.unlock();
}
} else {
ldout(cct, 10) << "ms_handle_reset on unknown osd addr "
for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
_dump_active(s);
- s->lock.unlock();
+ sl.unlock();
}
_dump_active(homeless_session);
}
void Objecter::dump_active()
{
- rwlock.get_read();
+ shared_lock rl(rwlock);
_dump_active();
- rwlock.unlock();
+ rl.unlock();
}
void Objecter::dump_requests(Formatter *fmt)
{
+ // Read-lock on Objecter held here
fmt->open_object_section("requests");
dump_ops(fmt);
dump_linger_ops(fmt);
void Objecter::dump_ops(Formatter *fmt)
{
+ // Read-lock on Objecter held
fmt->open_array_section("ops");
for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
_dump_ops(s, fmt);
- s->lock.unlock();
+ sl.unlock();
}
_dump_ops(homeless_session, fmt);
fmt->close_section(); // ops array
void Objecter::dump_linger_ops(Formatter *fmt)
{
+ // We have a read-lock on the objecter
fmt->open_array_section("linger_ops");
for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
_dump_linger_ops(s, fmt);
- s->lock.unlock();
+ sl.unlock();
}
_dump_linger_ops(homeless_session, fmt);
fmt->close_section(); // linger_ops array
void Objecter::dump_command_ops(Formatter *fmt)
{
+ // We have a read-lock on the Objecter here
fmt->open_array_section("command_ops");
for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
_dump_command_ops(s, fmt);
- s->lock.unlock();
+ sl.unlock();
}
_dump_command_ops(homeless_session, fmt);
fmt->close_section(); // command_ops array
std::string format, bufferlist& out)
{
Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
- RWLock::RLocker rl(m_objecter->rwlock);
+ shared_lock rl(m_objecter->rwlock);
m_objecter->dump_requests(f);
f->flush(out);
delete f;
{
int osd_num = (int)m->get_source().num();
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
if (!initialized.read()) {
m->put();
return;
OSDSession *s = siter->second;
- s->lock.get_read();
+ OSDSession::shared_lock sl(s->lock);
map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
if (p == s->command_ops.end()) {
ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
<< " not found" << dendl;
m->put();
- s->lock.unlock();
+ sl.unlock();
return;
}
<< m->get_connection() << " " << m->get_source_inst()
<< dendl;
m->put();
- s->lock.unlock();
+ sl.unlock();
return;
}
if (c->poutbl)
c->poutbl->claim(m->get_data());
- s->lock.unlock();
+ sl.unlock();
_finish_command(c, m->r, m->rs);
int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
{
- RWLock::WLocker wl(rwlock);
-
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+ shunique_lock sul(rwlock, ceph::acquire_unique);
ceph_tid_t tid = last_tid.inc();
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
{
- RWLock::WLocker hs_wl(homeless_session->lock);
- _session_command_op_assign(homeless_session, c);
+ OSDSession::unique_lock hs_wl(homeless_session->lock);
+ _session_command_op_assign(homeless_session, c);
}
- (void)_calc_command_target(c);
- _assign_command_session(c);
+ _calc_command_target(c, sul);
+ _assign_command_session(c, sul);
if (osd_timeout > timespan(0)) {
c->ontimeout = timer.add_event(osd_timeout,
[this, c, tid]() {
return 0;
}
-int Objecter::_calc_command_target(CommandOp *c)
+int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
{
- assert(rwlock.is_wlocked());
-
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+ assert(sul.owns_lock() && sul.mutex() == &rwlock);
c->map_check_error = 0;
}
OSDSession *s;
- int r = _get_session(c->osd, &s, lc);
+ int r = _get_session(c->osd, &s, sul);
assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
return RECALC_OP_TARGET_NO_ACTION;
}
-void Objecter::_assign_command_session(CommandOp *c)
+void Objecter::_assign_command_session(CommandOp *c,
+ shunique_lock& sul)
{
- assert(rwlock.is_wlocked());
-
- RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+ assert(sul.owns_lock() && sul.mutex() == &rwlock);
OSDSession *s;
- int r = _get_session(c->osd, &s, lc);
+ int r = _get_session(c->osd, &s, sul);
assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
if (c->session) {
OSDSession *cs = c->session;
- cs->lock.get_write();
+ OSDSession::unique_lock csl(cs->lock);
_session_command_op_remove(c->session, c);
- cs->lock.unlock();
+ csl.unlock();
}
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
_session_command_op_assign(s, c);
- s->lock.unlock();
}
put_session(s);
{
assert(initialized.read());
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
if (it == s->command_ops.end()) {
void Objecter::_finish_command(CommandOp *c, int r, string rs)
{
- assert(rwlock.is_wlocked());
+ // rwlock is locked unique
ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
<< rs << dendl;
timer.cancel_event(c->ontimeout);
OSDSession *s = c->session;
- s->lock.get_write();
+ OSDSession::unique_lock sl(s->lock);
_session_command_op_remove(c->session, c);
- s->lock.unlock();
+ sl.unlock();
c->put();
assert(ops.empty());
assert(linger_ops.empty());
assert(command_ops.empty());
-
- for (int i = 0; i < num_locks; i++) {
- delete completion_locks[i];
- }
- delete[] completion_locks;
}
Objecter::~Objecter()
*/
void Objecter::set_epoch_barrier(epoch_t epoch)
{
- RWLock::WLocker wl(rwlock);
+ unique_lock wl(rwlock);
ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
<< epoch_barrier << ") current epoch " << osdmap->get_epoch()
void finish(int r) {
objecter->_enumerate_reply(
- bl, r, end, pool_id, budget, epoch, result, next, on_finish);
+ bl, r, end, pool_id, budget, epoch, result, next, on_finish);
}
};
const hobject_t &start,
const hobject_t &end,
const uint32_t max,
- std::list<librados::ListObjectImpl> *result,
+ std::list<librados::ListObjectImpl> *result,
hobject_t *next,
Context *on_finish)
{
return;
}
- rwlock.get_read();
+ shared_lock rl(rwlock);
assert(osdmap->get_epoch());
if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
- rwlock.unlock();
+ rl.unlock();
lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
on_finish->complete(-EOPNOTSUPP);
return;
}
const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
if (!p) {
- lderr(cct) << __func__ << ": pool " << pool_id << " DNE in"
- "osd epoch " << osdmap->get_epoch() << dendl;
- rwlock.unlock();
+ lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
+ << osdmap->get_epoch() << dendl;
+ rl.unlock();
on_finish->complete(-ENOENT);
return;
} else {
- rwlock.unlock();
+ rl.unlock();
}
ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
const int64_t pool_id,
int budget,
epoch_t reply_epoch,
- std::list<librados::ListObjectImpl> *result,
+ std::list<librados::ListObjectImpl> *result,
hobject_t *next,
Context *on_finish)
{
<< " handle " << response.handle
<< " reply_epoch " << reply_epoch << dendl;
ldout(cct, 20) << __func__ << ": response.entries.size "
- << response.entries.size() << ", response.entries "
- << response.entries << dendl;
+ << response.entries.size() << ", response.entries "
+ << response.entries << dendl;
if (cmp_bitwise(response.handle, end) <= 0) {
*next = response.handle;
} else {
- ldout(cct, 10) << __func__ << ": adjusted next down to end " << end << dendl;
+ ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
+ << dendl;
*next = end;
// drop anything after 'end'
- rwlock.get_read();
+ shared_lock rl(rwlock);
const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
if (!pool) {
// pool is gone, drop any results which are now meaningless.
- rwlock.put_read();
+ rl.unlock();
on_finish->complete(-ENOENT);
return;
}
<< " >= end " << end << dendl;
response.entries.pop_back();
}
- rwlock.put_read();
+ rl.unlock();
}
if (!response.entries.empty()) {
result->merge(response.entries);
#ifndef CEPH_OBJECTER_H
#define CEPH_OBJECTER_H
+#include <condition_variable>
#include <list>
#include <map>
+#include <mutex>
#include <memory>
#include <sstream>
+#include <type_traits>
-#include "include/types.h"
+#include <boost/thread/shared_mutex.hpp>
+
+#include "include/assert.h"
#include "include/buffer.h"
+#include "include/types.h"
#include "include/rados/rados_types.hpp"
#include "common/admin_socket.h"
#include "common/ceph_time.h"
#include "common/ceph_timer.h"
-#include "common/RWLock.h"
+#include "common/shunique_lock.h"
#include "messages/MOSDOp.h"
#include "osd/OSDMap.h"
version_t last_seen_osdmap_version;
version_t last_seen_pgmap_version;
- RWLock rwlock;
+ mutable boost::shared_mutex rwlock;
+ using lock_guard = std::unique_lock<decltype(rwlock)>;
+ using unique_lock = std::unique_lock<decltype(rwlock)>;
+ using shared_lock = boost::shared_lock<decltype(rwlock)>;
+ using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
ceph::timer<ceph::mono_clock> timer;
PerfCounters *logger;
};
int submit_command(CommandOp *c, ceph_tid_t *ptid);
- int _calc_command_target(CommandOp *c);
- void _assign_command_session(CommandOp *c);
+ int _calc_command_target(CommandOp *c, shunique_lock &sul);
+ void _assign_command_session(CommandOp *c, shunique_lock &sul);
void _send_command(CommandOp *c);
int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
void _finish_command(CommandOp *c, int r, string rs);
bool is_watch;
ceph::mono_time watch_valid_thru; ///< send time for last acked ping
int last_error; ///< error from last failed ping|reconnect, if any
- RWLock watch_lock;
+ boost::shared_mutex watch_lock;
+ using lock_guard = std::unique_lock<decltype(watch_lock)>;
+ using unique_lock = std::unique_lock<decltype(watch_lock)>;
+ using shared_lock = boost::shared_lock<decltype(watch_lock)>;
+ using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
// queue of pending async operations, with the timestamp of
// when they were queued.
epoch_t last_force_resend;
void _queued_async() {
- assert(watch_lock.is_locked());
+ // watch_lock ust be locked unique
watch_pending_async.push_back(ceph::mono_clock::now());
}
void finished_async() {
- RWLock::WLocker l(watch_lock);
+ unique_lock l(watch_lock);
assert(!watch_pending_async.empty());
watch_pending_async.pop_front();
}
target(object_t(), object_locator_t(), 0),
snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
is_watch(false), last_error(0),
- watch_lock("Objecter::LingerOp::watch_lock"),
register_gen(0),
registered(false),
canceled(false),
// -- osd sessions --
struct OSDSession : public RefCountedObject {
- RWLock lock;
- Mutex **completion_locks;
+ boost::shared_mutex lock;
+ using lock_guard = std::lock_guard<decltype(lock)>;
+ using unique_lock = std::unique_lock<decltype(lock)>;
+ using shared_lock = boost::shared_lock<decltype(lock)>;
+ using shunique_lcok = ceph::shunique_lock<decltype(lock)>;
// pending ops
map<ceph_tid_t,Op*> ops;
int osd;
int incarnation;
- int num_locks;
ConnectionRef con;
+ int num_locks;
+ std::unique_ptr<std::mutex[]> completion_locks;
+ using unique_completion_lock = std::unique_lock<
+ decltype(completion_locks)::element_type>;
+
OSDSession(CephContext *cct, int o) :
- lock("OSDSession"),
- osd(o),
- incarnation(0),
- con(NULL) {
- num_locks = cct->_conf->objecter_completion_locks_per_session;
- completion_locks = new Mutex *[num_locks];
- for (int i = 0; i < num_locks; i++) {
- completion_locks[i] = new Mutex("OSDSession::completion_lock");
- }
- }
+ osd(o), incarnation(0), con(NULL),
+ num_locks(cct->_conf->objecter_completion_locks_per_session),
+ completion_locks(new std::mutex[num_locks]) {}
~OSDSession();
bool is_homeless() { return (osd == -1); }
- Mutex *get_lock(object_t& oid);
+ unique_completion_lock get_lock(object_t& oid);
};
map<int,OSDSession*> osd_sessions;
// we use this just to confirm a cookie is valid before dereferencing the ptr
set<LingerOp*> linger_ops_set;
int num_linger_callbacks;
- Mutex linger_callback_lock;
- Cond linger_callback_cond;
+ std::mutex linger_callback_lock;
+ typedef std::unique_lock<std::mutex> unique_linger_cb_lock;
+ typedef std::lock_guard<std::mutex> linger_cb_lock_guard;
+ std::condition_variable linger_callback_cond;
map<ceph_tid_t,PoolStatOp*> poolstat_ops;
map<ceph_tid_t,StatfsOp*> statfs_ops;
int _calc_target(op_target_t *t, epoch_t *last_force_resend = 0,
bool any_change = false);
int _map_session(op_target_t *op, OSDSession **s,
- RWLock::Context& lc);
+ shunique_lock& lc);
void _session_op_assign(OSDSession *s, Op *op);
void _session_op_remove(OSDSession *s, Op *op);
void _session_command_op_assign(OSDSession *to, CommandOp *op);
void _session_command_op_remove(OSDSession *from, CommandOp *op);
- int _assign_op_target_session(Op *op, RWLock::Context& lc,
+ int _assign_op_target_session(Op *op, shunique_lock& lc,
bool src_session_locked,
bool dst_session_locked);
- int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc);
+ int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
- void _linger_submit(LingerOp *info);
- void _send_linger(LingerOp *info);
+ void _linger_submit(LingerOp *info, shunique_lock& sul);
+ void _send_linger(LingerOp *info, shunique_lock& sul);
void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
void _linger_reconnect(LingerOp *info, int r);
void _send_linger_ping(LingerOp *info);
int _normalize_watch_error(int r);
void _linger_callback_queue() {
- Mutex::Locker l(linger_callback_lock);
+ linger_cb_lock_guard l(linger_callback_lock);
++num_linger_callbacks;
}
void _linger_callback_finish() {
- Mutex::Locker l(linger_callback_lock);
+ linger_cb_lock_guard l(linger_callback_lock);
if (--num_linger_callbacks == 0)
- linger_callback_cond.SignalAll();
+ linger_callback_cond.notify_all();
assert(num_linger_callbacks >= 0);
}
friend class C_DoWatchError;
public:
void linger_callback_flush() {
- Mutex::Locker l(linger_callback_lock);
- while (num_linger_callbacks > 0)
- linger_callback_cond.Wait(linger_callback_lock);
+ unique_linger_cb_lock l(linger_callback_lock);
+ linger_callback_cond.wait(l, [this]() {
+ return num_linger_callbacks <= 0;
+ });
}
private:
- void _check_op_pool_dne(Op *op, bool session_locked);
+ void _check_op_pool_dne(Op *op, unique_lock& sl);
void _send_op_map_check(Op *op);
void _op_cancel_map_check(Op *op);
void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
void kick_requests(OSDSession *session);
void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
- void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend);
+ void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
- int _get_session(int osd, OSDSession **session, RWLock::Context& lc);
+ int _get_session(int osd, OSDSession **session, shunique_lock& sul);
void put_session(OSDSession *s);
void get_session(OSDSession *s);
void _reopen_session(OSDSession *session);
* If throttle_op needs to throttle it will unlock client_lock.
*/
int calc_op_budget(Op *op);
- void _throttle_op(Op *op, int op_size=0);
- int _take_op_budget(Op *op) {
- assert(rwlock.is_locked());
+ void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
+ int _take_op_budget(Op *op, shunique_lock& sul) {
+ assert(sul && sul.mutex() == &rwlock);
int op_budget = calc_op_budget(op);
if (keep_balanced_budget) {
- _throttle_op(op, op_budget);
+ _throttle_op(op, sul, op_budget);
} else {
op_throttle_bytes.take(op_budget);
op_throttle_ops.take(1);
max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0),
keep_balanced_budget(false), honor_osdmap_full(true),
last_seen_osdmap_version(0), last_seen_pgmap_version(0),
- rwlock("Objecter::rwlock"), logger(NULL), tick_event(0),
- m_request_state_hook(NULL), num_linger_callbacks(0),
- linger_callback_lock("Objecter::linger_callback_lock"),
- num_homeless_ops(0), homeless_session(new OSDSession(cct, -1)),
+ logger(NULL), tick_event(0), m_request_state_hook(NULL),
+ num_linger_callbacks(0), num_homeless_ops(0),
+ homeless_session(new OSDSession(cct, -1)),
mon_timeout(ceph::make_timespan(mon_timeout)),
osd_timeout(ceph::make_timespan(osd_timeout)),
op_throttle_bytes(cct, "objecter_bytes",
void start();
void shutdown();
- const OSDMap *get_osdmap_read() {
- rwlock.get_read();
- return osdmap;
- }
- void put_osdmap_read() {
- rwlock.put_read();
+ // These two templates replace osdmap_(get)|(put)_read. Simply wrap
+ // whatever functionality you want to use the OSDMap in a lambda like:
+ //
+ // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
+ //
+ // or
+ //
+ // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
+ //
+ // Do not call into something that will try to lock the OSDMap from
+ // here or you will have great woe and misery.
+
+ template<typename Callback, typename...Args>
+ auto with_osdmap(Callback&& cb, Args&&...args) ->
+ typename std::enable_if<
+ std::is_void<
+ decltype(cb(const_cast<const OSDMap&>(*osdmap),
+ std::forward<Args>(args)...))>::value,
+ void>::type {
+ shared_lock l(rwlock);
+ std::forward<Callback>(cb)(const_cast<const OSDMap&>(*osdmap),
+ std::forward<Args>(args)...);
+ }
+
+ template<typename Callback, typename...Args>
+ auto with_osdmap(Callback&& cb, Args&&... args) ->
+ typename std::enable_if<
+ !std::is_void<
+ decltype(cb(const_cast<const OSDMap&>(*osdmap),
+ std::forward<Args>(args)...))>::value,
+ decltype(cb(const_cast<const OSDMap&>(*osdmap),
+ std::forward<Args>(args)...))>::type {
+ shared_lock l(rwlock);
+ return std::forward<Callback>(cb)(const_cast<const OSDMap&>(*osdmap),
+ std::forward<Args>(args)...);
}
+
/**
* Tell the objecter to throttle outgoing ops according to its
* budget (in _conf). If you do this, ops can block, in
map<int64_t, bool> *pool_full_map,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
- map<ceph_tid_t, CommandOp*>& need_resend_command);
+ map<ceph_tid_t, CommandOp*>& need_resend_command,
+ shunique_lock& sul);
int64_t get_object_hash_position(int64_t pool, const string& key,
const string& ns);
private:
// low-level
- ceph_tid_t _op_submit(Op *op, RWLock::Context& lc);
- ceph_tid_t _op_submit_with_budget(Op *op, RWLock::Context& lc,
+ ceph_tid_t _op_submit(Op *op, shunique_lock& lc);
+ ceph_tid_t _op_submit_with_budget(Op *op, shunique_lock& lc,
int *ctx_budget = NULL);
inline void unregister_op(Op *op);
public:
ceph_tid_t op_submit(Op *op, int *ctx_budget = NULL);
bool is_active() {
- RWLock::RLocker l(rwlock);
+ shared_lock l(rwlock);
return !((!inflight_ops.read()) && linger_ops.empty() &&
poolstat_ops.empty() && statfs_ops.empty());
}