]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common,osd: Mutex -> ceph::mutex
authorKefu Chai <kchai@redhat.com>
Tue, 20 Nov 2018 12:32:23 +0000 (20:32 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 22 Nov 2018 03:26:43 +0000 (11:26 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/common/Finisher.h
src/osd/OSD.cc
src/osd/OSD.h

index 1bcf82573d789fdd1a286d38df3b13a4b832f416..cca3f81c884352509835216ade20856df0011715 100644 (file)
@@ -191,10 +191,12 @@ public:
 class ContextQueue {
   list<Context *> q;
   std::mutex q_mutex;
-  Mutex& mutex;
-  Cond& cond;
+  ceph::mutex& mutex;
+  ceph::condition_variable& cond;
 public:
-  ContextQueue(Mutex& mut, Cond& con) : mutex(mut), cond(con) {}
+  ContextQueue(ceph::mutex& mut,
+              ceph::condition_variable& con)
+    : mutex(mut), cond(con) {}
 
   void queue(list<Context *>& ls) {
     bool empty = false;
@@ -209,9 +211,8 @@ public:
     }
 
     if (empty) {
-      mutex.Lock();
-      cond.Signal();
-      mutex.Unlock();
+      std::scoped_lock l{mutex};
+      cond.notify_all();
     }
 
     ls.clear();
index 1024631d72c21aa70aace3a844f87b70532a919d..5510ddd12647a5848e03c3c349878ae8fc59c66d 100644 (file)
@@ -222,8 +222,8 @@ OSDService::OSDService(OSD *osd) :
   class_handler(osd->class_handler),
   osd_max_object_size(cct->_conf, "osd_max_object_size"),
   osd_skip_data_digest(cct->_conf, "osd_skip_data_digest"),
-  publish_lock("OSDService::publish_lock"),
-  pre_publish_lock("OSDService::pre_publish_lock"),
+  publish_lock{ceph::make_mutex("OSDService::publish_lock")},
+  pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
   max_oldest_map(0),
   peer_map_epoch_lock("OSDService::peer_map_epoch_lock"),
   sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
@@ -9847,7 +9847,7 @@ void OSDShard::_detach_pg(OSDShardPGSlot *slot)
   pg_slots_by_epoch.erase(pg_slots_by_epoch.iterator_to(*slot));
   slot->epoch = 0;
   if (waiting_for_min_pg_epoch) {
-    min_pg_epoch_cond.Signal();
+    min_pg_epoch_cond.notify_all();
   }
 }
 
@@ -9863,7 +9863,7 @@ void OSDShard::update_pg_epoch(OSDShardPGSlot *slot, epoch_t e)
   dout(30) << "min is now " << pg_slots_by_epoch.begin()->epoch
           << " on " << pg_slots_by_epoch.begin()->pg->pg_id << dendl;
   if (waiting_for_min_pg_epoch) {
-    min_pg_epoch_cond.Signal();
+    min_pg_epoch_cond.notify_all();
   }
 }
 
@@ -9879,14 +9879,19 @@ epoch_t OSDShard::get_min_pg_epoch()
 
 void OSDShard::wait_min_pg_epoch(epoch_t need)
 {
-  std::lock_guard l(shard_lock);
+  std::unique_lock l{shard_lock};
   ++waiting_for_min_pg_epoch;
-  while (!pg_slots_by_epoch.empty() &&
-        pg_slots_by_epoch.begin()->epoch < need) {
-    dout(10) << need << " waiting on "
-            << pg_slots_by_epoch.begin()->epoch << dendl;
-    min_pg_epoch_cond.Wait(shard_lock);
-  }
+  min_pg_epoch_cond.wait(l, [need, this] {
+    if (pg_slots_by_epoch.empty()) {
+      return true;
+    } else if (pg_slots_by_epoch.begin()->epoch >= need) {
+      return true;
+    } else {
+      dout(10) << need << " waiting on "
+              << pg_slots_by_epoch.begin()->epoch << dendl;
+      return false;
+    }
+  });
   --waiting_for_min_pg_epoch;
 }
 
@@ -9983,9 +9988,8 @@ void OSDShard::consume_map(
     ++p;
   }
   if (queued) {
-    sdata_wait_lock.Lock();
-    sdata_cond.SignalOne();
-    sdata_wait_lock.Unlock();
+    std::lock_guard l{sdata_wait_lock};
+    sdata_cond.notify_one();
   }
 }
 
