From e060d7a115ff1477de0024d24fdc0397dc4c97b1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 13 Jan 2011 13:14:24 -0800 Subject: [PATCH] filejournal: rewrite completion handling, fix ordering on full->notfull Rewriting the completion handling to be simpler, clearer, so that it is easier to maintain a strict completion ordering invariant. This also fixes an ordering bug: When restarting journal, we defer initially until we get a committed_thru from the previous commit and then do all those completions. That same logic needs to also apply to new items submitted during that commit interval. This was broken before, but the simpler structure fixes it. Fixes #666. Tested-by: Jim Schutt Signed-off-by: Sage Weil --- src/os/FileJournal.cc | 106 +++++++++++++++++++++--------------------- src/os/FileJournal.h | 20 ++++---- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 7f33c84fa395a..54e0f05039050 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -510,10 +510,6 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_ // throw out what we have so far full_state = FULL_FULL; while (!writeq.empty()) { - if (writeq.front().fin) { - writing_seq.push_back(writeq.front().seq); - writing_fin.push_back(writeq.front().fin); - } dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl; throttle_ops.put(1); throttle_bytes.put(writeq.front().bl.length()); @@ -544,6 +540,36 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_ return 0; } +/* +void FileJournal::queue_write_fin(uint64_t seq, Context *fin) +{ + writing_seq.push_back(seq); + if (!waiting_for_notfull.empty()) { + // make sure previously unjournaled stuff waiting for UNFULL triggers + // _before_ newly journaled stuff does + dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin + << " until after UNFULL" << dendl; + C_Gather *g = new C_Gather(writeq.front().fin); + writing_fin.push_back(g->new_sub()); + waiting_for_notfull.push_back(g->new_sub()); + } else { + writing_fin.push_back(writeq.front().fin); + dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; + } +} +*/ + +void FileJournal::queue_completions_thru(uint64_t seq) +{ + while (!completions.empty() && + completions.front().first <= seq) { + dout(10) << "queue_completions_thru seq " << seq << " queueing seq " << completions.front().first + << " " << completions.front().second << dendl; + finisher->queue(completions.front().second); + completions.pop_front(); + } +} + int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) { // grab next item @@ -593,23 +619,11 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64 bl.push_back(bp); } bl.append((const char*)&h, sizeof(h)); - - if (writeq.front().fin) { - writing_seq.push_back(seq); - if (!waiting_for_notfull.empty()) { - // make sure previously unjournaled stuff waiting for UNFULL triggers - // _before_ newly journaled stuff does - dout(10) << " will defer seq " << seq << " callback until after UNFULL" << dendl; - C_Gather *g = new C_Gather(writeq.front().fin); - writing_fin.push_back(g->new_sub()); - waiting_for_notfull.push_back(g->new_sub()); - } else - writing_fin.push_back(writeq.front().fin); - } // pop from writeq writeq.pop_front(); journalq.push_back(pair(seq, queue_pos)); + writing_seq = seq; queue_pos += size; if (queue_pos > header.max_size) @@ -731,15 +745,21 @@ void FileJournal::do_write(bufferlist& bl) write_pos = pos; assert(write_pos % header.alignment == 0); + journaled_seq = writing_seq; + // kick finisher? // only if we haven't filled up recently! if (full_state != FULL_NOTFULL) { - dout(10) << "do_write NOT queueing finisher seq " << writing_seq.front() + dout(10) << "do_write NOT queueing finisher seq " << journaled_seq << ", full_commit_seq|full_restart_seq" << dendl; } else { - dout(20) << "do_write queueing finishers " << writing_fin << dendl; - writing_seq.clear(); - finisher->queue(writing_fin); + if (plug_journal_completions) { + dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq + << " due to completion plug" << dendl; + } else { + dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; + queue_completions_thru(journaled_seq); + } } } @@ -810,21 +830,20 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Conte << " len " << e.length() << " (" << oncommit << ")" << dendl; + if (oncommit) + completions.push_back(pair(seq, oncommit)); + if (full_state == FULL_NOTFULL) { // queue and kick writer thread dout(30) << "XXX throttle take " << e.length() << dendl; throttle_ops.take(1); throttle_bytes.take(e.length()); - writeq.push_back(write_item(seq, e, alignment, oncommit)); + writeq.push_back(write_item(seq, e, alignment)); write_cond.Signal(); } else { // not journaling this. restart writing no sooner than seq + 1. dout(10) << " journal is/was full" << dendl; - if (oncommit) { - writing_seq.push_back(seq); - writing_fin.push_back(oncommit); - } } } @@ -843,8 +862,9 @@ void FileJournal::commit_start() break; case FULL_WAIT: - dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active." << dendl; + dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; full_state = FULL_NOTFULL; + plug_journal_completions = true; break; } } @@ -877,40 +897,20 @@ void FileJournal::committed_thru(uint64_t seq) } must_write_header = true; print_header(); - - // recently were full, but aren't now. - if (!waiting_for_notfull.empty()) { - dout(10) << " finishing waiting_for_notfull items " << waiting_for_notfull << dendl; - finisher->queue(waiting_for_notfull); - } - // committed but writing - while (!writing_seq.empty() && writing_seq.front() <= seq) { - dout(15) << " finishing committed but writing|waiting seq " << writing_seq.front() << dendl; - finisher->queue(writing_fin.front()); - writing_seq.pop_front(); - writing_fin.pop_front(); + // completions! + queue_completions_thru(seq); + if (plug_journal_completions) { + dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; + plug_journal_completions = false; + queue_completions_thru(journaled_seq); } - if (full_state == FULL_WAIT) { - // will complete on _next_ commit - while (!writing_seq.empty()) { - dout(15) << " queuing seq " << writing_seq.front() << " " << writing_fin.front() - << " in waiting_for_notfull" << dendl; - waiting_for_notfull.push_back(writing_fin.front()); - writing_seq.pop_front(); - writing_fin.pop_front(); - } - } - // committed but unjournaled items while (!writeq.empty() && writeq.front().seq <= seq) { dout(15) << " dropping committed but unwritten seq " << writeq.front().seq << " len " << writeq.front().bl.length() - << " (" << writeq.front().fin << ")" << dendl; - if (writeq.front().fin) - finisher->queue(writeq.front().fin); dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl; throttle_ops.put(1); throttle_bytes.put(writeq.front().bl.length()); diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index 0959f7085fe7f..6ffa3a30c0df7 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -98,22 +98,18 @@ private: // in journal deque > journalq; // track seq offsets, so we can trim later. + deque > completions; // queued, writing, waiting for commit. - // currently being journaled and awaiting callback. - // or, awaiting callback bc journal was full. - deque writing_seq; - deque writing_fin; - - deque waiting_for_notfull; + uint64_t writing_seq, journaled_seq; + bool plug_journal_completions; // waiting to be journaled struct write_item { uint64_t seq; bufferlist bl; int alignment; - Context *fin; - write_item(uint64_t s, bufferlist& b, int al, Context *f) : - seq(s), alignment(al), fin(f) { + write_item(uint64_t s, bufferlist& b, int al) : + seq(s), alignment(al) { bl.claim(b); } }; @@ -140,6 +136,8 @@ private: void stop_writer(); void write_thread_entry(); + void queue_completions_thru(uint64_t seq); + int check_for_full(uint64_t seq, off64_t pos, off64_t size); int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee); int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes); @@ -173,6 +171,8 @@ private: last_committed_seq(0), full_state(FULL_NOTFULL), fd(-1), + writing_seq(0), journaled_seq(0), + plug_journal_completions(false), write_lock("FileJournal::write_lock"), write_stop(false), write_thread(this) { } @@ -198,7 +198,7 @@ private: void commit_start(); void committed_thru(uint64_t seq); bool should_commit_now() { - return full_state != FULL_NOTFULL || !waiting_for_notfull.empty(); + return full_state != FULL_NOTFULL; } void set_wait_on_full(bool b) { wait_on_full = b; } -- 2.39.5