From: Adam C. Emerson Date: Mon, 14 Sep 2015 17:35:10 +0000 (-0400) Subject: osdc: Update to use C++11 concurrency X-Git-Tag: v10.0.4~16^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f4b083a5e3a7727c097ae8be0bb39bae9e6723a5;p=ceph.git osdc: Update to use C++11 concurrency The only externally visible change is that Objecter::get_osdmap_read and Objedcter::put_osdmap_read have been removed in favor of Objecter::with_osdmap. It can be used like: objecter->with_osdmap([&](const OSDMap& o) { o.do_stuff(); }; int x = objecter->with_osdmap([&](const OSDMap& o) { return get_thing(id); }); objecter->with_osdmap(std::mem_fn(&OSDMap::do_stuff)); int x = objecter->with_osdmap(std::mem_fn(&OSDMap::get_thing), id); The choice between the style of the first two examples or the second two is arbitrary and depends on what you prefer. Signed-off-by: Adam C. Emerson --- diff --git a/src/client/Client.cc b/src/client/Client.cc index d6b08bb07399..1e1c9af64f35 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -443,18 +443,13 @@ void Client::dump_status(Formatter *f) ldout(cct, 1) << __func__ << dendl; - const OSDMap *osdmap = objecter->get_osdmap_read(); - const epoch_t osd_epoch = osdmap->get_epoch(); - objecter->put_osdmap_read(); + const epoch_t osd_epoch + = objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)); if (f) { f->open_object_section("metadata"); - { - for (std::map::const_iterator i = metadata.begin(); - i != metadata.end(); ++i) { - f->dump_string(i->first.c_str(), i->second); - } - } + for (const auto& kv : metadata) + f->dump_string(kv.first.c_str(), kv.second); f->close_section(); f->dump_int("dentry_count", lru.lru_get_size()); @@ -2050,9 +2045,9 @@ void Client::send_request(MetaRequest *request, MetaSession *session, } r->set_mdsmap_epoch(mdsmap->get_epoch()); if (r->head.op == CEPH_MDS_OP_SETXATTR) { - const OSDMap *osdmap = objecter->get_osdmap_read(); - r->set_osdmap_epoch(osdmap->get_epoch()); - objecter->put_osdmap_read(); + objecter->with_osdmap([r](const OSDMap& o) { + r->set_osdmap_epoch(o.get_epoch()); + }); } if (request->mds == -1) { @@ -2315,21 +2310,16 @@ void Client::handle_osd_map(MOSDMap *m) // cancel_writes std::vector full_pools; - const OSDMap *osd_map = objecter->get_osdmap_read(); - const map& pools = osd_map->get_pools(); - for (map::const_iterator i = pools.begin(); - i != pools.end(); ++i) { - if (i->second.has_flag(pg_pool_t::FLAG_FULL)) { - full_pools.push_back(i->first); - } - } - - objecter->put_osdmap_read(); + objecter->with_osdmap([&full_pools](const OSDMap &o) { + for (const auto& kv : o.get_pools()) { + if (kv.second.has_flag(pg_pool_t::FLAG_FULL)) { + full_pools.push_back(kv.first); + } + } + }); - for (std::vector::iterator i = full_pools.begin(); - i != full_pools.end(); ++i) { - _handle_full_flag(*i); - } + for (auto p : full_pools) + _handle_full_flag(p); // Subscribe to subsequent maps to watch for the full flag going // away. For the global full flag objecter does this for us, but @@ -9866,9 +9856,9 @@ int Client::ll_setxattr(Inode *in, const char *name, const void *value, strcmp(name, "ceph.file.layout") == 0 || strcmp(name, "ceph.dir.layout") == 0) { string rest(strstr(name, "layout")); string v((const char*)value); - const OSDMap *osdmap = objecter->get_osdmap_read(); - int r = check_data_pool_exist(rest, v, osdmap); - objecter->put_osdmap_read(); + int r = objecter->with_osdmap([&](const OSDMap& o) { + return check_data_pool_exist(rest, v, &o); + }); if (r == -ENOENT) { C_SaferCond ctx; @@ -9992,14 +9982,14 @@ size_t Client::_vxattrcb_layout(Inode *in, char *val, size_t size) (unsigned long long)in->layout.fl_stripe_unit, (unsigned long long)in->layout.fl_stripe_count, (unsigned long long)in->layout.fl_object_size); - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (osdmap->have_pg_pool(in->layout.fl_pg_pool)) - r += snprintf(val + r, size - r, "%s", - osdmap->get_pool_name(in->layout.fl_pg_pool).c_str()); - else - r += snprintf(val + r, size - r, "%lld", - (unsigned long long)in->layout.fl_pg_pool); - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + if (o.have_pg_pool(in->layout.fl_pg_pool)) + r += snprintf(val + r, size - r, "%s", + o.get_pool_name(in->layout.fl_pg_pool).c_str()); + else + r += snprintf(val + r, size - r, "%" PRIu64, + (uint64_t)in->layout.fl_pg_pool); + }); return r; } size_t Client::_vxattrcb_layout_stripe_unit(Inode *in, char *val, size_t size) @@ -10017,12 +10007,13 @@ size_t Client::_vxattrcb_layout_object_size(Inode *in, char *val, size_t size) size_t Client::_vxattrcb_layout_pool(Inode *in, char *val, size_t size) { size_t r; - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (osdmap->have_pg_pool(in->layout.fl_pg_pool)) - r = snprintf(val, size, "%s", osdmap->get_pool_name(in->layout.fl_pg_pool).c_str()); - else - r = snprintf(val, size, "%lld", (unsigned long long)in->layout.fl_pg_pool); - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + if (o.have_pg_pool(in->layout.fl_pg_pool)) + r = snprintf(val, size, "%s", o.get_pool_name( + in->layout.fl_pg_pool).c_str()); + else + r = snprintf(val, size, "%" PRIu64, (uint64_t)in->layout.fl_pg_pool); + }); return r; } size_t Client::_vxattrcb_dir_entries(Inode *in, char *val, size_t size) @@ -10301,9 +10292,8 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode, int64_t pool_id = -1; if (data_pool && *data_pool) { - const OSDMap * osdmap = objecter->get_osdmap_read(); - pool_id = osdmap->lookup_pg_pool_name(data_pool); - objecter->put_osdmap_read(); + pool_id = objecter->with_osdmap( + std::mem_fn(&OSDMap::lookup_pg_pool_name), data_pool); if (pool_id < 0) return -EINVAL; if (pool_id > 0xffffffffll) @@ -10870,29 +10860,25 @@ out: int Client::ll_num_osds(void) { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - int ret = osdmap->get_num_osds(); - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap(std::mem_fn(&OSDMap::get_num_osds)); } int Client::ll_osdaddr(int osd, uint32_t *addr) { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - bool exists = osdmap->exists(osd); entity_addr_t g; - if (exists) - g = osdmap->get_addr(osd); - objecter->put_osdmap_read(); - if (!exists) { + bool exists = objecter->with_osdmap([&](const OSDMap& o) { + if (!o.exists(osd)) + return false; + g = o.get_addr(osd); + return true; + }); + if (!exists) return -1; - } uint32_t nb_addr = (g.in4_addr()).sin_addr.s_addr; *addr = ntohl(nb_addr); return 0; } - uint32_t Client::ll_stripe_unit(Inode *in) { Mutex::Locker lock(client_lock); @@ -10940,15 +10926,15 @@ int Client::ll_get_stripe_osd(Inode *in, uint64_t blockno, uint64_t objectno = objectsetno * stripe_count + stripepos; // object id object_t oid = file_object_t(ino, objectno); - const OSDMap *osdmap = objecter->get_osdmap_read(); - ceph_object_layout olayout = osdmap->file_to_object_layout(oid, *layout, ""); - objecter->put_osdmap_read(); - - pg_t pg = (pg_t)olayout.ol_pgid; - vector osds; - int primary; - osdmap->pg_to_osds(pg, &osds, &primary); - return osds[0]; + return objecter->with_osdmap([&](const OSDMap& o) { + ceph_object_layout olayout = + o.file_to_object_layout(oid, *layout, string()); + pg_t pg = (pg_t)olayout.ol_pgid; + vector osds; + int primary; + o.pg_to_osds(pg, &osds, &primary); + return osds[0]; + }); } /* Return the offset of the block, internal to the object */ @@ -11608,34 +11594,24 @@ int Client::fdescribe_layout(int fd, ceph_file_layout *lp) int64_t Client::get_pool_id(const char *pool_name) { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - int64_t pool = osdmap->lookup_pg_pool_name(pool_name); - objecter->put_osdmap_read(); - return pool; + return objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name), + pool_name); } string Client::get_pool_name(int64_t pool) { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - string ret; - if (osdmap->have_pg_pool(pool)) - ret = osdmap->get_pool_name(pool); - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap([pool](const OSDMap& o) { + return o.have_pg_pool(pool) ? o.get_pool_name(pool) : string(); + }); } int Client::get_pool_replication(int64_t pool) { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - int ret; - if (!osdmap->have_pg_pool(pool)) - ret = -ENOENT; - else - ret = osdmap->get_pg_pool(pool)->get_size(); - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap([pool](const OSDMap& o) { + return o.have_pg_pool(pool) ? o.get_pg_pool(pool)->get_size() : -ENOENT; + }); } int Client::get_file_extent_osds(int fd, loff_t off, loff_t *len, vector& osds) @@ -11651,10 +11627,10 @@ int Client::get_file_extent_osds(int fd, loff_t off, loff_t *len, vector& o Striper::file_to_extents(cct, in->ino, &in->layout, off, 1, in->truncate_size, extents); assert(extents.size() == 1); - const OSDMap *osdmap = objecter->get_osdmap_read(); - pg_t pg = osdmap->object_locator_to_pg(extents[0].oid, extents[0].oloc); - osdmap->pg_to_acting_osds(pg, osds); - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + pg_t pg = o.object_locator_to_pg(extents[0].oid, extents[0].oloc); + o.pg_to_acting_osds(pg, osds); + }); if (osds.empty()) return -EINVAL; @@ -11686,13 +11662,13 @@ int Client::get_osd_crush_location(int id, vector >& path) Mutex::Locker lock(client_lock); if (id < 0) return -EINVAL; - const OSDMap *osdmap = objecter->get_osdmap_read(); - int ret = osdmap->crush->get_full_location_ordered(id, path); - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap([&](const OSDMap& o) { + return o.crush->get_full_location_ordered(id, path); + }); } -int Client::get_file_stripe_address(int fd, loff_t offset, vector& address) +int Client::get_file_stripe_address(int fd, loff_t offset, + vector& address) { Mutex::Locker lock(client_lock); @@ -11703,38 +11679,35 @@ int Client::get_file_stripe_address(int fd, loff_t offset, vector // which object? vector extents; - Striper::file_to_extents(cct, in->ino, &in->layout, offset, 1, in->truncate_size, extents); + Striper::file_to_extents(cct, in->ino, &in->layout, offset, 1, + in->truncate_size, extents); assert(extents.size() == 1); // now we have the object and its 'layout' - const OSDMap *osdmap = objecter->get_osdmap_read(); - pg_t pg = osdmap->object_locator_to_pg(extents[0].oid, extents[0].oloc); - vector osds; - osdmap->pg_to_acting_osds(pg, osds); - int ret = 0; - if (!osds.empty()) { - ret = -EINVAL; - } else { - for (unsigned i = 0; i < osds.size(); i++) { - entity_addr_t addr = osdmap->get_addr(osds[i]); - address.push_back(addr); - } - } - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap([&](const OSDMap& o) { + pg_t pg = o.object_locator_to_pg(extents[0].oid, extents[0].oloc); + vector osds; + o.pg_to_acting_osds(pg, osds); + if (osds.empty()) + return -EINVAL; + for (unsigned i = 0; i < osds.size(); i++) { + entity_addr_t addr = o.get_addr(osds[i]); + address.push_back(addr); + } + return 0; + }); } int Client::get_osd_addr(int osd, entity_addr_t& addr) { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - int ret = 0; - if (!osdmap->exists(osd)) - ret = -ENOENT; - else - addr = osdmap->get_addr(osd); - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap([&](const OSDMap& o) { + if (!o.exists(osd)) + return -ENOENT; + + addr = o.get_addr(osd); + return 0; + }); } int Client::enumerate_layout(int fd, vector& result, @@ -11761,12 +11734,12 @@ int Client::enumerate_layout(int fd, vector& result, int Client::get_local_osd() { Mutex::Locker lock(client_lock); - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (osdmap->get_epoch() != local_osd_epoch) { - local_osd = osdmap->find_osd_on_ip(messenger->get_myaddr()); - local_osd_epoch = osdmap->get_epoch(); - } - objecter->put_osdmap_read(); + objecter->with_osdmap([this](const OSDMap& o) { + if (o.get_epoch() != local_osd_epoch) { + local_osd = o.find_osd_on_ip(messenger->get_myaddr()); + local_osd_epoch = o.get_epoch(); + } + }); return local_osd; } diff --git a/src/client/SyntheticClient.cc b/src/client/SyntheticClient.cc index 9d1888b019e8..8a84a48b602b 100644 --- a/src/client/SyntheticClient.cc +++ b/src/client/SyntheticClient.cc @@ -1714,19 +1714,15 @@ int SyntheticClient::dump_placement(string& fn) { // run through all the object extents dout(0) << "file size is " << filesize << dendl; dout(0) << "(osd, start, length) tuples for file " << fn << dendl; - for (vector::iterator i = extents.begin(); - i != extents.end(); ++i) { - - const OSDMap *osdmap = client->objecter->get_osdmap_read(); - int osd = osdmap->get_pg_acting_primary(osdmap->object_locator_to_pg(i->oid, i->oloc)); - client->objecter->put_osdmap_read(); + for (const auto& x : extents) { + int osd = client->objecter->with_osdmap([&](const OSDMap& o) { + return o.get_pg_acting_primary(o.object_locator_to_pg(x.oid, x.oloc)); + }); // run through all the buffer extents - for (vector >::iterator j = i->buffer_extents.begin(); - j != i->buffer_extents.end(); ++j) { - dout(0) << "OSD " << osd << ", offset " << (*j).first - << ", length " << (*j).second << dendl; - } + for (const auto& be : x.buffer_extents) + dout(0) << "OSD " << osd << ", offset " << be.first + << ", length " << be.second << dendl; } return 0; } @@ -1999,12 +1995,11 @@ int SyntheticClient::overload_osd_0(int n, int size, int wrsize) { int SyntheticClient::check_first_primary(int fh) { vector extents; - client->enumerate_layout(fh, extents, 1, 0); - const OSDMap *osdmap = client->objecter->get_osdmap_read(); - int primary = osdmap->get_pg_acting_primary(osdmap->object_locator_to_pg(extents.begin()->oid, - extents.begin()->oloc)); - client->objecter->put_osdmap_read(); - return primary; + client->enumerate_layout(fh, extents, 1, 0); + return client->objecter->with_osdmap([&](const OSDMap& o) { + return o.get_pg_acting_primary( + o.object_locator_to_pg(extents.begin()->oid, extents.begin()->oloc)); + }); } int SyntheticClient::rm_file(string& fn) diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index a77f7719895d..899439524fcd 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -86,10 +86,8 @@ int64_t librados::RadosClient::lookup_pool(const char *name) return r; } - const OSDMap *osdmap = objecter->get_osdmap_read(); - int64_t ret = osdmap->lookup_pg_pool_name(name); - objecter->put_osdmap_read(); - return ret; + return objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name), + name); } bool librados::RadosClient::pool_requires_alignment(int64_t pool_id) @@ -107,7 +105,7 @@ bool librados::RadosClient::pool_requires_alignment(int64_t pool_id) // a safer version of pool_requires_alignment int librados::RadosClient::pool_requires_alignment2(int64_t pool_id, - bool *requires) + bool *requires) { if (!requires) return -EINVAL; @@ -117,14 +115,13 @@ int librados::RadosClient::pool_requires_alignment2(int64_t pool_id, return r; } - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (!osdmap->have_pg_pool(pool_id)) { - objecter->put_osdmap_read(); - return -ENOENT; - } - *requires = osdmap->get_pg_pool(pool_id)->requires_aligned_append(); - objecter->put_osdmap_read(); - return 0; + return objecter->with_osdmap([requires, pool_id](const OSDMap& o) { + if (!o.have_pg_pool(pool_id)) { + return -ENOENT; + } + *requires = o.get_pg_pool(pool_id)->requires_aligned_append(); + return 0; + }); } uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id) @@ -140,7 +137,7 @@ uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id) // a safer version of pool_required_alignment int librados::RadosClient::pool_required_alignment2(int64_t pool_id, - uint64_t *alignment) + uint64_t *alignment) { if (!alignment) return -EINVAL; @@ -150,30 +147,30 @@ int librados::RadosClient::pool_required_alignment2(int64_t pool_id, return r; } - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (!osdmap->have_pg_pool(pool_id)) { - objecter->put_osdmap_read(); - return -ENOENT; - } - *alignment = osdmap->get_pg_pool(pool_id)->required_alignment(); - objecter->put_osdmap_read(); - return 0; + return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) { + if (!o.have_pg_pool(pool_id)) { + return -ENOENT; + } + *alignment = o.get_pg_pool(pool_id)->required_alignment(); + return 0; + }); } -int librados::RadosClient::pool_get_auid(uint64_t pool_id, unsigned long long *auid) +int librados::RadosClient::pool_get_auid(uint64_t pool_id, + unsigned long long *auid) { int r = wait_for_osdmap(); if (r < 0) return r; - const OSDMap *osdmap = objecter->get_osdmap_read(); - const pg_pool_t *pg = osdmap->get_pg_pool(pool_id); - if (!pg) { - r = -ENOENT; - } else { - r = 0; - *auid = pg->auid; - } - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + const pg_pool_t *pg = o.get_pg_pool(pool_id); + if (!pg) { + r = -ENOENT; + } else { + r = 0; + *auid = pg->auid; + } + }); return r; } @@ -182,14 +179,14 @@ int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s) int r = wait_for_osdmap(); if (r < 0) return r; - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (!osdmap->have_pg_pool(pool_id)) { - r = -ENOENT; - } else { - r = 0; - *s = osdmap->get_pool_name(pool_id); - } - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + if (!o.have_pg_pool(pool_id)) { + r = -ENOENT; + } else { + r = 0; + *s = o.get_pool_name(pool_id); + } + }); return r; } @@ -468,11 +465,11 @@ int librados::RadosClient::wait_for_osdmap() } bool need_map = false; - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (osdmap->get_epoch() == 0) { - need_map = true; - } - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + if (o.get_epoch() == 0) { + need_map = true; + } + }); if (need_map) { Mutex::Locker l(lock); @@ -481,23 +478,20 @@ int librados::RadosClient::wait_for_osdmap() if (cct->_conf->rados_mon_op_timeout > 0) timeout.set_from_double(cct->_conf->rados_mon_op_timeout); - const OSDMap *osdmap = objecter->get_osdmap_read(); - if (osdmap->get_epoch() == 0) { + if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) { ldout(cct, 10) << __func__ << " waiting" << dendl; utime_t start = ceph_clock_now(cct); - while (osdmap->get_epoch() == 0) { - objecter->put_osdmap_read(); - cond.WaitInterval(cct, lock, timeout); - utime_t elapsed = ceph_clock_now(cct) - start; - if (!timeout.is_zero() && elapsed > timeout) { - lderr(cct) << "timed out waiting for first osdmap from monitors" << dendl; - return -ETIMEDOUT; - } - osdmap = objecter->get_osdmap_read(); + while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) { + cond.WaitInterval(cct, lock, timeout); + utime_t elapsed = ceph_clock_now(cct) - start; + if (!timeout.is_zero() && elapsed > timeout) { + lderr(cct) << "timed out waiting for first osdmap from monitors" + << dendl; + return -ETIMEDOUT; + } } ldout(cct, 10) << __func__ << " done waiting" << dendl; } - objecter->put_osdmap_read(); return 0; } else { return 0; @@ -526,12 +520,11 @@ int librados::RadosClient::pool_list(std::list >& v) int r = wait_for_osdmap(); if (r < 0) return r; - const OSDMap *osdmap = objecter->get_osdmap_read(); - for (map::const_iterator p = osdmap->get_pools().begin(); - p != osdmap->get_pools().end(); - ++p) - v.push_back(std::make_pair(p->first, osdmap->get_pool_name(p->first))); - objecter->put_osdmap_read(); + + objecter->with_osdmap([&](const OSDMap& o) { + for (auto p : o.get_pools()) + v.push_back(std::make_pair(p.first, o.get_pool_name(p.first))); + }); return 0; } @@ -634,21 +627,19 @@ int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tie return r; } - const OSDMap *osdmap = objecter->get_osdmap_read(); - - const pg_pool_t* pool = osdmap->get_pg_pool(pool_id); - if (pool) { - if (pool->tier_of < 0) { - *base_tier = pool_id; - } else { - *base_tier = pool->tier_of; - } - r = 0; - } else { - r = -ENOENT; - } - - objecter->put_osdmap_read(); + objecter->with_osdmap([&](const OSDMap& o) { + const pg_pool_t* pool = o.get_pg_pool(pool_id); + if (pool) { + if (pool->tier_of < 0) { + *base_tier = pool_id; + } else { + *base_tier = pool->tier_of; + } + r = 0; + } else { + r = -ENOENT; + } + }); return r; } diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 62d37c979a92..35d9a51d00b4 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -191,9 +191,8 @@ bool MDSDaemon::asok_command(string command, cmdmap_t& cmdmap, string format, void MDSDaemon::dump_status(Formatter *f) { - const OSDMap *osdmap = objecter->get_osdmap_read(); - const epoch_t osd_epoch = osdmap->get_epoch(); - objecter->put_osdmap_read(); + const epoch_t osd_epoch = objecter->with_osdmap( + std::mem_fn(&OSDMap::get_epoch)); f->open_object_section("status"); f->dump_stream("cluster_fsid") << monc->get_fsid(); @@ -490,18 +489,20 @@ int MDSDaemon::init(MDSMap::DaemonState wanted_state) while (true) { objecter->maybe_request_map(); objecter->wait_for_osd_map(); - const OSDMap *osdmap = objecter->get_osdmap_read(); - uint64_t osd_features = osdmap->get_up_osd_features(); - if (osd_features & CEPH_FEATURE_OSD_TMAP2OMAP) { - objecter->put_osdmap_read(); + if (objecter->with_osdmap([&](const OSDMap& o) { + uint64_t osd_features = o.get_up_osd_features(); + if (osd_features & CEPH_FEATURE_OSD_TMAP2OMAP) + return true; + if (o.get_num_up_osds() > 0) { + derr << "*** one or more OSDs do not support TMAP2OMAP; upgrade " + << "OSDs before starting MDS (or downgrade MDS) ***" << dendl; + } else { + derr << "*** no OSDs are up as of epoch " << o.get_epoch() + << ", waiting" << dendl; + } + return false; + })) break; - } - if (osdmap->get_num_up_osds() > 0) { - derr << "*** one or more OSDs do not support TMAP2OMAP; upgrade OSDs before starting MDS (or downgrade MDS) ***" << dendl; - } else { - derr << "*** no OSDs are up as of epoch " << osdmap->get_epoch() << ", waiting" << dendl; - } - objecter->put_osdmap_read(); sleep(10); } diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 090982ecf0a9..831cfd93b7fa 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -1606,10 +1606,8 @@ void MDSRankDispatcher::handle_mds_map( // Before going active, set OSD epoch barrier to latest (so that // we don't risk handing out caps to clients with old OSD maps that // might not include barriers from the previous incarnation of this MDS) - const OSDMap *osdmap = objecter->get_osdmap_read(); - const epoch_t osd_epoch = osdmap->get_epoch(); - objecter->put_osdmap_read(); - set_osd_epoch_barrier(osd_epoch); + set_osd_epoch_barrier(objecter->with_osdmap( + std::mem_fn(&OSDMap::get_epoch))); } if (is_active()) { diff --git a/src/mds/Server.cc b/src/mds/Server.cc index fd764cab5b60..b9f5b0c361a0 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -1450,10 +1450,11 @@ void Server::handle_osd_map() /* Note that we check the OSDMAP_FULL flag directly rather than * using osdmap_full_flag(), because we want to know "is the flag set" * rather than "does the flag apply to us?" */ - const OSDMap *osdmap = mds->objecter->get_osdmap_read(); - is_full = osdmap->test_flag(CEPH_OSDMAP_FULL); - dout(7) << __func__ << ": full = " << is_full << " epoch = " << osdmap->get_epoch() << dendl; - mds->objecter->put_osdmap_read(); + mds->objecter->with_osdmap([this](const OSDMap& o) { + is_full = o.test_flag(CEPH_OSDMAP_FULL); + dout(7) << __func__ << ": full = " << is_full << " epoch = " + << o.get_epoch() << dendl; + }); } void Server::dispatch_client_request(MDRequestRef& mdr) @@ -3965,7 +3966,7 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr) // XATTRS -int Server::parse_layout_vxattr(string name, string value, const OSDMap *osdmap, +int Server::parse_layout_vxattr(string name, string value, const OSDMap& osdmap, ceph_file_layout *layout, bool validate) { dout(20) << "parse_layout_vxattr name " << name << " value '" << value << "'" << dendl; @@ -4000,7 +4001,7 @@ int Server::parse_layout_vxattr(string name, string value, const OSDMap *osdmap, try { layout->fl_pg_pool = boost::lexical_cast(value); } catch (boost::bad_lexical_cast const&) { - int64_t pool = osdmap->lookup_pg_pool_name(value); + int64_t pool = osdmap.lookup_pg_pool_name(value); if (pool < 0) { dout(10) << " unknown pool " << value << dendl; return -ENOENT; @@ -4104,10 +4105,12 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur, layout = mdcache->default_file_layout; rest = name.substr(name.find("layout")); - const OSDMap *osdmap = mds->objecter->get_osdmap_read(); - int r = parse_layout_vxattr(rest, value, osdmap, &layout); - epoch_t epoch = osdmap->get_epoch(); - mds->objecter->put_osdmap_read(); + epoch_t epoch; + int r; + mds->objecter->with_osdmap([&](const OSDMap& osdmap) { + r = parse_layout_vxattr(rest, value, osdmap, &layout); + epoch = osdmap.get_epoch(); + }); if (r < 0) { if (r == -ENOENT) { epoch_t req_epoch = req->get_osdmap_epoch(); @@ -4153,10 +4156,12 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur, } ceph_file_layout layout = cur->get_projected_inode()->layout; rest = name.substr(name.find("layout")); - const OSDMap *osdmap = mds->objecter->get_osdmap_read(); - int r = parse_layout_vxattr(rest, value, osdmap, &layout); - epoch_t epoch = osdmap->get_epoch(); - mds->objecter->put_osdmap_read(); + int r; + epoch_t epoch; + mds->objecter->with_osdmap([&](const OSDMap& osdmap) { + r = parse_layout_vxattr(rest, value, osdmap, &layout); + epoch = osdmap.get_epoch(); + }); if (r < 0) { if (r == -ENOENT) { epoch_t req_epoch = req->get_osdmap_epoch(); diff --git a/src/mds/Server.h b/src/mds/Server.h index d194a2f0574e..40c71f76fc87 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -172,7 +172,7 @@ public: void handle_client_setlayout(MDRequestRef& mdr); void handle_client_setdirlayout(MDRequestRef& mdr); - int parse_layout_vxattr(string name, string value, const OSDMap *osdmap, + int parse_layout_vxattr(string name, string value, const OSDMap& osdmap, ceph_file_layout *layout, bool validate=true); int parse_quota_vxattr(string name, string value, quota_info_t *quota); void handle_set_vxattr(MDRequestRef& mdr, CInode *cur, diff --git a/src/mds/SnapServer.cc b/src/mds/SnapServer.cc index 70069092c411..45dd9e4aac8f 100644 --- a/src/mds/SnapServer.cc +++ b/src/mds/SnapServer.cc @@ -41,21 +41,19 @@ void SnapServer::reset_state() // find any removed snapshot in data pools if (mds) { // only if I'm running in a live MDS snapid_t first_free = 0; - const OSDMap *osdmap = mds->objecter->get_osdmap_read(); - for (set::const_iterator p = mds->mdsmap->get_data_pools().begin(); - p != mds->mdsmap->get_data_pools().end(); - ++p) { - const pg_pool_t *pi = osdmap->get_pg_pool(*p); - if (!pi) { - // If pool isn't in OSDMap yet then can't have any snaps needing - // removal, skip. - continue; - } - if (!pi->removed_snaps.empty() && - pi->removed_snaps.range_end() > first_free) - first_free = pi->removed_snaps.range_end(); - } - mds->objecter->put_osdmap_read(); + mds->objecter->with_osdmap([&](const OSDMap& o) { + for (const auto p : mds->mdsmap->get_data_pools()) { + const pg_pool_t *pi = o.get_pg_pool(p); + if (!pi) { + // If pool isn't in OSDMap yet then can't have any snaps + // needing removal, skip. + continue; + } + if (!pi->removed_snaps.empty() && + pi->removed_snaps.range_end() > first_free) + first_free = pi->removed_snaps.range_end(); + } + }); if (first_free > last_snap) last_snap = first_free; } @@ -259,30 +257,28 @@ void SnapServer::check_osd_map(bool force) map > all_purge; map > all_purged; - const OSDMap *osdmap = mds->objecter->get_osdmap_read(); - for (map >::iterator p = need_to_purge.begin(); - p != need_to_purge.end(); - ++p) { - int id = p->first; - const pg_pool_t *pi = osdmap->get_pg_pool(id); - if (pi == NULL) { - // The pool is gone. So are the snapshots. - all_purged[id] = std::vector(p->second.begin(), p->second.end()); - continue; - } - - for (set::iterator q = p->second.begin(); - q != p->second.end(); - ++q) { - if (pi->is_removed_snap(*q)) { - dout(10) << " osdmap marks " << *q << " as removed" << dendl; - all_purged[id].push_back(*q); - } else { - all_purge[id].push_back(*q); + mds->objecter->with_osdmap( + [this, &all_purged, &all_purge](const OSDMap& osdmap) { + for (const auto& p : need_to_purge) { + int id = p.first; + const pg_pool_t *pi = osdmap.get_pg_pool(id); + if (pi == NULL) { + // The pool is gone. So are the snapshots. + all_purged[id] = std::vector(p.second.begin(), + p.second.end()); + continue; + } + + for (const auto& q : p.second) { + if (pi->is_removed_snap(q)) { + dout(10) << " osdmap marks " << q << " as removed" << dendl; + all_purged[id].push_back(q); + } else { + all_purge[id].push_back(q); + } + } } - } - } - mds->objecter->put_osdmap_read(); + }); if (!all_purged.empty()) { // prepare to remove from need_to_purge list diff --git a/src/mds/StrayManager.cc b/src/mds/StrayManager.cc index 6c30b33e5539..6547a264120c 100644 --- a/src/mds/StrayManager.cc +++ b/src/mds/StrayManager.cc @@ -849,32 +849,29 @@ void StrayManager::_truncate_stray_logged(CDentry *dn, LogSegment *ls) void StrayManager::update_op_limit() { - const OSDMap *osdmap = mds->objecter->get_osdmap_read(); - assert(osdmap != NULL); - - // Number of PGs across all data pools uint64_t pg_count = 0; - const std::set &data_pools = mds->mdsmap->get_data_pools(); - for (std::set::iterator i = data_pools.begin(); - i != data_pools.end(); ++i) { - if (osdmap->get_pg_pool(*i) == NULL) { - // It is possible that we have an older OSDMap than MDSMap, because - // we don't start watching every OSDMap until after MDSRank is - // initialized - dout(4) << __func__ << " data pool " << *i - << " not found in OSDMap" << dendl; - continue; - } - pg_count += osdmap->get_pg_num(*i); - } - - mds->objecter->put_osdmap_read(); + mds->objecter->with_osdmap([&](const OSDMap& o) { + // Number of PGs across all data pools + const std::set &data_pools = mds->mdsmap->get_data_pools(); + for (const auto dp : data_pools) { + if (o.get_pg_pool(dp) == NULL) { + // It is possible that we have an older OSDMap than MDSMap, + // because we don't start watching every OSDMap until after + // MDSRank is initialized + dout(4) << __func__ << " data pool " << dp + << " not found in OSDMap" << dendl; + continue; + } + pg_count += o.get_pg_num(dp); + } + }); uint64_t mds_count = mds->mdsmap->get_max_mds(); // Work out a limit based on n_pgs / n_mdss, multiplied by the user's // preference for how many ops per PG - max_purge_ops = uint64_t(((double)pg_count / (double)mds_count) * g_conf->mds_max_purge_ops_per_pg); + max_purge_ops = uint64_t(((double)pg_count / (double)mds_count) * + g_conf->mds_max_purge_ops_per_pg); // User may also specify a hard limit, apply this if so. if (g_conf->mds_max_purge_ops) { diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 80e4db4dd2c4..f7de0dedffb5 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -16,6 +16,7 @@ #ifndef CEPH_DISPATCHER_H #define CEPH_DISPATCHER_H +#include "include/assert.h" #include "include/buffer_fwd.h" #include "include/assert.h" diff --git a/src/osdc/Filer.cc b/src/osdc/Filer.cc index 0a9d6e9a47cc..bf5f23c131b1 100644 --- a/src/osdc/Filer.cc +++ b/src/osdc/Filer.cc @@ -13,6 +13,8 @@ */ +#include + #include "Filer.h" #include "osd/OSDMap.h" #include "Striper.h" @@ -49,13 +51,13 @@ public: bool probe_complete; { - probe->lock.Lock(); + Probe::unique_lock pl(probe->lock); if (r != 0) { probe->err = r; } - probe_complete = filer->_probed(probe, oid, size, mtime); - assert(!probe->lock.is_locked_by_me()); + probe_complete = filer->_probed(probe, oid, size, mtime, pl); + assert(!pl.owns_lock()); } if (probe_complete) { probe->onfinish->complete(probe->err); @@ -128,9 +130,9 @@ int Filer::probe_impl(Probe* probe, ceph_file_layout *layout, probe->probing_off -= probe->probing_len; } - probe->lock.Lock(); - _probe(probe); - assert(!probe->lock.is_locked_by_me()); + Probe::unique_lock pl(probe->lock); + _probe(probe, pl); + assert(!pl.owns_lock()); return 0; } @@ -140,9 +142,9 @@ int Filer::probe_impl(Probe* probe, ceph_file_layout *layout, /** * probe->lock must be initially locked, this function will release it */ -void Filer::_probe(Probe *probe) +void Filer::_probe(Probe *probe, Probe::unique_lock& pl) { - assert(probe->lock.is_locked_by_me()); + assert(pl.owns_lock() && pl.mutex() == &probe->lock); ldout(cct, 10) << "_probe " << hex << probe->ino << dec << " " << probe->probing_off << "~" << probe->probing_len @@ -163,7 +165,7 @@ void Filer::_probe(Probe *probe) stat_extents.push_back(*p); } - probe->lock.Unlock(); + pl.unlock(); for (std::vector::iterator i = stat_extents.begin(); i != stat_extents.end(); ++i) { C_Probe *c = new C_Probe(this, probe, i->oid); @@ -179,9 +181,9 @@ void Filer::_probe(Probe *probe) * @return true if probe is complete and Probe object may be freed. */ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, - ceph::real_time mtime) + ceph::real_time mtime, Probe::unique_lock& pl) { - assert(probe->lock.is_locked_by_me()); + assert(pl.owns_lock() && pl.mutex() == &probe->lock); ldout(cct, 10) << "_probed " << probe->ino << " object " << oid << " has size " << size << " mtime " << mtime << dendl; @@ -194,12 +196,12 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, probe->ops.erase(oid); if (!probe->ops.empty()) { - probe->lock.Unlock(); + pl.unlock(); return false; // waiting for more! } if (probe->err) { // we hit an error, propagate back up - probe->lock.Unlock(); + pl.unlock(); return true; } @@ -278,8 +280,8 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, probe->probing_len = period; probe->probing_off -= period; } - _probe(probe); - assert(!probe->lock.is_locked_by_me()); + _probe(probe, pl); + assert(!pl.owns_lock()); return false; } else if (probe->pmtime) { ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl; @@ -289,7 +291,7 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, *probe->pumtime = ceph::real_clock::to_ceph_timespec(probe->max_mtime); } // done! - probe->lock.Unlock(); + pl.unlock(); return true; } @@ -297,7 +299,9 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, // ----------------------- struct PurgeRange { - Mutex lock; + std::mutex lock; + typedef std::lock_guard lock_guard; + typedef std::unique_lock unique_lock; inodeno_t ino; ceph_file_layout layout; SnapContext snapc; @@ -309,9 +313,8 @@ struct PurgeRange { PurgeRange(inodeno_t i, ceph_file_layout& l, const SnapContext& sc, uint64_t fo, uint64_t no, ceph::real_time t, int fl, Context *fin) - : lock("Filer::PurgeRange"), ino(i), layout(l), snapc(sc), - first(fo), num(no), mtime(t), flags(fl), oncommit(fin), - uncommitted(0) {} + : ino(i), layout(l), snapc(sc), first(fo), num(no), mtime(t), flags(fl), + oncommit(fin), uncommitted(0) {} }; int Filer::purge_range(inodeno_t ino, @@ -327,9 +330,7 @@ int Filer::purge_range(inodeno_t ino, // single object? easy! if (num_obj == 1) { object_t oid = file_object_t(ino, first_obj); - const OSDMap *osdmap = objecter->get_osdmap_read(); - object_locator_t oloc = osdmap->file_to_object_locator(*layout); - objecter->put_osdmap_read(); + object_locator_t oloc = OSDMap::file_to_object_locator(*layout); objecter->remove(oid, oloc, snapc, mtime, flags, NULL, oncommit); return 0; } @@ -352,7 +353,7 @@ struct C_PurgeRange : public Context { void Filer::_do_purge_range(PurgeRange *pr, int fin) { - pr->lock.Lock(); + PurgeRange::unique_lock prl(pr->lock); pr->uncommitted -= fin; ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first << "~" << pr->num << " uncommitted " << pr->uncommitted @@ -360,7 +361,7 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin) if (pr->num == 0 && pr->uncommitted == 0) { pr->oncommit->complete(0); - pr->lock.Unlock(); + prl.unlock(); delete pr; return; } @@ -375,15 +376,11 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin) pr->num--; max--; } - pr->lock.Unlock(); + prl.unlock(); // Issue objecter ops outside pr->lock to avoid lock dependency loop - for (std::vector::iterator i = remove_oids.begin(); - i != remove_oids.end(); ++i) { - const object_t oid = *i; - const OSDMap *osdmap = objecter->get_osdmap_read(); - const object_locator_t oloc = osdmap->file_to_object_locator(pr->layout); - objecter->put_osdmap_read(); + for (const auto& oid : remove_oids) { + object_locator_t oloc = OSDMap::file_to_object_locator(pr->layout); objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags, NULL, new C_OnFinisher(new C_PurgeRange(this, pr), finisher)); } diff --git a/src/osdc/Filer.h b/src/osdc/Filer.h index c2359c7cabd5..0c7b862de033 100644 --- a/src/osdc/Filer.h +++ b/src/osdc/Filer.h @@ -27,6 +27,8 @@ */ +#include + #include "include/types.h" #include "common/ceph_time.h" @@ -50,7 +52,9 @@ class Filer { // probes struct Probe { - Mutex lock; + std::mutex lock; + typedef std::lock_guard lock_guard; + typedef std::unique_lock unique_lock; inodeno_t ino; ceph_file_layout layout; snapid_t snapid; @@ -79,7 +83,7 @@ class Filer { Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn, uint64_t f, uint64_t *e, ceph::real_time *m, int fl, bool fw, Context *c) : - lock("Filer::Probe"), ino(i), layout(l), snapid(sn), + ino(i), layout(l), snapid(sn), psize(e), pmtime(m), pumtime(nullptr), flags(fl), fwd(fw), onfinish(c), probing_off(f), probing_len(0), err(0), found_size(false) {} @@ -87,7 +91,7 @@ class Filer { Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn, uint64_t f, uint64_t *e, utime_t *m, int fl, bool fw, Context *c) : - lock("Filer::Probe"), ino(i), layout(l), snapid(sn), + ino(i), layout(l), snapid(sn), psize(e), pmtime(nullptr), pumtime(m), flags(fl), fwd(fw), onfinish(c), probing_off(f), probing_len(0), err(0), found_size(false) {} @@ -95,9 +99,9 @@ class Filer { class C_Probe; - void _probe(Probe *p); + void _probe(Probe *p, Probe::unique_lock& pl); bool _probed(Probe *p, const object_t& oid, uint64_t size, - ceph::real_time mtime); + ceph::real_time mtime, Probe::unique_lock& pl); public: Filer(const Filer& other); diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index f2dd275aff08..487bb60b5a63 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -31,7 +31,7 @@ using std::chrono::seconds; void Journaler::set_readonly() { - Mutex::Locker l(lock); + lock_guard l(lock); ldout(cct, 1) << "set_readonly" << dendl; readonly = true; @@ -39,7 +39,7 @@ void Journaler::set_readonly() void Journaler::set_writeable() { - Mutex::Locker l(lock); + lock_guard l(lock); ldout(cct, 1) << "set_writeable" << dendl; readonly = false; @@ -47,7 +47,7 @@ void Journaler::set_writeable() void Journaler::create(ceph_file_layout *l, stream_format_t const sf) { - Mutex::Locker lk(lock); + lock_guard lk(lock); assert(!readonly); state = STATE_ACTIVE; @@ -67,7 +67,7 @@ void Journaler::create(ceph_file_layout *l, stream_format_t const sf) void Journaler::set_layout(ceph_file_layout const *l) { - Mutex::Locker lk(lock); + lock_guard lk(lock); _set_layout(l); } @@ -145,7 +145,7 @@ public: void Journaler::recover(Context *onread) { - Mutex::Locker l(lock); + lock_guard l(lock); if (stopping) { onread->complete(-EAGAIN); return; @@ -171,7 +171,7 @@ void Journaler::recover(Context *onread) void Journaler::_read_head(Context *on_finish, bufferlist *bl) { - assert(lock.is_locked_by_me()); + // lock is locked assert(state == STATE_READHEAD || state == STATE_REREADHEAD); object_t oid = file_object_t(ino, 0); @@ -181,7 +181,7 @@ void Journaler::_read_head(Context *on_finish, bufferlist *bl) void Journaler::reread_head(Context *onfinish) { - Mutex::Locker l(lock); + lock_guard l(lock); _reread_head(wrap_finisher(onfinish)); } @@ -205,7 +205,7 @@ void Journaler::_reread_head(Context *onfinish) void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) { - Mutex::Locker l(lock); + lock_guard l(lock); //read on-disk header into assert(bl.length() || r < 0 ); @@ -233,7 +233,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) void Journaler::_finish_read_head(int r, bufferlist& bl) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(state == STATE_READHEAD); @@ -301,7 +301,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) void Journaler::_probe(Context *finish, uint64_t *end) { - assert(lock.is_locked_by_me()); + // lock is locked ldout(cct, 1) << "probing for end of the log" << dendl; assert(state == STATE_PROBING || state == STATE_REPROBING); // probe the log @@ -323,7 +323,7 @@ void Journaler::_reprobe(C_OnFinisher *finish) void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(new_end >= write_pos || r < 0); ldout(cct, 1) << "_finish_reprobe new_end = " << new_end @@ -336,7 +336,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, void Journaler::_finish_probe_end(int r, uint64_t end) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(state == STATE_PROBING); if (r < 0) { // error in probing @@ -379,7 +379,7 @@ public: void Journaler::reread_head_and_probe(Context *onfinish) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(state == STATE_ACTIVE); _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish))); @@ -388,7 +388,7 @@ void Journaler::reread_head_and_probe(Context *onfinish) void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) { // Expect to be called back from finish_reread_head, which already takes lock - assert(lock.is_locked_by_me()); + // lock is locked assert(!r); //if we get an error, we're boned _reprobe(onfinish); @@ -411,7 +411,7 @@ public: void Journaler::write_head(Context *oncommit) { - Mutex::Locker l(lock); + lock_guard l(lock); _write_head(oncommit); } @@ -449,7 +449,7 @@ void Journaler::_write_head(Context *oncommit) void Journaler::_finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit) { - Mutex::Locker l(lock); + lock_guard l(lock); if (r < 0) { lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl; @@ -483,7 +483,7 @@ public: void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(!readonly); if (r < 0) { @@ -529,7 +529,7 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) uint64_t Journaler::append_entry(bufferlist& bl) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(!readonly); uint32_t s = bl.length(); @@ -659,7 +659,7 @@ void Journaler::_do_flush(unsigned amount) void Journaler::wait_for_flush(Context *onsafe) { - Mutex::Locker l(lock); + lock_guard l(lock); if (stopping) { onsafe->complete(-EAGAIN); return; @@ -692,7 +692,7 @@ void Journaler::_wait_for_flush(Context *onsafe) void Journaler::flush(Context *onsafe) { - Mutex::Locker l(lock); + lock_guard l(lock); _flush(wrap_finisher(onsafe)); } @@ -794,7 +794,7 @@ void Journaler::_issue_prezero() // lock before calling into objecter to do I/O. void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) { - Mutex::Locker l(lock); + lock_guard l(lock); ldout(cct, 10) << "_prezeroed to " << start << "~" << len << ", prezeroing/prezero was " << prezeroing_pos << "/" @@ -853,7 +853,7 @@ public: void finish(int r) { // Should only be called from waitfor_safe i.e. already inside lock - assert(ls->lock.is_locked_by_me()); + // (ls->lock is locked ls->_prefetch(); } }; @@ -861,7 +861,7 @@ public: void Journaler::_finish_read(int r, uint64_t offset, uint64_t length, bufferlist& bl) { - Mutex::Locker l(lock); + lock_guard l(lock); if (r < 0) { ldout(cct, 0) << "_finish_read got error " << r << dendl; @@ -1086,7 +1086,7 @@ bool Journaler::_is_readable() */ bool Journaler::is_readable() { - Mutex::Locker l(lock); + lock_guard l(lock); if (error != 0) { return false; @@ -1113,7 +1113,7 @@ class Journaler::C_EraseFinish : public Context { */ void Journaler::erase(Context *completion) { - Mutex::Locker l(lock); + lock_guard l(lock); // Async delete the journal data uint64_t first = trimmed_pos / get_layout_period(); @@ -1131,7 +1131,7 @@ void Journaler::erase(Context *completion) void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) { - Mutex::Locker l(lock); + lock_guard l(lock); if (data_result == 0) { // Async delete the journal header @@ -1150,7 +1150,7 @@ void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) */ bool Journaler::try_read_entry(bufferlist& bl) { - Mutex::Locker l(lock); + lock_guard l(lock); if (!readable) { ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" @@ -1192,7 +1192,7 @@ bool Journaler::try_read_entry(bufferlist& bl) void Journaler::wait_for_readable(Context *onreadable) { - Mutex::Locker l(lock); + lock_guard l(lock); if (stopping) { onreadable->complete(-EAGAIN); return; @@ -1227,7 +1227,7 @@ public: void Journaler::trim() { - Mutex::Locker l(lock); + lock_guard l(lock); _trim(); } @@ -1273,7 +1273,7 @@ void Journaler::_trim() void Journaler::_finish_trim(int r, uint64_t to) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(!readonly); ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos @@ -1295,7 +1295,7 @@ void Journaler::_finish_trim(int r, uint64_t to) void Journaler::handle_write_error(int r) { - assert(lock.is_locked_by_me()); + // lock is locked lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl; if (on_write_error) { @@ -1455,7 +1455,7 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to, * @param c callback/context to trigger on error */ void Journaler::set_write_error_handler(Context *c) { - Mutex::Locker l(lock); + lock_guard l(lock); assert(!on_write_error); on_write_error = wrap_finisher(c); called_write_error = false; @@ -1479,7 +1479,7 @@ C_OnFinisher *Journaler::wrap_finisher(Context *c) void Journaler::shutdown() { - Mutex::Locker l(lock); + lock_guard l(lock); ldout(cct, 1) << __func__ << dendl; diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index c12bc784e6fb..da397f6391b9 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -215,7 +215,9 @@ public: private: // me CephContext *cct; - Mutex lock; + std::mutex lock; + typedef std::lock_guard lock_guard; + typedef std::unique_lock unique_lock; Finisher *finisher; Header last_written; inodeno_t ino; @@ -252,7 +254,7 @@ private: void _do_delayed_flush() { assert(delay_flush_event != NULL); - Mutex::Locker l(lock); + lock_guard l(lock); delay_flush_event = NULL; _do_flush(); } @@ -399,8 +401,7 @@ public: Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f) : last_committed(mag), - cct(obj->cct), lock("Journaler"), finisher(f), - last_written(mag), + cct(obj->cct), finisher(f), last_written(mag), ino(ino_), pg_pool(pool), readonly(true), stream_format(-1), journal_stream(-1), magic(mag), @@ -425,7 +426,7 @@ public: * "erase" method. */ void reset() { - Mutex::Locker l(lock); + lock_guard l(lock); assert(state == STATE_ACTIVE); readonly = true; @@ -466,11 +467,11 @@ public: void set_readonly(); void set_writeable(); void set_write_pos(int64_t p) { - Mutex::Locker l(lock); + lock_guard l(lock); prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; } void set_read_pos(int64_t p) { - Mutex::Locker l(lock); + lock_guard l(lock); // we can't cope w/ in-progress read right now. assert(requested_pos == received_pos); read_pos = requested_pos = received_pos = p; @@ -478,17 +479,17 @@ public: } uint64_t append_entry(bufferlist& bl); void set_expire_pos(int64_t ep) { - Mutex::Locker l(lock); + lock_guard l(lock); expire_pos = ep; } void set_trimmed_pos(int64_t p) { - Mutex::Locker l(lock); + lock_guard l(lock); trimming_pos = trimmed_pos = p; } void trim(); void trim_tail() { - Mutex::Locker l(lock); + lock_guard l(lock); assert(!readonly); _issue_prezero(); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index cff49d488773..9ace054ca0c0 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -153,13 +153,18 @@ static const char *config_keys[] = { NULL }; -Mutex *Objecter::OSDSession::get_lock(object_t& oid) +Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock( + object_t& oid) { -#define HASH_PRIME 1021 + if (oid.name.empty()) + return unique_completion_lock(); + + static constexpr uint32_t HASH_PRIME = 1021; uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) % HASH_PRIME; - return completion_locks[h % num_locks]; + return unique_completion_lock(completion_locks[h % num_locks], + std::defer_lock); } const char** Objecter::get_tracked_conf_keys() const @@ -178,7 +183,7 @@ void Objecter::handle_conf_change(const struct md_config_t *conf, void Objecter::update_crush_location() { - RWLock::WLocker rwlocker(rwlock); + unique_lock wl(rwlock); std::multimap new_crush_location; vector lvec; get_str_vec(cct->_conf->crush_location, ";, \t", lvec); @@ -355,7 +360,7 @@ void Objecter::init() */ void Objecter::start() { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); start_tick(); if (osdmap->get_epoch() == 0) { @@ -367,7 +372,7 @@ void Objecter::shutdown() { assert(initialized.read()); - rwlock.get_write(); + unique_lock wl(rwlock); initialized.set(0); @@ -423,7 +428,7 @@ void Objecter::shutdown() ldout(cct, 10) << " linger_op " << i->first << dendl; LingerOp *lop = i->second; { - RWLock::WLocker wl(homeless_session->lock); + OSDSession::unique_lock swl(homeless_session->lock); _session_linger_op_remove(homeless_session, lop); } linger_ops.erase(lop->linger_id); @@ -436,7 +441,7 @@ void Objecter::shutdown() ldout(cct, 10) << " op " << i->first << dendl; Op *op = i->second; { - RWLock::WLocker wl(homeless_session->lock); + OSDSession::unique_lock swl(homeless_session->lock); _session_op_remove(homeless_session, op); } op->put(); @@ -448,7 +453,7 @@ void Objecter::shutdown() ldout(cct, 10) << " command_op " << i->first << dendl; CommandOp *cop = i->second; { - RWLock::WLocker wl(homeless_session->lock); + OSDSession::unique_lock swl(homeless_session->lock); _session_command_op_remove(homeless_session, cop); } cop->put(); @@ -475,18 +480,17 @@ void Objecter::shutdown() } // Let go of Objecter write lock so timer thread can shutdown - rwlock.unlock(); + wl.unlock(); } -void Objecter::_send_linger(LingerOp *info) +void Objecter::_send_linger(LingerOp *info, + shunique_lock& sul) { - assert(rwlock.is_wlocked()); - - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); + assert(sul.owns_lock() && sul.mutex() == &rwlock); vector opv; Context *oncommit = NULL; - info->watch_lock.get_read(); // just to read registered status + LingerOp::shared_lock watchl(info->watch_lock); bufferlist *poutbl = NULL; if (info->registered && info->is_watch) { ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" @@ -508,7 +512,7 @@ void Objecter::_send_linger(LingerOp *info) } oncommit = c; } - info->watch_lock.put_read(); + watchl.unlock(); Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv, info->target.flags | CEPH_OSD_FLAG_READ, NULL, NULL, @@ -527,18 +531,18 @@ void Objecter::_send_linger(LingerOp *info) if (info->register_tid) { // repeat send. cancel old registeration op, if any. - info->session->lock.get_write(); + OSDSession::unique_lock sl(info->session->lock); if (info->session->ops.count(info->register_tid)) { Op *o = info->session->ops[info->register_tid]; _op_cancel_map_check(o); _cancel_linger_op(o); } - info->session->lock.unlock(); + sl.unlock(); - info->register_tid = _op_submit(o, lc); + info->register_tid = _op_submit(o, sul); } else { // first send - info->register_tid = _op_submit_with_budget(o, lc); + info->register_tid = _op_submit_with_budget(o, sul); } logger->inc(l_osdc_linger_send); @@ -546,7 +550,7 @@ void Objecter::_send_linger(LingerOp *info) void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl) { - RWLock::WLocker wl(info->watch_lock); + LingerOp::unique_lock wl(info->watch_lock); ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl; if (info->on_reg_commit) { info->on_reg_commit->complete(r); @@ -580,9 +584,9 @@ struct C_DoWatchError : public Context { info->_queued_async(); } void finish(int r) { - objecter->rwlock.get_read(); + Objecter::unique_lock wl(objecter->rwlock); bool canceled = info->canceled; - objecter->rwlock.put_read(); + wl.unlock(); if (!canceled) { info->watch_context->handle_error(info->get_cookie(), err); @@ -609,7 +613,7 @@ void Objecter::_linger_reconnect(LingerOp *info, int r) ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r << " (last_error " << info->last_error << ")" << dendl; if (r < 0) { - info->watch_lock.get_write(); + LingerOp::unique_lock wl(info->watch_lock); if (!info->last_error) { r = _normalize_watch_error(r); info->last_error = r; @@ -618,14 +622,14 @@ void Objecter::_linger_reconnect(LingerOp *info, int r) _linger_callback_queue(); } } - info->watch_lock.put_write(); + wl.unlock(); } } void Objecter::_send_linger_ping(LingerOp *info) { - assert(rwlock.is_locked()); - assert(info->session->lock.is_locked()); + // rwlock is locked unique + // info->session->lock is locked if (cct->_conf->objecter_inject_no_watch_ping) { ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING" @@ -667,7 +671,7 @@ void Objecter::_send_linger_ping(LingerOp *info) void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent, uint32_t register_gen) { - RWLock::WLocker l(info->watch_lock); + LingerOp::unique_lock l(info->watch_lock); ldout(cct, 10) << __func__ << " " << info->linger_id << " sent " << sent << " gen " << register_gen << " = " << r << " (last_error " << info->last_error @@ -690,7 +694,7 @@ void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent, int Objecter::linger_check(LingerOp *info) { - RWLock::RLocker l(info->watch_lock); + LingerOp::shared_lock l(info->watch_lock); mono_time stamp = info->watch_valid_thru; if (!info->watch_pending_async.empty()) @@ -707,20 +711,20 @@ int Objecter::linger_check(LingerOp *info) void Objecter::linger_cancel(LingerOp *info) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); _linger_cancel(info); info->put(); } void Objecter::_linger_cancel(LingerOp *info) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl; if (!info->canceled) { OSDSession *s = info->session; - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); _session_linger_op_remove(s, info); - s->lock.unlock(); + sl.unlock(); linger_ops.erase(info->linger_id); linger_ops_set.erase(info); @@ -747,7 +751,7 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid, info->target.flags = flags; info->watch_valid_thru = mono_clock::now(); - RWLock::WLocker l(rwlock); + unique_lock l(rwlock); // Acquire linger ID info->linger_id = ++max_linger_id; @@ -781,8 +785,8 @@ ceph_tid_t Objecter::linger_watch(LingerOp *info, info->pobjver = objver; info->on_reg_commit = oncommit; - RWLock::WLocker wl(rwlock); - _linger_submit(info); + shunique_lock sul(rwlock, ceph::acquire_unique); + _linger_submit(info, sul); logger->inc(l_osdc_linger_active); return info->linger_id; @@ -803,18 +807,16 @@ ceph_tid_t Objecter::linger_notify(LingerOp *info, info->pobjver = objver; info->on_reg_commit = onfinish; - RWLock::WLocker wl(rwlock); - _linger_submit(info); + shunique_lock sul(rwlock, ceph::acquire_unique); + _linger_submit(info, sul); logger->inc(l_osdc_linger_active); return info->linger_id; } -void Objecter::_linger_submit(LingerOp *info) +void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul) { - assert(rwlock.is_wlocked()); - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); - + assert(sul.owns_lock() && sul.mutex() == &rwlock); assert(info->linger_id); // Populate Op::target @@ -822,14 +824,14 @@ void Objecter::_linger_submit(LingerOp *info) _calc_target(&info->target, &info->last_force_resend); // Create LingerOp<->OSDSession relation - int r = _get_session(info->target.osd, &s, lc); + int r = _get_session(info->target.osd, &s, sul); assert(r == 0); - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); _session_linger_op_assign(s, info); - s->lock.unlock(); + sl.unlock(); put_session(s); - _send_linger(info); + _send_linger(info, sul); } struct C_DoWatchNotify : public Context { @@ -849,7 +851,7 @@ struct C_DoWatchNotify : public Context { void Objecter::handle_watch_notify(MWatchNotify *m) { - RWLock::RLocker l(rwlock); + shared_lock l(rwlock); if (!initialized.read()) { return; } @@ -859,7 +861,7 @@ void Objecter::handle_watch_notify(MWatchNotify *m) ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl; return; } - RWLock::WLocker wl(info->watch_lock); + LingerOp::unique_lock wl(info->watch_lock); if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) { if (!info->last_error) { info->last_error = -ENOTCONN; @@ -894,11 +896,11 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m) { ldout(cct, 10) << __func__ << " " << *m << dendl; - rwlock.get_read(); + shared_lock l(rwlock); assert(initialized.read()); if (info->canceled) { - rwlock.put_read(); + l.unlock(); goto out; } @@ -907,7 +909,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m) assert(info->watch_context); assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT); - rwlock.put_read(); + l.unlock(); switch (m->opcode) { case CEPH_WATCH_EVENT_NOTIFY: @@ -976,15 +978,14 @@ void Objecter::_scan_requests(OSDSession *s, map *pool_full_map, map& need_resend, list& need_resend_linger, - map& need_resend_command) + map& need_resend_command, + shunique_lock& sul) { - assert(rwlock.is_wlocked()); + assert(sul.owns_lock() && sul.mutex() == &rwlock); list unregister_lingers; - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); - - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); // check for changed linger mappings (_before_ regular ops) map::iterator lp = s->linger_ops.begin(); @@ -996,7 +997,7 @@ void Objecter::_scan_requests(OSDSession *s, ++lp; ldout(cct, 10) << " checking linger op " << op->linger_id << dendl; bool unregister, force_resend_writes = cluster_full; - int r = _recalc_linger_op_target(op, lc); + int r = _recalc_linger_op_target(op, sul); if (pool_full_map) force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool]; @@ -1046,7 +1047,7 @@ void Objecter::_scan_requests(OSDSession *s, _op_cancel_map_check(op); break; case RECALC_OP_TARGET_POOL_DNE: - _check_op_pool_dne(op, true); + _check_op_pool_dne(op, sl); break; } } @@ -1061,7 +1062,7 @@ void Objecter::_scan_requests(OSDSession *s, if (pool_full_map) force_resend_writes = force_resend_writes || (*pool_full_map)[c->target_pg.pool()]; - int r = _calc_command_target(c); + int r = _calc_command_target(c, sul); switch (r) { case RECALC_OP_TARGET_NO_ACTION: // resend if skipped map; otherwise do nothing. @@ -1083,7 +1084,7 @@ void Objecter::_scan_requests(OSDSession *s, } } - s->lock.unlock(); + sl.unlock(); for (list::iterator iter = unregister_lingers.begin(); iter != unregister_lingers.end(); @@ -1095,7 +1096,7 @@ void Objecter::_scan_requests(OSDSession *s, void Objecter::handle_osd_map(MOSDMap *m) { - RWLock::WLocker wl(rwlock); + shunique_lock sul(rwlock, acquire_unique); if (!initialized.read()) return; @@ -1171,7 +1172,7 @@ void Objecter::handle_osd_map(MOSDMap *m) update_pool_full_map(pool_full_map); _scan_requests(homeless_session, skipped_map, cluster_full, &pool_full_map, need_resend, - need_resend_linger, need_resend_command); + need_resend_linger, need_resend_command, sul); // osd addr changes? for (map::iterator p = osd_sessions.begin(); @@ -1179,7 +1180,7 @@ void Objecter::handle_osd_map(MOSDMap *m) OSDSession *s = p->second; _scan_requests(s, skipped_map, cluster_full, &pool_full_map, need_resend, - need_resend_linger, need_resend_command); + need_resend_linger, need_resend_command, sul); ++p; if (!osdmap->is_up(s->osd) || (s->con && @@ -1198,7 +1199,7 @@ void Objecter::handle_osd_map(MOSDMap *m) p != osd_sessions.end(); ++p) { OSDSession *s = p->second; _scan_requests(s, false, false, NULL, need_resend, - need_resend_linger, need_resend_command); + need_resend_linger, need_resend_command, sul); } ldout(cct, 3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl; @@ -1206,7 +1207,7 @@ void Objecter::handle_osd_map(MOSDMap *m) _scan_requests(homeless_session, false, false, NULL, need_resend, need_resend_linger, - need_resend_command); + need_resend_command, sul); } else { ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting" << dendl; @@ -1226,8 +1227,6 @@ void Objecter::handle_osd_map(MOSDMap *m) _maybe_request_map(); } - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); - // resend requests for (map::iterator p = need_resend.begin(); p != need_resend.end(); ++p) { @@ -1235,13 +1234,13 @@ void Objecter::handle_osd_map(MOSDMap *m) OSDSession *s = op->session; bool mapped_session = false; if (!s) { - int r = _map_session(&op->target, &s, lc); + int r = _map_session(&op->target, &s, sul); assert(r == 0); mapped_session = true; } else { get_session(s); } - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); if (mapped_session) { _session_op_assign(s, op); } @@ -1254,7 +1253,7 @@ void Objecter::handle_osd_map(MOSDMap *m) _op_cancel_map_check(op); _cancel_linger_op(op); } - s->lock.unlock(); + sl.unlock(); put_session(s); } for (list::iterator p = need_resend_linger.begin(); @@ -1263,7 +1262,7 @@ void Objecter::handle_osd_map(MOSDMap *m) if (!op->session) { _calc_target(&op->target, &op->last_force_resend); OSDSession *s = NULL; - int const r = _get_session(op->target.osd, &s, lc); + int const r = _get_session(op->target.osd, &s, sul); assert(r == 0); assert(s != NULL); op->session = s; @@ -1271,13 +1270,13 @@ void Objecter::handle_osd_map(MOSDMap *m) } if (!op->session->is_homeless()) { logger->inc(l_osdc_linger_resend); - _send_linger(op); + _send_linger(op, sul); } } for (map::iterator p = need_resend_command.begin(); p != need_resend_command.end(); ++p) { CommandOp *c = p->second; - _assign_command_session(c); + _assign_command_session(c, sul); if (c->session && !c->session->is_homeless()) { _send_command(c); } @@ -1316,7 +1315,7 @@ void Objecter::C_Op_Map_Latest::finish(int r) << "op_map_latest r=" << r << " tid=" << tid << " latest " << latest << dendl; - RWLock::WLocker wl(objecter->rwlock); + Objecter::unique_lock wl(objecter->rwlock); map::iterator iter = objecter->check_latest_map_ops.find(tid); @@ -1335,7 +1334,8 @@ void Objecter::C_Op_Map_Latest::finish(int r) if (op->map_dne_bound == 0) op->map_dne_bound = latest; - objecter->_check_op_pool_dne(op, false); + OSDSession::unique_lock sl(op->session->lock, defer_lock); + objecter->_check_op_pool_dne(op, sl); op->put(); } @@ -1343,7 +1343,7 @@ void Objecter::C_Op_Map_Latest::finish(int r) int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); const map& pools = osdmap->get_pools(); map::const_iterator iter = pools.find(poolid); @@ -1366,7 +1366,7 @@ int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); const map& pools = osdmap->get_pools(); map::const_iterator iter = pools.find(poolid); @@ -1384,7 +1384,7 @@ int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, int Objecter::pool_snap_list(int64_t poolid, vector *snaps) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); const pg_pool_t *pi = osdmap->get_pg_pool(poolid); if (!pi) @@ -1397,9 +1397,10 @@ int Objecter::pool_snap_list(int64_t poolid, vector *snaps) return 0; } -void Objecter::_check_op_pool_dne(Op *op, bool session_locked) +// sl may be unlocked. +void Objecter::_check_op_pool_dne(Op *op, unique_lock& sl) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique if (op->attempts) { // we send a reply earlier, which means that previously the pool @@ -1432,13 +1433,15 @@ void Objecter::_check_op_pool_dne(Op *op, bool session_locked) OSDSession *s = op->session; assert(s != NULL); + assert(sl.mutex() == &s->lock); + bool session_locked = sl.owns_lock(); if (!session_locked) { - s->lock.get_write(); + sl.lock(); } _finish_op(op, 0); if (!session_locked) { - s->lock.unlock(); + sl.unlock(); } } } else { @@ -1448,7 +1451,7 @@ void Objecter::_check_op_pool_dne(Op *op, bool session_locked) void Objecter::_send_op_map_check(Op *op) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique // ask the monitor if (check_latest_map_ops.count(op->tid) == 0) { op->get(); @@ -1460,7 +1463,7 @@ void Objecter::_send_op_map_check(Op *op) void Objecter::_op_cancel_map_check(Op *op) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique map::iterator iter = check_latest_map_ops.find(op->tid); if (iter != check_latest_map_ops.end()) { @@ -1479,7 +1482,7 @@ void Objecter::C_Linger_Map_Latest::finish(int r) return; } - RWLock::WLocker wl(objecter->rwlock); + unique_lock wl(objecter->rwlock); map::iterator iter = objecter->check_latest_map_lingers.find(linger_id); @@ -1505,7 +1508,7 @@ void Objecter::C_Linger_Map_Latest::finish(int r) void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique *need_unregister = false; @@ -1545,7 +1548,7 @@ void Objecter::_send_linger_map_check(LingerOp *op) void Objecter::_linger_cancel_map_check(LingerOp *op) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique map::iterator iter = check_latest_map_lingers.find(op->linger_id); @@ -1565,7 +1568,7 @@ void Objecter::C_Command_Map_Latest::finish(int r) return; } - RWLock::WLocker wl(objecter->rwlock); + unique_lock wl(objecter->rwlock); map::iterator iter = objecter->check_latest_map_commands.find(tid); @@ -1586,7 +1589,7 @@ void Objecter::C_Command_Map_Latest::finish(int r) void Objecter::_check_command_map_dne(CommandOp *c) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique ldout(cct, 10) << "_check_command_map_dne tid " << c->tid << " current " << osdmap->get_epoch() @@ -1603,7 +1606,7 @@ void Objecter::_check_command_map_dne(CommandOp *c) void Objecter::_send_command_map_check(CommandOp *c) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique // ask the monitor if (check_latest_map_commands.count(c->tid) == 0) { @@ -1616,7 +1619,7 @@ void Objecter::_send_command_map_check(CommandOp *c) void Objecter::_command_cancel_map_check(CommandOp *c) { - assert(rwlock.is_wlocked()); + // rwlock is locked uniqe map::iterator iter = check_latest_map_commands.find(c->tid); @@ -1634,9 +1637,9 @@ void Objecter::_command_cancel_map_check(CommandOp *c) * @returns 0 on success, or -EAGAIN if the lock context requires * promotion to write. */ -int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) +int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul) { - assert(rwlock.is_locked()); + assert(sul && sul.mutex() == &rwlock); if (osd < 0) { *session = homeless_session; @@ -1654,7 +1657,7 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) << s->get_nref() << dendl; return 0; } - if (!lc.is_wlocked()) { + if (!sul.owns_lock()) { return -EAGAIN; } OSDSession *s = new OSDSession(cct, osd); @@ -1691,7 +1694,7 @@ void Objecter::get_session(Objecter::OSDSession *s) void Objecter::_reopen_session(OSDSession *s) { - assert(s->lock.is_locked()); + // s->lock is locked entity_inst_t inst = osdmap->get_inst(s->osd); ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " @@ -1707,14 +1710,14 @@ void Objecter::_reopen_session(OSDSession *s) void Objecter::close_session(OSDSession *s) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique ldout(cct, 10) << "close_session for osd." << s->osd << dendl; if (s->con) { s->con->mark_down(); logger->inc(l_osdc_osd_session_close); } - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); std::list homeless_lingers; std::list homeless_commands; @@ -1742,12 +1745,12 @@ void Objecter::close_session(OSDSession *s) } osd_sessions.erase(s->osd); - s->lock.unlock(); + sl.unlock(); put_session(s); // Assign any leftover ops to the homeless session { - RWLock::WLocker wl(homeless_session->lock); + OSDSession::unique_lock hsl(homeless_session->lock); for (std::list::iterator i = homeless_lingers.begin(); i != homeless_lingers.end(); ++i) { _session_linger_op_assign(homeless_session, *i); @@ -1767,19 +1770,20 @@ void Objecter::close_session(OSDSession *s) void Objecter::wait_for_osd_map() { - rwlock.get_write(); + unique_lock l(rwlock); if (osdmap->get_epoch()) { - rwlock.put_write(); + l.unlock(); return; } + // Leave this since it goes with C_SafeCond Mutex lock(""); Cond cond; bool done; lock.Lock(); C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL); waiting_for_map[0].push_back(pair(context, 0)); - rwlock.put_write(); + l.unlock(); while (!done) cond.Wait(lock); lock.Unlock(); @@ -1812,14 +1816,14 @@ void Objecter::wait_for_latest_osdmap(Context *fin) void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); _get_latest_version(oldest, newest, fin); } void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, Context *fin) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique if (osdmap->get_epoch() >= newest) { ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl; if (fin) @@ -1833,13 +1837,13 @@ void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, void Objecter::maybe_request_map() { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); _maybe_request_map(); } void Objecter::_maybe_request_map() { - assert(rwlock.is_locked()); + // rwlock is locked int flag = 0; if (_osdmap_full_flag() || osdmap->test_flag(CEPH_OSDMAP_PAUSERD) @@ -1859,7 +1863,7 @@ void Objecter::_maybe_request_map() void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique waiting_for_map[epoch].push_back(pair(c, err)); _maybe_request_map(); } @@ -1877,7 +1881,7 @@ void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err) */ bool Objecter::have_map(const epoch_t epoch) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); if (osdmap->get_epoch() >= epoch) { return true; } else { @@ -1887,7 +1891,7 @@ bool Objecter::have_map(const epoch_t epoch) bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); if (osdmap->get_epoch() >= epoch) { return true; } @@ -1900,19 +1904,19 @@ void Objecter::kick_requests(OSDSession *session) ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl; map lresend; - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); - session->lock.get_write(); + OSDSession::unique_lock sl(session->lock); _kick_requests(session, lresend); - session->lock.unlock(); + sl.unlock(); - _linger_ops_resend(lresend); + _linger_ops_resend(lresend, wl); } void Objecter::_kick_requests(OSDSession *session, map& lresend) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique // resend ops map resend; // resend in tid order @@ -1958,18 +1962,20 @@ void Objecter::_kick_requests(OSDSession *session, } } -void Objecter::_linger_ops_resend(map& lresend) +void Objecter::_linger_ops_resend(map& lresend, + unique_lock& ul) { - assert(rwlock.is_wlocked()); - + assert(ul.owns_lock()); + shunique_lock sul(std::move(ul)); while (!lresend.empty()) { LingerOp *op = lresend.begin()->second; if (!op->canceled) { - _send_linger(op); + _send_linger(op, sul); } op->put(); lresend.erase(lresend.begin()); } + ul = unique_lock(sul.release_to_unique()); } void Objecter::start_tick() @@ -1982,7 +1988,7 @@ void Objecter::start_tick() void Objecter::tick() { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); ldout(cct, 10) << "tick" << dendl; @@ -2007,7 +2013,7 @@ void Objecter::tick() for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - RWLock::WLocker l(s->lock); + OSDSession::lock_guard l(s->lock); bool found = false; for (map::iterator p = s->ops.begin(); p != s->ops.end(); @@ -2025,7 +2031,7 @@ void Objecter::tick() p != s->linger_ops.end(); ++p) { LingerOp *op = p->second; - RWLock::WLocker wl(op->watch_lock); + LingerOp::unique_lock wl(op->watch_lock); assert(op->session); ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl; @@ -2071,7 +2077,7 @@ void Objecter::tick() void Objecter::resend_mon_ops() { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "resend_mon_ops" << dendl; @@ -2124,12 +2130,11 @@ void Objecter::resend_mon_ops() ceph_tid_t Objecter::op_submit(Op *op, int *ctx_budget) { - RWLock::RLocker rl(rwlock); - RWLock::Context lc(rwlock, RWLock::Context::TakenForRead); - return _op_submit_with_budget(op, lc, ctx_budget); + shunique_lock rl(rwlock, ceph::acquire_shared); + return _op_submit_with_budget(op, rl, ctx_budget); } -ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc, +ceph_tid_t Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul, int *ctx_budget) { assert(initialized.read()); @@ -2141,7 +2146,7 @@ ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc, // throttle. before we look at any state, because // _take_op_budget() may drop our lock while it blocks. if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) { - int op_budget = _take_op_budget(op); + int op_budget = _take_op_budget(op, sul); // take and pass out the budget for the first OP // in the context session if (ctx_budget && (*ctx_budget == -1)) { @@ -2158,7 +2163,7 @@ ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc, op_cancel(tid, -ETIMEDOUT); }); } - return _op_submit(op, lc); + return _op_submit(op, sul); } void Objecter::_send_op_account(Op *op) @@ -2240,9 +2245,9 @@ void Objecter::_send_op_account(Op *op) } } -ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) +ceph_tid_t Objecter::_op_submit(Op *op, shunique_lock& sul) { - assert(rwlock.is_locked()); + // rwlock is locked ldout(cct, 10) << __func__ << " op " << op << dendl; @@ -2255,18 +2260,20 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) == RECALC_OP_TARGET_POOL_DNE; // Try to get a session, including a retry if we need to take write lock - int r = _get_session(op->target.osd, &s, lc); + int r = _get_session(op->target.osd, &s, sul); if (r == -EAGAIN) { assert(s == NULL); - lc.promote(); - r = _get_session(op->target.osd, &s, lc); + sul.unlock(); + sul.lock(); + r = _get_session(op->target.osd, &s, sul); } assert(r == 0); assert(s); // may be homeless // We may need to take wlock if we will need to _set_op_map_check later. - if (check_for_latest_map && !lc.is_wlocked()) { - lc.promote(); + if (check_for_latest_map && sul.owns_lock_shared()) { + sul.unlock(); + sul.lock(); } _send_op_account(op); @@ -2314,7 +2321,7 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) m = _prepare_osd_op(op); } - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); if (op->tid == 0) op->tid = last_tid.inc(); _session_op_assign(s, op); @@ -2331,7 +2338,7 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) } op = NULL; - s->lock.unlock(); + sl.unlock(); put_session(s); ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() @@ -2344,7 +2351,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) { assert(initialized.read()); - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); map::iterator p = s->ops.find(tid); if (p == s->ops.end()) { @@ -2379,7 +2386,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) } _op_cancel_map_check(op); _finish_op(op, r); - s->lock.unlock(); + sl.unlock(); return 0; } @@ -2388,9 +2395,8 @@ int Objecter::op_cancel(ceph_tid_t tid, int r) { int ret = 0; - rwlock.get_write(); + unique_lock wl(rwlock); ret = _op_cancel(tid, r); - rwlock.unlock(); return ret; } @@ -2407,9 +2413,9 @@ start: for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); if (s->ops.find(tid) != s->ops.end()) { - s->lock.unlock(); + sl.unlock(); ret = op_cancel(s, tid, r); if (ret == -ENOENT) { /* oh no! raced, maybe tid moved to another session, restarting */ @@ -2417,16 +2423,15 @@ start: } return ret; } - s->lock.unlock(); } ldout(cct, 5) << __func__ << ": tid " << tid << " not found in live sessions" << dendl; // Handle case where the op is in homeless session - homeless_session->lock.get_read(); + OSDSession::shared_lock sl(homeless_session->lock); if (homeless_session->ops.find(tid) != homeless_session->ops.end()) { - homeless_session->lock.unlock(); + sl.unlock(); ret = op_cancel(homeless_session, tid, r); if (ret == -ENOENT) { /* oh no! raced, maybe tid moved to another session, restarting */ @@ -2435,7 +2440,7 @@ start: return ret; } } else { - homeless_session->lock.unlock(); + sl.unlock(); } ldout(cct, 5) << __func__ << ": tid " << tid @@ -2447,7 +2452,7 @@ start: epoch_t Objecter::op_cancel_writes(int r, int64_t pool) { - rwlock.get_write(); + unique_lock wl(rwlock); std::vector to_cancel; bool found = false; @@ -2455,7 +2460,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool) for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); for (map::iterator op_i = s->ops.begin(); op_i != s->ops.end(); ++op_i) { if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE @@ -2463,7 +2468,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool) to_cancel.push_back(op_i->first); } } - s->lock.unlock(); + sl.unlock(); for (std::vector::iterator titer = to_cancel.begin(); titer != to_cancel.end(); @@ -2480,7 +2485,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool) const epoch_t epoch = osdmap->get_epoch(); - rwlock.unlock(); + wl.unlock(); if (found) { return epoch; @@ -2524,14 +2529,14 @@ bool Objecter::target_should_be_paused(op_target_t *t) */ bool Objecter::osdmap_full_flag() const { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); return _osdmap_full_flag(); } bool Objecter::osdmap_pool_full(const int64_t pool_id) const { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); if (_osdmap_full_flag()) { return true; @@ -2593,7 +2598,7 @@ void Objecter::update_pool_full_map(map& pool_full_map) int64_t Objecter::get_object_hash_position(int64_t pool, const string& key, const string& ns) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) return -ENOENT; @@ -2603,7 +2608,7 @@ int64_t Objecter::get_object_hash_position(int64_t pool, const string& key, int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key, const string& ns) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) return -ENOENT; @@ -2613,7 +2618,7 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key, int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, bool any_change) { - assert(rwlock.is_locked()); + // rwlock is locked bool is_read = t->flags & CEPH_OSD_FLAG_READ; bool is_write = t->flags & CEPH_OSD_FLAG_WRITE; @@ -2775,15 +2780,15 @@ int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, } int Objecter::_map_session(op_target_t *target, OSDSession **s, - RWLock::Context& lc) + shunique_lock& sul) { _calc_target(target); - return _get_session(target->osd, s, lc); + return _get_session(target->osd, s, sul); } void Objecter::_session_op_assign(OSDSession *to, Op *op) { - assert(to->lock.is_locked()); + // to->lock is locked assert(op->session == NULL); assert(op->tid); @@ -2801,7 +2806,7 @@ void Objecter::_session_op_assign(OSDSession *to, Op *op) void Objecter::_session_op_remove(OSDSession *from, Op *op) { assert(op->session == from); - assert(from->lock.is_locked()); + // from->lock is locked if (from->is_homeless()) { num_homeless_ops.dec(); @@ -2816,7 +2821,7 @@ void Objecter::_session_op_remove(OSDSession *from, Op *op) void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op) { - assert(to->lock.is_wlocked()); + // to lock is locked unique assert(op->session == NULL); if (to->is_homeless()) { @@ -2834,7 +2839,7 @@ void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op) void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op) { assert(from == op->session); - assert(from->lock.is_wlocked()); + // from->lock is locked unique if (from->is_homeless()) { num_homeless_ops.dec(); @@ -2851,7 +2856,7 @@ void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op) void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op) { assert(from == op->session); - assert(from->lock.is_locked()); + // from->lock is locked if (from->is_homeless()) { num_homeless_ops.dec(); @@ -2866,7 +2871,7 @@ void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op) void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op) { - assert(to->lock.is_locked()); + // to->lock is locked assert(op->session == NULL); assert(op->tid); @@ -2882,9 +2887,9 @@ void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op) } int Objecter::_recalc_linger_op_target(LingerOp *linger_op, - RWLock::Context& lc) + shunique_lock& sul) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique int r = _calc_target(&linger_op->target, &linger_op->last_force_resend, true); @@ -2894,7 +2899,7 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op, << " acting " << linger_op->target.acting << dendl; OSDSession *s = NULL; - r = _get_session(linger_op->target.osd, &s, lc); + r = _get_session(linger_op->target.osd, &s, sul); assert(r == 0); if (linger_op->session != s) { @@ -2902,10 +2907,9 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op, // same time here is only safe because we are the only one that // takes two, and we are holding rwlock for write. Disable // lockdep because it doesn't know that. - s->lock.get_write(false); + OSDSession::unique_lock sl(s->lock); _session_linger_op_remove(linger_op->session, linger_op); _session_linger_op_assign(s, linger_op); - s->lock.unlock(false); } put_session(s); @@ -2936,7 +2940,7 @@ void Objecter::_finish_op(Op *op, int r) { ldout(cct, 15) << "finish_op " << op->tid << dendl; - assert(op->session->lock.is_wlocked()); + // op->session->lock is locked unique if (!op->ctx_budgeted && op->budgeted) put_op_budget(op); @@ -2958,9 +2962,9 @@ void Objecter::_finish_op(Op *op, int r) void Objecter::finish_op(OSDSession *session, ceph_tid_t tid) { ldout(cct, 15) << "finish_op " << tid << dendl; - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); - RWLock::WLocker wl(session->lock); + OSDSession::unique_lock wl(session->lock); map::iterator iter = session->ops.find(tid); if (iter == session->ops.end()) @@ -2973,7 +2977,7 @@ void Objecter::finish_op(OSDSession *session, ceph_tid_t tid) MOSDOp *Objecter::_prepare_osd_op(Op *op) { - assert(rwlock.is_locked()); + // rwlock is locked int flags = op->target.flags; flags |= CEPH_OSD_FLAG_KNOWN_REDIR; @@ -3022,8 +3026,8 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op) void Objecter::_send_op(Op *op, MOSDOp *m) { - assert(rwlock.is_locked()); - assert(op->session->lock.is_locked()); + // rwlock is locked + // op->session->lock is locked if (!m) { assert(op->tid > 0); @@ -3078,31 +3082,38 @@ int Objecter::calc_op_budget(Op *op) return op_budget; } -void Objecter::_throttle_op(Op *op, int op_budget) +void Objecter::_throttle_op(Op *op, + shunique_lock& sul, + int op_budget) { - assert(rwlock.is_locked()); - - bool locked_for_write = rwlock.is_wlocked(); + assert(sul && sul.mutex() == &rwlock); + bool locked_for_write = sul.owns_lock(); if (!op_budget) op_budget = calc_op_budget(op); if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now - rwlock.unlock(); + sul.unlock(); op_throttle_bytes.get(op_budget); - rwlock.get(locked_for_write); + if (locked_for_write) + sul.lock(); + else + sul.lock_shared(); } if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now - rwlock.unlock(); + sul.unlock(); op_throttle_ops.get(1); - rwlock.get(locked_for_write); + if (locked_for_write) + sul.lock(); + else + sul.lock_shared(); } } void Objecter::unregister_op(Op *op) { - op->session->lock.get_write(); + OSDSession::unique_lock sl(op->session->lock); op->session->ops.erase(op->tid); - op->session->lock.unlock(); + sl.unlock(); put_session(op->session); op->session = NULL; @@ -3119,12 +3130,11 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) int osd_num = (int)m->get_source().num(); - RWLock::RLocker l(rwlock); + shunique_lock sul(rwlock, ceph::acquire_shared); if (!initialized.read()) { m->put(); return; } - RWLock::Context lc(rwlock, RWLock::Context::TakenForRead); map::iterator siter = osd_sessions.find(osd_num); if (siter == osd_sessions.end()) { @@ -3139,7 +3149,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) OSDSession *s = siter->second; get_session(s); - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); map::iterator iter = s->ops.find(tid); if (iter == s->ops.end()) { @@ -3147,7 +3157,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ? " onnvram" : " ack")) << " ... stray" << dendl; - s->lock.unlock(); + sl.unlock(); put_session(s); m->put(); return; @@ -3171,7 +3181,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << "; last attempt " << (op->attempts - 1) << " sent to " << op->session->con->get_peer_addr() << dendl; m->put(); - s->lock.unlock(); + sl.unlock(); put_session(s); return; } @@ -3193,7 +3203,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) if (op->oncommit || op->oncommit_sync) num_uncommitted.dec(); _session_op_remove(s, op); - s->lock.unlock(); + sl.unlock(); put_session(s); // FIXME: two redirects could race and reorder @@ -3202,7 +3212,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) m->get_redirect().combine_with_locator(op->target.target_oloc, op->target.target_oid.name); op->target.flags |= CEPH_OSD_FLAG_REDIRECTED; - _op_submit(op, lc); + _op_submit(op, sul); m->put(); return; } @@ -3215,14 +3225,13 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) op->tid = last_tid.inc(); _send_op(op); - s->lock.unlock(); + sl.unlock(); put_session(s); m->put(); return; } - l.unlock(); - lc.set_state(RWLock::Context::Untaken); + sul.unlock(); if (op->objver) *op->objver = m->get_user_version(); @@ -3299,8 +3308,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } /* get it before we call _finish_op() */ - Mutex *completion_lock = (op->target.base_oid.name.size() ? - s->get_lock(op->target.base_oid) : NULL); + auto completion_lock = + (op->target.base_oid.name.size() ? s->get_lock(op->target.base_oid) : + OSDSession::unique_completion_lock()); // done with this tid? if (!op->onack && !op->oncommit && !op->oncommit_sync) { @@ -3312,10 +3322,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << " uncommitted" << dendl; // serialize completions - if (completion_lock) { - completion_lock->Lock(); + if (completion_lock.mutex()) { + completion_lock.lock(); } - s->lock.unlock(); + sl.unlock(); // do callbacks if (onack) { @@ -3324,8 +3334,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) if (oncommit) { oncommit->complete(rc); } - if (completion_lock) { - completion_lock->Unlock(); + if (completion_lock.mutex()) { + completion_lock.unlock(); } m->put(); @@ -3334,9 +3344,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) uint32_t Objecter::list_nobjects_seek(NListContext *list_context, - uint32_t pos) + uint32_t pos) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id)); ldout(cct, 10) << "list_objects_seek " << list_context << " pos " << pos << " -> " << actual << dendl; @@ -3380,16 +3390,16 @@ void Objecter::list_nobjects(NListContext *list_context, Context *onfinish) return; } - rwlock.get_read(); + shared_lock rl(rwlock); const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id); if (!pool) { // pool is gone - rwlock.unlock(); + rl.unlock(); put_nlist_context_budget(list_context); onfinish->complete(-ENOENT); return; } int pg_num = pool->get_pg_num(); - rwlock.unlock(); + rl.unlock(); if (list_context->starting_pg_num == 0) { // there can't be zero pgs! list_context->starting_pg_num = pg_num; @@ -3426,7 +3436,7 @@ void Objecter::list_nobjects(NListContext *list_context, Context *onfinish) } void Objecter::_nlist_reply(NListContext *list_context, int r, - Context *final_finish, epoch_t reply_epoch) + Context *final_finish, epoch_t reply_epoch) { ldout(cct, 10) << "_list_reply" << dendl; @@ -3492,7 +3502,7 @@ void Objecter::put_nlist_context_budget(NListContext *list_context) { uint32_t Objecter::list_objects_seek(ListContext *list_context, uint32_t pos) { - RWLock::RLocker rl(rwlock); + shared_lock rl(rwlock); pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id)); ldout(cct, 10) << "list_objects_seek " << list_context << " pos " << pos << " -> " << actual << dendl; @@ -3536,16 +3546,16 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) return; } - rwlock.get_read(); + shared_lock rl(rwlock); const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id); if (!pool) { // pool is gone - rwlock.unlock(); + rl.unlock(); put_list_context_budget(list_context); onfinish->complete(-ENOENT); return; } int pg_num = pool->get_pg_num(); - rwlock.unlock(); + rl.unlock(); if (list_context->starting_pg_num == 0) { // there can't be zero pgs! list_context->starting_pg_num = pg_num; @@ -3650,7 +3660,7 @@ void Objecter::put_list_context_budget(ListContext *list_context) { int Objecter::create_pool_snap(int64_t pool, string& snap_name, Context *onfinish) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: " << snap_name << dendl; @@ -3692,7 +3702,7 @@ struct C_SelfmanagedSnap : public Context { int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl; PoolOp *op = new PoolOp; if (!op) return -ENOMEM; @@ -3711,7 +3721,7 @@ int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, int Objecter::delete_pool_snap(int64_t pool, string& snap_name, Context *onfinish) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: " << snap_name << dendl; @@ -3739,7 +3749,7 @@ int Objecter::delete_pool_snap(int64_t pool, string& snap_name, int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: " << snap << dendl; PoolOp *op = new PoolOp; @@ -3759,7 +3769,7 @@ int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap, int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid, int crush_rule) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "create_pool name=" << name << dendl; if (osdmap->lookup_pg_pool_name(name.c_str()) >= 0) @@ -3784,7 +3794,7 @@ int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid, int Objecter::delete_pool(int64_t pool, Context *onfinish) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "delete_pool " << pool << dendl; if (!osdmap->have_pg_pool(pool)) @@ -3796,7 +3806,7 @@ int Objecter::delete_pool(int64_t pool, Context *onfinish) int Objecter::delete_pool(const string &pool_name, Context *onfinish) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "delete_pool " << pool_name << dendl; int64_t pool = osdmap->lookup_pg_pool_name(pool_name); @@ -3827,7 +3837,7 @@ void Objecter::_do_delete_pool(int64_t pool, Context *onfinish) */ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl; PoolOp *op = new PoolOp; if (!op) return -ENOMEM; @@ -3847,7 +3857,7 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid) void Objecter::pool_op_submit(PoolOp *op) { - assert(rwlock.is_locked()); + // rwlock is locked if (mon_timeout > timespan(0)) { op->ontimeout = timer.add_event(mon_timeout, [this, op]() { @@ -3858,7 +3868,7 @@ void Objecter::pool_op_submit(PoolOp *op) void Objecter::_pool_op_submit(PoolOp *op) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique ldout(cct, 10) << "pool_op_submit " << op->tid << dendl; MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool, @@ -3881,9 +3891,9 @@ void Objecter::_pool_op_submit(PoolOp *op) */ void Objecter::handle_pool_op_reply(MPoolOpReply *m) { - rwlock.get_read(); + shunique_lock sul(rwlock, acquire_shared); if (!initialized.read()) { - rwlock.put_read(); + sul.unlock(); m->put(); return; } @@ -3900,8 +3910,8 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) if (m->version > last_seen_osdmap_version) last_seen_osdmap_version = m->version; if (osdmap->get_epoch() < m->epoch) { - rwlock.unlock(); - rwlock.get_write(); + sul.unlock(); + sul.lock(); // recheck op existence since we have let go of rwlock // (for promotion) above. iter = pool_ops.find(tid); @@ -3918,15 +3928,14 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) assert(op->onfinish); op->onfinish->complete(m->replyCode); } - } - else { + } else { assert(op->onfinish); op->onfinish->complete(m->replyCode); } op->onfinish = NULL; - if (!rwlock.is_wlocked()) { - rwlock.unlock(); - rwlock.get_write(); + if (!sul.owns_lock()) { + sul.unlock(); + sul.lock(); } iter = pool_ops.find(tid); if (iter != pool_ops.end()) { @@ -3937,7 +3946,8 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) } done: - rwlock.unlock(); + // Not strictly necessary, since we'll release it on return. + sul.unlock(); ldout(cct, 10) << "done" << dendl; m->put(); @@ -3947,7 +3957,7 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r) { assert(initialized.read()); - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); map::iterator it = pool_ops.find(tid); if (it == pool_ops.end()) { @@ -3967,7 +3977,7 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r) void Objecter::_finish_pool_op(PoolOp *op, int r) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique pool_ops.erase(op->tid); logger->set(l_osdc_poolop_active, pool_ops.size()); @@ -4000,7 +4010,7 @@ void Objecter::get_pool_stats(list& pools, op->ontimeout = 0; } - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); poolstat_ops[op->tid] = op; @@ -4025,7 +4035,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl; ceph_tid_t tid = m->get_tid(); - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); if (!initialized.read()) { m->put(); return; @@ -4052,7 +4062,7 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r) { assert(initialized.read()); - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); map::iterator it = poolstat_ops.find(tid); if (it == poolstat_ops.end()) { @@ -4071,7 +4081,7 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r) void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique poolstat_ops.erase(op->tid); logger->set(l_osdc_poolstat_active, poolstat_ops.size()); @@ -4085,7 +4095,7 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r) void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) { ldout(cct, 10) << "get_fs_stats" << dendl; - RWLock::WLocker l(rwlock); + unique_lock l(rwlock); StatfsOp *op = new StatfsOp; op->tid = last_tid.inc(); @@ -4108,7 +4118,7 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) void Objecter::_fs_stats_submit(StatfsOp *op) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl; monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid, @@ -4120,7 +4130,7 @@ void Objecter::_fs_stats_submit(StatfsOp *op) void Objecter::handle_fs_stats_reply(MStatfsReply *m) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); if (!initialized.read()) { m->put(); return; @@ -4148,7 +4158,7 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r) { assert(initialized.read()); - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); map::iterator it = statfs_ops.find(tid); if (it == statfs_ops.end()) { @@ -4167,7 +4177,7 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r) void Objecter::_finish_statfs_op(StatfsOp *op, int r) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique statfs_ops.erase(op->tid); logger->set(l_osdc_statfs_active, statfs_ops.size()); @@ -4230,24 +4240,24 @@ bool Objecter::ms_handle_reset(Connection *con) int osd = osdmap->identify_osd(con->get_peer_addr()); if (osd >= 0) { ldout(cct, 1) << "ms_handle_reset on osd." << osd << dendl; - rwlock.get_write(); + unique_lock wl(rwlock); if (!initialized.read()) { - rwlock.put_write(); + wl.unlock(); return false; } map::iterator p = osd_sessions.find(osd); if (p != osd_sessions.end()) { OSDSession *session = p->second; map lresend; - session->lock.get_write(); + OSDSession::unique_lock sl(session->lock); _reopen_session(session); _kick_requests(session, lresend); - session->lock.unlock(); - _linger_ops_resend(lresend); - rwlock.unlock(); + sl.unlock(); + _linger_ops_resend(lresend, wl); + wl.unlock(); maybe_request_map(); } else { - rwlock.unlock(); + wl.unlock(); } } else { ldout(cct, 10) << "ms_handle_reset on unknown osd addr " @@ -4312,22 +4322,23 @@ void Objecter::_dump_active() for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); _dump_active(s); - s->lock.unlock(); + sl.unlock(); } _dump_active(homeless_session); } void Objecter::dump_active() { - rwlock.get_read(); + shared_lock rl(rwlock); _dump_active(); - rwlock.unlock(); + rl.unlock(); } void Objecter::dump_requests(Formatter *fmt) { + // Read-lock on Objecter held here fmt->open_object_section("requests"); dump_ops(fmt); dump_linger_ops(fmt); @@ -4367,13 +4378,14 @@ void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt) void Objecter::dump_ops(Formatter *fmt) { + // Read-lock on Objecter held fmt->open_array_section("ops"); for (map::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); _dump_ops(s, fmt); - s->lock.unlock(); + sl.unlock(); } _dump_ops(homeless_session, fmt); fmt->close_section(); // ops array @@ -4396,13 +4408,14 @@ void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt) void Objecter::dump_linger_ops(Formatter *fmt) { + // We have a read-lock on the objecter fmt->open_array_section("linger_ops"); for (map::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); _dump_linger_ops(s, fmt); - s->lock.unlock(); + sl.unlock(); } _dump_linger_ops(homeless_session, fmt); fmt->close_section(); // linger_ops array @@ -4432,13 +4445,14 @@ void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt) void Objecter::dump_command_ops(Formatter *fmt) { + // We have a read-lock on the Objecter here fmt->open_array_section("command_ops"); for (map::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); _dump_command_ops(s, fmt); - s->lock.unlock(); + sl.unlock(); } _dump_command_ops(homeless_session, fmt); fmt->close_section(); // command_ops array @@ -4513,7 +4527,7 @@ bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap, std::string format, bufferlist& out) { Formatter *f = Formatter::create(format, "json-pretty", "json-pretty"); - RWLock::RLocker rl(m_objecter->rwlock); + shared_lock rl(m_objecter->rwlock); m_objecter->dump_requests(f); f->flush(out); delete f; @@ -4546,7 +4560,7 @@ void Objecter::handle_command_reply(MCommandReply *m) { int osd_num = (int)m->get_source().num(); - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); if (!initialized.read()) { m->put(); return; @@ -4562,13 +4576,13 @@ void Objecter::handle_command_reply(MCommandReply *m) OSDSession *s = siter->second; - s->lock.get_read(); + OSDSession::shared_lock sl(s->lock); map::iterator p = s->command_ops.find(m->get_tid()); if (p == s->command_ops.end()) { ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " not found" << dendl; m->put(); - s->lock.unlock(); + sl.unlock(); return; } @@ -4580,13 +4594,13 @@ void Objecter::handle_command_reply(MCommandReply *m) << m->get_connection() << " " << m->get_source_inst() << dendl; m->put(); - s->lock.unlock(); + sl.unlock(); return; } if (c->poutbl) c->poutbl->claim(m->get_data()); - s->lock.unlock(); + sl.unlock(); _finish_command(c, m->r, m->rs); @@ -4595,21 +4609,19 @@ void Objecter::handle_command_reply(MCommandReply *m) int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) { - RWLock::WLocker wl(rwlock); - - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); + shunique_lock sul(rwlock, ceph::acquire_unique); ceph_tid_t tid = last_tid.inc(); ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl; c->tid = tid; { - RWLock::WLocker hs_wl(homeless_session->lock); - _session_command_op_assign(homeless_session, c); + OSDSession::unique_lock hs_wl(homeless_session->lock); + _session_command_op_assign(homeless_session, c); } - (void)_calc_command_target(c); - _assign_command_session(c); + _calc_command_target(c, sul); + _assign_command_session(c, sul); if (osd_timeout > timespan(0)) { c->ontimeout = timer.add_event(osd_timeout, [this, c, tid]() { @@ -4631,11 +4643,9 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) return 0; } -int Objecter::_calc_command_target(CommandOp *c) +int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul) { - assert(rwlock.is_wlocked()); - - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); + assert(sul.owns_lock() && sul.mutex() == &rwlock); c->map_check_error = 0; @@ -4662,7 +4672,7 @@ int Objecter::_calc_command_target(CommandOp *c) } OSDSession *s; - int r = _get_session(c->osd, &s, lc); + int r = _get_session(c->osd, &s, sul); assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ if (c->session != s) { @@ -4678,26 +4688,24 @@ int Objecter::_calc_command_target(CommandOp *c) return RECALC_OP_TARGET_NO_ACTION; } -void Objecter::_assign_command_session(CommandOp *c) +void Objecter::_assign_command_session(CommandOp *c, + shunique_lock& sul) { - assert(rwlock.is_wlocked()); - - RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); + assert(sul.owns_lock() && sul.mutex() == &rwlock); OSDSession *s; - int r = _get_session(c->osd, &s, lc); + int r = _get_session(c->osd, &s, sul); assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ if (c->session != s) { if (c->session) { OSDSession *cs = c->session; - cs->lock.get_write(); + OSDSession::unique_lock csl(cs->lock); _session_command_op_remove(c->session, c); - cs->lock.unlock(); + csl.unlock(); } - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); _session_command_op_assign(s, c); - s->lock.unlock(); } put_session(s); @@ -4720,7 +4728,7 @@ int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r) { assert(initialized.read()); - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); map::iterator it = s->command_ops.find(tid); if (it == s->command_ops.end()) { @@ -4738,7 +4746,7 @@ int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r) void Objecter::_finish_command(CommandOp *c, int r, string rs) { - assert(rwlock.is_wlocked()); + // rwlock is locked unique ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl; @@ -4751,9 +4759,9 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs) timer.cancel_event(c->ontimeout); OSDSession *s = c->session; - s->lock.get_write(); + OSDSession::unique_lock sl(s->lock); _session_command_op_remove(c->session, c); - s->lock.unlock(); + sl.unlock(); c->put(); @@ -4767,11 +4775,6 @@ Objecter::OSDSession::~OSDSession() assert(ops.empty()); assert(linger_ops.empty()); assert(command_ops.empty()); - - for (int i = 0; i < num_locks; i++) { - delete completion_locks[i]; - } - delete[] completion_locks; } Objecter::~Objecter() @@ -4805,7 +4808,7 @@ Objecter::~Objecter() */ void Objecter::set_epoch_barrier(epoch_t epoch) { - RWLock::WLocker wl(rwlock); + unique_lock wl(rwlock); ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was " << epoch_barrier << ") current epoch " << osdmap->get_epoch() @@ -4851,7 +4854,7 @@ struct C_EnumerateReply : public Context { void finish(int r) { objecter->_enumerate_reply( - bl, r, end, pool_id, budget, epoch, result, next, on_finish); + bl, r, end, pool_id, budget, epoch, result, next, on_finish); } }; @@ -4861,7 +4864,7 @@ void Objecter::enumerate_objects( const hobject_t &start, const hobject_t &end, const uint32_t max, - std::list *result, + std::list *result, hobject_t *next, Context *on_finish) { @@ -4884,23 +4887,23 @@ void Objecter::enumerate_objects( return; } - rwlock.get_read(); + shared_lock rl(rwlock); assert(osdmap->get_epoch()); if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { - rwlock.unlock(); + rl.unlock(); lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl; on_finish->complete(-EOPNOTSUPP); return; } const pg_pool_t *p = osdmap->get_pg_pool(pool_id); if (!p) { - lderr(cct) << __func__ << ": pool " << pool_id << " DNE in" - "osd epoch " << osdmap->get_epoch() << dendl; - rwlock.unlock(); + lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch " + << osdmap->get_epoch() << dendl; + rl.unlock(); on_finish->complete(-ENOENT); return; } else { - rwlock.unlock(); + rl.unlock(); } ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl; @@ -4928,7 +4931,7 @@ void Objecter::_enumerate_reply( const int64_t pool_id, int budget, epoch_t reply_epoch, - std::list *result, + std::list *result, hobject_t *next, Context *on_finish) { @@ -4959,20 +4962,21 @@ void Objecter::_enumerate_reply( << " handle " << response.handle << " reply_epoch " << reply_epoch << dendl; ldout(cct, 20) << __func__ << ": response.entries.size " - << response.entries.size() << ", response.entries " - << response.entries << dendl; + << response.entries.size() << ", response.entries " + << response.entries << dendl; if (cmp_bitwise(response.handle, end) <= 0) { *next = response.handle; } else { - ldout(cct, 10) << __func__ << ": adjusted next down to end " << end << dendl; + ldout(cct, 10) << __func__ << ": adjusted next down to end " << end + << dendl; *next = end; // drop anything after 'end' - rwlock.get_read(); + shared_lock rl(rwlock); const pg_pool_t *pool = osdmap->get_pg_pool(pool_id); if (!pool) { // pool is gone, drop any results which are now meaningless. - rwlock.put_read(); + rl.unlock(); on_finish->complete(-ENOENT); return; } @@ -4994,7 +4998,7 @@ void Objecter::_enumerate_reply( << " >= end " << end << dendl; response.entries.pop_back(); } - rwlock.put_read(); + rl.unlock(); } if (!response.entries.empty()) { result->merge(response.entries); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index a8bfbdb031a9..8e156a00b6af 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -15,19 +15,25 @@ #ifndef CEPH_OBJECTER_H #define CEPH_OBJECTER_H +#include #include #include +#include #include #include +#include -#include "include/types.h" +#include + +#include "include/assert.h" #include "include/buffer.h" +#include "include/types.h" #include "include/rados/rados_types.hpp" #include "common/admin_socket.h" #include "common/ceph_time.h" #include "common/ceph_timer.h" -#include "common/RWLock.h" +#include "common/shunique_lock.h" #include "messages/MOSDOp.h" #include "osd/OSDMap.h" @@ -1128,7 +1134,11 @@ private: version_t last_seen_osdmap_version; version_t last_seen_pgmap_version; - RWLock rwlock; + mutable boost::shared_mutex rwlock; + using lock_guard = std::unique_lock; + using unique_lock = std::unique_lock; + using shared_lock = boost::shared_lock; + using shunique_lock = ceph::shunique_lock; ceph::timer timer; PerfCounters *logger; @@ -1566,8 +1576,8 @@ public: }; int submit_command(CommandOp *c, ceph_tid_t *ptid); - int _calc_command_target(CommandOp *c); - void _assign_command_session(CommandOp *c); + int _calc_command_target(CommandOp *c, shunique_lock &sul); + void _assign_command_session(CommandOp *c, shunique_lock &sul); void _send_command(CommandOp *c); int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r); void _finish_command(CommandOp *c, int r, string rs); @@ -1603,7 +1613,11 @@ public: bool is_watch; ceph::mono_time watch_valid_thru; ///< send time for last acked ping int last_error; ///< error from last failed ping|reconnect, if any - RWLock watch_lock; + boost::shared_mutex watch_lock; + using lock_guard = std::unique_lock; + using unique_lock = std::unique_lock; + using shared_lock = boost::shared_lock; + using shunique_lock = ceph::shunique_lock; // queue of pending async operations, with the timestamp of // when they were queued. @@ -1630,11 +1644,11 @@ public: epoch_t last_force_resend; void _queued_async() { - assert(watch_lock.is_locked()); + // watch_lock ust be locked unique watch_pending_async.push_back(ceph::mono_clock::now()); } void finished_async() { - RWLock::WLocker l(watch_lock); + unique_lock l(watch_lock); assert(!watch_pending_async.empty()); watch_pending_async.pop_front(); } @@ -1643,7 +1657,6 @@ public: target(object_t(), object_locator_t(), 0), snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), is_watch(false), last_error(0), - watch_lock("Objecter::LingerOp::watch_lock"), register_gen(0), registered(false), canceled(false), @@ -1729,8 +1742,11 @@ public: // -- osd sessions -- struct OSDSession : public RefCountedObject { - RWLock lock; - Mutex **completion_locks; + boost::shared_mutex lock; + using lock_guard = std::lock_guard; + using unique_lock = std::unique_lock; + using shared_lock = boost::shared_lock; + using shunique_lcok = ceph::shunique_lock; // pending ops map ops; @@ -1739,26 +1755,23 @@ public: int osd; int incarnation; - int num_locks; ConnectionRef con; + int num_locks; + std::unique_ptr completion_locks; + using unique_completion_lock = std::unique_lock< + decltype(completion_locks)::element_type>; + OSDSession(CephContext *cct, int o) : - lock("OSDSession"), - osd(o), - incarnation(0), - con(NULL) { - num_locks = cct->_conf->objecter_completion_locks_per_session; - completion_locks = new Mutex *[num_locks]; - for (int i = 0; i < num_locks; i++) { - completion_locks[i] = new Mutex("OSDSession::completion_lock"); - } - } + osd(o), incarnation(0), con(NULL), + num_locks(cct->_conf->objecter_completion_locks_per_session), + completion_locks(new std::mutex[num_locks]) {} ~OSDSession(); bool is_homeless() { return (osd == -1); } - Mutex *get_lock(object_t& oid); + unique_completion_lock get_lock(object_t& oid); }; map osd_sessions; @@ -1781,8 +1794,10 @@ public: // we use this just to confirm a cookie is valid before dereferencing the ptr set linger_ops_set; int num_linger_callbacks; - Mutex linger_callback_lock; - Cond linger_callback_cond; + std::mutex linger_callback_lock; + typedef std::unique_lock unique_linger_cb_lock; + typedef std::lock_guard linger_cb_lock_guard; + std::condition_variable linger_callback_cond; map poolstat_ops; map statfs_ops; @@ -1828,7 +1843,7 @@ public: int _calc_target(op_target_t *t, epoch_t *last_force_resend = 0, bool any_change = false); int _map_session(op_target_t *op, OSDSession **s, - RWLock::Context& lc); + shunique_lock& lc); void _session_op_assign(OSDSession *s, Op *op); void _session_op_remove(OSDSession *s, Op *op); @@ -1837,13 +1852,13 @@ public: void _session_command_op_assign(OSDSession *to, CommandOp *op); void _session_command_op_remove(OSDSession *from, CommandOp *op); - int _assign_op_target_session(Op *op, RWLock::Context& lc, + int _assign_op_target_session(Op *op, shunique_lock& lc, bool src_session_locked, bool dst_session_locked); - int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc); + int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc); - void _linger_submit(LingerOp *info); - void _send_linger(LingerOp *info); + void _linger_submit(LingerOp *info, shunique_lock& sul); + void _send_linger(LingerOp *info, shunique_lock& sul); void _linger_commit(LingerOp *info, int r, bufferlist& outbl); void _linger_reconnect(LingerOp *info, int r); void _send_linger_ping(LingerOp *info); @@ -1852,25 +1867,26 @@ public: int _normalize_watch_error(int r); void _linger_callback_queue() { - Mutex::Locker l(linger_callback_lock); + linger_cb_lock_guard l(linger_callback_lock); ++num_linger_callbacks; } void _linger_callback_finish() { - Mutex::Locker l(linger_callback_lock); + linger_cb_lock_guard l(linger_callback_lock); if (--num_linger_callbacks == 0) - linger_callback_cond.SignalAll(); + linger_callback_cond.notify_all(); assert(num_linger_callbacks >= 0); } friend class C_DoWatchError; public: void linger_callback_flush() { - Mutex::Locker l(linger_callback_lock); - while (num_linger_callbacks > 0) - linger_callback_cond.Wait(linger_callback_lock); + unique_linger_cb_lock l(linger_callback_lock); + linger_callback_cond.wait(l, [this]() { + return num_linger_callbacks <= 0; + }); } private: - void _check_op_pool_dne(Op *op, bool session_locked); + void _check_op_pool_dne(Op *op, unique_lock& sl); void _send_op_map_check(Op *op); void _op_cancel_map_check(Op *op); void _check_linger_pool_dne(LingerOp *op, bool *need_unregister); @@ -1882,9 +1898,9 @@ private: void kick_requests(OSDSession *session); void _kick_requests(OSDSession *session, map& lresend); - void _linger_ops_resend(map& lresend); + void _linger_ops_resend(map& lresend, unique_lock& ul); - int _get_session(int osd, OSDSession **session, RWLock::Context& lc); + int _get_session(int osd, OSDSession **session, shunique_lock& sul); void put_session(OSDSession *s); void get_session(OSDSession *s); void _reopen_session(OSDSession *session); @@ -1904,12 +1920,12 @@ private: * If throttle_op needs to throttle it will unlock client_lock. */ int calc_op_budget(Op *op); - void _throttle_op(Op *op, int op_size=0); - int _take_op_budget(Op *op) { - assert(rwlock.is_locked()); + void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0); + int _take_op_budget(Op *op, shunique_lock& sul) { + assert(sul && sul.mutex() == &rwlock); int op_budget = calc_op_budget(op); if (keep_balanced_budget) { - _throttle_op(op, op_budget); + _throttle_op(op, sul, op_budget); } else { op_throttle_bytes.take(op_budget); op_throttle_ops.take(1); @@ -1941,10 +1957,9 @@ private: max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0), keep_balanced_budget(false), honor_osdmap_full(true), last_seen_osdmap_version(0), last_seen_pgmap_version(0), - rwlock("Objecter::rwlock"), logger(NULL), tick_event(0), - m_request_state_hook(NULL), num_linger_callbacks(0), - linger_callback_lock("Objecter::linger_callback_lock"), - num_homeless_ops(0), homeless_session(new OSDSession(cct, -1)), + logger(NULL), tick_event(0), m_request_state_hook(NULL), + num_linger_callbacks(0), num_homeless_ops(0), + homeless_session(new OSDSession(cct, -1)), mon_timeout(ceph::make_timespan(mon_timeout)), osd_timeout(ceph::make_timespan(osd_timeout)), op_throttle_bytes(cct, "objecter_bytes", @@ -1958,14 +1973,44 @@ private: void start(); void shutdown(); - const OSDMap *get_osdmap_read() { - rwlock.get_read(); - return osdmap; - } - void put_osdmap_read() { - rwlock.put_read(); + // These two templates replace osdmap_(get)|(put)_read. Simply wrap + // whatever functionality you want to use the OSDMap in a lambda like: + // + // with_osdmap([](const OSDMap& o) { o.do_stuff(); }); + // + // or + // + // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); }); + // + // Do not call into something that will try to lock the OSDMap from + // here or you will have great woe and misery. + + template + auto with_osdmap(Callback&& cb, Args&&...args) -> + typename std::enable_if< + std::is_void< + decltype(cb(const_cast(*osdmap), + std::forward(args)...))>::value, + void>::type { + shared_lock l(rwlock); + std::forward(cb)(const_cast(*osdmap), + std::forward(args)...); + } + + template + auto with_osdmap(Callback&& cb, Args&&... args) -> + typename std::enable_if< + !std::is_void< + decltype(cb(const_cast(*osdmap), + std::forward(args)...))>::value, + decltype(cb(const_cast(*osdmap), + std::forward(args)...))>::type { + shared_lock l(rwlock); + return std::forward(cb)(const_cast(*osdmap), + std::forward(args)...); } + /** * Tell the objecter to throttle outgoing ops according to its * budget (in _conf). If you do this, ops can block, in @@ -1985,7 +2030,8 @@ private: map *pool_full_map, map& need_resend, list& need_resend_linger, - map& need_resend_command); + map& need_resend_command, + shunique_lock& sul); int64_t get_object_hash_position(int64_t pool, const string& key, const string& ns); @@ -2023,8 +2069,8 @@ private: private: // low-level - ceph_tid_t _op_submit(Op *op, RWLock::Context& lc); - ceph_tid_t _op_submit_with_budget(Op *op, RWLock::Context& lc, + ceph_tid_t _op_submit(Op *op, shunique_lock& lc); + ceph_tid_t _op_submit_with_budget(Op *op, shunique_lock& lc, int *ctx_budget = NULL); inline void unregister_op(Op *op); @@ -2032,7 +2078,7 @@ private: public: ceph_tid_t op_submit(Op *op, int *ctx_budget = NULL); bool is_active() { - RWLock::RLocker l(rwlock); + shared_lock l(rwlock); return !((!inflight_ops.read()) && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty()); }