From: zhikuodu Date: Fri, 28 Oct 2022 08:57:43 +0000 (+0800) Subject: mds: md_log_replay thread blocks waiting to be woken up X-Git-Tag: v17.2.6~166^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c82406ebbf3702283e3134b5bbf22a4ffd0717e6;p=ceph.git mds: md_log_replay thread blocks waiting to be woken up Fixes: https://tracker.ceph.com/issues/57764 Signed-off-by: zhikuodu (cherry picked from commit aeed04497ae8dbdf0e57ac72702a8d73a5e3c481) --- diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index eea60b48ae52..337c1025a3cb 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -1159,17 +1159,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa // state doesn't change in between. uint32_t events_transcribed = 0; while (1) { - while (!old_journal->is_readable() && - old_journal->get_read_pos() < old_journal->get_write_pos() && - !old_journal->get_error()) { - - // Issue a journal prefetch - C_SaferCond readable_waiter; - old_journal->wait_for_readable(&readable_waiter); - - // Wait for a journal prefetch to complete - readable_waiter.wait(); - } + old_journal->check_isreadable(); if (old_journal->get_error()) { r = old_journal->get_error(); dout(0) << "_replay journaler got error " << r << ", aborting" << dendl; @@ -1309,13 +1299,7 @@ void MDLog::_replay_thread() int r = 0; while (1) { // wait for read? - while (!journaler->is_readable() && - journaler->get_read_pos() < journaler->get_write_pos() && - !journaler->get_error()) { - C_SaferCond readable_waiter; - journaler->wait_for_readable(&readable_waiter); - r = readable_waiter.wait(); - } + journaler->check_isreadable(); if (journaler->get_error()) { r = journaler->get_error(); dout(0) << "_replay journaler got error " << r << ", aborting" << dendl; diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 8084a661d7d3..04b90fb5952a 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -991,7 +991,7 @@ void Journaler::_assimilate_prefetch() // Update readability (this will also hit any decode errors resulting // from bad data) - readable = _is_readable(); + readable = _have_next_entry(); } if ((got_any && !was_readable && readable) || read_pos == write_pos) { @@ -1115,9 +1115,9 @@ void Journaler::_prefetch() /* - * _is_readable() - return true if next entry is ready. + * _have_next_entry() - return true if next entry is ready. */ -bool Journaler::_is_readable() +bool Journaler::_have_next_entry() { // anything to read? if (read_pos == write_pos) @@ -1129,13 +1129,13 @@ bool Journaler::_is_readable() return true; } - ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() + ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length() << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl; // partial fragment at the end? if (received_pos == write_pos) { - ldout(cct, 10) << "is_readable() detected partial entry at tail, " + ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, " "adjusting write_pos to " << read_pos << dendl; // adjust write_pos @@ -1154,11 +1154,11 @@ bool Journaler::_is_readable() if (need > fetch_len) { temp_fetch_len = need; - ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len + ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len << dendl; } - ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; + ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl; return false; } @@ -1168,7 +1168,11 @@ bool Journaler::_is_readable() bool Journaler::is_readable() { lock_guard l(lock); + return _is_readable(); +} +bool Journaler::_is_readable() +{ if (error != 0) { return false; } @@ -1264,9 +1268,9 @@ bool Journaler::try_read_entry(bufferlist& bl) read_pos += consumed; try { // We were readable, we might not be any more - readable = _is_readable(); + readable = _have_next_entry(); } catch (const buffer::error &e) { - lderr(cct) << __func__ << ": decode error from _is_readable" << dendl; + lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl; error = -EINVAL; return false; } @@ -1286,6 +1290,11 @@ bool Journaler::try_read_entry(bufferlist& bl) void Journaler::wait_for_readable(Context *onreadable) { lock_guard l(lock); + _wait_for_readable(onreadable); +} + +void Journaler::_wait_for_readable(Context *onreadable) +{ if (is_stopping()) { finisher->queue(onreadable, -EAGAIN); return; @@ -1606,3 +1615,17 @@ void Journaler::shutdown() waitfor_safe.clear(); } +void Journaler::check_isreadable() +{ + std::unique_lock l(lock); + while (!_is_readable() && + get_read_pos() < get_write_pos() && + !get_error()) { + C_SaferCond readable_waiter; + _wait_for_readable(&readable_waiter); + l.unlock(); + readable_waiter.wait(); + l.lock(); + } + return ; +} diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 2dcc1197e611..1b40eadec169 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -383,7 +383,7 @@ private: */ void handle_write_error(int r); - bool _is_readable(); + bool _have_next_entry(); void _finish_erase(int data_result, C_OnFinisher *completion); class C_EraseFinish; @@ -459,6 +459,7 @@ public: void wait_for_flush(Context *onsafe = 0); void flush(Context *onsafe = 0); void wait_for_readable(Context *onfinish); + void _wait_for_readable(Context *onfinish); bool have_waiter() const; void wait_for_prezero(Context *onfinish); @@ -527,6 +528,7 @@ public: int get_error() { return error; } bool is_readonly() { return readonly; } bool is_readable(); + bool _is_readable(); bool try_read_entry(bufferlist& bl); uint64_t get_write_pos() const { return write_pos; } uint64_t get_write_safe_pos() const { return safe_pos; } @@ -536,6 +538,7 @@ public: size_t get_journal_envelope_size() const { return journal_stream.get_envelope_size(); } + void check_isreadable(); }; WRITE_CLASS_ENCODER(Journaler::Header)