// state doesn't change in between.
uint32_t events_transcribed = 0;
while (1) {
- while (!old_journal->is_readable() &&
- old_journal->get_read_pos() < old_journal->get_write_pos() &&
- !old_journal->get_error()) {
-
- // Issue a journal prefetch
- C_SaferCond readable_waiter;
- old_journal->wait_for_readable(&readable_waiter);
-
- // Wait for a journal prefetch to complete
- readable_waiter.wait();
- }
+ old_journal->check_isreadable();
if (old_journal->get_error()) {
r = old_journal->get_error();
dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
int r = 0;
while (1) {
// wait for read?
- while (!journaler->is_readable() &&
- journaler->get_read_pos() < journaler->get_write_pos() &&
- !journaler->get_error()) {
- C_SaferCond readable_waiter;
- journaler->wait_for_readable(&readable_waiter);
- r = readable_waiter.wait();
- }
+ journaler->check_isreadable();
if (journaler->get_error()) {
r = journaler->get_error();
dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
// Update readability (this will also hit any decode errors resulting
// from bad data)
- readable = _is_readable();
+ readable = _have_next_entry();
}
if ((got_any && !was_readable && readable) || read_pos == write_pos) {
/*
- * _is_readable() - return true if next entry is ready.
+ * _have_next_entry() - return true if next entry is ready.
*/
-bool Journaler::_is_readable()
+bool Journaler::_have_next_entry()
{
// anything to read?
if (read_pos == write_pos)
return true;
}
- ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+ ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
<< ", but need " << need << " for next entry; fetch_len is "
<< fetch_len << dendl;
// partial fragment at the end?
if (received_pos == write_pos) {
- ldout(cct, 10) << "is_readable() detected partial entry at tail, "
+ ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
"adjusting write_pos to " << read_pos << dendl;
// adjust write_pos
if (need > fetch_len) {
temp_fetch_len = need;
- ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+ ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
<< dendl;
}
- ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
+ ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
return false;
}
bool Journaler::is_readable()
{
lock_guard l(lock);
+ return _is_readable();
+}
+bool Journaler::_is_readable()
+{
if (error != 0) {
return false;
}
read_pos += consumed;
try {
// We were readable, we might not be any more
- readable = _is_readable();
+ readable = _have_next_entry();
} catch (const buffer::error &e) {
- lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+ lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
error = -EINVAL;
return false;
}
void Journaler::wait_for_readable(Context *onreadable)
{
lock_guard l(lock);
+ _wait_for_readable(onreadable);
+}
+
+void Journaler::_wait_for_readable(Context *onreadable)
+{
if (is_stopping()) {
finisher->queue(onreadable, -EAGAIN);
return;
waitfor_safe.clear();
}
+void Journaler::check_isreadable()
+{
+ std::unique_lock l(lock);
+ while (!_is_readable() &&
+ get_read_pos() < get_write_pos() &&
+ !get_error()) {
+ C_SaferCond readable_waiter;
+ _wait_for_readable(&readable_waiter);
+ l.unlock();
+ readable_waiter.wait();
+ l.lock();
+ }
+ return ;
+}
*/
void handle_write_error(int r);
- bool _is_readable();
+ bool _have_next_entry();
void _finish_erase(int data_result, C_OnFinisher *completion);
class C_EraseFinish;
void wait_for_flush(Context *onsafe = 0);
void flush(Context *onsafe = 0);
void wait_for_readable(Context *onfinish);
+ void _wait_for_readable(Context *onfinish);
bool have_waiter() const;
void wait_for_prezero(Context *onfinish);
int get_error() { return error; }
bool is_readonly() { return readonly; }
bool is_readable();
+ bool _is_readable();
bool try_read_entry(bufferlist& bl);
uint64_t get_write_pos() const { return write_pos; }
uint64_t get_write_safe_pos() const { return safe_pos; }
size_t get_journal_envelope_size() const {
return journal_stream.get_envelope_size();
}
+ void check_isreadable();
};
WRITE_CLASS_ENCODER(Journaler::Header)