]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/filestore: track objects with writes being applied
authorSage Weil <sage@redhat.com>
Sat, 27 Jan 2018 16:22:12 +0000 (10:22 -0600)
committerSage Weil <sage@redhat.com>
Mon, 12 Feb 2018 19:56:43 +0000 (13:56 -0600)
Note that this is *slight* overkill in that a *source* object of a clone
will also appear in the applying map, even though it is not being
modified.  Given that those clone operations are normally coupled with
another transaction that does write (which is why we are cloning in the
first place) this should not make any difference.

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/ObjectStore.h
src/os/filestore/FileStore.cc
src/os/filestore/FileStore.h

index 69e63ef4636cb50c82054b0e5bdd58b09b7adaa8..bb81a1d4ba650a066f6e66898b15e667c328cbfd 100644 (file)
@@ -498,6 +498,11 @@ public:
     Transaction(const Transaction& other) = default;
     Transaction& operator=(const Transaction& other) = default;
 
+    // expose object_index for FileStore::Op's benefit
+    const map<ghobject_t, __le32>& get_object_index() const {
+      return object_index;
+    }
+
     /* Operations on callback contexts */
     void register_on_applied(Context *c) {
       if (!c) return;
index d3f4e09495552e065e94b38ba2c631aac60b1ee8..02f41f4f3c00364934ea682ec51b1238f9dd9ac6 100644 (file)
@@ -2242,7 +2242,7 @@ int FileStore::queue_transactions(CollectionHandle& ch, vector<Transaction>& tls
     } else if (m_filestore_journal_writeahead) {
       dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
 
-      osr->queue_journal(o->op);
+      osr->queue_journal(o);
 
       trace.keyval("journal mode", "writeahead");
       trace.event("journal started");
index 944120708c5f4c6f1a1fe54f2d1c87e0b7b60447..88cadabc5efb3fec85bd3b50bbe1e33acc57d0aa 100644 (file)
@@ -221,6 +221,7 @@ private:
     uint64_t ops, bytes;
     TrackedOpRef osd_op;
     ZTracer::Trace trace;
+    bool registered_apply = false;
   };
   class OpSequencer : public CollectionImpl {
     CephContext *cct;
@@ -230,6 +231,7 @@ private:
     list<pair<uint64_t, Context*> > flush_commit_waiters;
     Cond cond;
     string osr_name_str;
+    map<ghobject_t,int> applying;
   public:
     Mutex apply_lock;  // for apply mutual exclusion
     int id;
@@ -284,9 +286,10 @@ private:
       }
     }
 
-    void queue_journal(uint64_t s) {
+    void queue_journal(Op *o) {
       Mutex::Locker l(qlock);
-      jq.push_back(s);
+      jq.push_back(o->op);
+      _register_apply(o);
     }
     void dequeue_journal(list<Context*> *to_queue) {
       Mutex::Locker l(qlock);
@@ -297,8 +300,37 @@ private:
     void queue(Op *o) {
       Mutex::Locker l(qlock);
       q.push_back(o);
+      _register_apply(o);
       o->trace.keyval("queue depth", q.size());
     }
+    void _register_apply(Op *o) {
+      if (o->registered_apply)
+       return;
+      o->registered_apply = true;
+      for (auto& t : o->tls) {
+       for (auto& i : t.get_object_index()) {
+         ++applying[i.first];
+       }
+      }
+    }
+    void _unregister_apply(Op *o) {
+      assert(o->registered_apply);
+      for (auto& t : o->tls) {
+       for (auto& i : t.get_object_index()) {
+         auto p = applying.find(i.first);
+         assert(p != applying.end());
+         if (--p->second == 0) {
+           applying.erase(p);
+         }
+       }
+      }
+    }
+    void wait_for_apply(const ghobject_t& oid) {
+      Mutex::Locker l(qlock);
+      while (applying.count(oid)) {
+       cond.Wait(qlock);
+      }
+    }
     Op *peek_queue() {
       Mutex::Locker l(qlock);
       assert(apply_lock.is_locked());
@@ -312,7 +344,7 @@ private:
       Op *o = q.front();
       q.pop_front();
       cond.Signal();
-
+      _unregister_apply(o);
       _wake_flush_waiters(to_queue);
       return o;
     }