]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: md_log_replay thread blocks waiting to be woken up 49671/head
authorzhikuodu <duzhk@qq.com>
Fri, 28 Oct 2022 08:57:43 +0000 (16:57 +0800)
committerVenky Shankar <vshankar@redhat.com>
Mon, 9 Jan 2023 10:28:03 +0000 (15:58 +0530)
Fixes: https://tracker.ceph.com/issues/57764
Signed-off-by: zhikuodu <duzhk@qq.com>
(cherry picked from commit aeed04497ae8dbdf0e57ac72702a8d73a5e3c481)

src/mds/MDLog.cc
src/osdc/Journaler.cc
src/osdc/Journaler.h

index 79143021e494715ae36ae2cb96720924b5d9dd3c..05f815c8ef48a3e25a1a5448d0bb907919f6fbb6 100644 (file)
@@ -1158,17 +1158,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
   // state doesn't change in between.
   uint32_t events_transcribed = 0;
   while (1) {
-    while (!old_journal->is_readable() &&
-          old_journal->get_read_pos() < old_journal->get_write_pos() &&
-          !old_journal->get_error()) {
-
-      // Issue a journal prefetch
-      C_SaferCond readable_waiter;
-      old_journal->wait_for_readable(&readable_waiter);
-
-      // Wait for a journal prefetch to complete
-      readable_waiter.wait();
-    }
+    old_journal->check_isreadable();
     if (old_journal->get_error()) {
       r = old_journal->get_error();
       dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
@@ -1308,13 +1298,7 @@ void MDLog::_replay_thread()
   int r = 0;
   while (1) {
     // wait for read?
-    while (!journaler->is_readable() &&
-          journaler->get_read_pos() < journaler->get_write_pos() &&
-          !journaler->get_error()) {
-      C_SaferCond readable_waiter;
-      journaler->wait_for_readable(&readable_waiter);
-      r = readable_waiter.wait();
-    }
+    journaler->check_isreadable(); 
     if (journaler->get_error()) {
       r = journaler->get_error();
       dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
index 6e1cbd930ac2cf4500cc2f2dab8ce24e7095ba69..35641b17665b26ed43bd6738b15fd48d0cade2f1 100644 (file)
@@ -990,7 +990,7 @@ void Journaler::_assimilate_prefetch()
 
     // Update readability (this will also hit any decode errors resulting
     // from bad data)
-    readable = _is_readable();
+    readable = _have_next_entry();
   }
 
   if ((got_any && !was_readable && readable) || read_pos == write_pos) {
@@ -1114,9 +1114,9 @@ void Journaler::_prefetch()
 
 
 /*
- * _is_readable() - return true if next entry is ready.
+ * _have_next_entry() - return true if next entry is ready.
  */
-bool Journaler::_is_readable()
+bool Journaler::_have_next_entry()
 {
   // anything to read?
   if (read_pos == write_pos)
@@ -1128,13 +1128,13 @@ bool Journaler::_is_readable()
     return true;
   }
 
-  ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+  ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
                  << ", but need " << need << " for next entry; fetch_len is "
                  << fetch_len << dendl;
 
   // partial fragment at the end?
   if (received_pos == write_pos) {
-    ldout(cct, 10) << "is_readable() detected partial entry at tail, "
+    ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
       "adjusting write_pos to " << read_pos << dendl;
 
     // adjust write_pos
@@ -1153,11 +1153,11 @@ bool Journaler::_is_readable()
 
   if (need > fetch_len) {
     temp_fetch_len = need;
-    ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+    ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
                   << dendl;
   }
 
-  ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
+  ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
   return false;
 }
 
@@ -1167,7 +1167,11 @@ bool Journaler::_is_readable()
 bool Journaler::is_readable()
 {
   lock_guard l(lock);
+  return _is_readable();
+}
 
+bool Journaler::_is_readable()
+{
   if (error != 0) {
     return false;
   }
@@ -1263,9 +1267,9 @@ bool Journaler::try_read_entry(bufferlist& bl)
   read_pos += consumed;
   try {
     // We were readable, we might not be any more
-    readable = _is_readable();
+    readable = _have_next_entry();
   } catch (const buffer::error &e) {
-    lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+    lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
     error = -EINVAL;
     return false;
   }
@@ -1285,6 +1289,11 @@ bool Journaler::try_read_entry(bufferlist& bl)
 void Journaler::wait_for_readable(Context *onreadable)
 {
   lock_guard l(lock);
+  _wait_for_readable(onreadable); 
+}
+
+void Journaler::_wait_for_readable(Context *onreadable)
+{
   if (is_stopping()) {
     finisher->queue(onreadable, -EAGAIN);
     return;
@@ -1605,3 +1614,17 @@ void Journaler::shutdown()
   waitfor_safe.clear();
 }
 
+void Journaler::check_isreadable()
+{
+  std::unique_lock l(lock);
+  while (!_is_readable() &&
+      get_read_pos() < get_write_pos() &&
+      !get_error()) {
+    C_SaferCond readable_waiter;
+    _wait_for_readable(&readable_waiter);
+    l.unlock();
+    readable_waiter.wait();
+    l.lock();
+  }
+  return ;
+}
index 3e8f0f6650b8f01f79e275891f47a4b166b5f402..d41c1ede6788e67f8ccf8bf7a887424810807025 100644 (file)
@@ -383,7 +383,7 @@ private:
    */
   void handle_write_error(int r);
 
-  bool _is_readable();
+  bool _have_next_entry();
 
   void _finish_erase(int data_result, C_OnFinisher *completion);
   class C_EraseFinish;
@@ -459,6 +459,7 @@ public:
   void wait_for_flush(Context *onsafe = 0);
   void flush(Context *onsafe = 0);
   void wait_for_readable(Context *onfinish);
+  void _wait_for_readable(Context *onfinish);
   bool have_waiter() const;
   void wait_for_prezero(Context *onfinish);
 
@@ -527,6 +528,7 @@ public:
   int get_error() { return error; }
   bool is_readonly() { return readonly; }
   bool is_readable();
+  bool _is_readable();
   bool try_read_entry(bufferlist& bl);
   uint64_t get_write_pos() const { return write_pos; }
   uint64_t get_write_safe_pos() const { return safe_pos; }
@@ -536,6 +538,7 @@ public:
   size_t get_journal_envelope_size() const { 
     return journal_stream.get_envelope_size(); 
   }
+  void check_isreadable();
 };
 WRITE_CLASS_ENCODER(Journaler::Header)