// throw out what we have so far
full_state = FULL_FULL;
while (!writeq.empty()) {
- if (writeq.front().fin) {
- writing_seq.push_back(writeq.front().seq);
- writing_fin.push_back(writeq.front().fin);
- }
dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
throttle_ops.put(1);
throttle_bytes.put(writeq.front().bl.length());
return 0;
}
+/*
+void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
+{
+ writing_seq.push_back(seq);
+ if (!waiting_for_notfull.empty()) {
+ // make sure previously unjournaled stuff waiting for UNFULL triggers
+ // _before_ newly journaled stuff does
+ dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
+ << " until after UNFULL" << dendl;
+ C_Gather *g = new C_Gather(writeq.front().fin);
+ writing_fin.push_back(g->new_sub());
+ waiting_for_notfull.push_back(g->new_sub());
+ } else {
+ writing_fin.push_back(writeq.front().fin);
+ dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
+ }
+}
+*/
+
+void FileJournal::queue_completions_thru(uint64_t seq)
+{
+ while (!completions.empty() &&
+ completions.front().first <= seq) {
+ dout(10) << "queue_completions_thru seq " << seq << " queueing seq " << completions.front().first
+ << " " << completions.front().second << dendl;
+ finisher->queue(completions.front().second);
+ completions.pop_front();
+ }
+}
+
int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
{
// grab next item
bl.push_back(bp);
}
bl.append((const char*)&h, sizeof(h));
-
- if (writeq.front().fin) {
- writing_seq.push_back(seq);
- if (!waiting_for_notfull.empty()) {
- // make sure previously unjournaled stuff waiting for UNFULL triggers
- // _before_ newly journaled stuff does
- dout(10) << " will defer seq " << seq << " callback until after UNFULL" << dendl;
- C_Gather *g = new C_Gather(writeq.front().fin);
- writing_fin.push_back(g->new_sub());
- waiting_for_notfull.push_back(g->new_sub());
- } else
- writing_fin.push_back(writeq.front().fin);
- }
// pop from writeq
writeq.pop_front();
journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
+ writing_seq = seq;
queue_pos += size;
if (queue_pos > header.max_size)
write_pos = pos;
assert(write_pos % header.alignment == 0);
+ journaled_seq = writing_seq;
+
// kick finisher?
// only if we haven't filled up recently!
if (full_state != FULL_NOTFULL) {
- dout(10) << "do_write NOT queueing finisher seq " << writing_seq.front()
+ dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
<< ", full_commit_seq|full_restart_seq" << dendl;
} else {
- dout(20) << "do_write queueing finishers " << writing_fin << dendl;
- writing_seq.clear();
- finisher->queue(writing_fin);
+ if (plug_journal_completions) {
+ dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
+ << " due to completion plug" << dendl;
+ } else {
+ dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
+ queue_completions_thru(journaled_seq);
+ }
}
}
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
+ if (oncommit)
+ completions.push_back(pair<uint64_t,Context*>(seq, oncommit));
+
if (full_state == FULL_NOTFULL) {
// queue and kick writer thread
dout(30) << "XXX throttle take " << e.length() << dendl;
throttle_ops.take(1);
throttle_bytes.take(e.length());
- writeq.push_back(write_item(seq, e, alignment, oncommit));
+ writeq.push_back(write_item(seq, e, alignment));
write_cond.Signal();
} else {
// not journaling this. restart writing no sooner than seq + 1.
dout(10) << " journal is/was full" << dendl;
- if (oncommit) {
- writing_seq.push_back(seq);
- writing_fin.push_back(oncommit);
- }
}
}
break;
case FULL_WAIT:
- dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active." << dendl;
+ dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl;
full_state = FULL_NOTFULL;
+ plug_journal_completions = true;
break;
}
}
}
must_write_header = true;
print_header();
-
- // recently were full, but aren't now.
- if (!waiting_for_notfull.empty()) {
- dout(10) << " finishing waiting_for_notfull items " << waiting_for_notfull << dendl;
- finisher->queue(waiting_for_notfull);
- }
- // committed but writing
- while (!writing_seq.empty() && writing_seq.front() <= seq) {
- dout(15) << " finishing committed but writing|waiting seq " << writing_seq.front() << dendl;
- finisher->queue(writing_fin.front());
- writing_seq.pop_front();
- writing_fin.pop_front();
+ // completions!
+ queue_completions_thru(seq);
+ if (plug_journal_completions) {
+ dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
+ plug_journal_completions = false;
+ queue_completions_thru(journaled_seq);
}
- if (full_state == FULL_WAIT) {
- // will complete on _next_ commit
- while (!writing_seq.empty()) {
- dout(15) << " queuing seq " << writing_seq.front() << " " << writing_fin.front()
- << " in waiting_for_notfull" << dendl;
- waiting_for_notfull.push_back(writing_fin.front());
- writing_seq.pop_front();
- writing_fin.pop_front();
- }
- }
-
// committed but unjournaled items
while (!writeq.empty() && writeq.front().seq <= seq) {
dout(15) << " dropping committed but unwritten seq " << writeq.front().seq
<< " len " << writeq.front().bl.length()
- << " (" << writeq.front().fin << ")"
<< dendl;
- if (writeq.front().fin)
- finisher->queue(writeq.front().fin);
dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
throttle_ops.put(1);
throttle_bytes.put(writeq.front().bl.length());
// in journal
deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
+ deque<pair<uint64_t,Context*> > completions; // queued, writing, waiting for commit.
- // currently being journaled and awaiting callback.
- // or, awaiting callback bc journal was full.
- deque<uint64_t> writing_seq;
- deque<Context*> writing_fin;
-
- deque<Context*> waiting_for_notfull;
+ uint64_t writing_seq, journaled_seq;
+ bool plug_journal_completions;
// waiting to be journaled
struct write_item {
uint64_t seq;
bufferlist bl;
int alignment;
- Context *fin;
- write_item(uint64_t s, bufferlist& b, int al, Context *f) :
- seq(s), alignment(al), fin(f) {
+ write_item(uint64_t s, bufferlist& b, int al) :
+ seq(s), alignment(al) {
bl.claim(b);
}
};
void stop_writer();
void write_thread_entry();
+ void queue_completions_thru(uint64_t seq);
+
int check_for_full(uint64_t seq, off64_t pos, off64_t size);
int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes);
last_committed_seq(0),
full_state(FULL_NOTFULL),
fd(-1),
+ writing_seq(0), journaled_seq(0),
+ plug_journal_completions(false),
write_lock("FileJournal::write_lock"),
write_stop(false),
write_thread(this) { }
void commit_start();
void committed_thru(uint64_t seq);
bool should_commit_now() {
- return full_state != FULL_NOTFULL || !waiting_for_notfull.empty();
+ return full_state != FULL_NOTFULL;
}
void set_wait_on_full(bool b) { wait_on_full = b; }