} else if (r == 0) {
Mutex::Locker l(lock);
dout(4) << "open complete" << dendl;
- if (r == 0) {
- journaler.set_writeable();
+
+ // Journaler only guarantees entries before head write_pos have been
+ // fully flushed. Before appending new entries, we need to find and
+ // drop any partial written entry.
+ if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
+ dout(4) << "recovering write_pos" << dendl;
+ journaler.set_read_pos(journaler.last_committed.write_pos);
+ _recover(completion);
+ return;
}
- completion->complete(r);
+
+ journaler.set_writeable();
+ completion->complete(0);
} else {
derr << "Error " << r << " loading Journaler" << dendl;
- on_error->complete(0);
+ on_error->complete(r);
}
}));
}
+
+void PurgeQueue::_recover(Context *completion)
+{
+ assert(lock.is_locked_by_me());
+
+ // Journaler::is_readable() adjusts write_pos if partial entry is encountered
+ while (1) {
+ if (!journaler.is_readable() &&
+ !journaler.get_error() &&
+ journaler.get_read_pos() < journaler.get_write_pos()) {
+ journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
+ Mutex::Locker l(lock);
+ _recover(completion);
+ }));
+ return;
+ }
+
+ if (journaler.get_error()) {
+ int r = journaler.get_error();
+ derr << "Error " << r << " recovering write_pos" << dendl;
+ on_error->complete(r);
+ return;
+ }
+
+ if (journaler.get_read_pos() == journaler.get_write_pos()) {
+ dout(4) << "write_pos recovered" << dendl;
+ // restore original read_pos
+ journaler.set_read_pos(journaler.last_committed.expire_pos);
+ journaler.set_writeable();
+ completion->complete(0);
+ return;
+ }
+
+ bufferlist bl;
+ bool readable = journaler.try_read_entry(bl);
+ assert(readable); // we checked earlier
+ }
+}
+
void PurgeQueue::create(Context *fin)
{
dout(4) << "creating" << dendl;