From: Kefu Chai Date: Sun, 7 Jul 2019 04:35:23 +0000 (+0800) Subject: journal: s/Mutex/ceph::mutex/ X-Git-Tag: v15.1.0~1971^2~39 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f39b32ebfa3b88f43bcc7f30d77511371ac09e1c;p=ceph.git journal: s/Mutex/ceph::mutex/ * FutureImpl::m_lock is an exception. as, before this change, the lock was initialized like `m_lock("FutureImpl::m_lock", false, false)`, see the declaration of `Mutex(const std::string &n, bool r = false, bool ld=true, bool bt=false)` so `m_lock` is actually not using the extra features offered by `Mutex` like runtime lockdeps check. and `mutex_debugging_base` does not allow us to disable lockdeps individually. but it does use the `is_locked()` method. so instead of using `ceph::mutex` directly, a cutomized `ceph::mutex` is added for `CEPH_DEBUG_MUTEX` build. Signed-off-by: Kefu Chai --- diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc index eac3fc39ba3..474c025c608 100644 --- a/src/journal/FutureImpl.cc +++ b/src/journal/FutureImpl.cc @@ -10,7 +10,7 @@ FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid) : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid), m_commit_tid(commit_tid), - m_lock("FutureImpl::m_lock", false, false), m_safe(false), + m_safe(false), m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE), m_consistent_ack(this) { } @@ -32,7 +32,7 @@ void FutureImpl::flush(Context *on_safe) { FlushHandlers flush_handlers; FutureImplPtr prev_future; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; complete = (m_safe && m_consistent); if (!complete) { if (on_safe != nullptr) { @@ -61,13 +61,13 @@ void FutureImpl::flush(Context *on_safe) { } FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return prepare_flush(flush_handlers, m_lock); } FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers, - Mutex &lock) { - ceph_assert(m_lock.is_locked()); + ceph::mutex &lock) { + ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_flush_state == FLUSH_STATE_NONE) { m_flush_state = FLUSH_STATE_REQUESTED; @@ -82,7 +82,7 @@ FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers, void FutureImpl::wait(Context *on_safe) { ceph_assert(on_safe != NULL); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!m_safe || !m_consistent) { m_contexts.push_back(on_safe); return; @@ -93,25 +93,25 @@ void FutureImpl::wait(Context *on_safe) { } bool FutureImpl::is_complete() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_safe && m_consistent; } int FutureImpl::get_return_value() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_safe && m_consistent); return m_return_value; } bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_flush_handler); m_flush_handler = flush_handler; return m_flush_state != FLUSH_STATE_NONE; } void FutureImpl::safe(int r) { - m_lock.Lock(); + m_lock.lock(); ceph_assert(!m_safe); m_safe = true; if (m_return_value == 0) { @@ -122,12 +122,12 @@ void FutureImpl::safe(int r) { if (m_consistent) { finish_unlock(); } else { - m_lock.Unlock(); + m_lock.unlock(); } } void FutureImpl::consistent(int r) { - m_lock.Lock(); + m_lock.lock(); ceph_assert(!m_consistent); m_consistent = true; m_prev_future.reset(); @@ -138,18 +138,18 @@ void FutureImpl::consistent(int r) { if (m_safe) { finish_unlock(); } else { - m_lock.Unlock(); + m_lock.unlock(); } } void FutureImpl::finish_unlock() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_safe && m_consistent); Contexts contexts; contexts.swap(m_contexts); - m_lock.Unlock(); + m_lock.unlock(); for (Contexts::iterator it = contexts.begin(); it != contexts.end(); ++it) { (*it)->complete(m_return_value); diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h index 2be3eb253ec..b81fba200bc 100644 --- a/src/journal/FutureImpl.h +++ b/src/journal/FutureImpl.h @@ -5,7 +5,6 @@ #define CEPH_JOURNAL_FUTURE_IMPL_H #include "include/int_types.h" -#include "common/Mutex.h" #include "common/RefCountedObj.h" #include "include/Context.h" #include "journal/Future.h" @@ -53,11 +52,11 @@ public: int get_return_value() const; inline bool is_flush_in_progress() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return (m_flush_state == FLUSH_STATE_IN_PROGRESS); } inline void set_flush_in_progress() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_flush_handler); m_flush_handler.reset(); m_flush_state = FLUSH_STATE_IN_PROGRESS; @@ -65,11 +64,11 @@ public: bool attach(const FlushHandlerPtr &flush_handler); inline void detach() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_flush_handler.reset(); } inline FlushHandlerPtr get_flush_handler() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_flush_handler; } @@ -101,7 +100,7 @@ private: uint64_t m_entry_tid; uint64_t m_commit_tid; - mutable Mutex m_lock; + mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false); FutureImplPtr m_prev_future; bool m_safe; bool m_consistent; @@ -114,7 +113,7 @@ private: Contexts m_contexts; FutureImplPtr prepare_flush(FlushHandlers *flush_handlers); - FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, Mutex &lock); + FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock); void consistent(int r); void finish_unlock(); diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index f35eccb131f..7fcad836c19 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -401,7 +401,7 @@ struct C_AssertActiveTag : public Context { } // anonymous namespace JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, librados::IoCtx &ioctx, + ceph::mutex *timer_lock, librados::IoCtx &ioctx, const std::string &oid, const std::string &client_id, const Settings &settings) @@ -409,7 +409,7 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, m_client_id(client_id), m_settings(settings), m_order(0), m_splay_width(0), m_pool_id(-1), m_initialized(false), m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), - m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this), + m_commit_tid(0), m_watch_ctx(this), m_watch_handle(0), m_minimum_set(0), m_active_set(0), m_update_notifications(0), m_commit_position_ctx(NULL), m_commit_position_task_ctx(NULL) { @@ -418,13 +418,13 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, } JournalMetadata::~JournalMetadata() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_initialized); } void JournalMetadata::init(Context *on_finish) { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_initialized); m_initialized = true; } @@ -437,7 +437,7 @@ void JournalMetadata::init(Context *on_finish) { if (r < 0) { lderr(m_cct) << __func__ << ": failed to watch journal" << cpp_strerror(r) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_watch_handle = 0; on_finish->complete(r); return; @@ -459,7 +459,7 @@ void JournalMetadata::shut_down(Context *on_finish) { uint64_t watch_handle = 0; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_initialized = false; std::swap(watch_handle, m_watch_handle); } @@ -592,23 +592,23 @@ void JournalMetadata::get_tags(uint64_t start_after_tag_tid, } void JournalMetadata::add_listener(JournalMetadataListener *listener) { - Mutex::Locker locker(m_lock); - while (m_update_notifications > 0) { - m_update_cond.Wait(m_lock); - } + std::unique_lock locker{m_lock}; + m_update_cond.wait(locker, [this] { + return m_update_notifications <= 0; + }); m_listeners.push_back(listener); } void JournalMetadata::remove_listener(JournalMetadataListener *listener) { - Mutex::Locker locker(m_lock); - while (m_update_notifications > 0) { - m_update_cond.Wait(m_lock); - } + std::unique_lock locker{m_lock}; + m_update_cond.wait(locker, [this] { + return m_update_notifications <= 0; + }); m_listeners.remove(listener); } void JournalMetadata::set_minimum_set(uint64_t object_set) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set << ", new=" << object_set << dendl; @@ -637,7 +637,7 @@ int JournalMetadata::set_active_set(uint64_t object_set) { } void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ldout(m_cct, 20) << __func__ << ": current=" << m_active_set << ", new=" << object_set << dendl; @@ -661,7 +661,7 @@ void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { } void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, m_async_op_tracker, @@ -681,8 +681,7 @@ void JournalMetadata::flush_commit_position() { void JournalMetadata::flush_commit_position(Context *on_safe) { ldout(m_cct, 20) << __func__ << dendl; - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{*m_timer_lock, m_lock}; if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) { // nothing to flush if (on_safe != nullptr) { @@ -703,7 +702,7 @@ void JournalMetadata::flush_commit_position(Context *on_safe) { } void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; if (allocated_entry_tid <= entry_tid) { allocated_entry_tid = entry_tid + 1; @@ -712,7 +711,7 @@ void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid); if (it == m_allocated_entry_tids.end()) { @@ -740,7 +739,7 @@ void JournalMetadata::refresh(Context *on_complete) { ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (on_complete != nullptr) { m_refresh_ctxs.push_back(on_complete); } @@ -755,7 +754,7 @@ void JournalMetadata::refresh(Context *on_complete) { void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; - m_lock.Lock(); + m_lock.lock(); if (r == 0) { Client client(m_client_id, bufferlist()); RegisteredClients::iterator it = refresh->registered_clients.find(client); @@ -770,14 +769,14 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { m_client = *it; ++m_update_notifications; - m_lock.Unlock(); + m_lock.unlock(); for (Listeners::iterator it = m_listeners.begin(); it != m_listeners.end(); ++it) { (*it)->handle_update(this); } - m_lock.Lock(); + m_lock.lock(); if (--m_update_notifications == 0) { - m_update_cond.Signal(); + m_update_cond.notify_all(); } } else { lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; @@ -791,7 +790,7 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { if (m_refreshes_in_progress == 0) { std::swap(refresh_ctxs, m_refresh_ctxs); } - m_lock.Unlock(); + m_lock.unlock(); for (auto ctx : refresh_ctxs) { ctx->complete(r); @@ -801,8 +800,8 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { void JournalMetadata::cancel_commit_task() { ldout(m_cct, 20) << __func__ << dendl; - ceph_assert(m_timer_lock->is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_commit_position_ctx != nullptr); ceph_assert(m_commit_position_task_ctx != nullptr); m_timer->cancel_event(m_commit_position_task_ctx); @@ -812,8 +811,8 @@ void JournalMetadata::cancel_commit_task() { void JournalMetadata::schedule_commit_task() { ldout(m_cct, 20) << __func__ << dendl; - ceph_assert(m_timer_lock->is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_commit_position_ctx != nullptr); if (m_commit_position_task_ctx == nullptr) { m_commit_position_task_ctx = @@ -823,8 +822,8 @@ void JournalMetadata::schedule_commit_task() { } void JournalMetadata::handle_commit_position_task() { - ceph_assert(m_timer_lock->is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ldout(m_cct, 20) << __func__ << ": " << "client_id=" << m_client_id << ", " << "commit_position=" << m_commit_position << dendl; @@ -838,13 +837,13 @@ void JournalMetadata::handle_commit_position_task() { Context* ctx = new FunctionContext([this, commit_position_ctx](int r) { Contexts flush_commit_position_ctxs; - m_lock.Lock(); + m_lock.lock(); ceph_assert(m_flush_commits_in_progress > 0); --m_flush_commits_in_progress; if (m_flush_commits_in_progress == 0) { std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs); } - m_lock.Unlock(); + m_lock.unlock(); commit_position_ctx->complete(0); for (auto ctx : flush_commit_position_ctxs) { @@ -856,9 +855,9 @@ void JournalMetadata::handle_commit_position_task() { ctx = new FunctionContext([this, ctx](int r) { // manually kick of a refresh in case the notification is missed // and ignore the next notification that we are about to send - m_lock.Lock(); + m_lock.lock(); ++m_ignore_watch_notifies; - m_lock.Unlock(); + m_lock.unlock(); refresh(ctx); }); @@ -877,12 +876,12 @@ void JournalMetadata::handle_commit_position_task() { } void JournalMetadata::schedule_watch_reset() { - ceph_assert(m_timer_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); m_timer->add_event_after(1, new C_WatchReset(this)); } void JournalMetadata::handle_watch_reset() { - ceph_assert(m_timer_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); if (!m_initialized) { return; } @@ -911,7 +910,7 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_ignore_watch_notifies > 0) { --m_ignore_watch_notifies; return; @@ -930,8 +929,7 @@ void JournalMetadata::handle_watch_error(int err) { lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; } - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{*m_timer_lock, m_lock}; // release old watch on error if (m_watch_handle != 0) { @@ -947,7 +945,7 @@ void JournalMetadata::handle_watch_error(int err) { uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; uint64_t commit_tid = ++m_commit_tid; m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid, entry_tid); @@ -962,7 +960,7 @@ uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, uint64_t object_num) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_pending_commit_tids.find(commit_tid); ceph_assert(it != m_pending_commit_tids.end()); @@ -978,7 +976,7 @@ void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, void JournalMetadata::get_commit_entry(uint64_t commit_tid, uint64_t *object_num, uint64_t *tag_tid, uint64_t *entry_tid) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_pending_commit_tids.find(commit_tid); ceph_assert(it != m_pending_commit_tids.end()); @@ -995,8 +993,7 @@ void JournalMetadata::committed(uint64_t commit_tid, ObjectSetPosition commit_position; Context *stale_ctx = nullptr; { - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{*m_timer_lock, m_lock}; ceph_assert(commit_tid > m_commit_position_tid); if (!m_commit_position.object_positions.empty()) { @@ -1103,7 +1100,7 @@ void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { Context *ctx = on_finish; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto &c : m_registered_clients) { if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || c.id == m_client_id || @@ -1158,7 +1155,7 @@ std::ostream &operator<<(std::ostream &os, std::ostream &operator<<(std::ostream &os, const JournalMetadata &jm) { - Mutex::Locker locker(jm.m_lock); + std::lock_guard locker{jm.m_lock}; os << "[oid=" << jm.m_oid << ", " << "initialized=" << jm.m_initialized << ", " << "order=" << (int)jm.m_order << ", " diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 393d20297d8..13d9fd44ff1 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -9,7 +9,7 @@ #include "include/rados/librados.hpp" #include "common/AsyncOpTracker.h" #include "common/Cond.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/RefCountedObj.h" #include "common/WorkQueue.h" #include "cls/journal/cls_journal_types.h" @@ -43,7 +43,7 @@ public: typedef std::set RegisteredClients; typedef std::list Tags; - JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, + JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, librados::IoCtx &ioctx, const std::string &oid, const std::string &client_id, const Settings &settings); ~JournalMetadata() override; @@ -105,20 +105,20 @@ public: inline SafeTimer &get_timer() { return *m_timer; } - inline Mutex &get_timer_lock() { + inline ceph::mutex &get_timer_lock() { return *m_timer_lock; } void set_minimum_set(uint64_t object_set); inline uint64_t get_minimum_set() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_minimum_set; } int set_active_set(uint64_t object_set); void set_active_set(uint64_t object_set, Context *on_finish); inline uint64_t get_active_set() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_active_set; } @@ -127,17 +127,17 @@ public: void flush_commit_position(); void flush_commit_position(Context *on_safe); void get_commit_position(ObjectSetPosition *commit_position) const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; *commit_position = m_client.commit_position; } void get_registered_clients(RegisteredClients *registered_clients) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; *registered_clients = m_registered_clients; } inline uint64_t allocate_entry_tid(uint64_t tag_tid) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_allocated_entry_tids[tag_tid]++; } void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid); @@ -216,7 +216,7 @@ private: journal_metadata->m_async_op_tracker.finish_op(); } void finish(int r) override { - Mutex::Locker locker(journal_metadata->m_lock); + std::lock_guard locker{journal_metadata->m_lock}; journal_metadata->handle_commit_position_task(); }; }; @@ -268,7 +268,7 @@ private: C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish) : journal_metadata(_journal_metadata), on_finish(_on_finish) { - Mutex::Locker locker(journal_metadata->m_lock); + std::lock_guard locker{journal_metadata->m_lock}; journal_metadata->m_async_op_tracker.start_op(); } ~C_ImmutableMetadata() override { @@ -287,7 +287,7 @@ private: C_Refresh(JournalMetadata *_journal_metadata) : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) { - Mutex::Locker locker(journal_metadata->m_lock); + std::lock_guard locker{journal_metadata->m_lock}; journal_metadata->m_async_op_tracker.start_op(); } ~C_Refresh() override { @@ -311,9 +311,9 @@ private: ContextWQ *m_work_queue; SafeTimer *m_timer; - Mutex *m_timer_lock; + ceph::mutex *m_timer_lock; - mutable Mutex m_lock; + mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock"); uint64_t m_commit_tid; CommitTids m_pending_commit_tids; @@ -331,7 +331,7 @@ private: AllocatedEntryTids m_allocated_entry_tids; size_t m_update_notifications; - Cond m_update_cond; + ceph::condition_variable m_update_cond; size_t m_ignore_watch_notifies = 0; size_t m_refreshes_in_progress = 0; diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 5ac4ebc5dc2..02df4dfb7e9 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -59,7 +59,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_replay_handler(replay_handler), m_cache_manager_handler(cache_manager_handler), - m_cache_rebalance_handler(this), m_lock("JournalPlayer::m_lock"), + m_cache_rebalance_handler(this), m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) { m_replay_handler->get(); @@ -104,7 +104,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, JournalPlayer::~JournalPlayer() { ceph_assert(m_async_op_tracker.empty()); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_shut_down); ceph_assert(m_fetch_object_numbers.empty()); ceph_assert(!m_watch_scheduled); @@ -117,7 +117,7 @@ JournalPlayer::~JournalPlayer() { } void JournalPlayer::prefetch() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_state == STATE_INIT); if (m_cache_manager_handler != nullptr && m_max_fetch_bytes == 0) { @@ -162,7 +162,7 @@ void JournalPlayer::prefetch() { void JournalPlayer::prefetch_and_watch(double interval) { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_watch_enabled = true; m_watch_interval = interval; m_watch_step = WATCH_STEP_FETCH_CURRENT; @@ -172,7 +172,7 @@ void JournalPlayer::prefetch_and_watch(double interval) { void JournalPlayer::shut_down(Context *on_finish) { ldout(m_cct, 20) << __func__ << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_shut_down); m_shut_down = true; @@ -200,7 +200,7 @@ void JournalPlayer::shut_down(Context *on_finish) { bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { ldout(m_cct, 20) << __func__ << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_state != STATE_PLAYBACK) { m_handler_notified = false; @@ -248,7 +248,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", " << "r=" << r << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (r >= 0) { switch (m_state) { case STATE_PREFETCH: @@ -277,7 +277,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) { int JournalPlayer::process_prefetch(uint64_t object_number) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; @@ -382,7 +382,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { int JournalPlayer::process_playback(uint64_t object_number) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (verify_playback_ready()) { notify_entries_available(); @@ -393,7 +393,7 @@ int JournalPlayer::process_playback(uint64_t object_number) { } bool JournalPlayer::is_object_set_ready() const { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_watch_scheduled || !m_fetch_object_numbers.empty()) { ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl; return false; @@ -403,7 +403,7 @@ bool JournalPlayer::is_object_set_ready() const { } bool JournalPlayer::verify_playback_ready() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); while (true) { if (!is_object_set_ready()) { @@ -506,7 +506,7 @@ bool JournalPlayer::verify_playback_ready() { } void JournalPlayer::prune_tag(uint64_t tag_tid) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag " << tag_tid << dendl; @@ -550,7 +550,7 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) { } void JournalPlayer::prune_active_tag(const boost::optional& tag_tid) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_active_tag_tid); uint64_t active_tag_tid = *m_active_tag_tid; @@ -564,7 +564,7 @@ void JournalPlayer::prune_active_tag(const boost::optional& tag_tid) { } ObjectPlayerPtr JournalPlayer::get_object_player() const { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); SplayedObjectPlayers::const_iterator it = m_object_players.find( m_splay_offset); @@ -573,7 +573,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const { } ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; @@ -586,7 +586,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const { } void JournalPlayer::advance_splay_object() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ++m_splay_offset; m_splay_offset %= m_journal_metadata->get_splay_width(); m_watch_step = WATCH_STEP_FETCH_CURRENT; @@ -595,7 +595,7 @@ void JournalPlayer::advance_splay_object() { } bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_watch_scheduled); uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -629,7 +629,7 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { } void JournalPlayer::fetch(uint64_t object_num) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ObjectPlayerPtr object_player(new ObjectPlayer( m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(), @@ -642,7 +642,7 @@ void JournalPlayer::fetch(uint64_t object_num) { } void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); uint64_t object_num = object_player->get_object_number(); std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); @@ -660,7 +660,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { << utils::get_object_name(m_object_oid_prefix, object_num) << ": r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_fetch_object_numbers.count(object_num) == 1); m_fetch_object_numbers.erase(object_num); @@ -677,7 +677,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { void JournalPlayer::refetch(bool immediate) { ldout(m_cct, 10) << __func__ << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_handler_notified = false; // if watching the object, handle the periodic re-fetch @@ -698,7 +698,7 @@ void JournalPlayer::refetch(bool immediate) { void JournalPlayer::schedule_watch(bool immediate) { ldout(m_cct, 10) << __func__ << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_watch_scheduled) { return; } @@ -759,7 +759,7 @@ void JournalPlayer::schedule_watch(bool immediate) { void JournalPlayer::handle_watch(uint64_t object_num, int r) { ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_watch_scheduled); m_watch_scheduled = false; @@ -793,7 +793,7 @@ void JournalPlayer::handle_watch(uint64_t object_num, int r) { void JournalPlayer::handle_watch_assert_active(int r) { ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_watch_scheduled); m_watch_scheduled = false; @@ -813,7 +813,7 @@ void JournalPlayer::handle_watch_assert_active(int r) { } void JournalPlayer::notify_entries_available() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_handler_notified) { return; } @@ -825,7 +825,7 @@ void JournalPlayer::notify_entries_available() { } void JournalPlayer::notify_complete(int r) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_handler_notified = true; ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; @@ -834,7 +834,7 @@ void JournalPlayer::notify_complete(int r) { } void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_state == STATE_ERROR || m_shut_down) { return; diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 6ba54cc8bab..21f215410f7 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -8,7 +8,6 @@ #include "include/Context.h" #include "include/rados/librados.hpp" #include "common/AsyncOpTracker.h" -#include "common/Mutex.h" #include "journal/JournalMetadata.h" #include "journal/ObjectPlayer.h" #include "journal/Types.h" @@ -116,7 +115,7 @@ private: AsyncOpTracker m_async_op_tracker; - mutable Mutex m_lock; + mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock"); State m_state; uint8_t m_splay_offset; diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index aa90660a01f..977b9b4f394 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -54,23 +54,26 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_max_in_flight_appends(max_in_flight_appends), m_listener(this), - m_object_handler(this), m_lock("JournalerRecorder::m_lock"), - m_current_set(m_journal_metadata->get_active_set()) { - - Mutex::Locker locker(m_lock); + m_object_handler(this), + m_lock(ceph::make_mutex("JournalerRecorder::m_lock")), + m_current_set(m_journal_metadata->get_active_set()), + m_object_locks{ceph::make_lock_container( + journal_metadata->get_splay_width(), [](const size_t splay_offset) { + return ceph::make_mutex("ObjectRecorder::m_lock::" + + std::to_string(splay_offset)); + })} +{ + + std::lock_guard locker{m_lock}; m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - shared_ptr object_lock(new Mutex( - "ObjectRecorder::m_lock::" + std::to_string(splay_offset))); - m_object_locks.push_back(object_lock); - uint64_t object_number = splay_offset + (m_current_set * splay_width); - Mutex::Locker locker(*object_lock); + std::lock_guard locker{m_object_locks[splay_offset]}; m_object_ptrs[splay_offset] = create_object_recorder( - object_number, m_object_locks[splay_offset]); + object_number, &m_object_locks[splay_offset]); } m_journal_metadata->add_listener(&m_listener); @@ -79,7 +82,7 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, JournalRecorder::~JournalRecorder() { m_journal_metadata->remove_listener(&m_listener); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_in_flight_advance_sets == 0); ceph_assert(m_in_flight_object_closes == 0); } @@ -89,7 +92,7 @@ void JournalRecorder::shut_down(Context *on_safe) { [this, on_safe](int r) { Context *ctx = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_in_flight_advance_sets != 0) { ceph_assert(m_on_object_set_advanced == nullptr); m_on_object_set_advanced = new FunctionContext( @@ -114,14 +117,14 @@ void JournalRecorder::set_append_batch_options(int flush_interval, << "flush_bytes=" << flush_bytes << ", " << "flush_age=" << flush_age << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_flush_interval = flush_interval; m_flush_bytes = flush_bytes; m_flush_age = flush_age; uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - Mutex::Locker object_locker(*m_object_locks[splay_offset]); + std::lock_guard object_locker{m_object_locks[splay_offset]}; auto object_recorder = get_object(splay_offset); object_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age); @@ -132,7 +135,7 @@ Future JournalRecorder::append(uint64_t tag_tid, const bufferlist &payload_bl) { ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl; - m_lock.Lock(); + m_lock.lock(); uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -145,8 +148,8 @@ Future JournalRecorder::append(uint64_t tag_tid, future->init(m_prev_future); m_prev_future = future; - m_object_locks[splay_offset]->Lock(); - m_lock.Unlock(); + m_object_locks[splay_offset].lock(); + m_lock.unlock(); bufferlist entry_bl; encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), @@ -154,12 +157,12 @@ Future JournalRecorder::append(uint64_t tag_tid, ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size()); bool object_full = object_ptr->append({{future, entry_bl}}); - m_object_locks[splay_offset]->Unlock(); + m_object_locks[splay_offset].unlock(); if (object_full) { ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; close_and_advance_object_set(object_ptr->get_object_number() / splay_width); } return Future(future); @@ -170,7 +173,7 @@ void JournalRecorder::flush(Context *on_safe) { C_Flush *ctx; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); @@ -185,7 +188,7 @@ void JournalRecorder::flush(Context *on_safe) { } ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset]; ceph_assert(object_recoder != NULL); @@ -193,7 +196,7 @@ ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { } void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); // entry overflow from open object if (m_current_set != object_set) { @@ -218,7 +221,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { } void JournalRecorder::advance_object_set() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_in_flight_object_closes == 0); ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl; @@ -229,7 +232,7 @@ void JournalRecorder::advance_object_set() { void JournalRecorder::handle_advance_object_set(int r) { Context *on_object_set_advanced = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; ceph_assert(m_in_flight_advance_sets > 0); @@ -251,7 +254,7 @@ void JournalRecorder::handle_advance_object_set(int r) { } void JournalRecorder::open_object_set() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ldout(m_cct, 10) << "opening object set " << m_current_set << dendl; @@ -274,7 +277,7 @@ void JournalRecorder::open_object_set() { bool JournalRecorder::close_object_set(uint64_t active_set) { ldout(m_cct, 10) << "active_set=" << active_set << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); // object recorders will invoke overflow handler as they complete // closing the object to ensure correct order of future appends @@ -300,7 +303,7 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { } ObjectRecorderPtr JournalRecorder::create_object_recorder( - uint64_t object_number, shared_ptr lock) { + uint64_t object_number, ceph::mutex* lock) { ldout(m_cct, 10) << "object_number=" << object_number << dendl; ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), @@ -314,17 +317,17 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( void JournalRecorder::create_next_object_recorder( ObjectRecorderPtr object_recorder) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); uint64_t object_number = object_recorder->get_object_number(); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; ldout(m_cct, 10) << "object_number=" << object_number << dendl; - ceph_assert(m_object_locks[splay_offset]->is_locked()); + ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset])); ObjectRecorderPtr new_object_recorder = create_object_recorder( - (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]); + (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]); ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", " << "new oid=" << new_object_recorder->get_oid() << dendl; @@ -343,7 +346,7 @@ void JournalRecorder::create_next_object_recorder( } void JournalRecorder::handle_update() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; uint64_t active_set = m_journal_metadata->get_active_set(); if (m_current_set < active_set) { @@ -365,7 +368,7 @@ void JournalRecorder::handle_update() { void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { ldout(m_cct, 10) << object_recorder->get_oid() << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; uint64_t object_number = object_recorder->get_object_number(); uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -393,7 +396,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { ldout(m_cct, 10) << object_recorder->get_oid() << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; uint64_t object_number = object_recorder->get_object_number(); uint8_t splay_width = m_journal_metadata->get_splay_width(); diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index 382f75acef9..7395283325e 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -7,7 +7,8 @@ #include "include/int_types.h" #include "include/Context.h" #include "include/rados/librados.hpp" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" +#include "common/containers.h" #include "journal/Future.h" #include "journal/FutureImpl.h" #include "journal/JournalMetadata.h" @@ -90,13 +91,13 @@ private: Listener m_listener; ObjectHandler m_object_handler; - Mutex m_lock; + ceph::mutex m_lock; uint32_t m_in_flight_advance_sets = 0; uint32_t m_in_flight_object_closes = 0; uint64_t m_current_set; ObjectRecorderPtrs m_object_ptrs; - std::vector> m_object_locks; + ceph::containers::tiny_vector m_object_locks; FutureImplPtr m_prev_future; @@ -111,7 +112,7 @@ private: void close_and_advance_object_set(uint64_t object_set); ObjectRecorderPtr create_object_recorder(uint64_t object_number, - std::shared_ptr lock); + ceph::mutex* lock); void create_next_object_recorder(ObjectRecorderPtr object_recorder); void handle_update(); @@ -121,13 +122,13 @@ private: void lock_object_recorders() { for (auto& lock : m_object_locks) { - lock->Lock(); + lock.lock(); } } void unlock_object_recorders() { for (auto& lock : m_object_locks) { - lock->Unlock(); + lock.unlock(); } } }; diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index 645a62304c5..84bc7e79cae 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -16,7 +16,7 @@ namespace journal { struct JournalTrimmer::C_RemoveSet : public Context { JournalTrimmer *journal_trimmer; uint64_t object_set; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("JournalTrimmer::m_lock"); uint32_t refs; int return_value; @@ -34,7 +34,7 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, const JournalMetadataPtr &journal_metadata) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_metadata_listener(this), - m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false), + m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); @@ -49,7 +49,7 @@ JournalTrimmer::~JournalTrimmer() { void JournalTrimmer::shut_down(Context *on_finish) { ldout(m_cct, 20) << __func__ << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_shutdown); m_shutdown = true; } @@ -67,7 +67,7 @@ void JournalTrimmer::remove_objects(bool force, Context *on_finish) { ldout(m_cct, 20) << __func__ << dendl; on_finish = new FunctionContext([this, force, on_finish](int r) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_remove_set_pending) { on_finish->complete(-EBUSY); @@ -103,7 +103,7 @@ void JournalTrimmer::committed(uint64_t commit_tid) { } void JournalTrimmer::trim_objects(uint64_t minimum_set) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl; if (minimum_set <= m_journal_metadata->get_minimum_set()) { @@ -121,7 +121,7 @@ void JournalTrimmer::trim_objects(uint64_t minimum_set) { } void JournalTrimmer::remove_set(uint64_t object_set) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_async_op_tracker.start_op(); uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -149,7 +149,7 @@ void JournalTrimmer::remove_set(uint64_t object_set) { void JournalTrimmer::handle_metadata_updated() { ldout(m_cct, 20) << __func__ << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; JournalMetadata::RegisteredClients registered_clients; m_journal_metadata->get_registered_clients(®istered_clients); @@ -193,7 +193,7 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) { ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", " << "trim=" << m_remove_set << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_remove_set_pending = false; if (r == -ENOENT) { @@ -223,12 +223,12 @@ JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set, uint8_t _splay_width) : journal_trimmer(_journal_trimmer), object_set(_object_set), - lock(utils::unique_lock_name("C_RemoveSet::lock", this)), + lock(ceph::make_mutex(utils::unique_lock_name("C_RemoveSet::lock", this))), refs(_splay_width), return_value(-ENOENT) { } void JournalTrimmer::C_RemoveSet::complete(int r) { - lock.Lock(); + lock.lock(); if (r < 0 && r != -ENOENT && (return_value == -ENOENT || return_value == 0)) { return_value = r; @@ -238,10 +238,10 @@ void JournalTrimmer::C_RemoveSet::complete(int r) { if (--refs == 0) { finish(return_value); - lock.Unlock(); + lock.unlock(); delete this; } else { - lock.Unlock(); + lock.unlock(); } } diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h index 0b27923992d..719be88e77b 100644 --- a/src/journal/JournalTrimmer.h +++ b/src/journal/JournalTrimmer.h @@ -8,7 +8,6 @@ #include "include/rados/librados.hpp" #include "include/Context.h" #include "common/AsyncOpTracker.h" -#include "common/Mutex.h" #include "journal/JournalMetadata.h" #include "cls/journal/cls_journal_types.h" #include @@ -70,7 +69,7 @@ private: AsyncOpTracker m_async_op_tracker; - Mutex m_lock; + ceph::mutex m_lock = ceph::make_mutex("JournalTrimmer::m_lock"); bool m_remove_set_pending; uint64_t m_remove_set; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 7d9c1409584..51a08d5be81 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -43,8 +43,7 @@ std::string Journaler::object_oid_prefix(int pool_id, return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + "."; } -Journaler::Threads::Threads(CephContext *cct) - : timer_lock("Journaler::timer_lock") { +Journaler::Threads::Threads(CephContext *cct) { thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1); thread_pool->start(); @@ -56,7 +55,7 @@ Journaler::Threads::Threads(CephContext *cct) Journaler::Threads::~Threads() { { - Mutex::Locker timer_locker(timer_lock); + std::lock_guard timer_locker{timer_lock}; timer->shutdown(); } delete timer; @@ -82,7 +81,7 @@ Journaler::Journaler(librados::IoCtx &header_ioctx, } Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, librados::IoCtx &header_ioctx, + ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, const std::string &journal_id, const std::string &client_id, const Settings &settings, CacheManagerHandler *cache_manager_handler) @@ -92,7 +91,7 @@ Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer, } void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, librados::IoCtx &header_ioctx, + ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, const std::string &journal_id, const Settings &settings) { m_header_ioctx.dup(header_ioctx); diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index a063a6d43a0..17397d7ec2a 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -41,8 +41,8 @@ public: ThreadPool *thread_pool = nullptr; ContextWQ *work_queue = nullptr; - SafeTimer *timer = nullptr; - Mutex timer_lock; + SafeTimer *timer; + ceph::mutex timer_lock = ceph::make_mutex("Journaler::timer_lock"); }; typedef cls::journal::Tag Tag; @@ -56,7 +56,7 @@ public: Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id, const std::string &client_id, const Settings &settings, CacheManagerHandler *cache_manager_handler); - Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, + Journaler(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, const std::string &journal_id, const std::string &client_id, const Settings &settings, CacheManagerHandler *cache_manager_handler); @@ -154,7 +154,7 @@ private: JournalRecorder *m_recorder = nullptr; JournalTrimmer *m_trimmer = nullptr; - void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, + void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, const std::string &journal_id, const Settings &settings); diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 35d979c7155..46f615002e0 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -45,14 +45,14 @@ bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter, ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, uint64_t object_num, SafeTimer &timer, - Mutex &timer_lock, uint8_t order, + ceph::mutex &timer_lock, uint8_t order, uint64_t max_fetch_bytes) : RefCountedObject(NULL, 0), m_object_num(object_num), m_oid(utils::get_object_name(object_oid_prefix, m_object_num)), m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order), m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order), m_watch_interval(0), m_watch_task(NULL), - m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)), + m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))), m_fetch_in_progress(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); @@ -60,8 +60,8 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, ObjectPlayer::~ObjectPlayer() { { - Mutex::Locker timer_locker(m_timer_lock); - Mutex::Locker locker(m_lock); + std::lock_guard timer_locker{m_timer_lock}; + std::lock_guard locker{m_lock}; ceph_assert(!m_fetch_in_progress); ceph_assert(m_watch_ctx == nullptr); } @@ -70,7 +70,7 @@ ObjectPlayer::~ObjectPlayer() { void ObjectPlayer::fetch(Context *on_finish) { ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_fetch_in_progress); m_fetch_in_progress = true; @@ -90,7 +90,7 @@ void ObjectPlayer::fetch(Context *on_finish) { void ObjectPlayer::watch(Context *on_fetch, double interval) { ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl; - Mutex::Locker timer_locker(m_timer_lock); + std::lock_guard timer_locker{m_timer_lock}; m_watch_interval = interval; ceph_assert(m_watch_ctx == nullptr); @@ -103,7 +103,7 @@ void ObjectPlayer::unwatch() { ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl; Context *watch_ctx = nullptr; { - Mutex::Locker timer_locker(m_timer_lock); + std::lock_guard timer_locker{m_timer_lock}; ceph_assert(!m_unwatched); m_unwatched = true; @@ -120,13 +120,13 @@ void ObjectPlayer::unwatch() { } void ObjectPlayer::front(Entry *entry) const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_entries.empty()); *entry = m_entries.front(); } void ObjectPlayer::pop_front() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_entries.empty()); auto &entry = m_entries.front(); @@ -148,7 +148,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl, return 0; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_fetch_in_progress); m_read_off += bl.length(); m_read_bl.append(bl); @@ -274,7 +274,7 @@ void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) { } void ObjectPlayer::schedule_watch() { - ceph_assert(m_timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_timer_lock)); if (m_watch_ctx == NULL) { return; } @@ -289,7 +289,7 @@ void ObjectPlayer::schedule_watch() { } bool ObjectPlayer::cancel_watch() { - ceph_assert(m_timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_timer_lock)); ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl; if (m_watch_task != nullptr) { bool canceled = m_timer.cancel_event(m_watch_task); @@ -302,7 +302,7 @@ bool ObjectPlayer::cancel_watch() { } void ObjectPlayer::handle_watch_task() { - ceph_assert(m_timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_timer_lock)); ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl; ceph_assert(m_watch_ctx != nullptr); @@ -318,7 +318,7 @@ void ObjectPlayer::handle_watch_fetched(int r) { Context *watch_ctx = nullptr; { - Mutex::Locker timer_locker(m_timer_lock); + std::lock_guard timer_locker{m_timer_lock}; std::swap(watch_ctx, m_watch_ctx); if (m_unwatched) { @@ -337,7 +337,7 @@ void ObjectPlayer::C_Fetch::finish(int r) { r = object_player->handle_fetch_complete(r, read_bl, &refetch); { - Mutex::Locker locker(object_player->m_lock); + std::lock_guard locker{object_player->m_lock}; object_player->m_fetch_in_progress = false; } diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h index 3dfad125bc1..568d31b0635 100644 --- a/src/journal/ObjectPlayer.h +++ b/src/journal/ObjectPlayer.h @@ -7,8 +7,7 @@ #include "include/Context.h" #include "include/interval_set.h" #include "include/rados/librados.hpp" -#include "common/Cond.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/RefCountedObj.h" #include "journal/Entry.h" #include @@ -37,7 +36,7 @@ public: }; ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, - uint64_t object_num, SafeTimer &timer, Mutex &timer_lock, + uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock, uint8_t order, uint64_t max_fetch_bytes); ~ObjectPlayer() override; @@ -55,16 +54,16 @@ public: void front(Entry *entry) const; void pop_front(); inline bool empty() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_entries.empty(); } inline void get_entries(Entries *entries) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; *entries = m_entries; } inline void get_invalid_ranges(InvalidRanges *invalid_ranges) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; *invalid_ranges = m_invalid_ranges; } @@ -79,7 +78,7 @@ public: } inline void set_max_fetch_bytes(uint64_t max_fetch_bytes) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_max_fetch_bytes = max_fetch_bytes; } @@ -108,7 +107,7 @@ private: CephContext *m_cct; SafeTimer &m_timer; - Mutex &m_timer_lock; + ceph::mutex &m_timer_lock; uint8_t m_order; uint64_t m_max_fetch_bytes; @@ -116,7 +115,7 @@ private: double m_watch_interval; Context *m_watch_task; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; bool m_fetch_in_progress; bufferlist m_read_bl; uint32_t m_read_off = 0; diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 2366759cc5c..4bb6d03c186 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -20,7 +20,7 @@ using std::shared_ptr; namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, - uint64_t object_number, shared_ptr lock, + uint64_t object_number, ceph::mutex* lock, ContextWQ *work_queue, Handler *handler, uint8_t order, int32_t max_in_flight_appends) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), @@ -59,7 +59,7 @@ void ObjectRecorder::set_append_batch_options(int flush_interval, << "flush_bytes=" << flush_bytes << ", " << "flush_age=" << flush_age << dendl; - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); m_flush_interval = flush_interval; m_flush_bytes = flush_bytes; m_flush_age = flush_age; @@ -68,7 +68,7 @@ void ObjectRecorder::set_append_batch_options(int flush_interval, bool ObjectRecorder::append(AppendBuffers &&append_buffers) { ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl; - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); FutureImplPtr last_flushed_future; for (auto& append_buffer : append_buffers) { @@ -91,13 +91,13 @@ void ObjectRecorder::flush(Context *on_safe) { Future future; { - Mutex::Locker locker(*m_lock); + std::unique_lock locker{*m_lock}; // if currently handling flush notifications, wait so that // we notify in the correct order (since lock is dropped on // callback) if (m_in_flight_flushes) { - m_in_flight_flushes_cond.Wait(*(m_lock.get())); + m_in_flight_flushes_cond.wait(locker); } // attach the flush to the most recent append @@ -124,15 +124,15 @@ void ObjectRecorder::flush(Context *on_safe) { void ObjectRecorder::flush(const FutureImplPtr &future) { ldout(m_cct, 20) << "flushing " << *future << dendl; - m_lock->Lock(); + m_lock->lock(); if (future->get_flush_handler().get() != &m_flush_handler) { // if we don't own this future, re-issue the flush so that it hits the // correct journal object owner future->flush(); - m_lock->Unlock(); + m_lock->unlock(); return; } else if (future->is_flush_in_progress()) { - m_lock->Unlock(); + m_lock->unlock(); return; } @@ -140,14 +140,14 @@ void ObjectRecorder::flush(const FutureImplPtr &future) { if (overflowed) { notify_handler_unlock(); } else { - m_lock->Unlock(); + m_lock->unlock(); } } void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { ldout(m_cct, 20) << dendl; - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); ceph_assert(m_in_flight_tids.empty()); ceph_assert(m_in_flight_appends.empty()); ceph_assert(m_object_closed || m_overflowed); @@ -161,7 +161,7 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { } bool ObjectRecorder::close() { - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); ldout(m_cct, 20) << dendl; send_appends(true, {}); @@ -176,7 +176,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { AppendBuffers append_buffers; { - m_lock->Lock(); + m_lock->lock(); auto tid_iter = m_in_flight_tids.find(tid); ceph_assert(tid_iter != m_in_flight_tids.end()); m_in_flight_tids.erase(tid_iter); @@ -193,7 +193,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { append_overflowed(); notify_handler_unlock(); } else { - m_lock->Unlock(); + m_lock->unlock(); } return; } @@ -208,7 +208,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { m_in_flight_appends.erase(iter); m_in_flight_flushes = true; - m_lock->Unlock(); + m_lock->unlock(); } // Flag the associated futures as complete. @@ -218,9 +218,9 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { } // wake up any flush requests that raced with a RADOS callback - m_lock->Lock(); + m_lock->lock(); m_in_flight_flushes = false; - m_in_flight_flushes_cond.Signal(); + m_in_flight_flushes_cond.notify_all(); if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) { // all remaining unsent appends should be redirected to new object @@ -230,7 +230,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { if (overflowed) { notify_handler_unlock(); } else { - m_lock->Unlock(); + m_lock->unlock(); } } } @@ -238,7 +238,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { void ObjectRecorder::append_overflowed() { ldout(m_cct, 10) << dendl; - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); ceph_assert(!m_in_flight_appends.empty()); InFlightAppends in_flight_appends; @@ -261,7 +261,7 @@ void ObjectRecorder::append_overflowed() { bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { ldout(m_cct, 20) << dendl; - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); if (m_object_closed || m_overflowed) { ldout(m_cct, 20) << "already closed or overflowed" << dendl; return false; @@ -369,13 +369,13 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { } void ObjectRecorder::notify_handler_unlock() { - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); if (m_object_closed) { - m_lock->Unlock(); + m_lock->unlock(); m_handler->closed(this); } else { // TODO need to delay completion until after aio_notify completes - m_lock->Unlock(); + m_lock->unlock(); m_handler->overflow(this); } } diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 1f6014773ef..30dde76c92b 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -7,8 +7,7 @@ #include "include/utime.h" #include "include/Context.h" #include "include/rados/librados.hpp" -#include "common/Cond.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/RefCountedObj.h" #include "common/WorkQueue.h" #include "journal/FutureImpl.h" @@ -39,7 +38,7 @@ public: }; ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, - uint64_t object_number, std::shared_ptr lock, + uint64_t object_number, ceph::mutex* lock, ContextWQ *work_queue, Handler *handler, uint8_t order, int32_t max_in_flight_appends); ~ObjectRecorder() override; @@ -61,7 +60,7 @@ public: void claim_append_buffers(AppendBuffers *append_buffers); bool is_closed() const { - ceph_assert(m_lock->is_locked()); + ceph_assert(ceph_mutex_is_locked(*m_lock)); return (m_object_closed && m_in_flight_appends.empty()); } bool close(); @@ -71,7 +70,7 @@ public: } inline size_t get_pending_appends() const { - Mutex::Locker locker(*m_lock); + std::lock_guard locker{*m_lock}; return m_pending_buffers.size(); } @@ -126,7 +125,7 @@ private: FlushHandler m_flush_handler; - mutable std::shared_ptr m_lock; + mutable ceph::mutex* m_lock; AppendBuffers m_pending_buffers; uint64_t m_pending_bytes = 0; utime_t m_last_flush_time; @@ -142,7 +141,7 @@ private: bufferlist m_prefetch_bl; bool m_in_flight_flushes; - Cond m_in_flight_flushes_cond; + ceph::condition_variable m_in_flight_flushes_cond; uint64_t m_in_flight_bytes = 0; bool send_appends(bool force, FutureImplPtr flush_sentinal);