]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 03:06:13 +0000 (11:06 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 01:34:50 +0000 (09:34 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
13 files changed:
src/client/Client.cc
src/client/Client.h
src/client/Delegation.cc
src/client/Fh.h
src/client/Inode.h
src/client/MetaRequest.h
src/client/ObjecterWriteback.h
src/client/SyntheticClient.cc
src/client/Trace.cc
src/client/barrier.cc
src/client/barrier.h
src/client/fuse_ll.cc
src/librados/RadosClient.cc

index 310f1667f4d57369842f50fa85eb87d914813190..7210b745420e3bcb819ae251c9641234aa955963 100644 (file)
@@ -70,7 +70,6 @@
 #include "osdc/Filer.h"
 
 #include "common/Cond.h"
-#include "common/Mutex.h"
 #include "common/perf_counters.h"
 #include "common/admin_socket.h"
 #include "common/errno.h"
@@ -145,20 +144,21 @@ bool Client::CommandHook::call(std::string_view command,
 {
   std::unique_ptr<Formatter> f(Formatter::create(format));
   f->open_object_section("result");
-  m_client->client_lock.Lock();
-  if (command == "mds_requests")
-    m_client->dump_mds_requests(f.get());
-  else if (command == "mds_sessions")
-    m_client->dump_mds_sessions(f.get());
-  else if (command == "dump_cache")
-    m_client->dump_cache(f.get());
-  else if (command == "kick_stale_sessions")
-    m_client->_kick_stale_sessions();
-  else if (command == "status")
-    m_client->dump_status(f.get());
-  else
-    ceph_abort_msg("bad command registered");
-  m_client->client_lock.Unlock();
+  {
+    std::lock_guard l{m_client->client_lock};
+    if (command == "mds_requests")
+      m_client->dump_mds_requests(f.get());
+    else if (command == "mds_sessions")
+      m_client->dump_mds_sessions(f.get());
+    else if (command == "dump_cache")
+      m_client->dump_cache(f.get());
+    else if (command == "kick_stale_sessions")
+      m_client->_kick_stale_sessions();
+    else if (command == "status")
+      m_client->dump_status(f.get());
+    else
+      ceph_abort_msg("bad command registered");
+  }
   f->close_section();
   f->flush(out);
   return true;
@@ -261,7 +261,6 @@ vinodeno_t Client::map_faked_ino(ino_t ino)
 Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
   : Dispatcher(m->cct),
     timer(m->cct, client_lock),
-    client_lock("Client::client_lock"),
     messenger(m),
     monclient(mc),
     objecter(objecter_),
@@ -309,14 +308,13 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
 
 Client::~Client()
 {
-  ceph_assert(!client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_not_locked(client_lock));
 
   // It is necessary to hold client_lock, because any inode destruction
   // may call into ObjectCacher, which asserts that it's lock (which is
   // client_lock) is held.
-  client_lock.Lock();
+  std::lock_guard l{client_lock};
   tear_down_cache();
-  client_lock.Unlock();
 }
 
 void Client::tear_down_cache()
@@ -443,7 +441,7 @@ void Client::dump_cache(Formatter *f)
 
 void Client::dump_status(Formatter *f)
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   ldout(cct, 1) << __func__ << dendl;
 
@@ -476,31 +474,29 @@ int Client::init()
 {
   timer.init();
   objectcacher->start();
-
-  client_lock.Lock();
-  ceph_assert(!initialized);
-
-  messenger->add_dispatcher_tail(this);
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    ceph_assert(!initialized);
+    messenger->add_dispatcher_tail(this);
+  }
   _finish_init();
   return 0;
 }
 
 void Client::_finish_init()
 {
-  client_lock.Lock();
-  // logger
-  PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
-  plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
-  plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
-  plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
-  plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
-  plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
-  logger.reset(plb.create_perf_counters());
-  cct->get_perfcounters_collection()->add(logger.get());
-
-  client_lock.Unlock();
+  {
+    std::lock_guard l{client_lock};
+    // logger
+    PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
+    plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
+    plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
+    plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
+    plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
+    plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
+    logger.reset(plb.create_perf_counters());
+    cct->get_perfcounters_collection()->add(logger.get());
+  }
 
   cct->_conf.add_observer(this);
 
@@ -546,9 +542,8 @@ void Client::_finish_init()
               << cpp_strerror(-ret) << dendl;
   }
 
-  client_lock.Lock();
+  std::lock_guard l{client_lock};
   initialized = true;
-  client_lock.Unlock();
 }
 
 void Client::shutdown() 
@@ -557,10 +552,10 @@ void Client::shutdown()
 
   // If we were not mounted, but were being used for sending
   // MDS commands, we may have sessions that need closing.
-  client_lock.Lock();
-  _close_sessions();
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    _close_sessions();
+  }
   cct->_conf.remove_observer(this);
 
   cct->get_admin_socket()->unregister_commands(&m_command_hook);
@@ -590,13 +585,12 @@ void Client::shutdown()
   }
 
   objectcacher->stop();  // outside of client_lock! this does a join.
-
-  client_lock.Lock();
-  ceph_assert(initialized);
-  initialized = false;
-  timer.shutdown();
-  client_lock.Unlock();
-
+  {
+    std::lock_guard l{client_lock};
+    ceph_assert(initialized);
+    initialized = false;
+    timer.shutdown();
+  }
   objecter_finisher.wait_for_empty();
   objecter_finisher.stop();
 
@@ -1704,7 +1698,7 @@ int Client::make_request(MetaRequest *request,
     }
 
     // set up wait cond
-    Cond caller_cond;
+    ceph::condition_variable caller_cond;
     request->caller_cond = &caller_cond;
 
     // choose mds
@@ -1756,11 +1750,14 @@ int Client::make_request(MetaRequest *request,
     // wait for signal
     ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
     request->kick = false;
-    while (!request->reply &&         // reply
-          request->resend_mds < 0 && // forward
-          !request->kick)
-      caller_cond.Wait(client_lock);
-    request->caller_cond = NULL;
+    std::unique_lock l{client_lock, std::adopt_lock};
+    caller_cond.wait(l, [request] {
+      return (request->reply ||                  // reply
+             request->resend_mds >= 0 || // forward
+             request->kick);
+    });
+    l.release();
+    request->caller_cond = nullptr;
 
     // did we get a reply?
     if (request->reply) 
@@ -1785,7 +1782,7 @@ int Client::make_request(MetaRequest *request,
 
   // kick dispatcher (we've got it!)
   ceph_assert(request->dispatch_cond);
-  request->dispatch_cond->Signal();
+  request->dispatch_cond->notify_all();
   ldout(cct, 20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
   request->dispatch_cond = 0;
   
@@ -2073,7 +2070,7 @@ void Client::_closed_mds_session(MetaSession *s)
   s->state = MetaSession::STATE_CLOSED;
   s->con->mark_down();
   signal_context_list(s->waiting_for_open);
-  mount_cond.Signal();
+  mount_cond.notify_all();
   remove_session_caps(s);
   kick_requests_closed(s);
   mds_sessions.erase(s->mds_num);
@@ -2108,7 +2105,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
       renew_caps(session);
       session->state = MetaSession::STATE_OPEN;
       if (unmounting)
-       mount_cond.Signal();
+       mount_cond.notify_all();
       else
        connect_mds_targets(from);
       signal_context_list(session->waiting_for_open);
@@ -2175,7 +2172,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
 
 bool Client::_any_stale_sessions() const
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   for (const auto &p : mds_sessions) {
     if (p.second.state == MetaSession::STATE_STALE) {
@@ -2319,7 +2316,7 @@ void Client::handle_client_request_forward(const MConstRef<MClientRequestForward
   request->item.remove_myself();
   request->num_fwd = fwd->get_num_fwd();
   request->resend_mds = fwd->get_dest_mds();
-  request->caller_cond->Signal();
+  request->caller_cond->notify_all();
 }
 
 bool Client::is_dir_operation(MetaRequest *req)
@@ -2375,7 +2372,7 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
          request->sent_on_mseq == it->second.mseq)) {
       ldout(cct, 20) << "have to return ESTALE" << dendl;
     } else {
-      request->caller_cond->Signal();
+      request->caller_cond->notify_all();
       return;
     }
   }
@@ -2402,18 +2399,23 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
   // Only signal the caller once (on the first reply):
   // Either its an unsafe reply, or its a safe reply and no unsafe reply was sent.
   if (!is_safe || !request->got_unsafe) {
-    Cond cond;
+    ceph::condition_variable cond;
     request->dispatch_cond = &cond;
 
     // wake up waiter
     ldout(cct, 20) << __func__ << " signalling caller " << (void*)request->caller_cond << dendl;
-    request->caller_cond->Signal();
+    request->caller_cond->notify_all();
 
     // wake for kick back
-    while (request->dispatch_cond) {
-      ldout(cct, 20) << __func__ << " awaiting kickback on tid " << tid << " " << &cond << dendl;
-      cond.Wait(client_lock);
-    }
+    std::unique_lock l{client_lock, std::adopt_lock};
+    cond.wait(l, [tid, request, &cond, this] {
+      if (request->dispatch_cond) {
+        ldout(cct, 20) << "handle_client_reply awaiting kickback on tid "
+                      << tid << " " << &cond << dendl;
+      }
+      return !request->dispatch_cond;
+    });
+    l.release();
   }
 
   if (is_safe) {
@@ -2429,7 +2431,7 @@ void Client::handle_client_reply(const MConstRef<MClientReply>& reply)
     unregister_request(request);
   }
   if (unmounting)
-    mount_cond.Signal();
+    mount_cond.notify_all();
 }
 
 void Client::_handle_full_flag(int64_t pool)
@@ -2632,7 +2634,7 @@ bool Client::ms_dispatch2(const MessageRef &m)
     trim_cache();
     if (size < lru.lru_get_size() + inode_map.size()) {
       ldout(cct, 10) << "unmounting: trim pass, cache shrank, poking unmount()" << dendl;
-      mount_cond.Signal();
+      mount_cond.notify_all();
     } else {
       ldout(cct, 10) << "unmounting: trim pass, size still " << lru.lru_get_size() 
                << "+" << inode_map.size() << dendl;
@@ -2850,7 +2852,7 @@ void Client::send_reconnect(MetaSession *session)
     m->set_encoding_version(0); // use connection features to choose encoding
   session->con->send_message2(std::move(m));
 
-  mount_cond.Signal();
+  mount_cond.notify_all();
 
   if (session->reclaim_state == MetaSession::RECLAIMING)
     signal_cond_list(waiting_for_reclaim);
@@ -2869,7 +2871,7 @@ void Client::kick_requests(MetaSession *session)
     if (req->aborted()) {
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       continue;
     }
@@ -2937,7 +2939,7 @@ void Client::kick_requests_closed(MetaSession *session)
     if (req->mds == session->mds_num) {
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       req->item.remove_myself();
       if (req->got_unsafe) {
@@ -3136,7 +3138,7 @@ private:
 public:
   C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { }
   void finish(int r) override {
-    ceph_assert(client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
     if (r != 0) {
       client_t const whoami = client->whoami;  // For the benefit of ldout prefix
       ldout(client->cct, 1) << "I/O error from flush on inode " << inode
@@ -3772,28 +3774,32 @@ void Client::flush_snaps(Inode *in)
   }
 }
 
-void Client::wait_on_list(list<Cond*>& ls)
+void Client::wait_on_list(list<ceph::condition_variable*>& ls)
 {
-  Cond cond;
+  ceph::condition_variable cond;
   ls.push_back(&cond);
-  cond.Wait(client_lock);
+  std::unique_lock l{client_lock, std::adopt_lock};
+  cond.wait(l);
+  l.release();
   ls.remove(&cond);
 }
 
-void Client::signal_cond_list(list<Cond*>& ls)
+void Client::signal_cond_list(list<ceph::condition_variable*>& ls)
 {
-  for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
-    (*it)->Signal();
+  for (auto cond : ls) {
+    cond->notify_all();
+  }
 }
 
 void Client::wait_on_context_list(list<Context*>& ls)
 {
-  Cond cond;
+  ceph::condition_variable cond;
   bool done = false;
   int r;
-  ls.push_back(new C_Cond(&cond, &done, &r));
-  while (!done)
-    cond.Wait(client_lock);
+  ls.push_back(new C_Cond(cond, &done, &r));
+  std::unique_lock l{client_lock, std::adopt_lock};
+  cond.wait(l, [&done] { return done;});
+  l.release();
 }
 
 void Client::signal_context_list(list<Context*>& ls)
@@ -3842,7 +3848,7 @@ public:
   }
   void finish(int r) override {
     // _async_invalidate takes the lock when it needs to, call this back from outside of lock.
-    ceph_assert(!client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
     client->_async_invalidate(ino, offset, length);
   }
 };
@@ -3924,7 +3930,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
 
 void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
 {
-  ceph_assert(client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(client_lock));
   if (!in->oset.dirty_or_tx) {
     ldout(cct, 10) << " nothing to flush" << dendl;
     return;
@@ -3935,16 +3941,16 @@ void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
                                      offset, size, &onflush);
   if (!ret) {
     // wait for flush
-    client_lock.Unlock();
+    client_lock.unlock();
     onflush.wait();
-    client_lock.Lock();
+    client_lock.lock();
   }
 }
 
 void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
 {
   //  std::lock_guard l(client_lock);
-  ceph_assert(client_lock.is_locked());   // will be called via dispatch() -> objecter -> ...
+  ceph_assert(ceph_mutex_is_locked(client_lock));   // will be called via dispatch() -> objecter -> ...
   Inode *in = static_cast<Inode *>(oset->parent);
   ceph_assert(in);
   _flushed(in);
@@ -4144,7 +4150,7 @@ void Client::remove_session_caps(MetaSession *s)
     signal_cond_list(in->waitfor_caps);
   }
   s->flushing_caps_tids.clear();
-  sync_cond.Signal();
+  sync_cond.notify_all();
 }
 
 int Client::_do_remount(bool retry_on_error)
@@ -4419,7 +4425,9 @@ void Client::wait_sync_caps(ceph_tid_t want)
     if (oldest_tid <= want) {
       ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
                     << " (want " << want << ")" << dendl;
-      sync_cond.Wait(client_lock);
+      std::unique_lock l{client_lock, std::adopt_lock};
+      sync_cond.wait(l);
+      l.release();
       goto retry;
     }
   }
@@ -5001,7 +5009,7 @@ void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, con
     signal_cond_list(in->waitfor_caps);
     if (session->flushing_caps_tids.empty() ||
        *session->flushing_caps_tids.begin() > flush_ack_tid)
-      sync_cond.Signal();
+      sync_cond.notify_all();
   }
 
   if (!dirty) {
@@ -5053,7 +5061,7 @@ void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, const MCo
       signal_cond_list(in->waitfor_caps);
       if (session->flushing_caps_tids.empty() ||
          *session->flushing_caps_tids.begin() > flush_ack_tid)
-       sync_cond.Signal();
+       sync_cond.notify_all();
     }
   } else {
     ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows
@@ -5085,7 +5093,7 @@ public:
   }
   void finish(int r) override {
     // _async_dentry_invalidate is responsible for its own locking
-    ceph_assert(!client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
     client->_async_dentry_invalidate(dirino, ino, name);
   }
 };
@@ -5617,15 +5625,15 @@ int Client::resolve_mds(
  */
 int Client::authenticate()
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   if (monclient->is_authenticated()) {
     return 0;
   }
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int r = monclient->authenticate(cct->_conf->client_mount_timeout);
-  client_lock.Lock();
+  client_lock.lock();
   if (r < 0) {
     return r;
   }
@@ -5646,9 +5654,9 @@ int Client::fetch_fsmap(bool user)
   do {
     C_SaferCond cond;
     monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
   } while (r == -EAGAIN);
 
   if (r < 0) {
@@ -5941,7 +5949,9 @@ void Client::_close_sessions()
 
     // wait for sessions to close
     ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
-    mount_cond.Wait(client_lock);
+    std::unique_lock l{client_lock, std::adopt_lock};
+    mount_cond.wait(l);
+    l.release();
   }
 }
 
@@ -5978,7 +5988,7 @@ void Client::_abort_mds_sessions(int err)
     req->abort(err);
     if (req->caller_cond) {
       req->kick = true;
-      req->caller_cond->Signal();
+      req->caller_cond->notify_all();
     }
   }
 
@@ -5996,6 +6006,7 @@ void Client::_abort_mds_sessions(int err)
 
 void Client::_unmount(bool abort)
 {
+  std::unique_lock lock{client_lock, std::adopt_lock};
   if (unmounting)
     return;
 
@@ -6018,11 +6029,13 @@ void Client::_unmount(bool abort)
     flush_mdlog_sync();
   }
 
-  while (!mds_requests.empty()) {
-    ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
-    mount_cond.Wait(client_lock);
-  }
-
+  mount_cond.wait(lock, [this] {
+    if (!mds_requests.empty()) {
+      ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests"
+                    << dendl;
+    }
+    return mds_requests.empty();
+  });
   if (tick_event)
     timer.cancel_event(tick_event);
   tick_event = 0;
@@ -6053,10 +6066,13 @@ void Client::_unmount(bool abort)
 
   _ll_drop_pins();
 
