]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
log: broadcast cond signals
authorSage Weil <sage@inktank.com>
Fri, 28 Dec 2012 21:07:18 +0000 (13:07 -0800)
committerSage Weil <sage@inktank.com>
Fri, 28 Dec 2012 23:08:29 +0000 (15:08 -0800)
We were using a single cond, and only signalling one waiter.  That means
that if the flusher and several logging threads are waiting, and we hit
a limit, we the logger could signal another logger instead of the flusher,
and we could deadlock.

Similarly, if the flusher empties the queue, it might signal only a single
logger, and that logger could re-signal the flusher, and the other logger
could wait forever.

Intead, break the single cond into two: one for loggers, and one for the
flusher.  Always signal the (one) flusher, and always broadcast to all
loggers.

Backport: bobtail, argonaut
Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Dan Mick <dan.mick@inktank.com>
src/log/Log.cc
src/log/Log.h

index 51a340ffcb17c2d0d3370ebc5db6799a5fd75754..2912463f6b665104801610282f1492b3880d9685 100644 (file)
@@ -51,7 +51,10 @@ Log::Log(SubsystemMap *s)
   ret = pthread_mutex_init(&m_queue_mutex, NULL);
   assert(ret == 0);
 
-  ret = pthread_cond_init(&m_cond, NULL);
+  ret = pthread_cond_init(&m_cond_loggers, NULL);
+  assert(ret == 0);
+
+  ret = pthread_cond_init(&m_cond_flusher, NULL);
   assert(ret == 0);
 
   // kludge for prealloc testing
@@ -73,7 +76,8 @@ Log::~Log()
   pthread_spin_destroy(&m_lock);
   pthread_mutex_destroy(&m_queue_mutex);
   pthread_mutex_destroy(&m_flush_mutex);
-  pthread_cond_destroy(&m_cond);
+  pthread_cond_destroy(&m_cond_loggers);
+  pthread_cond_destroy(&m_cond_flusher);
 }
 
 
@@ -139,10 +143,10 @@ void Log::submit_entry(Entry *e)
 
   // wait for flush to catch up
   while (m_new.m_len > m_max_new)
-    pthread_cond_wait(&m_cond, &m_queue_mutex);
+    pthread_cond_wait(&m_cond_loggers, &m_queue_mutex);
 
   m_new.enqueue(e);
-  pthread_cond_signal(&m_cond);
+  pthread_cond_signal(&m_cond_flusher);
   pthread_mutex_unlock(&m_queue_mutex);
 }
 
@@ -169,7 +173,7 @@ void Log::flush()
   pthread_mutex_lock(&m_queue_mutex);
   EntryQueue t;
   t.swap(m_new);
-  pthread_cond_signal(&m_cond);
+  pthread_cond_broadcast(&m_cond_loggers);
   pthread_mutex_unlock(&m_queue_mutex);
   _flush(&t, &m_recent, false);
 
@@ -299,7 +303,8 @@ void Log::stop()
   assert(is_started());
   pthread_mutex_lock(&m_queue_mutex);
   m_stop = true;
-  pthread_cond_signal(&m_cond);
+  pthread_cond_signal(&m_cond_flusher);
+  pthread_cond_broadcast(&m_cond_loggers);
   pthread_mutex_unlock(&m_queue_mutex);
   join();
 }
@@ -315,7 +320,7 @@ void *Log::entry()
       continue;
     }
 
-    pthread_cond_wait(&m_cond, &m_queue_mutex);
+    pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
   }
   pthread_mutex_unlock(&m_queue_mutex);
   flush();
index ab83a7c86e43b289ef3cc63471693c3613529825..f6a27dc5b373fdfe257b4b4ac708d7933fd5535c 100644 (file)
@@ -24,7 +24,8 @@ class Log : private Thread
   pthread_spinlock_t m_lock;
   pthread_mutex_t m_queue_mutex;
   pthread_mutex_t m_flush_mutex;
-  pthread_cond_t m_cond;
+  pthread_cond_t m_cond_loggers;
+  pthread_cond_t m_cond_flusher;
 
   EntryQueue m_new;    ///< new entries
   EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail