]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 04:35:23 +0000 (12:35 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 03:27:19 +0000 (11:27 +0800)
* FutureImpl::m_lock is an exception. as, before this change, the lock
  was initialized like `m_lock("FutureImpl::m_lock", false, false)`, see
  the declaration of
  `Mutex(const std::string &n, bool r = false, bool ld=true, bool bt=false)`
  so `m_lock` is actually not using the extra features offered by
  `Mutex` like runtime lockdeps check. and `mutex_debugging_base` does
  not allow us to disable lockdeps individually. but it does use the `is_locked()`
  method. so instead of using `ceph::mutex` directly, a cutomized
  `ceph::mutex` is added for `CEPH_DEBUG_MUTEX` build.

Signed-off-by: Kefu Chai <kchai@redhat.com>
16 files changed:
src/journal/FutureImpl.cc
src/journal/FutureImpl.h
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/JournalTrimmer.cc
src/journal/JournalTrimmer.h
src/journal/Journaler.cc
src/journal/Journaler.h
src/journal/ObjectPlayer.cc
src/journal/ObjectPlayer.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h

index eac3fc39ba31f228f5ca2af5a2100b374f19f683..474c025c608bf813b55dc4890eea94e98f93ca15 100644 (file)
@@ -10,7 +10,7 @@ FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
                        uint64_t commit_tid)
   : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
     m_commit_tid(commit_tid),
-    m_lock("FutureImpl::m_lock", false, false), m_safe(false),
+    m_safe(false),
     m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
     m_consistent_ack(this) {
 }
@@ -32,7 +32,7 @@ void FutureImpl::flush(Context *on_safe) {
   FlushHandlers flush_handlers;
   FutureImplPtr prev_future;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     complete = (m_safe && m_consistent);
     if (!complete) {
       if (on_safe != nullptr) {
@@ -61,13 +61,13 @@ void FutureImpl::flush(Context *on_safe) {
 }
 
 FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return prepare_flush(flush_handlers, m_lock);
 }
 
 FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
-                                        Mutex &lock) {
-  ceph_assert(m_lock.is_locked());
+                                        ceph::mutex &lock) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   if (m_flush_state == FLUSH_STATE_NONE) {
     m_flush_state = FLUSH_STATE_REQUESTED;
@@ -82,7 +82,7 @@ FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
 void FutureImpl::wait(Context *on_safe) {
   ceph_assert(on_safe != NULL);
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     if (!m_safe || !m_consistent) {
       m_contexts.push_back(on_safe);
       return;
@@ -93,25 +93,25 @@ void FutureImpl::wait(Context *on_safe) {
 }
 
 bool FutureImpl::is_complete() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return m_safe && m_consistent;
 }
 
 int FutureImpl::get_return_value() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_safe && m_consistent);
   return m_return_value;
 }
 
 bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_flush_handler);
   m_flush_handler = flush_handler;
   return m_flush_state != FLUSH_STATE_NONE;
 }
 
 void FutureImpl::safe(int r) {
-  m_lock.Lock();
+  m_lock.lock();
   ceph_assert(!m_safe);
   m_safe = true;
   if (m_return_value == 0) {
@@ -122,12 +122,12 @@ void FutureImpl::safe(int r) {
   if (m_consistent) {
     finish_unlock();
   } else {
-    m_lock.Unlock();
+    m_lock.unlock();
   }
 }
 
 void FutureImpl::consistent(int r) {
-  m_lock.Lock();
+  m_lock.lock();
   ceph_assert(!m_consistent);
   m_consistent = true;
   m_prev_future.reset();
@@ -138,18 +138,18 @@ void FutureImpl::consistent(int r) {
   if (m_safe) {
     finish_unlock();
   } else {
-    m_lock.Unlock();
+    m_lock.unlock();
   }
 }
 
 void FutureImpl::finish_unlock() {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ceph_assert(m_safe && m_consistent);
 
   Contexts contexts;
   contexts.swap(m_contexts);
 
-  m_lock.Unlock();
+  m_lock.unlock();
   for (Contexts::iterator it = contexts.begin();
        it != contexts.end(); ++it) {
     (*it)->complete(m_return_value);
index 2be3eb253eccf1818cd22852aa91bb556a44e09e..b81fba200bc0021652ea6f257e1f79bad531dd20 100644 (file)
@@ -5,7 +5,6 @@
 #define CEPH_JOURNAL_FUTURE_IMPL_H
 
 #include "include/int_types.h"
-#include "common/Mutex.h"
 #include "common/RefCountedObj.h"
 #include "include/Context.h"
 #include "journal/Future.h"
@@ -53,11 +52,11 @@ public:
   int get_return_value() const;
 
   inline bool is_flush_in_progress() const {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
   }
   inline void set_flush_in_progress() {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     ceph_assert(m_flush_handler);
     m_flush_handler.reset();
     m_flush_state = FLUSH_STATE_IN_PROGRESS;
@@ -65,11 +64,11 @@ public:
 
   bool attach(const FlushHandlerPtr &flush_handler);
   inline void detach() {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     m_flush_handler.reset();
   }
   inline FlushHandlerPtr get_flush_handler() const {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     return m_flush_handler;
   }
 
@@ -101,7 +100,7 @@ private:
   uint64_t m_entry_tid;
   uint64_t m_commit_tid;
 
-  mutable Mutex m_lock;
+  mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false);
   FutureImplPtr m_prev_future;
   bool m_safe;
   bool m_consistent;
@@ -114,7 +113,7 @@ private:
   Contexts m_contexts;
 
   FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);
-  FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, Mutex &lock);
+  FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);
 
   void consistent(int r);
   void finish_unlock();
index f35eccb131fff2b67880193815ed851e586dfafe..7fcad836c19c2775020f002fe22456f611e0b5c3 100644 (file)
@@ -401,7 +401,7 @@ struct C_AssertActiveTag : public Context {
 } // anonymous namespace
 
 JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
-                                 Mutex *timer_lock, librados::IoCtx &ioctx,
+                                 ceph::mutex *timer_lock, librados::IoCtx &ioctx,
                                  const std::string &oid,
                                  const std::string &client_id,
                                  const Settings &settings)
@@ -409,7 +409,7 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
       m_client_id(client_id), m_settings(settings), m_order(0),
       m_splay_width(0), m_pool_id(-1), m_initialized(false),
       m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
-      m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
+      m_commit_tid(0), m_watch_ctx(this),
       m_watch_handle(0), m_minimum_set(0), m_active_set(0),
       m_update_notifications(0), m_commit_position_ctx(NULL),
       m_commit_position_task_ctx(NULL) {
@@ -418,13 +418,13 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
 }
 
 JournalMetadata::~JournalMetadata() {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_initialized);
 }
 
 void JournalMetadata::init(Context *on_finish) {
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     ceph_assert(!m_initialized);
     m_initialized = true;
   }
