]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: md_log_replay thread blocks waiting to be woken up 49672/head
authorzhikuodu <duzhk@qq.com>
Fri, 28 Oct 2022 08:57:43 +0000 (16:57 +0800)
committerVenky Shankar <vshankar@redhat.com>
Mon, 9 Jan 2023 10:33:25 +0000 (16:03 +0530)
Fixes: https://tracker.ceph.com/issues/57764
Signed-off-by: zhikuodu <duzhk@qq.com>
(cherry picked from commit aeed04497ae8dbdf0e57ac72702a8d73a5e3c481)

src/mds/MDLog.cc
src/osdc/Journaler.cc
src/osdc/Journaler.h

index eea60b48ae527d475d22aaf8a69166d1c24e4971..337c1025a3cbc7586ee683ee6fa0cabde186cc45 100644 (file)
@@ -1159,17 +1159,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
   // state doesn't change in between.
   uint32_t events_transcribed = 0;
   while (1) {
-    while (!old_journal->is_readable() &&
-          old_journal->get_read_pos() < old_journal->get_write_pos() &&
-          !old_journal->get_error()) {
-
-      // Issue a journal prefetch
-      C_SaferCond readable_waiter;
-      old_journal->wait_for_readable(&readable_waiter);
-
-      // Wait for a journal prefetch to complete
-      readable_waiter.wait();
-    }
+    old_journal->check_isreadable();
     if (old_journal->get_error()) {
       r = old_journal->get_error();
       dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
@@ -1309,13 +1299,7 @@ void MDLog::_replay_thread()
   int r = 0;
   while (1) {
     // wait for read?
-    while (!journaler->is_readable() &&
-          journaler->get_read_pos() < journaler->get_write_pos() &&
-          !journaler->get_error()) {
-      C_SaferCond readable_waiter;
-      journaler->wait_for_readable(&readable_waiter);
-      r = readable_waiter.wait();
-    }
+    journaler->check_isreadable(); 
     if (journaler->get_error()) {
       r = journaler->get_error();
       dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
index 8084a661d7d34be5cc04986dea37cc83a27d2fb6..04b90fb5952aaf2b430fa0cd97fa0bbc4dd49192 100644 (file)
@@ -991,7 +991,7 @@ void Journaler::_assimilate_prefetch()
 
     // Update readability (this will also hit any decode errors resulting
     // from bad data)
-    readable = _is_readable();
+    readable = _have_next_entry();
   }
 
   if ((got_any && !was_readable && readable) || read_pos == write_pos) {
@@ -1115,9 +1115,9 @@ void Journaler::_prefetch()
 
 
 /*
- * _is_readable() - return true if next entry is ready.
+ * _have_next_entry() - return true if next entry is ready.
  */
-bool Journaler::_is_readable()
+bool Journaler::_have_next_entry()
 {
   // anything to read?
   if (read_pos == write_pos)
@@ -1129,13 +1129,13 @@ bool Journaler::_is_readable()
     return true;
   }
 
-  ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+  ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
                  << ", but need " << need << " for next entry; fetch_len is "
                  << fetch_len << dendl;
 
   // partial fragment at the end?
   if (received_pos == write_pos) {
-    ldout(cct, 10) << "is_readable() detected partial entry at tail, "
+    ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
       "adjusting write_pos to " << read_pos << dendl;
 
     // adjust write_pos
@@ -1154,11 +1154,11 @@ bool Journaler::_is_readable()
 
   if (need > fetch_len) {
     temp_fetch_len = need;
-    ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+    ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
                   << dendl;
   }
 
-  ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
+  ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
   return false;
 }
 
@@ -1168,7 +1168,11 @@ bool Journaler::_is_readable()
 bool Journaler::is_readable()
 {
   lock_guard l(lock);
+  return _is_readable();
+}
 
+bool Journaler::_is_readable()
+{
   if (error != 0) {
     return false;
   }
@@ -1264,9 +1268,9 @@ bool Journaler::try_read_entry(bufferlist& bl)
   read_pos += consumed;
   try {
     // We were readable, we might not be any more
-    readable = _is_readable();
+    readable = _have_next_entry();
   } catch (const buffer::error &e) {
-    lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+    lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
     error = -EINVAL;
     return false;
   }
@@ -1286,6 +1290,11 @@ bool Journaler::try_read_entry(bufferlist& bl)
 void Journaler::wait_for_readable(Context *onreadable)
 {
   lock_guard l(lock);
+  _wait_for_readable(onreadable); 
+}
+
+void Journaler::_wait_for_readable(Context *onreadable)
+{
   if (is_stopping()) {
     finisher->queue(onreadable, -EAGAIN);
     return;
@@ -1606,3 +1615,17 @@ void Journaler::shutdown()
   waitfor_safe.clear();
 }
 
+void Journaler::check_isreadable()
+{
+  std::unique_lock l(lock);
+  while (!_is_readable() &&
+      get_read_pos() < get_write_pos() &&
+      !get_error()) {
+    C_SaferCond readable_waiter;
+    _wait_for_readable(&readable_waiter);
+    l.unlock();
+    readable_waiter.wait();
+    l.lock();
+  }
+  return ;
+}
index 2dcc1197e61119a841228edd6aaca5fad157b5eb..1b40eadec1696d89397c69cf26ee073c58f75999 100644 (file)
@@ -383,7 +383,7 @@ private:
    */
   void handle_write_error(int r);
 
-  bool _is_readable();
+  bool _have_next_entry();
 
   void _finish_erase(int data_result, C_OnFinisher *completion);
   class C_EraseFinish;
@@ -459,6 +459,7 @@ public:
   void wait_for_flush(Context *onsafe = 0);
   void flush(Context *onsafe = 0);
   void wait_for_readable(Context *onfinish);
+  void _wait_for_readable(Context *onfinish);
   bool have_waiter() const;
   void wait_for_prezero(Context *onfinish);
 
@@ -527,6 +528,7 @@ public:
   int get_error() { return error; }
   bool is_readonly() { return readonly; }
   bool is_readable();
+  bool _is_readable();
   bool try_read_entry(bufferlist& bl);
   uint64_t get_write_pos() const { return write_pos; }
   uint64_t get_write_safe_pos() const { return safe_pos; }
@@ -536,6 +538,7 @@ public:
   size_t get_journal_envelope_size() const { 
     return journal_stream.get_envelope_size(); 
   }
+  void check_isreadable();
 };
 WRITE_CLASS_ENCODER(Journaler::Header)