-  while (unsafe_sync_write > 0) {
-    ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"  << dendl;
-    mount_cond.Wait(client_lock);
-  }
+  mount_cond.wait(lock, [this] {
+    if (unsafe_sync_write > 0) {
+      ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"
+                   << dendl;
+    }
+    return unsafe_sync_write <= 0;
+  });
 
   if (cct->_conf->client_oc) {
     // flush/release all buffered data
@@ -6104,9 +6120,8 @@ void Client::_unmount(bool abort)
             << "+" << inode_map.size() << " items"
            << ", waiting (for caps to release?)"
             << dendl;
-    utime_t until = ceph_clock_now() + utime_t(5, 0);
-    int r = mount_cond.WaitUntil(client_lock, until);
-    if (r == ETIMEDOUT) {
+    if (auto r = mount_cond.wait_for(lock, ceph::make_timespan(5));
+       r == std::cv_status::timeout) {
       dump_cache(NULL);
     }
   }
@@ -6123,6 +6138,7 @@ void Client::_unmount(bool abort)
 
   mounted = false;
 
+  lock.release();
   ldout(cct, 2) << "unmounted." << dendl;
 }
 
@@ -6168,7 +6184,7 @@ void Client::tick()
     cct->_conf->client_tick_interval,
     new FunctionContext([this](int) {
        // Called back via Timer, which takes client_lock for us
-       ceph_assert(client_lock.is_locked_by_me());
+       ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
        tick();
       }));
   utime_t now = ceph_clock_now();
@@ -6179,7 +6195,7 @@ void Client::tick()
       req->abort(-ETIMEDOUT);
       if (req->caller_cond) {
        req->kick = true;
-       req->caller_cond->Signal();
+       req->caller_cond->notify_all();
       }
       signal_cond_list(waiting_for_mdsmap);
       for (auto &p : mds_sessions) {
@@ -7963,7 +7979,7 @@ struct dentry_off_lt {
 int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
                              int caps, bool getref)
 {
-  ceph_assert(client_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(client_lock));
   ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
           << " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
           << dendl;
@@ -8019,9 +8035,9 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
 
     dn_name = dn->name; // fill in name while we have lock
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, in);  // _next_ offset
-    client_lock.Lock();
+    client_lock.lock();
     ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
                   << " = " << r << dendl;
     if (r < 0) {
@@ -8089,9 +8105,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       _ll_get(inode);
     }
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, inode);
-    client_lock.Lock();
+    client_lock.lock();
     if (r < 0)
       return r;
 
@@ -8122,9 +8138,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       _ll_get(inode);
     }
 
-    client_lock.Unlock();
+    client_lock.unlock();
     r = cb(p, &de, &stx, next_off, inode);
-    client_lock.Lock();
+    client_lock.lock();
     if (r < 0)
       return r;
 
@@ -8189,9 +8205,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
        _ll_get(inode);
       }
 
-      client_lock.Unlock();
+      client_lock.unlock();
       r = cb(p, &de, &stx, next_off, inode);  // _next_ offset
-      client_lock.Lock();
+      client_lock.lock();
 
       ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
                     << " = " << r << dendl;
@@ -8913,11 +8929,14 @@ void Client::lock_fh_pos(Fh *f)
   ldout(cct, 10) << __func__ << " " << f << dendl;
 
   if (f->pos_locked || !f->pos_waiters.empty()) {
-    Cond cond;
+    ceph::condition_variable cond;
     f->pos_waiters.push_back(&cond);
     ldout(cct, 10) << __func__ << " BLOCKING on " << f << dendl;
-    while (f->pos_locked || f->pos_waiters.front() != &cond)
-      cond.Wait(client_lock);
+    std::unique_lock l{client_lock, std::adopt_lock};
+    cond.wait(l, [f, me=&cond] {
+      return !f->pos_locked && f->pos_waiters.front() == me;
+    });
+    l.release();
     ldout(cct, 10) << __func__ << " UNBLOCKING on " << f << dendl;
     ceph_assert(f->pos_waiters.front() == &cond);
     f->pos_waiters.pop_front();
@@ -9138,9 +9157,9 @@ done:
   // done!
   
   if (onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
     if (ret >= 0 || ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
@@ -9201,9 +9220,9 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
                              off, len, bl, 0, &onfinish);
   if (r == 0) {
     get_cap_ref(in, CEPH_CAP_FILE_CACHE);
-    client_lock.Unlock();
+    client_lock.unlock();
     r = onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
     put_cap_ref(in, CEPH_CAP_FILE_CACHE);
   }
 
@@ -9239,8 +9258,8 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
 
   ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;
 
-  Mutex flock("Client::_read_sync flock");
-  Cond cond;
+  ceph::mutex flock = ceph::make_mutex("Client::_read_sync flock");
+  ceph::condition_variable cond;
   while (left > 0) {
     C_SaferCond onfinish("Client::_read_sync flock");
     bufferlist tbl;
@@ -9250,9 +9269,9 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
                      pos, left, &tbl, 0,
                      in->truncate_size, in->truncate_seq,
                      &onfinish);
-    client_lock.Unlock();
+    client_lock.unlock();
     int r = onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     // if we get ENOENT from OSD, assume 0 bytes returned
     if (r == -ENOENT)
@@ -9306,7 +9325,7 @@ void Client::_sync_write_commit(Inode *in)
   ldout(cct, 15) << __func__ << " unsafe_sync_write = " << unsafe_sync_write << dendl;
   if (unsafe_sync_write == 0 && unmounting) {
     ldout(cct, 10) << __func__ << " -- no more unsafe writes, unmount can proceed" << dendl;
-    mount_cond.Signal();
+    mount_cond.notify_all();
   }
 }
 
@@ -9577,9 +9596,9 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
                       offset, size, bl, ceph::real_clock::now(), 0,
                       in->truncate_size, in->truncate_seq,
                       &onfinish);
-    client_lock.Unlock();
+    client_lock.unlock();
     onfinish.wait();
-    client_lock.Lock();
+    client_lock.lock();
     _sync_write_commit(in);
   }
 
