/// -----------------------------
-void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable)
+void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync)
{
__u64 bytes = 0, ops = 0;
for (list<Transaction*>::iterator p = tls.begin();
o->op = op_seq;
o->tls.swap(tls);
o->onreadable = onreadable;
+ o->onreadable_sync = onreadable_sync;
o->ops = ops;
o->bytes = bytes;
int r = do_transactions(o->tls, o->op);
op_apply_finish();
dout(10) << "_do_op " << o << " " << o->op << " r = " << r
- << ", finisher " << o->onreadable << dendl;
+ << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
/*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
<< op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
if (next_finish == o->op) {
dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
- << " queueing " << o->onreadable << dendl;
+ << " queueing " << o->onreadable << " doing " << o->onreadable_sync << dendl;
next_finish++;
+ if (o->onreadable_sync) {
+ o->onreadable_sync->finish(0);
+ delete o->onreadable_sync;
+ }
op_finisher.queue(o->onreadable);
while (finish_queue.begin()->first == next_finish) {
- Context *c = finish_queue.begin()->second;
+ Context *c = finish_queue.begin()->second.first;
+ Context *s = finish_queue.begin()->second.second;
finish_queue.erase(finish_queue.begin());
dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
- << " queueing delayed " << c << dendl;
+ << " queueing delayed " << c << " doing " << s << dendl;
+ if (s) {
+ s->finish(0);
+ delete s;
+ }
op_finisher.queue(c);
next_finish++;
}
} else {
dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
<< ", delaying " << o->onreadable << dendl;
- finish_queue[o->op] = o->onreadable;
+ finish_queue[o->op] = pair<Context*,Context*>(o->onreadable, o->onreadable_sync);
}
delete o;
FileStore *fs;
__u64 op;
list<ObjectStore::Transaction*> tls;
- Context *onreadable;
+ Context *onreadable, *onreadable_sync;
Context *ondisk;
C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
- Context *onr, Context *ond) :
- fs(f), op(o), tls(t), onreadable(onr), ondisk(ond) { }
+ Context *onr, Context *ond, Context *onrsync) :
+ fs(f), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
void finish(int r) {
- fs->_journaled_ahead(op, tls, onreadable, ondisk);
+ fs->_journaled_ahead(op, tls, onreadable, ondisk, onreadable_sync);
}
};
}
int FileStore::queue_transactions(list<Transaction*> &tls,
- Context *onreadable,
- Context *ondisk)
+ Context *onreadable, Context *ondisk,
+ Context *onreadable_sync)
{
if (journal && journal->is_writeable()) {
if (g_conf.filestore_journal_parallel) {
journal_transactions(tls, op, ondisk);
// queue inside journal lock, to preserve ordering
- queue_op(op, tls, onreadable);
+ queue_op(op, tls, onreadable, onreadable_sync);
op_journal_finish();
return 0;
else if (g_conf.filestore_journal_writeahead) {
__u64 op = op_journal_start(0);
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
- journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, ondisk));
+ journal_transactions(tls, op,
+ new C_JournaledAhead(this, op, tls, onreadable, ondisk, onreadable_sync));
op_journal_finish();
return 0;
}
// start on_readable finisher after we queue journal item, as on_readable callback
// is allowed to delete the Transaction
+ if (onreadable_sync) {
+ onreadable_sync->finish(r);
+ delete onreadable_sync;
+ }
op_finisher.queue(onreadable, r);
return r;
void FileStore::_journaled_ahead(__u64 op,
list<Transaction*> &tls,
- Context *onreadable,
- Context *ondisk)
+ Context *onreadable, Context *ondisk,
+ Context *onreadable_sync)
{
dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
// this should queue in order because the journal does it's completions in order.
- queue_op(op, tls, onreadable);
+ queue_op(op, tls, onreadable, onreadable_sync);
if (ondisk) {
ondisk->finish(0);
delete ondisk;
struct Op {
__u64 op;
list<Transaction*> tls;
- Context *onreadable;
+ Context *onreadable, *onreadable_sync;
__u64 ops, bytes;
};
deque<Op*> op_queue;
Cond op_throttle_cond;
Finisher op_finisher;
__u64 next_finish;
- map<__u64, Context*> finish_queue;
+ map<__u64, pair<Context*,Context*> > finish_queue;
ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<Op> {
void _do_op(Op *o);
void _finish_op(Op *o);
- void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable);
+ void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
void _journaled_ahead(__u64 op, list<Transaction*> &tls,
- Context *onreadable, Context *ondisk);
+ Context *onreadable, Context *ondisk, Context *onreadable_sync);
friend class C_JournaledAhead;
// flusher thread
unsigned _do_transaction(Transaction& t);
int queue_transaction(Transaction* t);
- int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0);
+ int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+ Context *onreadable_sync=0);
// ------------------
// objects