]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/kstore: fix KStore rm_coll handler to behave similar to bluestore one
authorIgor Fedotov <ifedotov@mirantis.com>
Tue, 18 Oct 2016 13:42:12 +0000 (13:42 +0000)
committerIgor Fedotov <ifedotov@mirantis.com>
Tue, 18 Oct 2016 13:47:00 +0000 (13:47 +0000)
Signed-off-by: Igor Fedotov <ifedotov@mirantis.com>
src/os/kstore/KStore.cc
src/os/kstore/KStore.h

index 31af5c2755173e3cddcbedc5fa6d6a9b0f7c00aa..a04b6621dc189b84c80a56bc4f694e54d5e448bf 100755 (executable)
@@ -1397,14 +1397,42 @@ int KStore::collection_list(
   bool sort_bitwise, int max,
   vector<ghobject_t> *ls, ghobject_t *pnext)
 {
-  dout(15) << __func__ << " " << cid
-          << " start " << start << " end " << end << " max " << max << dendl;
-  if (!sort_bitwise)
-    return -EOPNOTSUPP;
-  CollectionRef c = _get_collection(cid);
+  CollectionHandle c = _get_collection(cid);
   if (!c)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  return collection_list(c, start, end, sort_bitwise, max, ls, pnext);
+}
+
+int KStore::collection_list(
+  CollectionHandle &c_, ghobject_t start, ghobject_t end,
+  bool sort_bitwise, int max,
+  vector<ghobject_t> *ls, ghobject_t *pnext)
+
+{
+  Collection *c = static_cast<Collection*>(c_.get());
+  dout(15) << __func__ << " " << c->cid
+           << " start " << start << " end " << end << " max " << max << dendl;
+  int r;
+  {
+    RWLock::RLocker l(c->lock);
+    r = _collection_list(c, start, end, sort_bitwise, max, ls, pnext);
+  }
+
+  dout(10) << __func__ << " " << c->cid
+    << " start " << start << " end " << end << " max " << max
+    << " = " << r << ", ls.size() = " << ls->size()
+    << ", next = " << (pnext ? *pnext : ghobject_t())  << dendl;
+  return r;
+}
+
+int KStore::_collection_list(
+  Collection* c, ghobject_t start, ghobject_t end,
+  bool sort_bitwise, int max,
+  vector<ghobject_t> *ls, ghobject_t *pnext)
+{
+  if (!sort_bitwise)
+    return -EOPNOTSUPP;
+
   int r = 0;
   KeyValueDB::Iterator it;
   string temp_start_key, temp_end_key;
@@ -1417,9 +1445,11 @@ int KStore::collection_list(
   if (!pnext)
     pnext = &static_next;
 
-  if (start == ghobject_t::get_max())
+  if (start == ghobject_t::get_max() ||
+    start.hobj.is_max()) {
     goto out;
-  get_coll_key_range(cid, c->cnode.bits, &temp_start_key, &temp_end_key,
+  }
+  get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
                     &start_key, &end_key);
   dout(20) << __func__
           << " range " << pretty_binary_string(temp_start_key)
@@ -1428,7 +1458,7 @@ int KStore::collection_list(
           << " to " << pretty_binary_string(end_key)
           << " start " << start << dendl;
   it = db->get_iterator(PREFIX_OBJ);
-  if (start == ghobject_t() || start == cid.get_min_hobj()) {
+  if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
     it->upper_bound(temp_start_key);
     temp = true;
   } else {
@@ -1492,14 +1522,10 @@ int KStore::collection_list(
     ls->push_back(oid);
     it->next();
   }
+out:
   if (!set_next) {
     *pnext = ghobject_t::get_max();
   }
- out:
-  dout(10) << __func__ << " " << cid
-          << " start " << start << " end " << end << " max " << max
-          << " = " << r << ", ls.size() = " << ls->size()
-          << ", next = " << *pnext << dendl;
   return r;
 }
 
@@ -2231,9 +2257,9 @@ void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
       break;
     }
     if (r < 0) {
-      dout(0) << " error " << cpp_strerror(r)
-             << " not handled on operation " << op->op
-             << " (op " << pos << ", counting from 0)" << dendl;
+      derr << " error " << cpp_strerror(r)
+          << " not handled on operation " << op->op
+          << " (op " << pos << ", counting from 0)" << dendl;
       dout(0) << " transaction dump:\n";
       JSONFormatter f(true);
       f.open_object_section("transaction");
@@ -3298,19 +3324,46 @@ int KStore::_remove_collection(TransContext *txc, coll_t cid,
       r = -ENOENT;
       goto out;
     }
-    pair<ghobject_t,OnodeRef> next;
-    while ((*c)->onode_map.get_next(next.first, &next)) {
-      if (next.second->exists) {
+    size_t nonexistent_count = 0;
+    pair<ghobject_t,OnodeRef> next_onode;
+    while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
+      if (next_onode.second->exists) {
        r = -ENOTEMPTY;
        goto out;
       }
+      ++nonexistent_count;
+    }
+    vector<ghobject_t> ls;
+    ghobject_t next;
+    // Enumerate onodes in db, up to nonexistent_count + 1
+    // then check if all of them are marked as non-existent.
+    // Bypass the check if returned number is greater than nonexistent_count
+    r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(), true,
+                         nonexistent_count + 1, &ls, &next);
+    if (r >= 0) {
+      bool exists = false; //ls.size() > nonexistent_count;
+      for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
+        dout(10) << __func__ << " oid " << *it << dendl;
+        auto onode = (*c)->onode_map.lookup(*it);
+        exists = !onode || onode->exists;
+        if (exists) {
+          dout(10) << __func__ << " " << *it
+                   << " exists in db" << dendl;
+        }
+      }
+      if (!exists) {
+        coll_map.erase(cid);
+        txc->removed_collections.push_back(*c);
+        c->reset();
+        txc->t->rmkey(PREFIX_COLL, stringify(cid));
+        r = 0;
+      } else {
+        dout(10) << __func__ << " " << cid
+                 << " is non-empty" << dendl;
+        r = -ENOTEMPTY;
+      }
     }
-    coll_map.erase(cid);
-    txc->removed_collections.push_back(*c);
-    c->reset();
   }
-  txc->t->rmkey(PREFIX_COLL, stringify(cid));
-  r = 0;
 
  out:
   dout(10) << __func__ << " " << cid << " = " << r << dendl;
index f04a3f16aa08106b8ccacc2439526464ded247d6..5f5aea6e5f184d4c09f308cd2698a24b04186796 100644 (file)
@@ -393,6 +393,9 @@ private:
                        uint64_t offset, bufferlist& bl);
   void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
 
+  int _collection_list(Collection *c, ghobject_t start, ghobject_t end,
+    bool sort_bitwise, int max, vector<ghobject_t> *ls, ghobject_t *next);
+
 public:
   KStore(CephContext *cct, const string& path);
   ~KStore();
@@ -465,10 +468,12 @@ public:
   bool collection_exists(const coll_t& c);
   int collection_empty(const coll_t& c, bool *empty);
 
-  using ObjectStore::collection_list;
   int collection_list(const coll_t& cid, ghobject_t start, ghobject_t end,
-                     bool sort_bitwise, int max,
-                     vector<ghobject_t> *ls, ghobject_t *next);
+             bool sort_bitwise, int max,
+             vector<ghobject_t> *ls, ghobject_t *next) override;
+  int collection_list(CollectionHandle &c, ghobject_t start, ghobject_t end,
+             bool sort_bitwise, int max,
+             vector<ghobject_t> *ls, ghobject_t *next) override;
 
   using ObjectStore::omap_get;
   int omap_get(