@@ -9622,9 +9641,9 @@ success:
 done:
 
   if (nullptr != onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int uninline_ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
@@ -9754,10 +9773,10 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   }
 
   if (nullptr != object_cacher_completion) { // wait on a real reply instead of guessing
-    client_lock.Unlock();
+    client_lock.unlock();
     ldout(cct, 15) << "waiting on data to flush" << dendl;
     r = object_cacher_completion->wait();
-    client_lock.Lock();
+    client_lock.lock();
     ldout(cct, 15) << "got " << r << " from flush writeback" << dendl;
   } else {
     // FIXME: this can starve
@@ -9933,11 +9952,11 @@ int Client::statfs(const char *path, struct statvfs *stbuf,
     objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
   }
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int rval = cond.wait();
   assert(root);
   total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
-  client_lock.Lock();
+  client_lock.lock();
 
   if (rval < 0) {
     ldout(cct, 1) << "underlying call to statfs returned error: "
@@ -10396,11 +10415,11 @@ int Client::_sync_fs()
   wait_sync_caps(flush_tid);
 
   if (nullptr != cond) {
-    client_lock.Unlock();
+    client_lock.unlock();
     ldout(cct, 15) << __func__ << " waiting on data to flush" << dendl;
     cond->wait();
     ldout(cct, 15) << __func__ << " flush finished" << dendl;
-    client_lock.Lock();
+    client_lock.lock();
   }
 
   return 0;
@@ -13188,9 +13207,9 @@ int Client::ll_read_block(Inode *in, uint64_t blockid,
                 CEPH_OSD_FLAG_READ,
                  &onfinish);
 
-  client_lock.Unlock();
+  client_lock.unlock();
   int r = onfinish.wait();
-  client_lock.Lock();
+  client_lock.lock();
 
   if (r >= 0) {
       bl.copy(0, bl.length(), buf);
@@ -13233,9 +13252,9 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
   fakesnap.seq = snapseq;
 
   /* lock just in time */
-  client_lock.Lock();
+  client_lock.lock();
   if (unmounting) {
-    client_lock.Unlock();
+    client_lock.unlock();
     return -ENOTCONN;
   }
 
@@ -13249,7 +13268,7 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
                  0,
                  onsafe.get());
 
-  client_lock.Unlock();
+  client_lock.unlock();
   if (nullptr != onsafe) {
     r = onsafe->wait();
   }
@@ -13449,9 +13468,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
       in->change_attr++;
       in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-      client_lock.Unlock();
+      client_lock.unlock();
       onfinish.wait();
-      client_lock.Lock();
+      client_lock.lock();
       _sync_write_commit(in);
     }
   } else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
