]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: fix remove_collection to properly detect collection's empty 11398/head
authorIgor Fedotov <ifedotov@mirantis.com>
Fri, 7 Oct 2016 15:22:45 +0000 (15:22 +0000)
committerIgor Fedotov <ifedotov@mirantis.com>
Fri, 14 Oct 2016 15:01:29 +0000 (15:01 +0000)
Signed-off-by: Igor Fedotov <ifedotov@mirantis.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/test/objectstore/store_test.cc

index 265df0f1e85bb18e78bb92e6da36814b67c4444f..aefa266ed21999749093fba4d14438920c26ad20 100644 (file)
@@ -5419,7 +5419,27 @@ int BlueStore::collection_list(
 {
   Collection *c = static_cast<Collection*>(c_.get());
   dout(15) << __func__ << " " << c->cid
-          << " start " << start << " end " << end << " max " << max << dendl;
+           << " start " << start << " end " << end << " max " << max << dendl;
+  int r;
+  {
+    RWLock::RLocker l(c->lock);
+    r = _collection_list(c, start, end, sort_bitwise, max, ls, pnext);
+  }
+
+  c->trim_cache();
+  dout(10) << __func__ << " " << c->cid
+    << " start " << start << " end " << end << " max " << max
+    << " = " << r << ", ls.size() = " << ls->size()
+    << ", next = " << (pnext ? *pnext : ghobject_t())  << dendl;
+  return r;
+}
+
+int BlueStore::_collection_list(
+  Collection* c, ghobject_t start, ghobject_t end,
+  bool sort_bitwise, int max,
+  vector<ghobject_t> *ls, ghobject_t *pnext)
+{
+
   if (!c->exists)
     return -ENOENT;
   if (!sort_bitwise)
@@ -5427,113 +5447,105 @@ int BlueStore::collection_list(
 
   int r = 0;
   ghobject_t static_next;
-  {
-    RWLock::RLocker l(c->lock);
-    KeyValueDB::Iterator it;
-    string temp_start_key, temp_end_key;
-    string start_key, end_key;
-    bool set_next = false;
-    string pend;
-    bool temp;
-
-    if (!pnext)
-      pnext = &static_next;
-
-    if (start == ghobject_t::get_max() ||
-        start.hobj.is_max()) {
-      *pnext = ghobject_t::get_max();
-      goto out;
-    }
-    get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
-                    &start_key, &end_key);
-    dout(20) << __func__
-            << " range " << pretty_binary_string(temp_start_key)
-            << " to " << pretty_binary_string(temp_end_key)
-            << " and " << pretty_binary_string(start_key)
-            << " to " << pretty_binary_string(end_key)
-            << " start " << start << dendl;
-    it = db->get_iterator(PREFIX_OBJ);
-    if (start == ghobject_t() ||
-        start.hobj == hobject_t() ||
-        start == c->cid.get_min_hobj()) {
-      it->upper_bound(temp_start_key);
+  KeyValueDB::Iterator it;
+  string temp_start_key, temp_end_key;
+  string start_key, end_key;
+  bool set_next = false;
+  string pend;
+  bool temp;
+
+  if (!pnext)
+    pnext = &static_next;
+
+  if (start == ghobject_t::get_max() ||
+    start.hobj.is_max()) {
+    *pnext = ghobject_t::get_max();
+    goto out;
+  }
+  get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
+    &start_key, &end_key);
+  dout(20) << __func__
+    << " range " << pretty_binary_string(temp_start_key)
+    << " to " << pretty_binary_string(temp_end_key)
+    << " and " << pretty_binary_string(start_key)
+    << " to " << pretty_binary_string(end_key)
+    << " start " << start << dendl;
+  it = db->get_iterator(PREFIX_OBJ);
+  if (start == ghobject_t() ||
+    start.hobj == hobject_t() ||
+    start == c->cid.get_min_hobj()) {
+    it->upper_bound(temp_start_key);
+    temp = true;
+  } else {
+    string k;
+    get_object_key(start, &k);
+    if (start.hobj.is_temp()) {
       temp = true;
+      assert(k >= temp_start_key && k < temp_end_key);
     } else {
-      string k;
-      get_object_key(start, &k);
-      if (start.hobj.is_temp()) {
-        temp = true;
-        assert(k >= temp_start_key && k < temp_end_key);
-      } else {
-        temp = false;
-        assert(k >= start_key && k < end_key);
-      }
-      dout(20) << " start from " << pretty_binary_string(k)
-              << " temp=" << (int)temp << dendl;
-      it->lower_bound(k);
+      temp = false;
+      assert(k >= start_key && k < end_key);
     }
-    if (end.hobj.is_max()) {
-      pend = temp ? temp_end_key : end_key;
+    dout(20) << " start from " << pretty_binary_string(k)
+      << " temp=" << (int)temp << dendl;
+    it->lower_bound(k);
+  }
+  if (end.hobj.is_max()) {
+    pend = temp ? temp_end_key : end_key;
+  } else {
+    get_object_key(end, &end_key);
+    if (end.hobj.is_temp()) {
+      if (temp)
+       pend = end_key;
+      else
+       goto out;
     } else {
-      get_object_key(end, &end_key);
-      if (end.hobj.is_temp()) {
-        if (temp)
-         pend = end_key;
-        else
-         goto out;
-      } else {
-        pend = temp ? temp_end_key : end_key;
-      }
-    }
-    dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
-    while (true) {
-      if (!it->valid() || it->key() > pend) {
-        if (!it->valid())
-         dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
-        else
-         dout(20) << __func__ << " key " << pretty_binary_string(it->key())
-                  << " > " << end << dendl;
-        if (temp) {
-         if (end.hobj.is_temp()) {
-           break;
-         }
-         dout(30) << __func__ << " switch to non-temp namespace" << dendl;
-         temp = false;
-         it->upper_bound(start_key);
-         pend = end_key;
-         dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
-         continue;
-        }
-        break;
-      }
-      dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
-      if (is_extent_shard_key(it->key())) {
-       it->next();
+      pend = temp ? temp_end_key : end_key;
+    }
+  }
+  dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
+  while (true) {
+    if (!it->valid() || it->key() > pend) {
+      if (!it->valid())
+       dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
+      else
+       dout(20) << __func__ << " key " << pretty_binary_string(it->key())
+       << " > " << end << dendl;
+      if (temp) {
+       if (end.hobj.is_temp()) {
+         break;
+       }
+       dout(30) << __func__ << " switch to non-temp namespace" << dendl;
+       temp = false;
+       it->upper_bound(start_key);
+       pend = end_key;
+       dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
        continue;
       }
-      ghobject_t oid;
-      int r = get_key_object(it->key(), &oid);
-      assert(r == 0);
-      if (ls->size() >= (unsigned)max) {
-        dout(20) << __func__ << " reached max " << max << dendl;
-        *pnext = oid;
-        set_next = true;
-        break;
-      }
-      ls->push_back(oid);
+      break;
+    }
+    dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
+    if (is_extent_shard_key(it->key())) {
       it->next();
+      continue;
     }
-    if (!set_next) {
-      *pnext = ghobject_t::get_max();
+    ghobject_t oid;
+    int r = get_key_object(it->key(), &oid);
+    assert(r == 0);
+    if (ls->size() >= (unsigned)max) {
+      dout(20) << __func__ << " reached max " << max << dendl;
+      *pnext = oid;
+      set_next = true;
+      break;
     }
+    ls->push_back(oid);
+    it->next();
+  }
+  if (!set_next) {
+    *pnext = ghobject_t::get_max();
   }
 
- out:
-  c->trim_cache();
-  dout(10) << __func__ << " " << c->cid
-          << " start " << start << " end " << end << " max " << max
-          << " = " << r << ", ls.size() = " << ls->size()
-          << ", next = " << *pnext << dendl;
+out:
   return r;
 }
 
@@ -8680,25 +8692,54 @@ int BlueStore::_remove_collection(TransContext *txc, coll_t cid,
       r = -ENOENT;
       goto out;
     }
+    size_t nonexistent_count = 0;
     assert((*c)->exists);
     if ((*c)->onode_map.map_any([&](OnodeRef o) {
-         if (o->exists) {
-           dout(10) << __func__ << " " << o->oid << " " << o
-                    << " exists in onode_map" << dendl;
-           return true;
-         }
-         return false;
-       })) {
+        if (o->exists) {
+          dout(10) << __func__ << " " << o->oid << " " << o
+                   << " exists in onode_map" << dendl;
+          return true;
+        }
+        ++nonexistent_count;
+        return false;
+        })) {
       r = -ENOTEMPTY;
       goto out;
     }
-    coll_map.erase(cid);
-    txc->removed_collections.push_back(*c);
-    (*c)->exists = false;
-    c->reset();
+
+    vector<ghobject_t> ls;
+    ghobject_t next;
+    //Enumerate onodes in db, up to nonexistent_count + 1
+    // then check if all of them are marked as non-existent.
+    // Bypass the check if returned number is greater than nonexistent_count
+    r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(), true, nonexistent_count + 1,
+                          &ls, &next);
+
+    if (r >= 0) {
+      bool exists = false; //ls.size() > nonexistent_count;
+      for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
+        dout(10) << __func__ << " oid " << *it << dendl;
+        auto onode = (*c)->onode_map.lookup(*it);
+        exists = !onode || onode->exists;
+        if (exists) {
+          dout(10) << __func__ << " " << *it
+                   << " exists in db" << dendl;
+        }
+      }
+      if (!exists) {
+        coll_map.erase(cid);
+        txc->removed_collections.push_back(*c);
+        (*c)->exists = false;
+        c->reset();
+        txc->t->rmkey(PREFIX_COLL, stringify(cid));
+        r = 0;
+      } else {
+        dout(10) << __func__ << " " << cid
+                 << " is non-empty" << dendl;
+        r = -ENOTEMPTY;
+      }
+    }
   }
