]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: allow reuse of osr from existing collection
authorSage Weil <sage@redhat.com>
Sun, 8 Jul 2018 19:24:49 +0000 (14:24 -0500)
committerSage Weil <sage@redhat.com>
Fri, 7 Sep 2018 17:07:56 +0000 (12:07 -0500)
We try to attach an old osr at prepare_new_collection time, but that
happens before a transaction is submitted, and we might have a
transaction that removes and then recreates a collection.

Move the logic to _osr_attach and extend it to include reusing an osr
in use by a collection already in coll_map.  Also adjust the
_osr_register_zombie method to behave if the osr is already there, which
can happen with a remove, create, remove+create transaction sequence.

Fixes: https://tracker.ceph.com/issues/25180
Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index 5e618339b291bf645ff0c33785b6069d33554a43..a96d163202d8ab00920624d4fbad6e6309b565c7 100644 (file)
@@ -3193,20 +3193,6 @@ BlueStore::Collection::Collection(BlueStore *store_, Cache *c, coll_t cid)
     exists(true),
     onode_map(c)
 {
-  {
-    std::lock_guard<std::mutex> l(store->zombie_osr_lock);
-    auto p = store->zombie_osr_set.find(cid);
-    if (p == store->zombie_osr_set.end()) {
-      osr = new OpSequencer(store, cid);
-      osr->shard = cid.hash_to_shard(store->m_finisher_num);
-    } else {
-      osr = p->second;
-      store->zombie_osr_set.erase(p);
-      ldout(store->cct, 10) << "resurrecting zombie osr " << osr << dendl;
-      osr->zombie = false;
-      ceph_assert(osr->shard == cid.hash_to_shard(store->m_finisher_num));
-    }
-  }
 }
 
 bool BlueStore::Collection::flush_commit(Context *c)
@@ -5468,6 +5454,7 @@ int BlueStore::_open_collections(int *errors)
       }   
       dout(20) << __func__ << " opened " << cid << " " << c
               << " " << c->cnode << dendl;
+      _osr_attach(c.get());
       coll_map[cid] = c;
     } else {
       derr << __func__ << " unrecognized collection " << it->key() << dendl;
@@ -7257,6 +7244,7 @@ ObjectStore::CollectionHandle BlueStore::create_new_collection(
     cache_shards[cid.hash_to_shard(cache_shards.size())],
     cid);
   new_coll_map[cid] = c;
+  _osr_attach(c);
   return c;
 }
 
@@ -9030,13 +9018,43 @@ out:
   txc->released.clear();
 }
 
+void BlueStore::_osr_attach(Collection *c)
+{
+  // note: caller has RWLock on coll_map
+  auto q = coll_map.find(c->cid);
+  if (q != coll_map.end()) {
+    c->osr = q->second->osr;
+    ceph_assert(c->osr->shard == c->cid.hash_to_shard(m_finisher_num));
+    ldout(cct, 10) << __func__ << " " << c->cid
+                  << " reusing osr " << c->osr << " from existing coll "
+                  << q->second << dendl;
+  } else {
+    std::lock_guard<std::mutex> l(zombie_osr_lock);
+    auto p = zombie_osr_set.find(c->cid);
+    if (p == zombie_osr_set.end()) {
+      c->osr = new OpSequencer(this, c->cid);
+      c->osr->shard = c->cid.hash_to_shard(m_finisher_num);
+      ldout(cct, 10) << __func__ << " " << c->cid
+                    << " fresh osr " << c->osr << dendl;
+    } else {
+      c->osr = p->second;
+      zombie_osr_set.erase(p);
+      ldout(cct, 10) << __func__ << " " << c->cid
+                    << " resurrecting zombie osr " << c->osr << dendl;
+      c->osr->zombie = false;
+      ceph_assert(c->osr->shard == c->cid.hash_to_shard(m_finisher_num));
+    }
+  }
+}
+
 void BlueStore::_osr_register_zombie(OpSequencer *osr)
 {
   std::lock_guard<std::mutex> l(zombie_osr_lock);
   dout(10) << __func__ << " " << osr << " " << osr->cid << dendl;
   osr->zombie = true;
   auto i = zombie_osr_set.emplace(osr->cid, osr);
-  ceph_assert(i.second); // this should be a new insertion
+  // this is either a new insertion or the same osr is already there
+  ceph_assert(i.second || i.first->second == osr);
 }
 
 void BlueStore::_osr_drain_preceding(TransContext *txc)
index 63bcb75b0d389857aafe857717b31c72033bc78b..ff7dcff90409a8f71f096db5fa504f4e204b7afc 100644 (file)
@@ -2192,6 +2192,7 @@ private:
   void _txc_finish(TransContext *txc);
   void _txc_release_alloc(TransContext *txc);
 
+  void _osr_attach(Collection *c);
   void _osr_register_zombie(OpSequencer *osr);
   void _osr_drain_preceding(TransContext *txc);
   void _osr_drain_all();