From 429a9ef071582b047a6980051a5db770e44c0710 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 26 Feb 2010 12:29:17 -0800 Subject: [PATCH] filestore: add onreadable_sync callback Add an additional completion context that gets called synchronously when the operation completes, instead of getting shunted to the async finisher thread. This allows us to make sure certain completion events happen without getting 'stuck in line' behind other completions with conflicting locks. --- src/os/FileStore.cc | 49 +++++++++++++++++++++++++++++--------------- src/os/FileStore.h | 11 +++++----- src/os/ObjectStore.h | 9 ++++---- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index c6707b13f9e9b..8541addf79f7c 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -649,7 +649,7 @@ int FileStore::umount() /// ----------------------------- -void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreadable) +void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreadable, Context *onreadable_sync) { __u64 bytes = 0, ops = 0; for (list::iterator p = tls.begin(); @@ -667,6 +667,7 @@ void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreada o->op = op_seq; o->tls.swap(tls); o->onreadable = onreadable; + o->onreadable_sync = onreadable_sync; o->ops = ops; o->bytes = bytes; @@ -691,7 +692,7 @@ void FileStore::_do_op(Op *o) int r = do_transactions(o->tls, o->op); op_apply_finish(); dout(10) << "_do_op " << o << " " << o->op << " r = " << r - << ", finisher " << o->onreadable << dendl; + << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl; /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now " << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl; @@ -707,22 +708,31 @@ void FileStore::_finish_op(Op *o) if (next_finish == o->op) { dout(10) << "_finish_op " << o->op << " next_finish " << next_finish - << " queueing " << o->onreadable << dendl; + << " queueing " << o->onreadable << " doing " << o->onreadable_sync << dendl; next_finish++; + if (o->onreadable_sync) { + o->onreadable_sync->finish(0); + delete o->onreadable_sync; + } op_finisher.queue(o->onreadable); while (finish_queue.begin()->first == next_finish) { - Context *c = finish_queue.begin()->second; + Context *c = finish_queue.begin()->second.first; + Context *s = finish_queue.begin()->second.second; finish_queue.erase(finish_queue.begin()); dout(10) << "_finish_op " << o->op << " next_finish " << next_finish - << " queueing delayed " << c << dendl; + << " queueing delayed " << c << " doing " << s << dendl; + if (s) { + s->finish(0); + delete s; + } op_finisher.queue(c); next_finish++; } } else { dout(10) << "_finish_op " << o->op << " next_finish " << next_finish << ", delaying " << o->onreadable << dendl; - finish_queue[o->op] = o->onreadable; + finish_queue[o->op] = pair(o->onreadable, o->onreadable_sync); } delete o; @@ -733,14 +743,14 @@ struct C_JournaledAhead : public Context { FileStore *fs; __u64 op; list tls; - Context *onreadable; + Context *onreadable, *onreadable_sync; Context *ondisk; C_JournaledAhead(FileStore *f, __u64 o, list& t, - Context *onr, Context *ond) : - fs(f), op(o), tls(t), onreadable(onr), ondisk(ond) { } + Context *onr, Context *ond, Context *onrsync) : + fs(f), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { } void finish(int r) { - fs->_journaled_ahead(op, tls, onreadable, ondisk); + fs->_journaled_ahead(op, tls, onreadable, ondisk, onreadable_sync); } }; @@ -752,8 +762,8 @@ int FileStore::queue_transaction(Transaction *t) } int FileStore::queue_transactions(list &tls, - Context *onreadable, - Context *ondisk) + Context *onreadable, Context *ondisk, + Context *onreadable_sync) { if (journal && journal->is_writeable()) { if (g_conf.filestore_journal_parallel) { @@ -766,7 +776,7 @@ int FileStore::queue_transactions(list &tls, journal_transactions(tls, op, ondisk); // queue inside journal lock, to preserve ordering - queue_op(op, tls, onreadable); + queue_op(op, tls, onreadable, onreadable_sync); op_journal_finish(); return 0; @@ -774,7 +784,8 @@ int FileStore::queue_transactions(list &tls, else if (g_conf.filestore_journal_writeahead) { __u64 op = op_journal_start(0); dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl; - journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, ondisk)); + journal_transactions(tls, op, + new C_JournaledAhead(this, op, tls, onreadable, ondisk, onreadable_sync)); op_journal_finish(); return 0; } @@ -795,6 +806,10 @@ int FileStore::queue_transactions(list &tls, // start on_readable finisher after we queue journal item, as on_readable callback // is allowed to delete the Transaction + if (onreadable_sync) { + onreadable_sync->finish(r); + delete onreadable_sync; + } op_finisher.queue(onreadable, r); return r; @@ -802,12 +817,12 @@ int FileStore::queue_transactions(list &tls, void FileStore::_journaled_ahead(__u64 op, list &tls, - Context *onreadable, - Context *ondisk) + Context *onreadable, Context *ondisk, + Context *onreadable_sync) { dout(10) << "_journaled_ahead " << op << " " << tls << dendl; // this should queue in order because the journal does it's completions in order. - queue_op(op, tls, onreadable); + queue_op(op, tls, onreadable, onreadable_sync); if (ondisk) { ondisk->finish(0); delete ondisk; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index dcff5dbae74ec..85527c8466532 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -89,7 +89,7 @@ class FileStore : public JournalingObjectStore { struct Op { __u64 op; list tls; - Context *onreadable; + Context *onreadable, *onreadable_sync; __u64 ops, bytes; }; deque op_queue; @@ -97,7 +97,7 @@ class FileStore : public JournalingObjectStore { Cond op_throttle_cond; Finisher op_finisher; __u64 next_finish; - map<__u64, Context*> finish_queue; + map<__u64, pair > finish_queue; ThreadPool op_tp; struct OpWQ : public ThreadPool::WorkQueue { @@ -136,9 +136,9 @@ class FileStore : public JournalingObjectStore { void _do_op(Op *o); void _finish_op(Op *o); - void queue_op(__u64 op, list& tls, Context *onreadable); + void queue_op(__u64 op, list& tls, Context *onreadable, Context *onreadable_sync); void _journaled_ahead(__u64 op, list &tls, - Context *onreadable, Context *ondisk); + Context *onreadable, Context *ondisk, Context *onreadable_sync); friend class C_JournaledAhead; // flusher thread @@ -194,7 +194,8 @@ class FileStore : public JournalingObjectStore { unsigned _do_transaction(Transaction& t); int queue_transaction(Transaction* t); - int queue_transactions(list& tls, Context *onreadable, Context *ondisk=0); + int queue_transactions(list& tls, Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0); // ------------------ // objects diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 34a6155edc122..409a6db68d546 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -468,13 +468,14 @@ public: virtual unsigned apply_transactions(list& tls, Context *ondisk=0) = 0; virtual int queue_transaction(Transaction* t) = 0; - virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0) { + virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0) { list tls; tls.push_back(t); - return queue_transactions(tls, onreadable, ondisk); + return queue_transactions(tls, onreadable, ondisk, onreadable_sync); } - virtual int queue_transactions(list& tls, Context *onreadable, - Context *ondisk=0) = 0; + virtual int queue_transactions(list& tls, Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0) = 0; -- 2.39.5