]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: correct lock ordering issues discovered by lockdep
authorJason Dillaman <dillaman@redhat.com>
Wed, 2 Dec 2015 18:35:21 +0000 (13:35 -0500)
committerJason Dillaman <dillaman@redhat.com>
Wed, 2 Dec 2015 19:05:17 +0000 (14:05 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/FutureImpl.cc
src/journal/FutureImpl.h
src/journal/JournalMetadata.cc
src/journal/ObjectPlayer.cc
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h

index 83657330b9f470aead984e0c00b162f0e7a43a80..0ccb46fa90feef8b8070746f656cee758e589061 100644 (file)
@@ -91,7 +91,7 @@ bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
 }
 
 void FutureImpl::safe(int r) {
-  Mutex::Locker locker(m_lock);
+  m_lock.Lock();
   assert(!m_safe);
   m_safe = true;
   if (m_return_value == 0) {
@@ -100,12 +100,14 @@ void FutureImpl::safe(int r) {
 
   m_flush_handler.reset();
   if (m_consistent) {
-    finish();
+    finish_unlock();
+  } else {
+    m_lock.Unlock();
   }
 }
 
 void FutureImpl::consistent(int r) {
-  Mutex::Locker locker(m_lock);
+  m_lock.Lock();
   assert(!m_consistent);
   m_consistent = true;
   m_prev_future.reset();
@@ -114,11 +116,13 @@ void FutureImpl::consistent(int r) {
   }
 
   if (m_safe) {
-    finish();
+    finish_unlock();
+  } else {
+    m_lock.Unlock();
   }
 }
 
-void FutureImpl::finish() {
+void FutureImpl::finish_unlock() {
   assert(m_lock.is_locked());
   assert(m_safe && m_consistent);
 
@@ -130,7 +134,6 @@ void FutureImpl::finish() {
        it != contexts.end(); ++it) {
     (*it)->complete(m_return_value);
   }
-  m_lock.Lock();
 }
 
 std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
index d936805acdc0c3ad5f737bb41bbd14689712331b..855c95889389da9a11288c6443211c0bd7088412 100644 (file)
@@ -113,7 +113,7 @@ private:
   Contexts m_contexts;
 
   void consistent(int r);
-  void finish();
+  void finish_unlock();
 };
 
 void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
index 409b4b98ed1085ea27777c285ddec8fa67d63e56..90363c25b7dbfccc83d1ca3a7d51c2f1f680b849 100644 (file)
@@ -186,12 +186,12 @@ void JournalMetadata::set_active_set(uint64_t object_set) {
 
 void JournalMetadata::flush_commit_position() {
   {
+    Mutex::Locker timer_locker(m_timer_lock);
     Mutex::Locker locker(m_lock);
     if (m_commit_position_task_ctx == NULL) {
       return;
     }
 
-    Mutex::Locker timer_locker(m_timer_lock);
     m_timer->cancel_event(m_commit_position_task_ctx);
     m_commit_position_task_ctx = NULL;
   }
@@ -202,23 +202,28 @@ void JournalMetadata::set_commit_position(
     const ObjectSetPosition &commit_position, Context *on_safe) {
   assert(on_safe != NULL);
 
-  Mutex::Locker locker(m_lock);
-  ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
-                   << ", new=" << commit_position << dendl;
-  if (commit_position <= m_client.commit_position ||
-      commit_position <= m_commit_position) {
-    on_safe->complete(-ESTALE);
-    return;
-  }
+  Context *stale_ctx = nullptr;
+  {
+    Mutex::Locker timer_locker(m_timer_lock);
+    Mutex::Locker locker(m_lock);
+    ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
+                     << ", new=" << commit_position << dendl;
+    if (commit_position <= m_client.commit_position ||
+        commit_position <= m_commit_position) {
+      stale_ctx = on_safe;
+    } else {
+      stale_ctx = m_commit_position_ctx;
 
-  if (m_commit_position_ctx != NULL) {
-    m_commit_position_ctx->complete(-ESTALE);
+      m_client.commit_position = commit_position;
+      m_commit_position = commit_position;
+      m_commit_position_ctx = on_safe;
+      schedule_commit_task();
+    }
   }
 
-  m_client.commit_position = commit_position;
-  m_commit_position = commit_position;
-  m_commit_position_ctx = on_safe;
-  schedule_commit_task();
+  if (stale_ctx != nullptr) {
+    stale_ctx->complete(-ESTALE);
+  }
 }
 
 void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
@@ -298,9 +303,9 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
 }
 
 void JournalMetadata::schedule_commit_task() {
+  assert(m_timer_lock.is_locked());
   assert(m_lock.is_locked());
 
-  Mutex::Locker timer_locker(m_timer_lock);
   if (m_commit_position_task_ctx == NULL) {
     m_commit_position_task_ctx = new C_CommitPositionTask(this);
     m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
@@ -357,8 +362,8 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
 
 void JournalMetadata::handle_watch_error(int err) {
   lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
-  Mutex::Locker locker(m_lock);
   Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker locker(m_lock);
   if (m_initialized && err != -ENOENT) {
     schedule_watch_reset();
   }
index 939722eb31f7f7a90ef2cd8a2b8cca42a91863a4..8ef5dfeb04c6f65340b05dbffbfe7ed9352add67 100644 (file)
@@ -29,6 +29,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
 
 ObjectPlayer::~ObjectPlayer() {
   {
+    Mutex::Locker timer_locker(m_timer_lock);
     Mutex::Locker locker(m_lock);
     assert(!m_fetch_in_progress);
     assert(m_watch_ctx == NULL);
@@ -59,7 +60,6 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) {
   Mutex::Locker timer_locker(m_timer_lock);
   m_watch_interval = interval;
 
-  Mutex::Locker locker(m_lock);
   assert(m_watch_ctx == NULL);
   m_watch_ctx = on_fetch;
 
@@ -69,16 +69,12 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) {
 void ObjectPlayer::unwatch() {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
   Mutex::Locker timer_locker(m_timer_lock);
-  Mutex::Locker locker(m_lock);
-
   cancel_watch();
 
   m_watch_ctx = NULL;
-  m_timer_lock.Unlock();
   while (m_watch_in_progress) {
-    m_watch_in_progress_cond.Wait(m_lock);
+    m_watch_in_progress_cond.Wait(m_timer_lock);
   }
-  m_timer_lock.Lock();
 }
 
 void ObjectPlayer::front(Entry *entry) const {
@@ -170,7 +166,6 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
 
 void ObjectPlayer::schedule_watch() {
   assert(m_timer_lock.is_locked());
-  assert(m_lock.is_locked());
   if (m_watch_ctx == NULL) {
     return;
   }
@@ -194,13 +189,10 @@ void ObjectPlayer::handle_watch_task() {
   assert(m_timer_lock.is_locked());
 
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
-  {
-    Mutex::Locker locker(m_lock);
-    assert(m_watch_ctx != NULL);
+  assert(m_watch_ctx != NULL);
 
-    m_watch_in_progress = true;
-    m_watch_task = NULL;
-  }
+  m_watch_in_progress = true;
+  m_watch_task = NULL;
   fetch(new C_WatchFetch(this));
 }
 
@@ -211,7 +203,6 @@ void ObjectPlayer::handle_watch_fetched(int r) {
   Context *on_finish = NULL;
   {
     Mutex::Locker timer_locker(m_timer_lock);
-    Mutex::Locker locker(m_lock);
     assert(m_watch_in_progress);
     if (r == -ENOENT) {
       schedule_watch();
@@ -226,7 +217,7 @@ void ObjectPlayer::handle_watch_fetched(int r) {
   }
 
   {
-    Mutex::Locker locker(m_lock);
+    Mutex::Locker locker(m_timer_lock);
     m_watch_in_progress = false;
     m_watch_in_progress_cond.Signal();
   }
index cf96b9417ecdb3b2f07fa443f4249d7aa75fdd1f..7c0143f025e53977aeaedd897839b25964c21d25 100644 (file)
@@ -44,11 +44,12 @@ ObjectRecorder::~ObjectRecorder() {
 
 bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
   FutureImplPtr last_flushed_future;
+  bool schedule_append = false;
   {
     Mutex::Locker locker(m_lock);
     for (AppendBuffers::const_iterator iter = append_buffers.begin();
          iter != append_buffers.end(); ++iter) {
-      if (append(*iter)) {
+      if (append(*iter, &schedule_append)) {
         last_flushed_future = iter->first;
       }
     }
@@ -56,6 +57,10 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
 
   if (last_flushed_future) {
     flush(last_flushed_future);
+  } else if (schedule_append) {
+    schedule_append_task();
+  } else {
+    cancel_append_task();
   }
   return (m_size + m_pending_bytes >= m_soft_max_size);
 }
@@ -63,6 +68,7 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
 void ObjectRecorder::flush(Context *on_safe) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
+  cancel_append_task();
   Future future;
   {
     Mutex::Locker locker(m_lock);
@@ -77,7 +83,6 @@ void ObjectRecorder::flush(Context *on_safe) {
       assert(!append_buffers.empty());
       future = Future(append_buffers.rbegin()->first);
     }
-    cancel_append_task();
   }
 
   if (future.is_valid()) {
@@ -130,11 +135,11 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
 bool ObjectRecorder::close_object() {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
+  cancel_append_task();
+
   Mutex::Locker locker(m_lock);
   m_object_closed = true;
-  if (flush_appends(true)) {
-    cancel_append_task();
-  }
+  flush_appends(true);
   return m_in_flight_appends.empty();
 }
 
@@ -162,17 +167,16 @@ void ObjectRecorder::schedule_append_task() {
   }
 }
 
-bool ObjectRecorder::append(const AppendBuffer &append_buffer) {
+bool ObjectRecorder::append(const AppendBuffer &append_buffer,
+                            bool *schedule_append) {
   assert(m_lock.is_locked());
 
   bool flush_requested = append_buffer.first->attach(&m_flush_handler);
   m_append_buffers.push_back(append_buffer);
   m_pending_bytes += append_buffer.second.length();
 
-  if (flush_appends(false)) {
-    cancel_append_task();
-  } else {
-    schedule_append_task();
+  if (!flush_appends(false)) {
+    *schedule_append = true;
   }
   return flush_requested;
 }
@@ -202,21 +206,30 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
                    << ", r=" << r << dendl;
 
-  Mutex::Locker locker(m_lock);
-  InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
-  if (iter == m_in_flight_appends.end()) {
-    // must have seen an overflow on a previous append op
-    assert(m_overflowed);
-    return;
-  } else if (r == -EOVERFLOW) {
-    m_overflowed = true;
-    append_overflowed(tid);
-    return;
-  }
+  AppendBuffers append_buffers;
+  {
+    Mutex::Locker locker(m_lock);
+    InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+    if (iter == m_in_flight_appends.end()) {
+      // must have seen an overflow on a previous append op
+      assert(m_overflowed);
+      return;
+    } else if (r == -EOVERFLOW) {
+      m_overflowed = true;
+      append_overflowed(tid);
+      return;
+    }
+
+    assert(!m_overflowed || r != 0);
+    append_buffers.swap(iter->second);
+    assert(!append_buffers.empty());
 
-  assert(!m_overflowed || r != 0);
-  AppendBuffers &append_buffers = iter->second;
-  assert(!append_buffers.empty());
+    m_in_flight_appends.erase(iter);
+    if (m_in_flight_appends.empty() && m_object_closed) {
+      // all remaining unsent appends should be redirected to new object
+      notify_overflow();
+    }
+  }
 
   // Flag the associated futures as complete.
   for (AppendBuffers::iterator buf_it = append_buffers.begin();
@@ -225,12 +238,6 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
                      << dendl;
     buf_it->first->safe(r);
   }
-  m_in_flight_appends.erase(iter);
-
-  if (m_in_flight_appends.empty() && m_object_closed) {
-    // all remaining unsent appends should be redirected to new object
-    notify_overflow();
-  }
 }
 
 void ObjectRecorder::append_overflowed(uint64_t tid) {
index 566c41fd780b12ae2d891b12f74d7b2e66c86f72..f9a776863de312f17561c4ee5a70f112409247b9 100644 (file)
@@ -135,7 +135,7 @@ private:
   void cancel_append_task();
   void schedule_append_task();
 
-  bool append(const AppendBuffer &append_buffer);
+  bool append(const AppendBuffer &append_buffer, bool *schedule_append);
   bool flush_appends(bool force);
   void handle_append_flushed(uint64_t tid, int r);
   void append_overflowed(uint64_t tid);