@@ -13471,9 +13490,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
   }
 
   if (nullptr != onuninline) {
-    client_lock.Unlock();
+    client_lock.unlock();
     int ret = onuninline->wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     if (ret >= 0 || ret == -ECANCELED) {
       in->inline_data.clear();
@@ -14117,10 +14136,10 @@ int Client::check_pool_perm(Inode *in, int need)
     objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
                     nullsnapc, ceph::real_clock::now(), 0, &wr_cond);
 
-    client_lock.Unlock();
+    client_lock.unlock();
     int rd_ret = rd_cond.wait();
     int wr_ret = wr_cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
 
     bool errored = false;
 
@@ -14371,9 +14390,9 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
   C_SaferCond cond;
   if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
     ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
-    client_lock.Unlock();
+    client_lock.unlock();
     cond.wait();
-    client_lock.Lock();
+    client_lock.lock();
   }
 
   bool blacklisted = objecter->with_osdmap(
@@ -14484,7 +14503,7 @@ void intrusive_ptr_release(Inode *in)
 
 mds_rank_t Client::_get_random_up_mds() const
 {
-  ceph_assert(client_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   std::set<mds_rank_t> up;
   mdsmap->get_up_mds_set(up);
@@ -14517,7 +14536,7 @@ int StandaloneClient::init()
   objectcacher->start();
   objecter->init();
 
-  client_lock.Lock();
+  client_lock.lock();
   ceph_assert(!is_initialized());
 
   messenger->add_dispatcher_tail(objecter);
@@ -14528,7 +14547,7 @@ int StandaloneClient::init()
   if (r < 0) {
     // need to do cleanup because we're in an intermediate init state
     timer.shutdown();
-    client_lock.Unlock();
+    client_lock.unlock();
     objecter->shutdown();
     objectcacher->stop();
     monclient->shutdown();
@@ -14536,7 +14555,7 @@ int StandaloneClient::init()
   }
   objecter->start();
 
-  client_lock.Unlock();
+  client_lock.unlock();
   _finish_init();
 
   return 0;
index 58ae8bb7b4fb14ad6ee9fc6bcc015fc627b33407..d95ffe3c6409ee8483b5051471ded940ecf0bdb9 100644 (file)
@@ -18,8 +18,8 @@
 
 #include "common/CommandTable.h"
 #include "common/Finisher.h"
-#include "common/Mutex.h"
 #include "common/Timer.h"
+#include "common/ceph_mutex.h"
 #include "common/cmdparse.h"
 #include "common/compiler_extensions.h"
 #include "include/cephfs/ceph_statx.h"
@@ -611,8 +611,8 @@ public:
   int ll_delegation(Fh *fh, unsigned cmd, ceph_deleg_cb_t cb, void *priv);
 
   entity_name_t get_myname() { return messenger->get_myname(); }
-  void wait_on_list(list<Cond*>& ls);
-  void signal_cond_list(list<Cond*>& ls);
+  void wait_on_list(std::list<ceph::condition_variable*>& ls);
+  void signal_cond_list(std::list<ceph::condition_variable*>& ls);
 
   void set_filer_flags(int flags);
   void clear_filer_flags(int flags);
@@ -959,7 +959,8 @@ protected:
 
   // global client lock
   //  - protects Client and buffer cache both!
-  Mutex                  client_lock;
+  ceph::mutex client_lock = ceph::make_mutex("Client::client_lock");
+;
 
   std::map<snapid_t, int> ll_snap_ref;
 
@@ -1221,10 +1222,10 @@ private:
 
   // mds sessions
   map<mds_rank_t, MetaSession> mds_sessions;  // mds -> push seq
-  list<Cond*> waiting_for_mdsmap;
+  std::list<ceph::condition_variable*> waiting_for_mdsmap;
 
   // FSMap, for when using mds_command
-  list<Cond*> waiting_for_fsmap;
+  std::list<ceph::condition_variable*> waiting_for_fsmap;
   std::unique_ptr<FSMap> fsmap;
   std::unique_ptr<FSMapUser> fsmap_user;
 
@@ -1281,15 +1282,15 @@ private:
   // trace generation
   ofstream traceout;
 
-  Cond mount_cond, sync_cond;
+  ceph::condition_variable mount_cond, sync_cond;
 
   std::map<std::pair<int64_t,std::string>, int> pool_perms;
-  list<Cond*> waiting_for_pool_perm;
+  std::list<ceph::condition_variable*> waiting_for_pool_perm;
 
   uint64_t retries_on_invalidate = 0;
 
   // state reclaim
-  list<Cond*> waiting_for_reclaim;
+  std::list<ceph::condition_variable*> waiting_for_reclaim;
   int reclaim_errno = 0;
   epoch_t reclaim_osd_epoch = 0;
   entity_addrvec_t reclaim_target_addrs;
index 58d8aaf9d089ad3a2c4b98b7ed33603da9e982c3..52339c5d816065f8cbafaaa72f7a61a9f11c6b1e 100644 (file)
@@ -17,7 +17,7 @@ public:
     Client *client = in->client;
 
     // Called back via Timer, which takes client_lock for us
-    ceph_assert(client->client_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
 
     lsubdout(client->cct, client, 0) << __func__ <<
          ": delegation return timeout for inode 0x" <<
index 6bc4d1a38fcb0fa750fb0fd8f92aa9a92a89b0f4..eae96037e74325d5af7db2f5d6f5ca79de4e8891 100644 (file)
@@ -7,7 +7,6 @@
 #include "UserPerm.h"
 #include "mds/flock.h"
 
-class Cond;
 class Inode;
 
 // file handle for any open file state
@@ -21,7 +20,7 @@ struct Fh {
 
   int flags;
   bool pos_locked;           // pos is currently in use
-  list<Cond*> pos_waiters;   // waiters for pos
+  std::list<ceph::condition_variable*> pos_waiters;   // waiters for pos
 
   UserPerm actor_perms; // perms I opened the file with
 
index 0e6586cb6697682a74e33ca13750123a323bc049..da2c9225ac5a666943c4aadcfc307bded349095a 100644 (file)
@@ -228,9 +228,9 @@ struct Inode {
   map<string,bufferptr> xattrs;
   map<frag_t,int> fragmap;  // known frag -> mds mappings
 
-  list<Cond*>       waitfor_caps;
-  list<Cond*>       waitfor_commit;
-  list<Cond*>      waitfor_deleg;
+  std::list<ceph::condition_variable*> waitfor_caps;
+  std::list<ceph::condition_variable*> waitfor_commit;
+  std::list<ceph::condition_variable*> waitfor_deleg;
 
   Dentry *get_first_parent() {
     ceph_assert(!dentries.empty());
index 202be43baeec3af1d425c2be7265297ece216cd9..611d8e5d95397b3798e00bda0c5d58d1cc00d5bf 100644 (file)
@@ -64,9 +64,9 @@ public:
   xlist<MetaRequest*>::item unsafe_dir_item;
   xlist<MetaRequest*>::item unsafe_target_item;
 
-  Cond  *caller_cond;          // who to take up
-  Cond  *dispatch_cond;        // who to kick back
-  list<Cond*> waitfor_safe;
+  ceph::condition_variable *caller_cond;          // who to take up
+  ceph::condition_variable *dispatch_cond;        // who to kick back
+  list<ceph::condition_variable*> waitfor_safe;
 
   InodeRef target;
   UserPerm perms;
index 8928437646ce2ee8d6a605f43e4be095811b058d..7a159e80e9de0598cf77323531b1149cf232968a 100644 (file)
@@ -8,7 +8,7 @@
 
 class ObjecterWriteback : public WritebackHandler {
  public:
-  ObjecterWriteback(Objecter *o, Finisher *fin, Mutex *lock)
+  ObjecterWriteback(Objecter *o, Finisher *fin, ceph::mutex *lock)
     : m_objecter(o),
       m_finisher(fin),
       m_lock(lock) { }
@@ -66,7 +66,7 @@ class ObjecterWriteback : public WritebackHandler {
  private:
   Objecter *m_objecter;
   Finisher *m_finisher;
-  Mutex *m_lock;
+  ceph::mutex *m_lock;
 };
 
 #endif
index f511168386f749b764488baf755c9eee749f7863..a6d05ff3ec998421c74b892cc58381f6962b293c 100644 (file)
@@ -1045,8 +1045,8 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
   int n = 0;
 
   // for object traces
-  Mutex lock("synclient foo");
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("synclient foo");
+  ceph::condition_variable cond;
   bool ack;
 
   while (!t.end()) {
@@ -1450,14 +1450,13 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       int64_t oh = t.get_int();
       int64_t ol = t.get_int();
       object_t oid = file_object_t(oh, ol);
-      lock.Lock();
+      std::unique_lock locker{lock};
       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
       uint64_t size;
       ceph::real_time mtime;
       client->objecter->stat(oid, oloc, CEPH_NOSNAP, &size, &mtime, 0,
-                            new C_SafeCond(&lock, &cond, &ack));
-      while (!ack) cond.Wait(lock);
-      lock.Unlock();
+                            new C_SafeCond(lock, cond, &ack));
+      cond.wait(locker, [&ack] { return ack; });
     }
     else if (strcmp(op, "o_read") == 0) {
       int64_t oh = t.get_int();
@@ -1466,12 +1465,11 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       int64_t len = t.get_int();
       object_t oid = file_object_t(oh, ol);
       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
-      lock.Lock();
+      std::unique_lock locker{lock};
       bufferlist bl;
       client->objecter->read(oid, oloc, off, len, CEPH_NOSNAP, &bl, 0,
-                            new C_SafeCond(&lock, &cond, &ack));
-      while (!ack) cond.Wait(lock);
-      lock.Unlock();
+                            new C_SafeCond(lock, cond, &ack));
+      cond.wait(locker, [&ack] { return ack; });
     }
     else if (strcmp(op, "o_write") == 0) {
       int64_t oh = t.get_int();
@@ -1480,16 +1478,15 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       int64_t len = t.get_int();
       object_t oid = file_object_t(oh, ol);
       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
-      lock.Lock();
+      std::unique_lock locker{lock};
       bufferptr bp(len);
       bufferlist bl;
       bl.push_back(bp);
       SnapContext snapc;
       client->objecter->write(oid, oloc, off, len, snapc, bl,
                              ceph::real_clock::now(), 0,
-                             new C_SafeCond(&lock, &cond, &ack));
-      while (!ack) cond.Wait(lock);
-      lock.Unlock();
+                             new C_SafeCond(lock, cond, &ack));
+      cond.wait(locker, [&ack] { return ack; });
     }
     else if (strcmp(op, "o_zero") == 0) {
       int64_t oh = t.get_int();
@@ -1498,13 +1495,12 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       int64_t len = t.get_int();
       object_t oid = file_object_t(oh, ol);
       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
-      lock.Lock();
+      std::unique_lock locker{lock};
       SnapContext snapc;
       client->objecter->zero(oid, oloc, off, len, snapc,
                             ceph::real_clock::now(), 0,
-                            new C_SafeCond(&lock, &cond, &ack));
-      while (!ack) cond.Wait(lock);
-      lock.Unlock();
+                            new C_SafeCond(lock, cond, &ack));
+      cond.wait(locker, [&ack] { return ack; });
     }
 
 
@@ -2214,20 +2210,19 @@ int SyntheticClient::read_file(const std::string& fn, int size,
 
 
 class C_Ref : public Context {
-  Mutex& lock;
-  Cond& cond;
+  ceph::mutex& lock;
+  ceph::condition_variable& cond;
   int *ref;
 public:
-  C_Ref(Mutex &l, Cond &c, int *r) : lock(l), cond(c), ref(r) {
-    lock.Lock();
+  C_Ref(ceph::mutex &l, ceph::condition_variable &c, int *r)
+    : lock(l), cond(c), ref(r) {
+    lock_guard locker{lock};
     (*ref)++;
-    lock.Unlock();
   }
   void finish(int) override {
-    lock.Lock();
+    lock_guard locker{lock};
     (*ref)--;
-    cond.Signal();
-    lock.Unlock();
+    cond.notify_all();
   }
 };
 
@@ -2259,8 +2254,8 @@ int SyntheticClient::create_objects(int nobj, int osize, int inflight)
   bufferlist bl;
   bl.push_back(bp);
 
-  Mutex lock("create_objects lock");
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("create_objects lock");
+  ceph::condition_variable cond;
   
   int unsafe = 0;
   
@@ -2279,31 +2274,34 @@ int SyntheticClient::create_objects(int nobj, int osize, int inflight)
     dout(10) << "writing " << oid << dendl;
 
     starts.push_back(ceph_clock_now());
-    client->client_lock.Lock();
-    client->objecter->write(oid, oloc, 0, osize, snapc, bl,
-                           ceph::real_clock::now(), 0,
-                           new C_Ref(lock, cond, &unsafe));
-    client->client_lock.Unlock();
-
-    lock.Lock();
-    while (unsafe > inflight) {
-      dout(20) << "waiting for " << unsafe << " unsafe" << dendl;
-      cond.Wait(lock);
+    {
+      std::lock_guard locker{client->client_lock};
+      client->objecter->write(oid, oloc, 0, osize, snapc, bl,
+                             ceph::real_clock::now(), 0,
+                             new C_Ref(lock, cond, &unsafe));
+    }
+    {
+      std::unique_lock locker{lock};
+      cond.wait(locker, [&unsafe, inflight, this] {
+        if (unsafe > inflight) {
+         dout(20) << "waiting for " << unsafe << " unsafe" << dendl;
+       }
+       return unsafe <= inflight;
+      });
     }
-    lock.Unlock();
-
     utime_t lat = ceph_clock_now();
     lat -= starts.front();
     starts.pop_front();
   }
-
-  lock.Lock();
-  while (unsafe > 0) {
-    dout(10) << "waiting for " << unsafe << " unsafe" << dendl;
-    cond.Wait(lock);
+  {
+    std::unique_lock locker{lock};
+    cond.wait(locker, [&unsafe, this] {
+      if (unsafe > 0) {
+       dout(10) << "waiting for " << unsafe << " unsafe" << dendl;
+      }
+      return unsafe <= 0;
+    });
   }
-  lock.Unlock();
-
   dout(5) << "create_objects done" << dendl;
   return 0;
 }
@@ -2341,8 +2339,8 @@ int SyntheticClient::object_rw(int nobj, int osize, int wrpc,
     prime += 2;
   }
 
-  Mutex lock("lock");
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("lock");
+  ceph::condition_variable cond;
 
   int unack = 0;
 
@@ -2367,7 +2365,7 @@ int SyntheticClient::object_rw(int nobj, int osize, int wrpc,
     object_locator_t oloc(SYNCLIENT_FIRST_POOL);
     SnapContext snapc;
     
-    client->client_lock.Lock();
+    client->client_lock.lock();
     utime_t start = ceph_clock_now();
     if (write) {
       dout(10) << "write to " << oid << dendl;
@@ -2388,14 +2386,17 @@ int SyntheticClient::object_rw(int nobj, int osize, int wrpc,
       client->objecter->read(oid, oloc, 0, osize, CEPH_NOSNAP, &inbl, 0,
                             new C_Ref(lock, cond, &unack));
     }
-    client->client_lock.Unlock();
+    client->client_lock.unlock();
 
-    lock.Lock();
-    while (unack > 0) {
-      dout(20) << "waiting for " << unack << " unack" << dendl;
-      cond.Wait(lock);
+    {
+      std::unique_lock locker{lock};
+      cond.wait(locker, [&unack, this] {
+       if (unack > 0) {
+         dout(20) << "waiting for " << unack << " unack" << dendl;
+       }
+       return unack <= 0;
+      });
     }
-    lock.Unlock();
 
     utime_t lat = ceph_clock_now();
     lat -= start;
@@ -3366,19 +3367,17 @@ int SyntheticClient::chunk_file(string &filename)
   while (pos < size) {
     int get = std::min<int>(size - pos, 1048576);
 
-    Mutex flock("synclient chunk_file lock");
-    Cond cond;
+    ceph::mutex flock = ceph::make_mutex("synclient chunk_file lock");
+    ceph::condition_variable cond;
     bool done;
     bufferlist bl;
-
-    flock.Lock();
-    Context *onfinish = new C_SafeCond(&flock, &cond, &done);
-    client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0,
-                       onfinish);
-    while (!done)
-      cond.Wait(flock);
-    flock.Unlock();
-
+    {
+      std::unique_lock locker{flock};
+      Context *onfinish = new C_SafeCond(flock, cond, &done);
+      client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0,
+                         onfinish);
+      cond.wait(locker, [&done] { return done; });
+    }
     dout(0) << "got " << bl.length() << " bytes at " << pos << dendl;
     
     if (from_before.length()) {
index 43494cd4e7e0c68d0a0705ac8d331d9ff3931cce..0d07da66c10e06db81767204e385e7ca51bb2a1c 100644 (file)
@@ -20,8 +20,6 @@
 #include <iostream>
 #include <map>
 
-#include "common/Mutex.h"
-
 #include "common/config.h"
 
 #include <sys/types.h>
index dd87471035257da4627145c3ae23014031b511e6..7a0c08868439c9b071450183fc617f79a23a3ae2 100644 (file)
@@ -80,7 +80,7 @@ Barrier::~Barrier()
 
 /* BarrierContext */
 BarrierContext::BarrierContext(Client *c, uint64_t ino) :
-  cl(c), ino(ino), lock("BarrierContext")
+  cl(c), ino(ino)
 { };
 
 void BarrierContext::write_nobarrier(C_Block_Sync &cbs)
@@ -92,7 +92,7 @@ void BarrierContext::write_nobarrier(C_Block_Sync &cbs)
 
 void BarrierContext::write_barrier(C_Block_Sync &cbs)
 {
-  std::lock_guard locker(lock);
+  std::unique_lock locker(lock);
   barrier_interval &iv = cbs.iv;
 
   { /* find blocking commit--intrusive no help here */
@@ -104,7 +104,7 @@ void BarrierContext::write_barrier(C_Block_Sync &cbs)
       Barrier &barrier = *iter;
       while (boost::icl::intersects(barrier.span, iv)) {
        /*  wait on this */
-       barrier.cond.Wait(lock);
+       barrier.cond.wait(locker);
        done = true;
       }
     }
@@ -117,7 +117,7 @@ void BarrierContext::write_barrier(C_Block_Sync &cbs)
 
 void BarrierContext::commit_barrier(barrier_interval &civ)
 {
-    std::lock_guard locker(lock);
+    std::unique_lock locker(lock);
 
     /* we commit outstanding writes--if none exist, we don't care */
     if (outstanding_writes.size() == 0)
@@ -152,7 +152,7 @@ void BarrierContext::commit_barrier(barrier_interval &civ)
     if (barrier) {
       active_commits.push_back(*barrier);
       /* and wait on this */
-      barrier->cond.Wait(lock);
+      barrier->cond.wait(locker);
     }
 
 } /* commit_barrier */
@@ -173,7 +173,7 @@ void BarrierContext::complete(C_Block_Sync &cbs)
       Barrier *barrier = iter->barrier;
       barrier->write_list.erase(iter);
       /* signal waiters */
-      barrier->cond.Signal();
+      barrier->cond.notify_all();
        /* dispose cleared barrier */
       if (barrier->write_list.size() == 0) {
        BarrierList::iterator iter2 =
index f94a48acd05c328f84f2d367a915ddae426b9970..289a3a5212f3508df21a37bb31cddc2f6db85ab8 100644 (file)
@@ -18,8 +18,7 @@
 #include <boost/intrusive/list.hpp>
 #define BOOST_ICL_USE_STATIC_BOUNDED_INTERVALS
 #include <boost/icl/interval_set.hpp>
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
 
 class Client;
 
@@ -55,7 +54,7 @@ typedef boost::intrusive::list< C_Block_Sync,
 class Barrier
 {
 private:
-  Cond cond;
+  ceph::condition_variable cond;
   boost::icl::interval_set<uint64_t> span;
   BlockSyncList write_list;
 
@@ -80,7 +79,7 @@ class BarrierContext
 private:
   Client *cl;
   uint64_t ino;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("BarrierContext");
 
   // writes not claimed by a commit
   BlockSyncList outstanding_writes;
index 63b993bebc72bf6176ea2427b680a4a8388eb7ce..547ce8611b5f5ed95af470c07f63f05d425fb08e 100644 (file)
@@ -89,7 +89,7 @@ public:
   struct fuse_session *se;
   char *mountpoint;
 
-  Mutex stag_lock;
+  ceph::mutex stag_lock = ceph::make_mutex("fuse_ll.cc stag_lock");
   int last_stag;
 
   ceph::unordered_map<uint64_t,int> snap_stag_map;
@@ -1048,7 +1048,6 @@ CephFuse::Handle::Handle(Client *c, int fd) :
   ch(NULL),
   se(NULL),
   mountpoint(NULL),
-  stag_lock("fuse_ll.cc stag_lock"),
   last_stag(0)
 {
   snap_stag_map[CEPH_NOSNAP] = 0;
index 2fc2012c906455da8a4d204f6fa901e00c5ff400..125235ff7cd7013ad95c6390bf07641cdd76d870 100644 (file)
@@ -413,7 +413,7 @@ struct C_aio_watch_flush_Complete : public Context {
   }
 
   void finish(int r) override {
-    c->lock.Lock();
+    c->lock.lock();
     c->rval = r;
     c->complete = true;
     c->cond.Signal();