]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
os/bluestore: keep all OpSequencers registered
authorSage Weil <sage@redhat.com>
Tue, 14 Mar 2017 02:49:41 +0000 (22:49 -0400)
committerSage Weil <sage@redhat.com>
Tue, 21 Mar 2017 18:56:28 +0000 (13:56 -0500)
Maintain the set of all live OpSequencers.

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index 9357510557e5e5aae0d801c1cc41a3bdbba253b8..d726469ad649fad29c25461e87deba8e282f2227 100644 (file)
@@ -7759,7 +7759,7 @@ int BlueStore::_do_deferred_op(TransContext *txc, bluestore_deferred_op_t& wo)
 int BlueStore::_deferred_replay()
 {
   dout(10) << __func__ << " start" << dendl;
-  OpSequencerRef osr = new OpSequencer(cct);
+  OpSequencerRef osr = new OpSequencer(cct, this);
   int count = 0;
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_DEFERRED);
   for (it->lower_bound(string()); it->valid(); it->next(), ++count) {
@@ -7819,7 +7819,7 @@ int BlueStore::queue_transactions(
     osr = static_cast<OpSequencer *>(posr->p.get());
     dout(10) << __func__ << " existing " << osr << " " << *osr << dendl;
   } else {
-    osr = new OpSequencer(cct);
+    osr = new OpSequencer(cct, this);
     osr->parent = posr;
     posr->p = osr;
     dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
index eb2dd1dd56511d1ee23690e21560e0f5e28d07f9..d419e4b14d6dbf0838ef53d17ea37c5fa2fa3de5 100644 (file)
@@ -1535,6 +1535,7 @@ public:
     boost::intrusive::list_member_hook<> deferred_osr_queue_item;
 
     Sequencer *parent;
+    BlueStore *store;
 
     std::mutex deferred_apply_mutex;
 
@@ -1546,13 +1547,15 @@ public:
 
     std::atomic_int kv_submitted_waiters = {0};
 
-    OpSequencer(CephContext* cct)
+    OpSequencer(CephContext* cct, BlueStore *store)
        //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
       : Sequencer_impl(cct),
-       parent(NULL) {
+       parent(NULL), store(store) {
+      store->register_osr(this);
     }
     ~OpSequencer() {
       assert(q.empty());
+      store->unregister_osr(this);
     }
 
     void queue_new(TransContext *txc) {
@@ -1717,6 +1720,9 @@ private:
 
   vector<Cache*> cache_shards;
 
+  std::mutex osr_lock;            ///< protect osd_set
+  std::set<OpSequencer*> osr_set; ///< set of all OpSequencers
+
   std::atomic<uint64_t> nid_last = {0};
   std::atomic<uint64_t> nid_max = {0};
   std::atomic<uint64_t> blobid_last = {0};
@@ -1995,6 +2001,15 @@ public:
     f->close_section();
   }
 
+  void register_osr(OpSequencer *osr) {
+    std::lock_guard<std::mutex> l(osr_lock);
+    osr_set.insert(osr);
+  }
+  void unregister_osr(OpSequencer *osr) {
+    std::lock_guard<std::mutex> l(osr_lock);
+    osr_set.erase(osr);
+  }
+
 public:
   int statfs(struct store_statfs_t *buf) override;