-  txc->t->rmkey(PREFIX_COLL, stringify(cid));
-  r = 0;
 
  out:
   dout(10) << __func__ << " " << cid << " = " << r << dendl;
index e63c1710c1b4e8547a30f549594dae3729420418..e93a5088a19d2b093d3f0f78a0f6cac189362b68 100644 (file)
@@ -1572,6 +1572,11 @@ private:
     b->shared_blob->bc.write(txc->seq, offset, bl, flags);
     txc->shared_blobs_written.insert(b->shared_blob);
   }
+
+  int _collection_list(Collection *c, ghobject_t start, ghobject_t end,
+    bool sort_bitwise, int max, vector<ghobject_t> *ls, ghobject_t *next);
+
+
 public:
   BlueStore(CephContext *cct, const string& path);
   ~BlueStore();
index 6372742f3c2c69656159c789198ce4a97c1e7568..f519135e46e897d26d45cf46f37bddc07f0c5f9e 100644 (file)
@@ -2516,6 +2516,7 @@ TEST_P(StoreTest, SimpleCloneTest) {
 
   ghobject_t hoid2(hobject_t(sobject_t("Object 2", CEPH_NOSNAP),
                             "key", 123, -1, ""));
+  ghobject_t hoid3(hobject_t(sobject_t("Object 3", CEPH_NOSNAP)));
   {
     ObjectStore::Transaction t;
     t.clone(cid, hoid, hoid2);
@@ -2758,9 +2759,40 @@ TEST_P(StoreTest, SimpleCloneTest) {
     ASSERT_TRUE(bl_eq(rl, final));
   }
   {
+    //verify if non-empty collection is properly handled after store reload
+    r = store->umount();
+    ASSERT_EQ(r, 0);
+    r = store->mount();
+    ASSERT_EQ(r, 0);
+
     ObjectStore::Transaction t;
+    t.remove_collection(cid);
+    cerr << "Invalid rm coll" << std::endl;
+    EXPECT_DEATH(apply_transaction(store, &osr, std::move(t)), ".*Directory not empty.*");
+
+  }
+  {
+    //verify if non-empty collection is properly handled when there are some pending removes and live records in db
+    cerr << "Invalid rm coll again" << std::endl;
+
+    ObjectStore::Transaction t;
+    t.touch(cid, hoid3); //new record in db
     t.remove(cid, hoid);
     t.remove(cid, hoid2);
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    r = store->umount();
+    ASSERT_EQ(r, 0);
+    r = store->mount();
+    ASSERT_EQ(r, 0);
+
+    t.remove_collection(cid);
+    EXPECT_DEATH(apply_transaction(store, &osr, std::move(t)), ".*Directory not empty.*");
+  }
+  {
+    ObjectStore::Transaction t;
+    t.remove(cid, hoid3);
     t.remove_collection(cid);
     cerr << "Cleaning" << std::endl;
     r = apply_transaction(store, &osr, std::move(t));