@@ -437,7 +437,7 @@ void JournalMetadata::init(Context *on_finish) {
       if (r < 0) {
         lderr(m_cct) << __func__ << ": failed to watch journal"
                      << cpp_strerror(r) << dendl;
-        Mutex::Locker locker(m_lock);
+       std::lock_guard locker{m_lock};
         m_watch_handle = 0;
         on_finish->complete(r);
         return;
@@ -459,7 +459,7 @@ void JournalMetadata::shut_down(Context *on_finish) {
 
   uint64_t watch_handle = 0;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     m_initialized = false;
     std::swap(watch_handle, m_watch_handle);
   }
@@ -592,23 +592,23 @@ void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
 }
 
 void JournalMetadata::add_listener(JournalMetadataListener *listener) {
-  Mutex::Locker locker(m_lock);
-  while (m_update_notifications > 0) {
-    m_update_cond.Wait(m_lock);
-  }
+  std::unique_lock locker{m_lock};
+  m_update_cond.wait(locker, [this] {
+    return m_update_notifications <= 0;
+  });
   m_listeners.push_back(listener);
 }
 
 void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
-  Mutex::Locker locker(m_lock);
-  while (m_update_notifications > 0) {
-    m_update_cond.Wait(m_lock);
-  }
+  std::unique_lock locker{m_lock};
+  m_update_cond.wait(locker, [this] {
+    return m_update_notifications <= 0;
+  });
   m_listeners.remove(listener);
 }
 
 void JournalMetadata::set_minimum_set(uint64_t object_set) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
                    << ", new=" << object_set << dendl;
@@ -637,7 +637,7 @@ int JournalMetadata::set_active_set(uint64_t object_set) {
 }
 
 void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
                    << ", new=" << object_set << dendl;
@@ -661,7 +661,7 @@ void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
 }
 
 void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
                                                  m_async_op_tracker,
