From: Samuel Just Date: Mon, 9 Jun 2014 18:00:32 +0000 (-0700) Subject: os/: add async flush_commit() method to Sequencer and implementations X-Git-Tag: v0.84~172^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f7498124325fe2269c33e45fae4c8181c8f7be14;p=ceph.git os/: add async flush_commit() method to Sequencer and implementations Signed-off-by: Samuel Just --- diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 9d6252ca4ca9c..31bd6a6045453 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1703,7 +1703,8 @@ void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) void FileStore::_finish_op(OpSequencer *osr) { - Op *o = osr->dequeue(); + list to_queue; + Op *o = osr->dequeue(&to_queue); dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl; osr->apply_lock.Unlock(); // locked in _do_op @@ -1721,6 +1722,7 @@ void FileStore::_finish_op(OpSequencer *osr) if (o->onreadable) { op_finisher.queue(o->onreadable); } + op_finisher.queue(to_queue); delete o; } @@ -1836,7 +1838,8 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk) // this should queue in order because the journal does it's completions in order. queue_op(osr, o); - osr->dequeue_journal(); + list to_queue; + osr->dequeue_journal(&to_queue); // do ondisk completions async, to prevent any onreadable_sync completions // getting blocked behind an ondisk completion. @@ -1844,6 +1847,7 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk) dout(10) << " queueing ondisk " << ondisk << dendl; ondisk_finisher.queue(ondisk); } + ondisk_finisher.queue(to_queue); } int FileStore::_do_transactions( diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 4c9ffdba2f3e8..3fcd89a265091 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -193,19 +193,70 @@ private: Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock) list q; list jq; + list > flush_commit_waiters; Cond cond; public: Sequencer *parent; Mutex apply_lock; // for apply mutual exclusion + /// get_max_uncompleted + bool _get_max_uncompleted( + uint64_t *seq ///< [out] max uncompleted seq + ) { + assert(qlock.is_locked()); + assert(seq); + *seq = 0; + if (q.empty() && jq.empty()) + return true; + + if (!q.empty()) + *seq = q.back()->op; + if (!jq.empty() && jq.back() > *seq) + *seq = jq.back(); + + return false; + } /// @returns true if both queues are empty + + /// get_min_uncompleted + bool _get_min_uncompleted( + uint64_t *seq ///< [out] min uncompleted seq + ) { + assert(qlock.is_locked()); + assert(seq); + *seq = 0; + if (q.empty() && jq.empty()) + return true; + + if (!q.empty()) + *seq = q.front()->op; + if (!jq.empty() && jq.front() < *seq) + *seq = jq.front(); + + return false; + } /// @returns true if both queues are empty + + void _wake_flush_waiters(list *to_queue) { + uint64_t seq; + if (_get_min_uncompleted(&seq)) + seq = -1; + + for (list >::iterator i = + flush_commit_waiters.begin(); + i != flush_commit_waiters.end() && i->first < seq; + flush_commit_waiters.erase(i++)) { + to_queue->push_back(i->second); + } + } + void queue_journal(uint64_t s) { Mutex::Locker l(qlock); jq.push_back(s); } - void dequeue_journal() { + void dequeue_journal(list *to_queue) { Mutex::Locker l(qlock); jq.pop_front(); cond.Signal(); + _wake_flush_waiters(to_queue); } void queue(Op *o) { Mutex::Locker l(qlock); @@ -215,20 +266,26 @@ private: assert(apply_lock.is_locked()); return q.front(); } - Op *dequeue() { + + Op *dequeue(list *to_queue) { + assert(to_queue); assert(apply_lock.is_locked()); Mutex::Locker l(qlock); Op *o = q.front(); q.pop_front(); cond.Signal(); + + _wake_flush_waiters(to_queue); return o; } + void flush() { Mutex::Locker l(qlock); while (g_conf->filestore_blackhole) cond.Wait(qlock); // wait forever + // get max for journal _or_ op queues uint64_t seq = 0; if (!q.empty()) @@ -243,6 +300,17 @@ private: cond.Wait(qlock); } } + bool flush_commit(Context *c) { + Mutex::Locker l(qlock); + uint64_t seq = 0; + if (_get_max_uncompleted(&seq)) { + delete c; + return true; + } else { + flush_commit_waiters.push_back(make_pair(seq, c)); + return false; + } + } OpSequencer() : qlock("FileStore::OpSequencer::qlock", false, false), diff --git a/src/os/KeyValueStore.cc b/src/os/KeyValueStore.cc index cc117fa3a6718..3255ad4ed06c9 100644 --- a/src/os/KeyValueStore.cc +++ b/src/os/KeyValueStore.cc @@ -1085,7 +1085,8 @@ void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) void KeyValueStore::_finish_op(OpSequencer *osr) { - Op *o = osr->dequeue(); + list to_queue; + Op *o = osr->dequeue(&to_queue); dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl; osr->apply_lock.Unlock(); // locked in _do_op @@ -1099,6 +1100,7 @@ void KeyValueStore::_finish_op(OpSequencer *osr) o->onreadable_sync->complete(0); } op_finisher.queue(o->onreadable); + op_finisher.queue(to_queue); delete o; } diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h index d7b9c0a11644a..402b4cba92b65 100644 --- a/src/os/KeyValueStore.h +++ b/src/os/KeyValueStore.h @@ -259,9 +259,53 @@ class KeyValueStore : public ObjectStore, list q; list jq; Cond cond; + list > flush_commit_waiters; public: Sequencer *parent; Mutex apply_lock; // for apply mutual exclusion + + /// get_max_uncompleted + bool _get_max_uncompleted( + uint64_t *seq ///< [out] max uncompleted seq + ) { + assert(qlock.is_locked()); + assert(seq); + *seq = 0; + if (q.empty()) { + return true; + } else { + *seq = q.back()->op; + return false; + } + } /// @returns true if the queue is empty + + /// get_min_uncompleted + bool _get_min_uncompleted( + uint64_t *seq ///< [out] min uncompleted seq + ) { + assert(qlock.is_locked()); + assert(seq); + *seq = 0; + if (q.empty()) { + return true; + } else { + *seq = q.front()->op; + return false; + } + } /// @returns true if both queues are empty + + void _wake_flush_waiters(list *to_queue) { + uint64_t seq; + if (_get_min_uncompleted(&seq)) + seq = -1; + + for (list >::iterator i = + flush_commit_waiters.begin(); + i != flush_commit_waiters.end() && i->first < seq; + flush_commit_waiters.erase(i++)) { + to_queue->push_back(i->second); + } + } void queue(Op *o) { Mutex::Locker l(qlock); @@ -271,14 +315,19 @@ class KeyValueStore : public ObjectStore, assert(apply_lock.is_locked()); return q.front(); } - Op *dequeue() { + + Op *dequeue(list *to_queue) { + assert(to_queue); assert(apply_lock.is_locked()); Mutex::Locker l(qlock); Op *o = q.front(); q.pop_front(); cond.Signal(); + + _wake_flush_waiters(to_queue); return o; } + void flush() { Mutex::Locker l(qlock); @@ -297,6 +346,17 @@ class KeyValueStore : public ObjectStore, cond.Wait(qlock); } } + bool flush_commit(Context *c) { + Mutex::Locker l(qlock); + uint64_t seq = 0; + if (_get_max_uncompleted(&seq)) { + delete c; + return true; + } else { + flush_commit_waiters.push_back(make_pair(seq, c)); + return false; + } + } OpSequencer() : qlock("KeyValueStore::OpSequencer::qlock", false, false), diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 15ed31f0311ae..a5f5fcb0b4418 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -128,6 +128,22 @@ public: */ struct Sequencer_impl { virtual void flush() = 0; + + /** + * Async flush_commit + * + * There are two cases: + * 1) sequencer is currently idle: the method returns true and + * c is deleted + * 2) sequencer is not idle: the method returns false and c is + * called asyncronously with a value of 0 once all transactions + * queued on this sequencer prior to the call have been applied + * and committed. + */ + virtual bool flush_commit( + Context *c ///< [in] context to call upon flush/commit + ) = 0; ///< @return true if idle, false otherwise + virtual ~Sequencer_impl() {} }; @@ -153,6 +169,16 @@ public: if (p) p->flush(); } + + /// @see Sequencer_impl::flush_commit() + bool flush_commit(Context *c) { + if (!p) { + delete c; + return true; + } else { + return p->flush_commit(c); + } + } }; /*********************************