From: Jason Dillaman Date: Wed, 2 Dec 2015 18:35:21 +0000 (-0500) Subject: journal: correct lock ordering issues discovered by lockdep X-Git-Tag: v10.0.2~119^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2a0263fc64958c7f95d04019ce5c5aa26f14b515;p=ceph.git journal: correct lock ordering issues discovered by lockdep Signed-off-by: Jason Dillaman --- diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc index 83657330b9f4..0ccb46fa90fe 100644 --- a/src/journal/FutureImpl.cc +++ b/src/journal/FutureImpl.cc @@ -91,7 +91,7 @@ bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) { } void FutureImpl::safe(int r) { - Mutex::Locker locker(m_lock); + m_lock.Lock(); assert(!m_safe); m_safe = true; if (m_return_value == 0) { @@ -100,12 +100,14 @@ void FutureImpl::safe(int r) { m_flush_handler.reset(); if (m_consistent) { - finish(); + finish_unlock(); + } else { + m_lock.Unlock(); } } void FutureImpl::consistent(int r) { - Mutex::Locker locker(m_lock); + m_lock.Lock(); assert(!m_consistent); m_consistent = true; m_prev_future.reset(); @@ -114,11 +116,13 @@ void FutureImpl::consistent(int r) { } if (m_safe) { - finish(); + finish_unlock(); + } else { + m_lock.Unlock(); } } -void FutureImpl::finish() { +void FutureImpl::finish_unlock() { assert(m_lock.is_locked()); assert(m_safe && m_consistent); @@ -130,7 +134,6 @@ void FutureImpl::finish() { it != contexts.end(); ++it) { (*it)->complete(m_return_value); } - m_lock.Lock(); } std::ostream &operator<<(std::ostream &os, const FutureImpl &future) { diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h index d936805acdc0..855c95889389 100644 --- a/src/journal/FutureImpl.h +++ b/src/journal/FutureImpl.h @@ -113,7 +113,7 @@ private: Contexts m_contexts; void consistent(int r); - void finish(); + void finish_unlock(); }; void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p); diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index 409b4b98ed10..90363c25b7db 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -186,12 +186,12 @@ void JournalMetadata::set_active_set(uint64_t object_set) { void JournalMetadata::flush_commit_position() { { + Mutex::Locker timer_locker(m_timer_lock); Mutex::Locker locker(m_lock); if (m_commit_position_task_ctx == NULL) { return; } - Mutex::Locker timer_locker(m_timer_lock); m_timer->cancel_event(m_commit_position_task_ctx); m_commit_position_task_ctx = NULL; } @@ -202,23 +202,28 @@ void JournalMetadata::set_commit_position( const ObjectSetPosition &commit_position, Context *on_safe) { assert(on_safe != NULL); - Mutex::Locker locker(m_lock); - ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position - << ", new=" << commit_position << dendl; - if (commit_position <= m_client.commit_position || - commit_position <= m_commit_position) { - on_safe->complete(-ESTALE); - return; - } + Context *stale_ctx = nullptr; + { + Mutex::Locker timer_locker(m_timer_lock); + Mutex::Locker locker(m_lock); + ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position + << ", new=" << commit_position << dendl; + if (commit_position <= m_client.commit_position || + commit_position <= m_commit_position) { + stale_ctx = on_safe; + } else { + stale_ctx = m_commit_position_ctx; - if (m_commit_position_ctx != NULL) { - m_commit_position_ctx->complete(-ESTALE); + m_client.commit_position = commit_position; + m_commit_position = commit_position; + m_commit_position_ctx = on_safe; + schedule_commit_task(); + } } - m_client.commit_position = commit_position; - m_commit_position = commit_position; - m_commit_position_ctx = on_safe; - schedule_commit_task(); + if (stale_ctx != nullptr) { + stale_ctx->complete(-ESTALE); + } } void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) { @@ -298,9 +303,9 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { } void JournalMetadata::schedule_commit_task() { + assert(m_timer_lock.is_locked()); assert(m_lock.is_locked()); - Mutex::Locker timer_locker(m_timer_lock); if (m_commit_position_task_ctx == NULL) { m_commit_position_task_ctx = new C_CommitPositionTask(this); m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx); @@ -357,8 +362,8 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { void JournalMetadata::handle_watch_error(int err) { lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; - Mutex::Locker locker(m_lock); Mutex::Locker timer_locker(m_timer_lock); + Mutex::Locker locker(m_lock); if (m_initialized && err != -ENOENT) { schedule_watch_reset(); } diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 939722eb31f7..8ef5dfeb04c6 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -29,6 +29,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, ObjectPlayer::~ObjectPlayer() { { + Mutex::Locker timer_locker(m_timer_lock); Mutex::Locker locker(m_lock); assert(!m_fetch_in_progress); assert(m_watch_ctx == NULL); @@ -59,7 +60,6 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) { Mutex::Locker timer_locker(m_timer_lock); m_watch_interval = interval; - Mutex::Locker locker(m_lock); assert(m_watch_ctx == NULL); m_watch_ctx = on_fetch; @@ -69,16 +69,12 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) { void ObjectPlayer::unwatch() { ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl; Mutex::Locker timer_locker(m_timer_lock); - Mutex::Locker locker(m_lock); - cancel_watch(); m_watch_ctx = NULL; - m_timer_lock.Unlock(); while (m_watch_in_progress) { - m_watch_in_progress_cond.Wait(m_lock); + m_watch_in_progress_cond.Wait(m_timer_lock); } - m_timer_lock.Lock(); } void ObjectPlayer::front(Entry *entry) const { @@ -170,7 +166,6 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) { void ObjectPlayer::schedule_watch() { assert(m_timer_lock.is_locked()); - assert(m_lock.is_locked()); if (m_watch_ctx == NULL) { return; } @@ -194,13 +189,10 @@ void ObjectPlayer::handle_watch_task() { assert(m_timer_lock.is_locked()); ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl; - { - Mutex::Locker locker(m_lock); - assert(m_watch_ctx != NULL); + assert(m_watch_ctx != NULL); - m_watch_in_progress = true; - m_watch_task = NULL; - } + m_watch_in_progress = true; + m_watch_task = NULL; fetch(new C_WatchFetch(this)); } @@ -211,7 +203,6 @@ void ObjectPlayer::handle_watch_fetched(int r) { Context *on_finish = NULL; { Mutex::Locker timer_locker(m_timer_lock); - Mutex::Locker locker(m_lock); assert(m_watch_in_progress); if (r == -ENOENT) { schedule_watch(); @@ -226,7 +217,7 @@ void ObjectPlayer::handle_watch_fetched(int r) { } { - Mutex::Locker locker(m_lock); + Mutex::Locker locker(m_timer_lock); m_watch_in_progress = false; m_watch_in_progress_cond.Signal(); } diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index cf96b9417ecd..7c0143f025e5 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -44,11 +44,12 @@ ObjectRecorder::~ObjectRecorder() { bool ObjectRecorder::append(const AppendBuffers &append_buffers) { FutureImplPtr last_flushed_future; + bool schedule_append = false; { Mutex::Locker locker(m_lock); for (AppendBuffers::const_iterator iter = append_buffers.begin(); iter != append_buffers.end(); ++iter) { - if (append(*iter)) { + if (append(*iter, &schedule_append)) { last_flushed_future = iter->first; } } @@ -56,6 +57,10 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) { if (last_flushed_future) { flush(last_flushed_future); + } else if (schedule_append) { + schedule_append_task(); + } else { + cancel_append_task(); } return (m_size + m_pending_bytes >= m_soft_max_size); } @@ -63,6 +68,7 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) { void ObjectRecorder::flush(Context *on_safe) { ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + cancel_append_task(); Future future; { Mutex::Locker locker(m_lock); @@ -77,7 +83,6 @@ void ObjectRecorder::flush(Context *on_safe) { assert(!append_buffers.empty()); future = Future(append_buffers.rbegin()->first); } - cancel_append_task(); } if (future.is_valid()) { @@ -130,11 +135,11 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { bool ObjectRecorder::close_object() { ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + cancel_append_task(); + Mutex::Locker locker(m_lock); m_object_closed = true; - if (flush_appends(true)) { - cancel_append_task(); - } + flush_appends(true); return m_in_flight_appends.empty(); } @@ -162,17 +167,16 @@ void ObjectRecorder::schedule_append_task() { } } -bool ObjectRecorder::append(const AppendBuffer &append_buffer) { +bool ObjectRecorder::append(const AppendBuffer &append_buffer, + bool *schedule_append) { assert(m_lock.is_locked()); bool flush_requested = append_buffer.first->attach(&m_flush_handler); m_append_buffers.push_back(append_buffer); m_pending_bytes += append_buffer.second.length(); - if (flush_appends(false)) { - cancel_append_task(); - } else { - schedule_append_task(); + if (!flush_appends(false)) { + *schedule_append = true; } return flush_requested; } @@ -202,21 +206,30 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid << ", r=" << r << dendl; - Mutex::Locker locker(m_lock); - InFlightAppends::iterator iter = m_in_flight_appends.find(tid); - if (iter == m_in_flight_appends.end()) { - // must have seen an overflow on a previous append op - assert(m_overflowed); - return; - } else if (r == -EOVERFLOW) { - m_overflowed = true; - append_overflowed(tid); - return; - } + AppendBuffers append_buffers; + { + Mutex::Locker locker(m_lock); + InFlightAppends::iterator iter = m_in_flight_appends.find(tid); + if (iter == m_in_flight_appends.end()) { + // must have seen an overflow on a previous append op + assert(m_overflowed); + return; + } else if (r == -EOVERFLOW) { + m_overflowed = true; + append_overflowed(tid); + return; + } + + assert(!m_overflowed || r != 0); + append_buffers.swap(iter->second); + assert(!append_buffers.empty()); - assert(!m_overflowed || r != 0); - AppendBuffers &append_buffers = iter->second; - assert(!append_buffers.empty()); + m_in_flight_appends.erase(iter); + if (m_in_flight_appends.empty() && m_object_closed) { + // all remaining unsent appends should be redirected to new object + notify_overflow(); + } + } // Flag the associated futures as complete. for (AppendBuffers::iterator buf_it = append_buffers.begin(); @@ -225,12 +238,6 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { << dendl; buf_it->first->safe(r); } - m_in_flight_appends.erase(iter); - - if (m_in_flight_appends.empty() && m_object_closed) { - // all remaining unsent appends should be redirected to new object - notify_overflow(); - } } void ObjectRecorder::append_overflowed(uint64_t tid) { diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 566c41fd780b..f9a776863de3 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -135,7 +135,7 @@ private: void cancel_append_task(); void schedule_append_task(); - bool append(const AppendBuffer &append_buffer); + bool append(const AppendBuffer &append_buffer, bool *schedule_append); bool flush_appends(bool force); void handle_append_flushed(uint64_t tid, int r); void append_overflowed(uint64_t tid);