@@ -681,8 +681,7 @@ void JournalMetadata::flush_commit_position() {
 void JournalMetadata::flush_commit_position(Context *on_safe) {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  Mutex::Locker timer_locker(*m_timer_lock);
-  Mutex::Locker locker(m_lock);
+  std::scoped_lock locker{*m_timer_lock, m_lock};
   if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
     // nothing to flush
     if (on_safe != nullptr) {
@@ -703,7 +702,7 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
 }
 
 void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
   if (allocated_entry_tid <= entry_tid) {
     allocated_entry_tid = entry_tid + 1;
@@ -712,7 +711,7 @@ void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
 
 bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
                                                    uint64_t *entry_tid) const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
   if (it == m_allocated_entry_tids.end()) {
@@ -740,7 +739,7 @@ void JournalMetadata::refresh(Context *on_complete) {
   ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
 
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     if (on_complete != nullptr) {
       m_refresh_ctxs.push_back(on_complete);
     }
@@ -755,7 +754,7 @@ void JournalMetadata::refresh(Context *on_complete) {
 void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
   ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
 
-  m_lock.Lock();
+  m_lock.lock();
   if (r == 0) {
     Client client(m_client_id, bufferlist());
     RegisteredClients::iterator it = refresh->registered_clients.find(client);
@@ -770,14 +769,14 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
       m_client = *it;
 
       ++m_update_notifications;
-      m_lock.Unlock();
+      m_lock.unlock();
       for (Listeners::iterator it = m_listeners.begin();
            it != m_listeners.end(); ++it) {
         (*it)->handle_update(this);
       }
-      m_lock.Lock();
+      m_lock.lock();
       if (--m_update_notifications == 0) {
-        m_update_cond.Signal();
+        m_update_cond.notify_all();
       }
     } else {
       lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
@@ -791,7 +790,7 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
   if (m_refreshes_in_progress == 0) {
     std::swap(refresh_ctxs, m_refresh_ctxs);
   }
-  m_lock.Unlock();
+  m_lock.unlock();
 
   for (auto ctx : refresh_ctxs) {
     ctx->complete(r);
@@ -801,8 +800,8 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
 void JournalMetadata::cancel_commit_task() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  ceph_assert(m_timer_lock->is_locked());
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ceph_assert(m_commit_position_ctx != nullptr);
   ceph_assert(m_commit_position_task_ctx != nullptr);
   m_timer->cancel_event(m_commit_position_task_ctx);
@@ -812,8 +811,8 @@ void JournalMetadata::cancel_commit_task() {
 void JournalMetadata::schedule_commit_task() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  ceph_assert(m_timer_lock->is_locked());
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ceph_assert(m_commit_position_ctx != nullptr);
   if (m_commit_position_task_ctx == nullptr) {
     m_commit_position_task_ctx =
@@ -823,8 +822,8 @@ void JournalMetadata::schedule_commit_task() {
 }
 
 void JournalMetadata::handle_commit_position_task() {
-  ceph_assert(m_timer_lock->is_locked());
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ldout(m_cct, 20) << __func__ << ": "
                    << "client_id=" << m_client_id << ", "
                    << "commit_position=" << m_commit_position << dendl;
@@ -838,13 +837,13 @@ void JournalMetadata::handle_commit_position_task() {
 
   Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
       Contexts flush_commit_position_ctxs;
-      m_lock.Lock();
+      m_lock.lock();
       ceph_assert(m_flush_commits_in_progress > 0);
       --m_flush_commits_in_progress;
       if (m_flush_commits_in_progress == 0) {
         std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
       }
-      m_lock.Unlock();
+      m_lock.unlock();
 
       commit_position_ctx->complete(0);
       for (auto ctx : flush_commit_position_ctxs) {
@@ -856,9 +855,9 @@ void JournalMetadata::handle_commit_position_task() {
   ctx = new FunctionContext([this, ctx](int r) {
       // manually kick of a refresh in case the notification is missed
       // and ignore the next notification that we are about to send
-      m_lock.Lock();
+      m_lock.lock();
       ++m_ignore_watch_notifies;
-      m_lock.Unlock();
+      m_lock.unlock();
 
       refresh(ctx);
     });
@@ -877,12 +876,12 @@ void JournalMetadata::handle_commit_position_task() {
 }
 
 void JournalMetadata::schedule_watch_reset() {
-  ceph_assert(m_timer_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
   m_timer->add_event_after(1, new C_WatchReset(this));
 }
 
 void JournalMetadata::handle_watch_reset() {
-  ceph_assert(m_timer_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
   if (!m_initialized) {
     return;
   }
@@ -911,7 +910,7 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
   m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
 
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     if (m_ignore_watch_notifies > 0) {
       --m_ignore_watch_notifies;
       return;
@@ -930,8 +929,7 @@ void JournalMetadata::handle_watch_error(int err) {
     lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
   }
 
-  Mutex::Locker timer_locker(*m_timer_lock);
-  Mutex::Locker locker(m_lock);
+  std::scoped_lock locker{*m_timer_lock, m_lock};
 
   // release old watch on error
   if (m_watch_handle != 0) {
@@ -947,7 +945,7 @@ void JournalMetadata::handle_watch_error(int err) {
 uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
                                               uint64_t tag_tid,
                                               uint64_t entry_tid) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   uint64_t commit_tid = ++m_commit_tid;
   m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
                                                   entry_tid);
@@ -962,7 +960,7 @@ uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
 
 void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
                                           uint64_t object_num) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   auto it = m_pending_commit_tids.find(commit_tid);
   ceph_assert(it != m_pending_commit_tids.end());
@@ -978,7 +976,7 @@ void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
 void JournalMetadata::get_commit_entry(uint64_t commit_tid,
                                        uint64_t *object_num,
                                        uint64_t *tag_tid, uint64_t *entry_tid) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   auto it = m_pending_commit_tids.find(commit_tid);
   ceph_assert(it != m_pending_commit_tids.end());
@@ -995,8 +993,7 @@ void JournalMetadata::committed(uint64_t commit_tid,
   ObjectSetPosition commit_position;
   Context *stale_ctx = nullptr;
   {
-    Mutex::Locker timer_locker(*m_timer_lock);
-    Mutex::Locker locker(m_lock);
+    std::scoped_lock locker{*m_timer_lock, m_lock};
     ceph_assert(commit_tid > m_commit_position_tid);
 
     if (!m_commit_position.object_positions.empty()) {
@@ -1103,7 +1100,7 @@ void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
 
   Context *ctx = on_finish;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     for (auto &c : m_registered_clients) {
       if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
           c.id == m_client_id ||
@@ -1158,7 +1155,7 @@ std::ostream &operator<<(std::ostream &os,
 
 std::ostream &operator<<(std::ostream &os,
                         const JournalMetadata &jm) {
-  Mutex::Locker locker(jm.m_lock);
+  std::lock_guard locker{jm.m_lock};
   os << "[oid=" << jm.m_oid << ", "
      << "initialized=" << jm.m_initialized << ", "
      << "order=" << (int)jm.m_order << ", "
index 393d20297d834e6ccf981149288f08fc2aa25c98..13d9fd44ff1df738279815cd8a13c2bc87174da6 100644 (file)
@@ -9,7 +9,7 @@
 #include "include/rados/librados.hpp"
 #include "common/AsyncOpTracker.h"
 #include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/RefCountedObj.h"
 #include "common/WorkQueue.h"
 #include "cls/journal/cls_journal_types.h"
@@ -43,7 +43,7 @@ public:
   typedef std::set<Client> RegisteredClients;
   typedef std::list<Tag> Tags;
 
-  JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+  JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
                   librados::IoCtx &ioctx, const std::string &oid,
                   const std::string &client_id, const Settings &settings);
   ~JournalMetadata() override;
@@ -105,20 +105,20 @@ public:
   inline SafeTimer &get_timer() {
     return *m_timer;
   }
-  inline Mutex &get_timer_lock() {
+  inline ceph::mutex &get_timer_lock() {
     return *m_timer_lock;
   }
 
   void set_minimum_set(uint64_t object_set);
   inline uint64_t get_minimum_set() const {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     return m_minimum_set;
   }
 
   int set_active_set(uint64_t object_set);
   void set_active_set(uint64_t object_set, Context *on_finish);
   inline uint64_t get_active_set() const {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     return m_active_set;
   }
 
@@ -127,17 +127,17 @@ public:
   void flush_commit_position();
   void flush_commit_position(Context *on_safe);
   void get_commit_position(ObjectSetPosition *commit_position) const {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     *commit_position = m_client.commit_position;
   }
 
   void get_registered_clients(RegisteredClients *registered_clients) {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     *registered_clients = m_registered_clients;
   }
 
   inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     return m_allocated_entry_tids[tag_tid]++;
   }
   void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
@@ -216,7 +216,7 @@ private:
       journal_metadata->m_async_op_tracker.finish_op();
     }
     void finish(int r) override {
-      Mutex::Locker locker(journal_metadata->m_lock);
+      std::lock_guard locker{journal_metadata->m_lock};
       journal_metadata->handle_commit_position_task();
     };
   };
@@ -268,7 +268,7 @@ private:
 
     C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
       : journal_metadata(_journal_metadata), on_finish(_on_finish) {
-      Mutex::Locker locker(journal_metadata->m_lock);
+      std::lock_guard locker{journal_metadata->m_lock};
       journal_metadata->m_async_op_tracker.start_op();
     }
     ~C_ImmutableMetadata() override {
@@ -287,7 +287,7 @@ private:
 
     C_Refresh(JournalMetadata *_journal_metadata)
       : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) {
-      Mutex::Locker locker(journal_metadata->m_lock);
+      std::lock_guard locker{journal_metadata->m_lock};
       journal_metadata->m_async_op_tracker.start_op();
     }
     ~C_Refresh() override {
@@ -311,9 +311,9 @@ private:
 
   ContextWQ *m_work_queue;
   SafeTimer *m_timer;
-  Mutex *m_timer_lock;
+  ceph::mutex *m_timer_lock;
 
-  mutable Mutex m_lock;
+  mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock");
 
   uint64_t m_commit_tid;
   CommitTids m_pending_commit_tids;
@@ -331,7 +331,7 @@ private:
   AllocatedEntryTids m_allocated_entry_tids;
 
   size_t m_update_notifications;
-  Cond m_update_cond;
+  ceph::condition_variable m_update_cond;
 
   size_t m_ignore_watch_notifies = 0;
   size_t m_refreshes_in_progress = 0;
index 5ac4ebc5dc2a7d3bd8a8e7922f25d1cc8bde6ac0..02df4dfb7e98f07ed8f643d68a8b76f0a41ba7fc 100644 (file)
@@ -59,7 +59,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
     m_cache_manager_handler(cache_manager_handler),
-    m_cache_rebalance_handler(this), m_lock("JournalPlayer::m_lock"),
+    m_cache_rebalance_handler(this),
     m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false),
     m_watch_scheduled(false), m_watch_interval(0) {
   m_replay_handler->get();
@@ -104,7 +104,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
 JournalPlayer::~JournalPlayer() {
   ceph_assert(m_async_op_tracker.empty());
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     ceph_assert(m_shut_down);
     ceph_assert(m_fetch_object_numbers.empty());
     ceph_assert(!m_watch_scheduled);
@@ -117,7 +117,7 @@ JournalPlayer::~JournalPlayer() {
 }
 
 void JournalPlayer::prefetch() {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_state == STATE_INIT);
 
   if (m_cache_manager_handler != nullptr && m_max_fetch_bytes == 0) {
@@ -162,7 +162,7 @@ void JournalPlayer::prefetch() {
 
 void JournalPlayer::prefetch_and_watch(double interval) {
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     m_watch_enabled = true;
     m_watch_interval = interval;
     m_watch_step = WATCH_STEP_FETCH_CURRENT;
@@ -172,7 +172,7 @@ void JournalPlayer::prefetch_and_watch(double interval) {
 
 void JournalPlayer::shut_down(Context *on_finish) {
   ldout(m_cct, 20) << __func__ << dendl;
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   ceph_assert(!m_shut_down);
   m_shut_down = true;
@@ -200,7 +200,7 @@ void JournalPlayer::shut_down(Context *on_finish) {
 
 bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
   ldout(m_cct, 20) << __func__ << dendl;
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   if (m_state != STATE_PLAYBACK) {
     m_handler_notified = false;
@@ -248,7 +248,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) {
   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
                    << "r=" << r << dendl;
 
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   if (r >= 0) {
     switch (m_state) {
     case STATE_PREFETCH:
@@ -277,7 +277,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) {
 
 int JournalPlayer::process_prefetch(uint64_t object_number) {
   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   uint8_t splay_offset = object_number % splay_width;
@@ -382,7 +382,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
 
 int JournalPlayer::process_playback(uint64_t object_number) {
   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   if (verify_playback_ready()) {
     notify_entries_available();
@@ -393,7 +393,7 @@ int JournalPlayer::process_playback(uint64_t object_number) {
 }
 
 bool JournalPlayer::is_object_set_ready() const {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
     ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
     return false;
@@ -403,7 +403,7 @@ bool JournalPlayer::is_object_set_ready() const {
 }
 
 bool JournalPlayer::verify_playback_ready() {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   while (true) {
     if (!is_object_set_ready()) {
@@ -506,7 +506,7 @@ bool JournalPlayer::verify_playback_ready() {
 }
 
 void JournalPlayer::prune_tag(uint64_t tag_tid) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
                    << tag_tid << dendl;
 
@@ -550,7 +550,7 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) {
 }
 
 void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ceph_assert(m_active_tag_tid);
 
   uint64_t active_tag_tid = *m_active_tag_tid;
@@ -564,7 +564,7 @@ void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
 }
 
 ObjectPlayerPtr JournalPlayer::get_object_player() const {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   SplayedObjectPlayers::const_iterator it = m_object_players.find(
     m_splay_offset);
@@ -573,7 +573,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const {
 }
 
 ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   uint8_t splay_offset = object_number % splay_width;
@@ -586,7 +586,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
 }
 
 void JournalPlayer::advance_splay_object() {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ++m_splay_offset;
   m_splay_offset %= m_journal_metadata->get_splay_width();
   m_watch_step = WATCH_STEP_FETCH_CURRENT;
@@ -595,7 +595,7 @@ void JournalPlayer::advance_splay_object() {
 }
 
 bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   ceph_assert(!m_watch_scheduled);
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -629,7 +629,7 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
 }
 
 void JournalPlayer::fetch(uint64_t object_num) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   ObjectPlayerPtr object_player(new ObjectPlayer(
     m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
@@ -642,7 +642,7 @@ void JournalPlayer::fetch(uint64_t object_num) {
 }
 
 void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   uint64_t object_num = object_player->get_object_number();
   std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
@@ -660,7 +660,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
                    << utils::get_object_name(m_object_oid_prefix, object_num)
                    << ": r=" << r << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_fetch_object_numbers.count(object_num) == 1);
   m_fetch_object_numbers.erase(object_num);
 
@@ -677,7 +677,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
 
 void JournalPlayer::refetch(bool immediate) {
   ldout(m_cct, 10) << __func__ << dendl;
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   m_handler_notified = false;
 
   // if watching the object, handle the periodic re-fetch
@@ -698,7 +698,7 @@ void JournalPlayer::refetch(bool immediate) {
 
 void JournalPlayer::schedule_watch(bool immediate) {
   ldout(m_cct, 10) << __func__ << dendl;
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   if (m_watch_scheduled) {
     return;
   }
@@ -759,7 +759,7 @@ void JournalPlayer::schedule_watch(bool immediate) {
 
 void JournalPlayer::handle_watch(uint64_t object_num, int r) {
   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_watch_scheduled);
   m_watch_scheduled = false;
 
@@ -793,7 +793,7 @@ void JournalPlayer::handle_watch(uint64_t object_num, int r) {
 void JournalPlayer::handle_watch_assert_active(int r) {
   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_watch_scheduled);
   m_watch_scheduled = false;
 
@@ -813,7 +813,7 @@ void JournalPlayer::handle_watch_assert_active(int r) {
 }
 
 void JournalPlayer::notify_entries_available() {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   if (m_handler_notified) {
     return;
   }
@@ -825,7 +825,7 @@ void JournalPlayer::notify_entries_available() {
 }
 
 void JournalPlayer::notify_complete(int r) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   m_handler_notified = true;
 
   ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
@@ -834,7 +834,7 @@ void JournalPlayer::notify_complete(int r) {
 }
 
 void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   if (m_state == STATE_ERROR || m_shut_down) {
     return;
index 6ba54cc8babbf5e9e03a679cf39a0e2506256458..21f215410f719950317b5647df1d61a94ccd5824 100644 (file)
@@ -8,7 +8,6 @@
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include "common/AsyncOpTracker.h"
-#include "common/Mutex.h"
 #include "journal/JournalMetadata.h"
 #include "journal/ObjectPlayer.h"
 #include "journal/Types.h"
@@ -116,7 +115,7 @@ private:
 
   AsyncOpTracker m_async_op_tracker;
 
-  mutable Mutex m_lock;
+  mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock");
   State m_state;
   uint8_t m_splay_offset;
 
index aa90660a01fa2500a99badeb93fe3186fbfa11d3..977b9b4f3945632ece8b60f0e68794b23da65b1f 100644 (file)
@@ -54,23 +54,26 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata),
     m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
-    m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
-    m_current_set(m_journal_metadata->get_active_set()) {
-
-  Mutex::Locker locker(m_lock);
+    m_object_handler(this),
+    m_lock(ceph::make_mutex("JournalerRecorder::m_lock")),
+    m_current_set(m_journal_metadata->get_active_set()),
+    m_object_locks{ceph::make_lock_container<ceph::mutex>(
+      journal_metadata->get_splay_width(), [](const size_t splay_offset) {
+      return ceph::make_mutex("ObjectRecorder::m_lock::" +
+                             std::to_string(splay_offset));
+    })}
+{
+
+  std::lock_guard locker{m_lock};
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
-    shared_ptr<Mutex> object_lock(new Mutex(
-      "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
-    m_object_locks.push_back(object_lock);
-
     uint64_t object_number = splay_offset + (m_current_set * splay_width);
-    Mutex::Locker locker(*object_lock);
+    std::lock_guard locker{m_object_locks[splay_offset]};
     m_object_ptrs[splay_offset] = create_object_recorder(
-      object_number, m_object_locks[splay_offset]);
+      object_number, &m_object_locks[splay_offset]);
   }
 
   m_journal_metadata->add_listener(&m_listener);
@@ -79,7 +82,7 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
 JournalRecorder::~JournalRecorder() {
   m_journal_metadata->remove_listener(&m_listener);
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_in_flight_advance_sets == 0);
   ceph_assert(m_in_flight_object_closes == 0);
 }
@@ -89,7 +92,7 @@ void JournalRecorder::shut_down(Context *on_safe) {
     [this, on_safe](int r) {
       Context *ctx = nullptr;
       {
-        Mutex::Locker locker(m_lock);
+       std::lock_guard locker{m_lock};
         if (m_in_flight_advance_sets != 0) {
           ceph_assert(m_on_object_set_advanced == nullptr);
           m_on_object_set_advanced = new FunctionContext(
@@ -114,14 +117,14 @@ void JournalRecorder::set_append_batch_options(int flush_interval,
                   << "flush_bytes=" << flush_bytes << ", "
                   << "flush_age=" << flush_age << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   m_flush_interval = flush_interval;
   m_flush_bytes = flush_bytes;
   m_flush_age = flush_age;
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
-    Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+    std::lock_guard object_locker{m_object_locks[splay_offset]};
     auto object_recorder = get_object(splay_offset);
     object_recorder->set_append_batch_options(flush_interval, flush_bytes,
                                               flush_age);
@@ -132,7 +135,7 @@ Future JournalRecorder::append(uint64_t tag_tid,
                                const bufferlist &payload_bl) {
   ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
 
-  m_lock.Lock();
+  m_lock.lock();
 
   uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
   uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -145,8 +148,8 @@ Future JournalRecorder::append(uint64_t tag_tid,
   future->init(m_prev_future);
   m_prev_future = future;
 
-  m_object_locks[splay_offset]->Lock();
-  m_lock.Unlock();
+  m_object_locks[splay_offset].lock();
+  m_lock.unlock();
 
   bufferlist entry_bl;
   encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
@@ -154,12 +157,12 @@ Future JournalRecorder::append(uint64_t tag_tid,
   ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
 
   bool object_full = object_ptr->append({{future, entry_bl}});
-  m_object_locks[splay_offset]->Unlock();
+  m_object_locks[splay_offset].unlock();
 
   if (object_full) {
     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
                      << dendl;
-    Mutex::Locker l(m_lock);
+    std::lock_guard l{m_lock};
     close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
   }
   return Future(future);
@@ -170,7 +173,7 @@ void JournalRecorder::flush(Context *on_safe) {
 
   C_Flush *ctx;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
 
     ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
     for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
@@ -185,7 +188,7 @@ void JournalRecorder::flush(Context *on_safe) {
 }
 
 ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
   ceph_assert(object_recoder != NULL);
@@ -193,7 +196,7 @@ ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
 }
 
 void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   // entry overflow from open object
   if (m_current_set != object_set) {
@@ -218,7 +221,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
 }
 
 void JournalRecorder::advance_object_set() {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   ceph_assert(m_in_flight_object_closes == 0);
   ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
@@ -229,7 +232,7 @@ void JournalRecorder::advance_object_set() {
 void JournalRecorder::handle_advance_object_set(int r) {
   Context *on_object_set_advanced = nullptr;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
 
     ceph_assert(m_in_flight_advance_sets > 0);
@@ -251,7 +254,7 @@ void JournalRecorder::handle_advance_object_set(int r) {
 }
 
 void JournalRecorder::open_object_set() {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
 
@@ -274,7 +277,7 @@ void JournalRecorder::open_object_set() {
 
 bool JournalRecorder::close_object_set(uint64_t active_set) {
   ldout(m_cct, 10) << "active_set=" << active_set << dendl;
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   // object recorders will invoke overflow handler as they complete
   // closing the object to ensure correct order of future appends
@@ -300,7 +303,7 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
 }
 
 ObjectRecorderPtr JournalRecorder::create_object_recorder(
-    uint64_t object_number, shared_ptr<Mutex> lock) {
+    uint64_t object_number, ceph::mutex* lock) {
   ldout(m_cct, 10) << "object_number=" << object_number << dendl;
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
@@ -314,17 +317,17 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
 
 void JournalRecorder::create_next_object_recorder(
     ObjectRecorderPtr object_recorder) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   uint64_t object_number = object_recorder->get_object_number();
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   uint8_t splay_offset = object_number % splay_width;
   ldout(m_cct, 10) << "object_number=" << object_number << dendl;
 
-  ceph_assert(m_object_locks[splay_offset]->is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset]));
 
   ObjectRecorderPtr new_object_recorder = create_object_recorder(
-     (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
+     (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]);
 
   ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
                    << "new oid=" << new_object_recorder->get_oid() << dendl;
@@ -343,7 +346,7 @@ void JournalRecorder::create_next_object_recorder(
 }
 
 void JournalRecorder::handle_update() {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   uint64_t active_set = m_journal_metadata->get_active_set();
   if (m_current_set < active_set) {
@@ -365,7 +368,7 @@ void JournalRecorder::handle_update() {
 void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
   ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   uint64_t object_number = object_recorder->get_object_number();
   uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -393,7 +396,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
 void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
   ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   uint64_t object_number = object_recorder->get_object_number();
   uint8_t splay_width = m_journal_metadata->get_splay_width();
index 382f75acef9c60ad65d0fb2d9cca7353903d3884..7395283325ebb4e4a3a44f55670fd4672bd23a8c 100644 (file)
@@ -7,7 +7,8 @@
 #include "include/int_types.h"
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
+#include "common/containers.h"
 #include "journal/Future.h"
 #include "journal/FutureImpl.h"
 #include "journal/JournalMetadata.h"
@@ -90,13 +91,13 @@ private:
   Listener m_listener;
   ObjectHandler m_object_handler;
 
-  Mutex m_lock;
+  ceph::mutex m_lock;
 
   uint32_t m_in_flight_advance_sets = 0;
   uint32_t m_in_flight_object_closes = 0;
   uint64_t m_current_set;
   ObjectRecorderPtrs m_object_ptrs;
-  std::vector<std::shared_ptr<Mutex>> m_object_locks;
+  ceph::containers::tiny_vector<ceph::mutex> m_object_locks;
 
   FutureImplPtr m_prev_future;
 
@@ -111,7 +112,7 @@ private:
   void close_and_advance_object_set(uint64_t object_set);
 
   ObjectRecorderPtr create_object_recorder(uint64_t object_number,
-                                           std::shared_ptr<Mutex> lock);
+                                           ceph::mutex* lock);
   void create_next_object_recorder(ObjectRecorderPtr object_recorder);
 
   void handle_update();
@@ -121,13 +122,13 @@ private:
 
   void lock_object_recorders() {
     for (auto& lock : m_object_locks) {
-      lock->Lock();
+      lock.lock();
     }
   }
 
   void unlock_object_recorders() {
     for (auto& lock : m_object_locks) {
-      lock->Unlock();
+      lock.unlock();
     }
   }
 };
index 645a62304c5f99a67a29b3106f88b9bd2ecd100a..84bc7e79cae2693bfe9f385695a72d247b1f9314 100644 (file)
@@ -16,7 +16,7 @@ namespace journal {
 struct JournalTrimmer::C_RemoveSet : public Context {
   JournalTrimmer *journal_trimmer;
   uint64_t object_set;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("JournalTrimmer::m_lock");
   uint32_t refs;
   int return_value;
 
@@ -34,7 +34,7 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
                                const JournalMetadataPtr &journal_metadata)
     : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
       m_journal_metadata(journal_metadata), m_metadata_listener(this),
-      m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false),
+      m_remove_set_pending(false),
       m_remove_set(0), m_remove_set_ctx(NULL) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -49,7 +49,7 @@ JournalTrimmer::~JournalTrimmer() {
 void JournalTrimmer::shut_down(Context *on_finish) {
   ldout(m_cct, 20) << __func__ << dendl;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     ceph_assert(!m_shutdown);
     m_shutdown = true;
   }
@@ -67,7 +67,7 @@ void JournalTrimmer::remove_objects(bool force, Context *on_finish) {
   ldout(m_cct, 20) << __func__ << dendl;
 
   on_finish = new FunctionContext([this, force, on_finish](int r) {
-      Mutex::Locker locker(m_lock);
+                                   std::lock_guard locker{m_lock};
 
       if (m_remove_set_pending) {
         on_finish->complete(-EBUSY);
@@ -103,7 +103,7 @@ void JournalTrimmer::committed(uint64_t commit_tid) {
 }
 
 void JournalTrimmer::trim_objects(uint64_t minimum_set) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl;
   if (minimum_set <= m_journal_metadata->get_minimum_set()) {
@@ -121,7 +121,7 @@ void JournalTrimmer::trim_objects(uint64_t minimum_set) {
 }
 
 void JournalTrimmer::remove_set(uint64_t object_set) {
-  ceph_assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   m_async_op_tracker.start_op();
   uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -149,7 +149,7 @@ void JournalTrimmer::remove_set(uint64_t object_set) {
 void JournalTrimmer::handle_metadata_updated() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   JournalMetadata::RegisteredClients registered_clients;
   m_journal_metadata->get_registered_clients(&registered_clients);
@@ -193,7 +193,7 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
   ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", "
                    << "trim=" << m_remove_set << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   m_remove_set_pending = false;
 
   if (r == -ENOENT) {
@@ -223,12 +223,12 @@ JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
                                          uint64_t _object_set,
                                          uint8_t _splay_width)
   : journal_trimmer(_journal_trimmer), object_set(_object_set),
-    lock(utils::unique_lock_name("C_RemoveSet::lock", this)),
+    lock(ceph::make_mutex(utils::unique_lock_name("C_RemoveSet::lock", this))),
     refs(_splay_width), return_value(-ENOENT) {
 }
 
 void JournalTrimmer::C_RemoveSet::complete(int r) {
-  lock.Lock();
+  lock.lock();
   if (r < 0 && r != -ENOENT &&
       (return_value == -ENOENT || return_value == 0)) {
     return_value = r;
@@ -238,10 +238,10 @@ void JournalTrimmer::C_RemoveSet::complete(int r) {
 
   if (--refs == 0) {
     finish(return_value);
-    lock.Unlock();
+    lock.unlock();
     delete this;
   } else {
-    lock.Unlock();
+    lock.unlock();
   }
 }
 
index 0b27923992d0d4f5d94662f20def4cda0e738a87..719be88e77b35b8feff6af47806e653f1b8bfc44 100644 (file)
@@ -8,7 +8,6 @@
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
 #include "common/AsyncOpTracker.h"
-#include "common/Mutex.h"
 #include "journal/JournalMetadata.h"
 #include "cls/journal/cls_journal_types.h"
 #include <functional>
@@ -70,7 +69,7 @@ private:
 
   AsyncOpTracker m_async_op_tracker;
 
-  Mutex m_lock;
+  ceph::mutex m_lock = ceph::make_mutex("JournalTrimmer::m_lock");
 
   bool m_remove_set_pending;
   uint64_t m_remove_set;
index 7d9c1409584335bc4e306b783f95d3a65ddb7af7..51a08d5be818e861d784e8559856fd632f0d360e 100644 (file)
@@ -43,8 +43,7 @@ std::string Journaler::object_oid_prefix(int pool_id,
   return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + ".";
 }
 
-Journaler::Threads::Threads(CephContext *cct)
-    : timer_lock("Journaler::timer_lock") {
+Journaler::Threads::Threads(CephContext *cct) {
   thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
   thread_pool->start();
 
@@ -56,7 +55,7 @@ Journaler::Threads::Threads(CephContext *cct)
 
 Journaler::Threads::~Threads() {
   {
-    Mutex::Locker timer_locker(timer_lock);
+    std::lock_guard timer_locker{timer_lock};
     timer->shutdown();
   }
   delete timer;
@@ -82,7 +81,7 @@ Journaler::Journaler(librados::IoCtx &header_ioctx,
 }
 
 Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
-                     Mutex *timer_lock, librados::IoCtx &header_ioctx,
+                     ceph::mutex *timer_lock, librados::IoCtx &header_ioctx,
                     const std::string &journal_id,
                     const std::string &client_id, const Settings &settings,
                      CacheManagerHandler *cache_manager_handler)
@@ -92,7 +91,7 @@ Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
 }
 
 void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
-                       Mutex *timer_lock, librados::IoCtx &header_ioctx,
+                       ceph::mutex *timer_lock, librados::IoCtx &header_ioctx,
                        const std::string &journal_id,
                        const Settings &settings) {
   m_header_ioctx.dup(header_ioctx);
index a063a6d43a018c555de8d05dab53ea4a9b220171..17397d7ec2a654829fa08e2cf75e56f07b723cab 100644 (file)
@@ -41,8 +41,8 @@ public:
     ThreadPool *thread_pool = nullptr;
     ContextWQ *work_queue = nullptr;
 
-    SafeTimer *timer = nullptr;
-    Mutex timer_lock;
+    SafeTimer *timer;
+    ceph::mutex timer_lock = ceph::make_mutex("Journaler::timer_lock");
   };
 
   typedef cls::journal::Tag Tag;
@@ -56,7 +56,7 @@ public:
   Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
            const std::string &client_id, const Settings &settings,
             CacheManagerHandler *cache_manager_handler);
-  Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+  Journaler(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
             librados::IoCtx &header_ioctx, const std::string &journal_id,
            const std::string &client_id, const Settings &settings,
             CacheManagerHandler *cache_manager_handler);
@@ -154,7 +154,7 @@ private:
   JournalRecorder *m_recorder = nullptr;
   JournalTrimmer *m_trimmer = nullptr;
 
-  void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+  void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
               librados::IoCtx &header_ioctx, const std::string &journal_id,
               const Settings &settings);
 
index 35d979c7155eaa6c5f3e3ba9a96e26c192cd7ad5..46f615002e0f2d119ece3b7a06745e3a457c0c6f 100644 (file)
@@ -45,14 +45,14 @@ bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
                            const std::string &object_oid_prefix,
                            uint64_t object_num, SafeTimer &timer,
-                           Mutex &timer_lock, uint8_t order,
+                           ceph::mutex &timer_lock, uint8_t order,
                            uint64_t max_fetch_bytes)
   : RefCountedObject(NULL, 0), m_object_num(object_num),
     m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
     m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
     m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
     m_watch_interval(0), m_watch_task(NULL),
-    m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
+    m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))),
     m_fetch_in_progress(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
@@ -60,8 +60,8 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
 
 ObjectPlayer::~ObjectPlayer() {
   {
-    Mutex::Locker timer_locker(m_timer_lock);
-    Mutex::Locker locker(m_lock);
+    std::lock_guard timer_locker{m_timer_lock};
+    std::lock_guard locker{m_lock};
     ceph_assert(!m_fetch_in_progress);
     ceph_assert(m_watch_ctx == nullptr);
   }
@@ -70,7 +70,7 @@ ObjectPlayer::~ObjectPlayer() {
 void ObjectPlayer::fetch(Context *on_finish) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_fetch_in_progress);
   m_fetch_in_progress = true;
 
@@ -90,7 +90,7 @@ void ObjectPlayer::fetch(Context *on_finish) {
 void ObjectPlayer::watch(Context *on_fetch, double interval) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
 
-  Mutex::Locker timer_locker(m_timer_lock);
+  std::lock_guard timer_locker{m_timer_lock};
   m_watch_interval = interval;
 
   ceph_assert(m_watch_ctx == nullptr);
@@ -103,7 +103,7 @@ void ObjectPlayer::unwatch() {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
   Context *watch_ctx = nullptr;
   {
-    Mutex::Locker timer_locker(m_timer_lock);
+    std::lock_guard timer_locker{m_timer_lock};
     ceph_assert(!m_unwatched);
     m_unwatched = true;
 
@@ -120,13 +120,13 @@ void ObjectPlayer::unwatch() {
 }
 
 void ObjectPlayer::front(Entry *entry) const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_entries.empty());
   *entry = m_entries.front();
 }
 
 void ObjectPlayer::pop_front() {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_entries.empty());
 
   auto &entry = m_entries.front();
@@ -148,7 +148,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
     return 0;
   }
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_fetch_in_progress);
   m_read_off += bl.length();
   m_read_bl.append(bl);
@@ -274,7 +274,7 @@ void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
 }
 
 void ObjectPlayer::schedule_watch() {
-  ceph_assert(m_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
   if (m_watch_ctx == NULL) {
     return;
   }
@@ -289,7 +289,7 @@ void ObjectPlayer::schedule_watch() {
 }
 
 bool ObjectPlayer::cancel_watch() {
-  ceph_assert(m_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
   if (m_watch_task != nullptr) {
     bool canceled = m_timer.cancel_event(m_watch_task);
@@ -302,7 +302,7 @@ bool ObjectPlayer::cancel_watch() {
 }
 
 void ObjectPlayer::handle_watch_task() {
-  ceph_assert(m_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
 
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
   ceph_assert(m_watch_ctx != nullptr);
@@ -318,7 +318,7 @@ void ObjectPlayer::handle_watch_fetched(int r) {
 
   Context *watch_ctx = nullptr;
   {
-    Mutex::Locker timer_locker(m_timer_lock);
+    std::lock_guard timer_locker{m_timer_lock};
     std::swap(watch_ctx, m_watch_ctx);
 
     if (m_unwatched) {
@@ -337,7 +337,7 @@ void ObjectPlayer::C_Fetch::finish(int r) {
   r = object_player->handle_fetch_complete(r, read_bl, &refetch);
 
   {
-    Mutex::Locker locker(object_player->m_lock);
+    std::lock_guard locker{object_player->m_lock};
     object_player->m_fetch_in_progress = false;
   }
 
index 3dfad125bc1f01d9784aa935a510c3cc776c9b8a..568d31b063541e92cc10b680234c7a3e9c6c5e8e 100644 (file)
@@ -7,8 +7,7 @@
 #include "include/Context.h"
 #include "include/interval_set.h"
 #include "include/rados/librados.hpp"
-#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/RefCountedObj.h"
 #include "journal/Entry.h"
 #include <list>
@@ -37,7 +36,7 @@ public:
   };
 
   ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
-               uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
+               uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock,
                uint8_t order, uint64_t max_fetch_bytes);
   ~ObjectPlayer() override;
 
@@ -55,16 +54,16 @@ public:
   void front(Entry *entry) const;
   void pop_front();
   inline bool empty() const {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     return m_entries.empty();
   }
 
   inline void get_entries(Entries *entries) {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     *entries = m_entries;
   }
   inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     *invalid_ranges = m_invalid_ranges;
   }
 
@@ -79,7 +78,7 @@ public:
   }
 
   inline void set_max_fetch_bytes(uint64_t max_fetch_bytes) {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     m_max_fetch_bytes = max_fetch_bytes;
   }
 
@@ -108,7 +107,7 @@ private:
   CephContext *m_cct;
 
   SafeTimer &m_timer;
-  Mutex &m_timer_lock;
+  ceph::mutex &m_timer_lock;
 
   uint8_t m_order;
   uint64_t m_max_fetch_bytes;
@@ -116,7 +115,7 @@ private:
   double m_watch_interval;
   Context *m_watch_task;
 
-  mutable Mutex m_lock;
+  mutable ceph::mutex m_lock;
   bool m_fetch_in_progress;
   bufferlist m_read_bl;
   uint32_t m_read_off = 0;
index 2366759cc5c2bb52d2d6c482623d719f8436083e..4bb6d03c186137706d495c2b97cd5638b9c1975e 100644 (file)
@@ -20,7 +20,7 @@ using std::shared_ptr;
 namespace journal {
 
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
-                               uint64_t object_number, shared_ptr<Mutex> lock,
+                               uint64_t object_number, ceph::mutex* lock,
                                ContextWQ *work_queue, Handler *handler,
                                uint8_t order, int32_t max_in_flight_appends)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
@@ -59,7 +59,7 @@ void ObjectRecorder::set_append_batch_options(int flush_interval,
                   << "flush_bytes=" << flush_bytes << ", "
                   << "flush_age=" << flush_age << dendl;
 
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
   m_flush_interval = flush_interval;
   m_flush_bytes = flush_bytes;
   m_flush_age = flush_age;
@@ -68,7 +68,7 @@ void ObjectRecorder::set_append_batch_options(int flush_interval,
 bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
   ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
 
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
 
   FutureImplPtr last_flushed_future;
   for (auto& append_buffer : append_buffers) {
@@ -91,13 +91,13 @@ void ObjectRecorder::flush(Context *on_safe) {
 
   Future future;
   {
-    Mutex::Locker locker(*m_lock);
+    std::unique_lock locker{*m_lock};
 
     // if currently handling flush notifications, wait so that
     // we notify in the correct order (since lock is dropped on
     // callback)
     if (m_in_flight_flushes) {
-      m_in_flight_flushes_cond.Wait(*(m_lock.get()));
+      m_in_flight_flushes_cond.wait(locker);
     }
 
     // attach the flush to the most recent append
@@ -124,15 +124,15 @@ void ObjectRecorder::flush(Context *on_safe) {
 void ObjectRecorder::flush(const FutureImplPtr &future) {
   ldout(m_cct, 20) << "flushing " << *future << dendl;
 
-  m_lock->Lock();
+  m_lock->lock();
   if (future->get_flush_handler().get() != &m_flush_handler) {
     // if we don't own this future, re-issue the flush so that it hits the
     // correct journal object owner
     future->flush();
-    m_lock->Unlock();
+    m_lock->unlock();
     return;
   } else if (future->is_flush_in_progress()) {
-    m_lock->Unlock();
+    m_lock->unlock();
     return;
   }
 
@@ -140,14 +140,14 @@ void ObjectRecorder::flush(const FutureImplPtr &future) {
   if (overflowed) {
     notify_handler_unlock();
   } else {
-    m_lock->Unlock();
+    m_lock->unlock();
   }
 }
 
 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
   ldout(m_cct, 20) << dendl;
 
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
   ceph_assert(m_in_flight_tids.empty());
   ceph_assert(m_in_flight_appends.empty());
   ceph_assert(m_object_closed || m_overflowed);
@@ -161,7 +161,7 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
 }
 
 bool ObjectRecorder::close() {
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
 
   ldout(m_cct, 20) << dendl;
   send_appends(true, {});
@@ -176,7 +176,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
 
   AppendBuffers append_buffers;
   {
-    m_lock->Lock();
+    m_lock->lock();
     auto tid_iter = m_in_flight_tids.find(tid);
     ceph_assert(tid_iter != m_in_flight_tids.end());
     m_in_flight_tids.erase(tid_iter);
@@ -193,7 +193,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
         append_overflowed();
         notify_handler_unlock();
       } else {
-        m_lock->Unlock();
+        m_lock->unlock();
       }
       return;
     }
@@ -208,7 +208,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
 
     m_in_flight_appends.erase(iter);
     m_in_flight_flushes = true;
-    m_lock->Unlock();
+    m_lock->unlock();
   }
 
   // Flag the associated futures as complete.
@@ -218,9 +218,9 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
   }
 
   // wake up any flush requests that raced with a RADOS callback
-  m_lock->Lock();
+  m_lock->lock();
   m_in_flight_flushes = false;
-  m_in_flight_flushes_cond.Signal();
+  m_in_flight_flushes_cond.notify_all();
 
   if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
     // all remaining unsent appends should be redirected to new object
@@ -230,7 +230,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
     if (overflowed) {
       notify_handler_unlock();
     } else {
-      m_lock->Unlock();
+      m_lock->unlock();
     }
   }
 }
@@ -238,7 +238,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
 void ObjectRecorder::append_overflowed() {
   ldout(m_cct, 10) << dendl;
 
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
   ceph_assert(!m_in_flight_appends.empty());
 
   InFlightAppends in_flight_appends;
@@ -261,7 +261,7 @@ void ObjectRecorder::append_overflowed() {
 bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
   ldout(m_cct, 20) << dendl;
 
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
   if (m_object_closed || m_overflowed) {
     ldout(m_cct, 20) << "already closed or overflowed" << dendl;
     return false;
@@ -369,13 +369,13 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
 }
 
 void ObjectRecorder::notify_handler_unlock() {
-  ceph_assert(m_lock->is_locked());
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
   if (m_object_closed) {
-    m_lock->Unlock();
+    m_lock->unlock();
     m_handler->closed(this);
   } else {
     // TODO need to delay completion until after aio_notify completes
-    m_lock->Unlock();
+    m_lock->unlock();
     m_handler->overflow(this);
   }
 }
index 1f6014773ef5d88aa7b4e3c71594422fa8bd6e01..30dde76c92bd5c56e01b2db2669bbed9d2ba551c 100644 (file)
@@ -7,8 +7,7 @@
 #include "include/utime.h"
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
-#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/RefCountedObj.h"
 #include "common/WorkQueue.h"
 #include "journal/FutureImpl.h"
@@ -39,7 +38,7 @@ public:
   };
 
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
-                 uint64_t object_number, std::shared_ptr<Mutex> lock,
+                 uint64_t object_number, ceph::mutex* lock,
                  ContextWQ *work_queue, Handler *handler, uint8_t order,
                  int32_t max_in_flight_appends);
   ~ObjectRecorder() override;
@@ -61,7 +60,7 @@ public:
   void claim_append_buffers(AppendBuffers *append_buffers);
 
   bool is_closed() const {
-    ceph_assert(m_lock->is_locked());
+    ceph_assert(ceph_mutex_is_locked(*m_lock));
     return (m_object_closed && m_in_flight_appends.empty());
   }
   bool close();
@@ -71,7 +70,7 @@ public:
   }
 
   inline size_t get_pending_appends() const {
-    Mutex::Locker locker(*m_lock);
+    std::lock_guard locker{*m_lock};
     return m_pending_buffers.size();
   }
 
@@ -126,7 +125,7 @@ private:
 
   FlushHandler m_flush_handler;
 
-  mutable std::shared_ptr<Mutex> m_lock;
+  mutable ceph::mutex* m_lock;
   AppendBuffers m_pending_buffers;
   uint64_t m_pending_bytes = 0;
   utime_t m_last_flush_time;
@@ -142,7 +141,7 @@ private:
   bufferlist m_prefetch_bl;
 
   bool m_in_flight_flushes;
-  Cond m_in_flight_flushes_cond;
+  ceph::condition_variable m_in_flight_flushes_cond;
   uint64_t m_in_flight_bytes = 0;
 
   bool send_appends(bool force, FutureImplPtr flush_sentinal);