@@ -10187,9 +10191,8 @@ void OSDShard::register_and_wake_split_child(PG *pg)
        epoch,
        NullEvt())));
 
-  sdata_wait_lock.Lock();
-  sdata_cond.SignalOne();
-  sdata_wait_lock.Unlock();
+  std::lock_guard l{sdata_wait_lock};
+  sdata_cond.notify_one();
 }
 
 void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
@@ -10253,31 +10256,31 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   bool is_smallest_thread_index = thread_index < osd->num_shards;
 
   // peek at spg_t
-  sdata->shard_lock.Lock();
+  sdata->shard_lock.lock();
   if (sdata->pqueue->empty() &&
       (!is_smallest_thread_index || sdata->context_queue.empty())) {
-    sdata->sdata_wait_lock.Lock();
+    std::unique_lock wait_lock{sdata->sdata_wait_lock};
     if (is_smallest_thread_index && !sdata->context_queue.empty()) {
       // we raced with a context_queue addition, don't wait
-      sdata->sdata_wait_lock.Unlock();
+      wait_lock.unlock();
     } else if (!sdata->stop_waiting) {
       dout(20) << __func__ << " empty q, waiting" << dendl;
       osd->cct->get_heartbeat_map()->clear_timeout(hb);
-      sdata->shard_lock.Unlock();
-      sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
-      sdata->sdata_wait_lock.Unlock();
-      sdata->shard_lock.Lock();
+      sdata->shard_lock.unlock();
+      sdata->sdata_cond.wait(wait_lock);
+      wait_lock.unlock();
+      sdata->shard_lock.lock();
       if (sdata->pqueue->empty() &&
          !(is_smallest_thread_index && !sdata->context_queue.empty())) {
-       sdata->shard_lock.Unlock();
+       sdata->shard_lock.unlock();
        return;
       }
       osd->cct->get_heartbeat_map()->reset_timeout(hb,
          osd->cct->_conf->threadpool_default_timeout, 0);
     } else {
       dout(20) << __func__ << " need return immediately" << dendl;
-      sdata->sdata_wait_lock.Unlock();
-      sdata->shard_lock.Unlock();
+      wait_lock.unlock();
+      sdata->shard_lock.unlock();
       return;
     }
   }
@@ -10289,17 +10292,17 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 
   if (sdata->pqueue->empty()) {
     if (osd->is_stopping()) {
-      sdata->shard_lock.Unlock();
+      sdata->shard_lock.unlock();
       return;    // OSD shutdown, discard.
     }
-    sdata->shard_lock.Unlock();
+    sdata->shard_lock.unlock();
     handle_oncommits(oncommits);
     return;
   }
 
   OpQueueItem item = sdata->pqueue->dequeue();
   if (osd->is_stopping()) {
-    sdata->shard_lock.Unlock();
+    sdata->shard_lock.unlock();
     return;    // OSD shutdown, discard.
   }
 
@@ -10328,18 +10331,18 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     uint64_t requeue_seq = slot->requeue_seq;
     ++slot->num_running;
 
-    sdata->shard_lock.Unlock();
+    sdata->shard_lock.unlock();
     osd->service.maybe_inject_dispatch_delay();
     pg->lock();
     osd->service.maybe_inject_dispatch_delay();
-    sdata->shard_lock.Lock();
+    sdata->shard_lock.lock();
 
     auto q = sdata->pg_slots.find(token);
     if (q == sdata->pg_slots.end()) {
       // this can happen if we race with pg removal.
       dout(20) << __func__ << " slot " << token << " no longer there" << dendl;
       pg->unlock();
-      sdata->shard_lock.Unlock();
+      sdata->shard_lock.unlock();
       handle_oncommits(oncommits);
       return;
     }
@@ -10351,7 +10354,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       dout(20) << __func__ << " " << token
               << " nothing queued" << dendl;
       pg->unlock();
-      sdata->shard_lock.Unlock();
+      sdata->shard_lock.unlock();
       handle_oncommits(oncommits);
       return;
     }
@@ -10361,7 +10364,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
               << requeue_seq << ", we raced with _wake_pg_slot"
               << dendl;
       pg->unlock();
-      sdata->shard_lock.Unlock();
+      sdata->shard_lock.unlock();
       handle_oncommits(oncommits);
       return;
     }
