From: Sage Weil Date: Sat, 27 Jan 2018 16:22:12 +0000 (-0600) Subject: os/filestore: track objects with writes being applied X-Git-Tag: v13.0.2~222^2~30 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=28d63a4d8e5ae1cc8c2dd5117cc6bc7aa6e0179f;p=ceph.git os/filestore: track objects with writes being applied Note that this is *slight* overkill in that a *source* object of a clone will also appear in the applying map, even though it is not being modified. Given that those clone operations are normally coupled with another transaction that does write (which is why we are cloning in the first place) this should not make any difference. Signed-off-by: Sage Weil --- diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 69e63ef4636c..bb81a1d4ba65 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -498,6 +498,11 @@ public: Transaction(const Transaction& other) = default; Transaction& operator=(const Transaction& other) = default; + // expose object_index for FileStore::Op's benefit + const map& get_object_index() const { + return object_index; + } + /* Operations on callback contexts */ void register_on_applied(Context *c) { if (!c) return; diff --git a/src/os/filestore/FileStore.cc b/src/os/filestore/FileStore.cc index d3f4e0949555..02f41f4f3c00 100644 --- a/src/os/filestore/FileStore.cc +++ b/src/os/filestore/FileStore.cc @@ -2242,7 +2242,7 @@ int FileStore::queue_transactions(CollectionHandle& ch, vector& tls } else if (m_filestore_journal_writeahead) { dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl; - osr->queue_journal(o->op); + osr->queue_journal(o); trace.keyval("journal mode", "writeahead"); trace.event("journal started"); diff --git a/src/os/filestore/FileStore.h b/src/os/filestore/FileStore.h index 944120708c5f..88cadabc5efb 100644 --- a/src/os/filestore/FileStore.h +++ b/src/os/filestore/FileStore.h @@ -221,6 +221,7 @@ private: uint64_t ops, bytes; TrackedOpRef osd_op; ZTracer::Trace trace; + bool registered_apply = false; }; class OpSequencer : public CollectionImpl { CephContext *cct; @@ -230,6 +231,7 @@ private: list > flush_commit_waiters; Cond cond; string osr_name_str; + map applying; public: Mutex apply_lock; // for apply mutual exclusion int id; @@ -284,9 +286,10 @@ private: } } - void queue_journal(uint64_t s) { + void queue_journal(Op *o) { Mutex::Locker l(qlock); - jq.push_back(s); + jq.push_back(o->op); + _register_apply(o); } void dequeue_journal(list *to_queue) { Mutex::Locker l(qlock); @@ -297,8 +300,37 @@ private: void queue(Op *o) { Mutex::Locker l(qlock); q.push_back(o); + _register_apply(o); o->trace.keyval("queue depth", q.size()); } + void _register_apply(Op *o) { + if (o->registered_apply) + return; + o->registered_apply = true; + for (auto& t : o->tls) { + for (auto& i : t.get_object_index()) { + ++applying[i.first]; + } + } + } + void _unregister_apply(Op *o) { + assert(o->registered_apply); + for (auto& t : o->tls) { + for (auto& i : t.get_object_index()) { + auto p = applying.find(i.first); + assert(p != applying.end()); + if (--p->second == 0) { + applying.erase(p); + } + } + } + } + void wait_for_apply(const ghobject_t& oid) { + Mutex::Locker l(qlock); + while (applying.count(oid)) { + cond.Wait(qlock); + } + } Op *peek_queue() { Mutex::Locker l(qlock); assert(apply_lock.is_locked()); @@ -312,7 +344,7 @@ private: Op *o = q.front(); q.pop_front(); cond.Signal(); - + _unregister_apply(o); _wake_flush_waiters(to_queue); return o; }