From: Kefu Chai Date: Tue, 20 Nov 2018 12:32:23 +0000 (+0800) Subject: common,osd: Mutex -> ceph::mutex X-Git-Tag: v14.1.0~820^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=03029f431b2f8cfbe7f87ec8ec599db1deb2cb21;p=ceph.git common,osd: Mutex -> ceph::mutex Signed-off-by: Kefu Chai --- diff --git a/src/common/Finisher.h b/src/common/Finisher.h index 1bcf82573d78..cca3f81c8843 100644 --- a/src/common/Finisher.h +++ b/src/common/Finisher.h @@ -191,10 +191,12 @@ public: class ContextQueue { list q; std::mutex q_mutex; - Mutex& mutex; - Cond& cond; + ceph::mutex& mutex; + ceph::condition_variable& cond; public: - ContextQueue(Mutex& mut, Cond& con) : mutex(mut), cond(con) {} + ContextQueue(ceph::mutex& mut, + ceph::condition_variable& con) + : mutex(mut), cond(con) {} void queue(list& ls) { bool empty = false; @@ -209,9 +211,8 @@ public: } if (empty) { - mutex.Lock(); - cond.Signal(); - mutex.Unlock(); + std::scoped_lock l{mutex}; + cond.notify_all(); } ls.clear(); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 1024631d72c2..5510ddd12647 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -222,8 +222,8 @@ OSDService::OSDService(OSD *osd) : class_handler(osd->class_handler), osd_max_object_size(cct->_conf, "osd_max_object_size"), osd_skip_data_digest(cct->_conf, "osd_skip_data_digest"), - publish_lock("OSDService::publish_lock"), - pre_publish_lock("OSDService::pre_publish_lock"), + publish_lock{ceph::make_mutex("OSDService::publish_lock")}, + pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")}, max_oldest_map(0), peer_map_epoch_lock("OSDService::peer_map_epoch_lock"), sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0), @@ -9847,7 +9847,7 @@ void OSDShard::_detach_pg(OSDShardPGSlot *slot) pg_slots_by_epoch.erase(pg_slots_by_epoch.iterator_to(*slot)); slot->epoch = 0; if (waiting_for_min_pg_epoch) { - min_pg_epoch_cond.Signal(); + min_pg_epoch_cond.notify_all(); } } @@ -9863,7 +9863,7 @@ void OSDShard::update_pg_epoch(OSDShardPGSlot *slot, epoch_t e) dout(30) << "min is now " << pg_slots_by_epoch.begin()->epoch << " on " << pg_slots_by_epoch.begin()->pg->pg_id << dendl; if (waiting_for_min_pg_epoch) { - min_pg_epoch_cond.Signal(); + min_pg_epoch_cond.notify_all(); } } @@ -9879,14 +9879,19 @@ epoch_t OSDShard::get_min_pg_epoch() void OSDShard::wait_min_pg_epoch(epoch_t need) { - std::lock_guard l(shard_lock); + std::unique_lock l{shard_lock}; ++waiting_for_min_pg_epoch; - while (!pg_slots_by_epoch.empty() && - pg_slots_by_epoch.begin()->epoch < need) { - dout(10) << need << " waiting on " - << pg_slots_by_epoch.begin()->epoch << dendl; - min_pg_epoch_cond.Wait(shard_lock); - } + min_pg_epoch_cond.wait(l, [need, this] { + if (pg_slots_by_epoch.empty()) { + return true; + } else if (pg_slots_by_epoch.begin()->epoch >= need) { + return true; + } else { + dout(10) << need << " waiting on " + << pg_slots_by_epoch.begin()->epoch << dendl; + return false; + } + }); --waiting_for_min_pg_epoch; } @@ -9983,9 +9988,8 @@ void OSDShard::consume_map( ++p; } if (queued) { - sdata_wait_lock.Lock(); - sdata_cond.SignalOne(); - sdata_wait_lock.Unlock(); + std::lock_guard l{sdata_wait_lock}; + sdata_cond.notify_one(); } } @@ -10187,9 +10191,8 @@ void OSDShard::register_and_wake_split_child(PG *pg) epoch, NullEvt()))); - sdata_wait_lock.Lock(); - sdata_cond.SignalOne(); - sdata_wait_lock.Unlock(); + std::lock_guard l{sdata_wait_lock}; + sdata_cond.notify_one(); } void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num) @@ -10253,31 +10256,31 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) bool is_smallest_thread_index = thread_index < osd->num_shards; // peek at spg_t - sdata->shard_lock.Lock(); + sdata->shard_lock.lock(); if (sdata->pqueue->empty() && (!is_smallest_thread_index || sdata->context_queue.empty())) { - sdata->sdata_wait_lock.Lock(); + std::unique_lock wait_lock{sdata->sdata_wait_lock}; if (is_smallest_thread_index && !sdata->context_queue.empty()) { // we raced with a context_queue addition, don't wait - sdata->sdata_wait_lock.Unlock(); + wait_lock.unlock(); } else if (!sdata->stop_waiting) { dout(20) << __func__ << " empty q, waiting" << dendl; osd->cct->get_heartbeat_map()->clear_timeout(hb); - sdata->shard_lock.Unlock(); - sdata->sdata_cond.Wait(sdata->sdata_wait_lock); - sdata->sdata_wait_lock.Unlock(); - sdata->shard_lock.Lock(); + sdata->shard_lock.unlock(); + sdata->sdata_cond.wait(wait_lock); + wait_lock.unlock(); + sdata->shard_lock.lock(); if (sdata->pqueue->empty() && !(is_smallest_thread_index && !sdata->context_queue.empty())) { - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); return; } osd->cct->get_heartbeat_map()->reset_timeout(hb, osd->cct->_conf->threadpool_default_timeout, 0); } else { dout(20) << __func__ << " need return immediately" << dendl; - sdata->sdata_wait_lock.Unlock(); - sdata->shard_lock.Unlock(); + wait_lock.unlock(); + sdata->shard_lock.unlock(); return; } } @@ -10289,17 +10292,17 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) if (sdata->pqueue->empty()) { if (osd->is_stopping()) { - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); return; // OSD shutdown, discard. } - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); handle_oncommits(oncommits); return; } OpQueueItem item = sdata->pqueue->dequeue(); if (osd->is_stopping()) { - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); return; // OSD shutdown, discard. } @@ -10328,18 +10331,18 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) uint64_t requeue_seq = slot->requeue_seq; ++slot->num_running; - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); osd->service.maybe_inject_dispatch_delay(); pg->lock(); osd->service.maybe_inject_dispatch_delay(); - sdata->shard_lock.Lock(); + sdata->shard_lock.lock(); auto q = sdata->pg_slots.find(token); if (q == sdata->pg_slots.end()) { // this can happen if we race with pg removal. dout(20) << __func__ << " slot " << token << " no longer there" << dendl; pg->unlock(); - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); handle_oncommits(oncommits); return; } @@ -10351,7 +10354,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) dout(20) << __func__ << " " << token << " nothing queued" << dendl; pg->unlock(); - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); handle_oncommits(oncommits); return; } @@ -10361,7 +10364,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) << requeue_seq << ", we raced with _wake_pg_slot" << dendl; pg->unlock(); - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); handle_oncommits(oncommits); return; } @@ -10460,13 +10463,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) } unsigned pushes_to_free = qi.get_reserved_pushes(); if (pushes_to_free > 0) { - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); osd->service.release_reserved_pushes(pushes_to_free); handle_oncommits(oncommits); return; } } - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); handle_oncommits(oncommits); return; } @@ -10474,13 +10477,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) OSDMapRef osdmap = sdata->shard_osdmap; if (qi.get_map_epoch() > osdmap->get_epoch()) { _add_slot_waiter(token, slot, std::move(qi)); - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); pg->unlock(); handle_oncommits(oncommits); return; } } - sdata->shard_lock.Unlock(); + sdata->shard_lock.unlock(); if (!new_children.empty()) { for (auto shard : osd->shards) { @@ -10535,7 +10538,7 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { assert (NULL != sdata); unsigned priority = item.get_priority(); unsigned cost = item.get_cost(); - sdata->shard_lock.Lock(); + sdata->shard_lock.lock(); dout(20) << __func__ << " " << item << dendl; if (priority >= osd->op_prio_cutoff) @@ -10544,12 +10547,10 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { else sdata->pqueue->enqueue( item.get_owner(), priority, cost, std::move(item)); - sdata->shard_lock.Unlock(); - - sdata->sdata_wait_lock.Lock(); - sdata->sdata_cond.SignalOne(); - sdata->sdata_wait_lock.Unlock(); + sdata->shard_lock.unlock(); + std::lock_guard l{sdata->sdata_wait_lock}; + sdata->sdata_cond.notify_one(); } void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) @@ -10557,7 +10558,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size()); auto& sdata = osd->shards[shard_index]; ceph_assert(sdata); - sdata->shard_lock.Lock(); + sdata->shard_lock.lock(); auto p = sdata->pg_slots.find(item.get_ordering_token()); if (p != sdata->pg_slots.end() && !p->second->to_process.empty()) { @@ -10575,10 +10576,9 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) dout(20) << __func__ << " " << item << dendl; } sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff); - sdata->shard_lock.Unlock(); - sdata->sdata_wait_lock.Lock(); - sdata->sdata_cond.SignalOne(); - sdata->sdata_wait_lock.Unlock(); + sdata->shard_lock.unlock(); + std::lock_guard l{sdata->sdata_wait_lock}; + sdata->sdata_cond.notify_one(); } namespace ceph { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c8e0cea6a81d..a53e3c1490d8 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -296,7 +296,7 @@ public: private: // -- superblock -- - Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish + ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish OSDSuperblock superblock; public: @@ -345,7 +345,7 @@ public: */ private: OSDMapRef next_osdmap; - Cond pre_publish_cond; + ceph::condition_variable pre_publish_cond; public: void pre_publish_map(OSDMapRef map) { @@ -378,20 +378,17 @@ public: if (--(i->second) == 0) { map_reservations.erase(i); } - pre_publish_cond.Signal(); + pre_publish_cond.notify_all(); } /// blocks until there are no reserved maps prior to next_osdmap void await_reserved_maps() { - std::lock_guard l(pre_publish_lock); + std::unique_lock l{pre_publish_lock}; ceph_assert(next_osdmap); - while (true) { - map::const_iterator i = map_reservations.cbegin(); - if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) { - break; - } else { - pre_publish_cond.Wait(pre_publish_lock); - } - } + pre_publish_cond.wait(l, [this] { + auto i = map_reservations.cbegin(); + return (i == map_reservations.cend() || + i->first >= next_osdmap->get_epoch()); + }); } OSDMapRef get_next_osdmap() { std::lock_guard l(pre_publish_lock); @@ -1131,11 +1128,11 @@ struct OSDShard { string shard_name; string sdata_wait_lock_name; - Mutex sdata_wait_lock; - Cond sdata_cond; + ceph::mutex sdata_wait_lock; + ceph::condition_variable sdata_cond; string osdmap_lock_name; - Mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock + ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock OSDMapRef shard_osdmap; OSDMapRef get_osdmap() { @@ -1144,7 +1141,7 @@ struct OSDShard { } string shard_lock_name; - Mutex shard_lock; ///< protects remaining members below + ceph::mutex shard_lock; ///< protects remaining members below /// map of slots for each spg_t. maintains ordering of items dequeued /// from pqueue while _process thread drops shard lock to acquire the @@ -1166,7 +1163,7 @@ struct OSDShard { &OSDShardPGSlot::pg_epoch_item>, boost::intrusive::compare> pg_slots_by_epoch; int waiting_for_min_pg_epoch = 0; - Cond min_pg_epoch_cond; + ceph::condition_variable min_pg_epoch_cond; /// priority queue std::unique_ptr> pqueue; @@ -1228,11 +1225,11 @@ struct OSDShard { osd(osd), shard_name(string("OSDShard.") + stringify(id)), sdata_wait_lock_name(shard_name + "::sdata_wait_lock"), - sdata_wait_lock(sdata_wait_lock_name.c_str(), false, true, false), + sdata_wait_lock{make_mutex(sdata_wait_lock_name)}, osdmap_lock_name(shard_name + "::osdmap_lock"), - osdmap_lock(osdmap_lock_name.c_str(), false, false), + osdmap_lock{make_mutex(osdmap_lock_name)}, shard_lock_name(shard_name + "::shard_lock"), - shard_lock(shard_lock_name.c_str(), false, true, false), + shard_lock{make_mutex(shard_lock_name)}, context_queue(sdata_wait_lock, sdata_cond) { if (opqueue == io_queue::weightedpriority) { pqueue = std::make_unique< @@ -1253,7 +1250,7 @@ struct OSDShard { class OSD : public Dispatcher, public md_config_obs_t { /** OSD **/ - Mutex osd_lock; // global lock + Mutex osd_lock; // global lock SafeTimer tick_timer; // safe timer (osd_lock) // Tick timer for those stuff that do not need osd_lock @@ -1753,11 +1750,10 @@ protected: void return_waiting_threads() override { for(uint32_t i = 0; i < osd->num_shards; i++) { OSDShard* sdata = osd->shards[i]; - assert (NULL != sdata); - sdata->sdata_wait_lock.Lock(); + assert (NULL != sdata); + std::scoped_lock l{sdata->sdata_wait_lock}; sdata->stop_waiting = true; - sdata->sdata_cond.Signal(); - sdata->sdata_wait_lock.Unlock(); + sdata->sdata_cond.notify_all(); } } @@ -1765,9 +1761,8 @@ protected: for(uint32_t i = 0; i < osd->num_shards; i++) { OSDShard* sdata = osd->shards[i]; assert (NULL != sdata); - sdata->sdata_wait_lock.Lock(); + std::scoped_lock l{sdata->sdata_wait_lock}; sdata->stop_waiting = false; - sdata->sdata_wait_lock.Unlock(); } } @@ -1779,11 +1774,10 @@ protected: snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i); ceph_assert(NULL != sdata); - sdata->shard_lock.Lock(); + std::scoped_lock l{sdata->shard_lock}; f->open_object_section(queue_name); sdata->pqueue->dump(f); f->close_section(); - sdata->shard_lock.Unlock(); } } @@ -2105,14 +2099,14 @@ protected: return c; } void _process(Command *c, ThreadPool::TPHandle &) override { - osd->osd_lock.Lock(); + osd->osd_lock.lock(); if (osd->is_stopping()) { - osd->osd_lock.Unlock(); + osd->osd_lock.unlock(); delete c; return; } osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); - osd->osd_lock.Unlock(); + osd->osd_lock.unlock(); delete c; } void _clear() override {