]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
os/bluestore: fix OpSequencer/Sequencer lifecycle
authorSage Weil <sage@redhat.com>
Fri, 10 Mar 2017 03:56:28 +0000 (22:56 -0500)
committerSage Weil <sage@redhat.com>
Tue, 21 Mar 2017 18:56:28 +0000 (13:56 -0500)
Make osr_set refcounts so that it can tolerate a Sequencer destruction
racing with flush or a Sequencer that outlives the BlueStore instance
itself.

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/ObjectStore.h
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index 7f4a00733e307db110baf12eb2b25e86c314d9a1..2a9c28777471a2223acbb9a1360edce9f0e330fd 100644 (file)
@@ -134,8 +134,15 @@ public:
    */
   struct Sequencer_impl : public RefCountedObject {
     CephContext* cct;
+
+    // block until any previous transactions are visible.  specifically,
+    // collection_list and collection_empty need to reflect prior operations.
     virtual void flush() = 0;
 
+    // called when we are done with the impl.  the impl may have a different
+    // (longer) lifecycle than the Sequencer.
+    virtual void discard() {}
+
     /**
      * Async flush_commit
      *
@@ -165,8 +172,11 @@ public:
     Sequencer_implRef p;
 
     explicit Sequencer(string n)
-      : name(n), shard_hint(spg_t()), p(NULL) {}
+      : name(n), shard_hint(spg_t()), p(NULL) {
+    }
     ~Sequencer() {
+      if (p)
+       p->discard();  // tell impl we are done with it
     }
 
     /// return a unique string identifier for this sequencer
index ea734b5521dca8b044b694c8f33a5c58fd7d698d..9d466954a24a40079b93668cebf98da3a08bc5eb 100644 (file)
@@ -4815,6 +4815,8 @@ int BlueStore::umount()
   dout(1) << __func__ << dendl;
 
   _osr_drain_all();
+  _osr_discard_all();
+  assert(osr_set.empty());  // nobody should be creating sequencers!
 
   mempool_thread.shutdown();
 
@@ -7499,22 +7501,32 @@ void BlueStore::_osr_drain_all()
 {
   dout(10) << __func__ << dendl;
 
-  // WARNING: we make a (somewhat sloppy) assumption here that
-  // no OpSequencers will be created or destroyed for the duration
-  // of this method.
-  set<OpSequencer*> s;
+  set<OpSequencerRef> s;
   {
     std::lock_guard<std::mutex> l(osr_lock);
     s = osr_set;
   }
   for (auto osr : s) {
-    dout(20) << __func__ << " flush " << osr << dendl;
+    dout(20) << __func__ << " drain " << osr << dendl;
     osr->drain();
   }
 
   dout(10) << __func__ << " done" << dendl;
 }
 
+void BlueStore::_osr_discard_all()
+{
+  dout(10) << __func__ << " " << osr_set << dendl;
+  set<OpSequencerRef> s;
+  {
+    std::lock_guard<std::mutex> l(osr_lock);
+    s = osr_set;
+  }
+  for (auto osr : s) {
+    osr->discard();
+  }
+}
+
 void BlueStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
@@ -7785,7 +7797,8 @@ int BlueStore::_deferred_replay()
     _txc_state_proc(txc);
   }
   dout(20) << __func__ << " draining osr" << dendl;
-  osr->drain();
+  _osr_drain_all();
+  osr->discard();
   dout(10) << __func__ << " completed " << count << " events" << dendl;
   return 0;
 }
index 6bd3b20c25be9b37b3ed1dec39250b57273653e3..35d889dcb4d1fc86ebd341886f8c5a6e9b06d0a1 100644 (file)
@@ -1547,15 +1547,27 @@ public:
 
     std::atomic_int kv_submitted_waiters = {0};
 
+    std::mutex register_lock;
+    bool registered = true;
+
     OpSequencer(CephContext* cct, BlueStore *store)
        //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
       : Sequencer_impl(cct),
        parent(NULL), store(store) {
+      std::lock_guard<std::mutex> l(register_lock);
       store->register_osr(this);
     }
     ~OpSequencer() {
       assert(q.empty());
-      store->unregister_osr(this);
+      discard();
+    }
+
+    void discard() override {
+      std::lock_guard<std::mutex> l(register_lock);
+      if (registered) {
+       registered = false;
+       store->unregister_osr(this);
+      }
     }
 
     void queue_new(TransContext *txc) {
@@ -1720,8 +1732,8 @@ private:
 
   vector<Cache*> cache_shards;
 
-  std::mutex osr_lock;            ///< protect osd_set
-  std::set<OpSequencer*> osr_set; ///< set of all OpSequencers
+  std::mutex osr_lock;              ///< protect osd_set
+  std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
 
   std::atomic<uint64_t> nid_last = {0};
   std::atomic<uint64_t> nid_max = {0};
@@ -1890,6 +1902,7 @@ private:
 
   void _osr_reap_done(OpSequencer *osr);
   void _osr_drain_all();
+  void _osr_discard_all();
 
   void _kv_sync_thread();
   void _kv_stop() {