From: Sage Weil Date: Fri, 26 Feb 2010 20:29:17 +0000 (-0800) Subject: filestore: add onreadable_sync callback X-Git-Tag: v0.20~388 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=429a9ef071582b047a6980051a5db770e44c0710;p=ceph.git filestore: add onreadable_sync callback Add an additional completion context that gets called synchronously when the operation completes, instead of getting shunted to the async finisher thread. This allows us to make sure certain completion events happen without getting 'stuck in line' behind other completions with conflicting locks. --- diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index c6707b13f9e9..8541addf79f7 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -649,7 +649,7 @@ int FileStore::umount() /// ----------------------------- -void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreadable) +void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreadable, Context *onreadable_sync) { __u64 bytes = 0, ops = 0; for (list::iterator p = tls.begin(); @@ -667,6 +667,7 @@ void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreada o->op = op_seq; o->tls.swap(tls); o->onreadable = onreadable; + o->onreadable_sync = onreadable_sync; o->ops = ops; o->bytes = bytes; @@ -691,7 +692,7 @@ void FileStore::_do_op(Op *o) int r = do_transactions(o->tls, o->op); op_apply_finish(); dout(10) << "_do_op " << o << " " << o->op << " r = " << r - << ", finisher " << o->onreadable << dendl; + << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl; /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now " << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl; @@ -707,22 +708,31 @@ void FileStore::_finish_op(Op *o) if (next_finish == o->op) { dout(10) << "_finish_op " << o->op << " next_finish " << next_finish - << " queueing " << o->onreadable << dendl; + << " queueing " << o->onreadable << " doing " << o->onreadable_sync << dendl; next_finish++; + if (o->onreadable_sync) { + o->onreadable_sync->finish(0); + delete o->onreadable_sync; + } op_finisher.queue(o->onreadable); while (finish_queue.begin()->first == next_finish) { - Context *c = finish_queue.begin()->second; + Context *c = finish_queue.begin()->second.first; + Context *s = finish_queue.begin()->second.second; finish_queue.erase(finish_queue.begin()); dout(10) << "_finish_op " << o->op << " next_finish " << next_finish - << " queueing delayed " << c << dendl; + << " queueing delayed " << c << " doing " << s << dendl; + if (s) { + s->finish(0); + delete s; + } op_finisher.queue(c); next_finish++; } } else { dout(10) << "_finish_op " << o->op << " next_finish " << next_finish << ", delaying " << o->onreadable << dendl; - finish_queue[o->op] = o->onreadable; + finish_queue[o->op] = pair(o->onreadable, o->onreadable_sync); } delete o; @@ -733,14 +743,14 @@ struct C_JournaledAhead : public Context { FileStore *fs; __u64 op; list tls; - Context *onreadable; + Context *onreadable, *onreadable_sync; Context *ondisk; C_JournaledAhead(FileStore *f, __u64 o, list& t, - Context *onr, Context *ond) : - fs(f), op(o), tls(t), onreadable(onr), ondisk(ond) { } + Context *onr, Context *ond, Context *onrsync) : + fs(f), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { } void finish(int r) { - fs->_journaled_ahead(op, tls, onreadable, ondisk); + fs->_journaled_ahead(op, tls, onreadable, ondisk, onreadable_sync); } }; @@ -752,8 +762,8 @@ int FileStore::queue_transaction(Transaction *t) } int FileStore::queue_transactions(list &tls, - Context *onreadable, - Context *ondisk) + Context *onreadable, Context *ondisk, + Context *onreadable_sync) { if (journal && journal->is_writeable()) { if (g_conf.filestore_journal_parallel) { @@ -766,7 +776,7 @@ int FileStore::queue_transactions(list &tls, journal_transactions(tls, op, ondisk); // queue inside journal lock, to preserve ordering - queue_op(op, tls, onreadable); + queue_op(op, tls, onreadable, onreadable_sync); op_journal_finish(); return 0; @@ -774,7 +784,8 @@ int FileStore::queue_transactions(list &tls, else if (g_conf.filestore_journal_writeahead) { __u64 op = op_journal_start(0); dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl; - journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, ondisk)); + journal_transactions(tls, op, + new C_JournaledAhead(this, op, tls, onreadable, ondisk, onreadable_sync)); op_journal_finish(); return 0; } @@ -795,6 +806,10 @@ int FileStore::queue_transactions(list &tls, // start on_readable finisher after we queue journal item, as on_readable callback // is allowed to delete the Transaction + if (onreadable_sync) { + onreadable_sync->finish(r); + delete onreadable_sync; + } op_finisher.queue(onreadable, r); return r; @@ -802,12 +817,12 @@ int FileStore::queue_transactions(list &tls, void FileStore::_journaled_ahead(__u64 op, list &tls, - Context *onreadable, - Context *ondisk) + Context *onreadable, Context *ondisk, + Context *onreadable_sync) { dout(10) << "_journaled_ahead " << op << " " << tls << dendl; // this should queue in order because the journal does it's completions in order. - queue_op(op, tls, onreadable); + queue_op(op, tls, onreadable, onreadable_sync); if (ondisk) { ondisk->finish(0); delete ondisk; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index dcff5dbae74e..85527c846653 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -89,7 +89,7 @@ class FileStore : public JournalingObjectStore { struct Op { __u64 op; list tls; - Context *onreadable; + Context *onreadable, *onreadable_sync; __u64 ops, bytes; }; deque op_queue; @@ -97,7 +97,7 @@ class FileStore : public JournalingObjectStore { Cond op_throttle_cond; Finisher op_finisher; __u64 next_finish; - map<__u64, Context*> finish_queue; + map<__u64, pair > finish_queue; ThreadPool op_tp; struct OpWQ : public ThreadPool::WorkQueue { @@ -136,9 +136,9 @@ class FileStore : public JournalingObjectStore { void _do_op(Op *o); void _finish_op(Op *o); - void queue_op(__u64 op, list& tls, Context *onreadable); + void queue_op(__u64 op, list& tls, Context *onreadable, Context *onreadable_sync); void _journaled_ahead(__u64 op, list &tls, - Context *onreadable, Context *ondisk); + Context *onreadable, Context *ondisk, Context *onreadable_sync); friend class C_JournaledAhead; // flusher thread @@ -194,7 +194,8 @@ class FileStore : public JournalingObjectStore { unsigned _do_transaction(Transaction& t); int queue_transaction(Transaction* t); - int queue_transactions(list& tls, Context *onreadable, Context *ondisk=0); + int queue_transactions(list& tls, Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0); // ------------------ // objects diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 34a6155edc12..409a6db68d54 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -468,13 +468,14 @@ public: virtual unsigned apply_transactions(list& tls, Context *ondisk=0) = 0; virtual int queue_transaction(Transaction* t) = 0; - virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0) { + virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0) { list tls; tls.push_back(t); - return queue_transactions(tls, onreadable, ondisk); + return queue_transactions(tls, onreadable, ondisk, onreadable_sync); } - virtual int queue_transactions(list& tls, Context *onreadable, - Context *ondisk=0) = 0; + virtual int queue_transactions(list& tls, Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0) = 0;