@@ -10460,13 +10463,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       }
       unsigned pushes_to_free = qi.get_reserved_pushes();
       if (pushes_to_free > 0) {
-       sdata->shard_lock.Unlock();
+       sdata->shard_lock.unlock();
        osd->service.release_reserved_pushes(pushes_to_free);
        handle_oncommits(oncommits);
        return;
       }
     }
-    sdata->shard_lock.Unlock();
+    sdata->shard_lock.unlock();
     handle_oncommits(oncommits);
     return;
   }
@@ -10474,13 +10477,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     OSDMapRef osdmap = sdata->shard_osdmap;
     if (qi.get_map_epoch() > osdmap->get_epoch()) {
       _add_slot_waiter(token, slot, std::move(qi));
-      sdata->shard_lock.Unlock();
+      sdata->shard_lock.unlock();
       pg->unlock();
       handle_oncommits(oncommits);
       return;
     }
   }
-  sdata->shard_lock.Unlock();
+  sdata->shard_lock.unlock();
 
   if (!new_children.empty()) {
     for (auto shard : osd->shards) {
@@ -10535,7 +10538,7 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
   assert (NULL != sdata);
   unsigned priority = item.get_priority();
   unsigned cost = item.get_cost();
-  sdata->shard_lock.Lock();
+  sdata->shard_lock.lock();
 
   dout(20) << __func__ << " " << item << dendl;
   if (priority >= osd->op_prio_cutoff)
@@ -10544,12 +10547,10 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
   else
     sdata->pqueue->enqueue(
       item.get_owner(), priority, cost, std::move(item));
-  sdata->shard_lock.Unlock();
-
-  sdata->sdata_wait_lock.Lock();
-  sdata->sdata_cond.SignalOne();
-  sdata->sdata_wait_lock.Unlock();
+  sdata->shard_lock.unlock();
 
+  std::lock_guard l{sdata->sdata_wait_lock};
+  sdata->sdata_cond.notify_one();
 }
 
 void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
@@ -10557,7 +10558,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
   auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
   auto& sdata = osd->shards[shard_index];
   ceph_assert(sdata);
-  sdata->shard_lock.Lock();
+  sdata->shard_lock.lock();
   auto p = sdata->pg_slots.find(item.get_ordering_token());
   if (p != sdata->pg_slots.end() &&
       !p->second->to_process.empty()) {
@@ -10575,10 +10576,9 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
     dout(20) << __func__ << " " << item << dendl;
   }
   sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
-  sdata->shard_lock.Unlock();
-  sdata->sdata_wait_lock.Lock();
-  sdata->sdata_cond.SignalOne();
-  sdata->sdata_wait_lock.Unlock();
+  sdata->shard_lock.unlock();
+  std::lock_guard l{sdata->sdata_wait_lock};
+  sdata->sdata_cond.notify_one();
 }
 
 namespace ceph { 
index c8e0cea6a81d18f101d271ec908cb9cbfbb7bd90..a53e3c1490d874897f749cb886e6784958f88e3f 100644 (file)
@@ -296,7 +296,7 @@ public:
 
 private:
   // -- superblock --
-  Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
+  ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
   OSDSuperblock superblock;
 
 public:
@@ -345,7 +345,7 @@ public:
    */
 private:
   OSDMapRef next_osdmap;
-  Cond pre_publish_cond;
+  ceph::condition_variable pre_publish_cond;
 
 public:
   void pre_publish_map(OSDMapRef map) {
@@ -378,20 +378,17 @@ public:
     if (--(i->second) == 0) {
       map_reservations.erase(i);
     }
-    pre_publish_cond.Signal();
+    pre_publish_cond.notify_all();
   }
   /// blocks until there are no reserved maps prior to next_osdmap
   void await_reserved_maps() {
-    std::lock_guard l(pre_publish_lock);
+    std::unique_lock l{pre_publish_lock};
     ceph_assert(next_osdmap);
-    while (true) {
-      map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
-      if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
-       break;
-      } else {
-       pre_publish_cond.Wait(pre_publish_lock);
-      }
-    }
+    pre_publish_cond.wait(l, [this] {
+      auto i = map_reservations.cbegin();
+      return (i == map_reservations.cend() ||
+             i->first >= next_osdmap->get_epoch());
+    });
   }
   OSDMapRef get_next_osdmap() {
     std::lock_guard l(pre_publish_lock);
@@ -1131,11 +1128,11 @@ struct OSDShard {
   string shard_name;
 
   string sdata_wait_lock_name;
-  Mutex sdata_wait_lock;
-  Cond sdata_cond;
+  ceph::mutex sdata_wait_lock;
+  ceph::condition_variable sdata_cond;
 
   string osdmap_lock_name;
-  Mutex osdmap_lock;  ///< protect shard_osdmap updates vs users w/o shard_lock
+  ceph::mutex osdmap_lock;  ///< protect shard_osdmap updates vs users w/o shard_lock
   OSDMapRef shard_osdmap;
 
   OSDMapRef get_osdmap() {
@@ -1144,7 +1141,7 @@ struct OSDShard {
   }
 
   string shard_lock_name;
-  Mutex shard_lock;   ///< protects remaining members below
+  ceph::mutex shard_lock;   ///< protects remaining members below
 
   /// map of slots for each spg_t.  maintains ordering of items dequeued
   /// from pqueue while _process thread drops shard lock to acquire the
@@ -1166,7 +1163,7 @@ struct OSDShard {
       &OSDShardPGSlot::pg_epoch_item>,
     boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
   int waiting_for_min_pg_epoch = 0;
-  Cond min_pg_epoch_cond;
+  ceph::condition_variable min_pg_epoch_cond;
 
   /// priority queue
   std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
@@ -1228,11 +1225,11 @@ struct OSDShard {
       osd(osd),
       shard_name(string("OSDShard.") + stringify(id)),
       sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
-      sdata_wait_lock(sdata_wait_lock_name.c_str(), false, true, false),
+      sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
       osdmap_lock_name(shard_name + "::osdmap_lock"),
-      osdmap_lock(osdmap_lock_name.c_str(), false, false),
+      osdmap_lock{make_mutex(osdmap_lock_name)},
       shard_lock_name(shard_name + "::shard_lock"),
-      shard_lock(shard_lock_name.c_str(), false, true, false),
+      shard_lock{make_mutex(shard_lock_name)},
       context_queue(sdata_wait_lock, sdata_cond) {
     if (opqueue == io_queue::weightedpriority) {
       pqueue = std::make_unique<
@@ -1253,7 +1250,7 @@ struct OSDShard {
 class OSD : public Dispatcher,
            public md_config_obs_t {
   /** OSD **/
-  Mutex osd_lock;                      // global lock
+  Mutex osd_lock;          // global lock
   SafeTimer tick_timer;    // safe timer (osd_lock)
 
   // Tick timer for those stuff that do not need osd_lock
@@ -1753,11 +1750,10 @@ protected:
     void return_waiting_threads() override {
       for(uint32_t i = 0; i < osd->num_shards; i++) {
        OSDShard* sdata = osd->shards[i];
-       assert (NULL != sdata); 
-       sdata->sdata_wait_lock.Lock();
+       assert (NULL != sdata);
+       std::scoped_lock l{sdata->sdata_wait_lock};
        sdata->stop_waiting = true;
-       sdata->sdata_cond.Signal();
-       sdata->sdata_wait_lock.Unlock();
+       sdata->sdata_cond.notify_all();
       }
     }
 
@@ -1765,9 +1761,8 @@ protected:
       for(uint32_t i = 0; i < osd->num_shards; i++) {
        OSDShard* sdata = osd->shards[i];
        assert (NULL != sdata);
-       sdata->sdata_wait_lock.Lock();
+       std::scoped_lock l{sdata->sdata_wait_lock};
        sdata->stop_waiting = false;
-       sdata->sdata_wait_lock.Unlock();
       }
     }
 
@@ -1779,11 +1774,10 @@ protected:
        snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
        ceph_assert(NULL != sdata);
 
-       sdata->shard_lock.Lock();
+       std::scoped_lock l{sdata->shard_lock};
        f->open_object_section(queue_name);
        sdata->pqueue->dump(f);
        f->close_section();
-       sdata->shard_lock.Unlock();
       }
     }
 
@@ -2105,14 +2099,14 @@ protected:
       return c;
     }
     void _process(Command *c, ThreadPool::TPHandle &) override {
-      osd->osd_lock.Lock();
+      osd->osd_lock.lock();
       if (osd->is_stopping()) {
-       osd->osd_lock.Unlock();
+       osd->osd_lock.unlock();
        delete c;
        return;
       }
       osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
-      osd->osd_lock.Unlock();
+      osd->osd_lock.unlock();
       delete c;
     }
     void _clear() override {