{
Collection *c = static_cast<Collection*>(c_.get());
dout(15) << __func__ << " " << c->cid
- << " start " << start << " end " << end << " max " << max << dendl;
+ << " start " << start << " end " << end << " max " << max << dendl;
+ int r;
+ {
+ RWLock::RLocker l(c->lock);
+ r = _collection_list(c, start, end, sort_bitwise, max, ls, pnext);
+ }
+
+ c->trim_cache();
+ dout(10) << __func__ << " " << c->cid
+ << " start " << start << " end " << end << " max " << max
+ << " = " << r << ", ls.size() = " << ls->size()
+ << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
+ return r;
+}
+
+int BlueStore::_collection_list(
+ Collection* c, ghobject_t start, ghobject_t end,
+ bool sort_bitwise, int max,
+ vector<ghobject_t> *ls, ghobject_t *pnext)
+{
+
if (!c->exists)
return -ENOENT;
if (!sort_bitwise)
int r = 0;
ghobject_t static_next;
- {
- RWLock::RLocker l(c->lock);
- KeyValueDB::Iterator it;
- string temp_start_key, temp_end_key;
- string start_key, end_key;
- bool set_next = false;
- string pend;
- bool temp;
-
- if (!pnext)
- pnext = &static_next;
-
- if (start == ghobject_t::get_max() ||
- start.hobj.is_max()) {
- *pnext = ghobject_t::get_max();
- goto out;
- }
- get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
- &start_key, &end_key);
- dout(20) << __func__
- << " range " << pretty_binary_string(temp_start_key)
- << " to " << pretty_binary_string(temp_end_key)
- << " and " << pretty_binary_string(start_key)
- << " to " << pretty_binary_string(end_key)
- << " start " << start << dendl;
- it = db->get_iterator(PREFIX_OBJ);
- if (start == ghobject_t() ||
- start.hobj == hobject_t() ||
- start == c->cid.get_min_hobj()) {
- it->upper_bound(temp_start_key);
+ KeyValueDB::Iterator it;
+ string temp_start_key, temp_end_key;
+ string start_key, end_key;
+ bool set_next = false;
+ string pend;
+ bool temp;
+
+ if (!pnext)
+ pnext = &static_next;
+
+ if (start == ghobject_t::get_max() ||
+ start.hobj.is_max()) {
+ *pnext = ghobject_t::get_max();
+ goto out;
+ }
+ get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
+ &start_key, &end_key);
+ dout(20) << __func__
+ << " range " << pretty_binary_string(temp_start_key)
+ << " to " << pretty_binary_string(temp_end_key)
+ << " and " << pretty_binary_string(start_key)
+ << " to " << pretty_binary_string(end_key)
+ << " start " << start << dendl;
+ it = db->get_iterator(PREFIX_OBJ);
+ if (start == ghobject_t() ||
+ start.hobj == hobject_t() ||
+ start == c->cid.get_min_hobj()) {
+ it->upper_bound(temp_start_key);
+ temp = true;
+ } else {
+ string k;
+ get_object_key(start, &k);
+ if (start.hobj.is_temp()) {
temp = true;
+ assert(k >= temp_start_key && k < temp_end_key);
} else {
- string k;
- get_object_key(start, &k);
- if (start.hobj.is_temp()) {
- temp = true;
- assert(k >= temp_start_key && k < temp_end_key);
- } else {
- temp = false;
- assert(k >= start_key && k < end_key);
- }
- dout(20) << " start from " << pretty_binary_string(k)
- << " temp=" << (int)temp << dendl;
- it->lower_bound(k);
+ temp = false;
+ assert(k >= start_key && k < end_key);
}
- if (end.hobj.is_max()) {
- pend = temp ? temp_end_key : end_key;
+ dout(20) << " start from " << pretty_binary_string(k)
+ << " temp=" << (int)temp << dendl;
+ it->lower_bound(k);
+ }
+ if (end.hobj.is_max()) {
+ pend = temp ? temp_end_key : end_key;
+ } else {
+ get_object_key(end, &end_key);
+ if (end.hobj.is_temp()) {
+ if (temp)
+ pend = end_key;
+ else
+ goto out;
} else {
- get_object_key(end, &end_key);
- if (end.hobj.is_temp()) {
- if (temp)
- pend = end_key;
- else
- goto out;
- } else {
- pend = temp ? temp_end_key : end_key;
- }
- }
- dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
- while (true) {
- if (!it->valid() || it->key() > pend) {
- if (!it->valid())
- dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
- else
- dout(20) << __func__ << " key " << pretty_binary_string(it->key())
- << " > " << end << dendl;
- if (temp) {
- if (end.hobj.is_temp()) {
- break;
- }
- dout(30) << __func__ << " switch to non-temp namespace" << dendl;
- temp = false;
- it->upper_bound(start_key);
- pend = end_key;
- dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
- continue;
- }
- break;
- }
- dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
- if (is_extent_shard_key(it->key())) {
- it->next();
+ pend = temp ? temp_end_key : end_key;
+ }
+ }
+ dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
+ while (true) {
+ if (!it->valid() || it->key() > pend) {
+ if (!it->valid())
+ dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
+ else
+ dout(20) << __func__ << " key " << pretty_binary_string(it->key())
+ << " > " << end << dendl;
+ if (temp) {
+ if (end.hobj.is_temp()) {
+ break;
+ }
+ dout(30) << __func__ << " switch to non-temp namespace" << dendl;
+ temp = false;
+ it->upper_bound(start_key);
+ pend = end_key;
+ dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
continue;
}
- ghobject_t oid;
- int r = get_key_object(it->key(), &oid);
- assert(r == 0);
- if (ls->size() >= (unsigned)max) {
- dout(20) << __func__ << " reached max " << max << dendl;
- *pnext = oid;
- set_next = true;
- break;
- }
- ls->push_back(oid);
+ break;
+ }
+ dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
+ if (is_extent_shard_key(it->key())) {
it->next();
+ continue;
}
- if (!set_next) {
- *pnext = ghobject_t::get_max();
+ ghobject_t oid;
+ int r = get_key_object(it->key(), &oid);
+ assert(r == 0);
+ if (ls->size() >= (unsigned)max) {
+ dout(20) << __func__ << " reached max " << max << dendl;
+ *pnext = oid;
+ set_next = true;
+ break;
}
+ ls->push_back(oid);
+ it->next();
+ }
+ if (!set_next) {
+ *pnext = ghobject_t::get_max();
}
- out:
- c->trim_cache();
- dout(10) << __func__ << " " << c->cid
- << " start " << start << " end " << end << " max " << max
- << " = " << r << ", ls.size() = " << ls->size()
- << ", next = " << *pnext << dendl;
+out:
return r;
}
r = -ENOENT;
goto out;
}
+ size_t nonexistent_count = 0;
assert((*c)->exists);
if ((*c)->onode_map.map_any([&](OnodeRef o) {
- if (o->exists) {
- dout(10) << __func__ << " " << o->oid << " " << o
- << " exists in onode_map" << dendl;
- return true;
- }
- return false;
- })) {
+ if (o->exists) {
+ dout(10) << __func__ << " " << o->oid << " " << o
+ << " exists in onode_map" << dendl;
+ return true;
+ }
+ ++nonexistent_count;
+ return false;
+ })) {
r = -ENOTEMPTY;
goto out;
}
- coll_map.erase(cid);
- txc->removed_collections.push_back(*c);
- (*c)->exists = false;
- c->reset();
+
+ vector<ghobject_t> ls;
+ ghobject_t next;
+ //Enumerate onodes in db, up to nonexistent_count + 1
+ // then check if all of them are marked as non-existent.
+ // Bypass the check if returned number is greater than nonexistent_count
+ r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(), true, nonexistent_count + 1,
+ &ls, &next);
+
+ if (r >= 0) {
+ bool exists = false; //ls.size() > nonexistent_count;
+ for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
+ dout(10) << __func__ << " oid " << *it << dendl;
+ auto onode = (*c)->onode_map.lookup(*it);
+ exists = !onode || onode->exists;
+ if (exists) {
+ dout(10) << __func__ << " " << *it
+ << " exists in db" << dendl;
+ }
+ }
+ if (!exists) {
+ coll_map.erase(cid);
+ txc->removed_collections.push_back(*c);
+ (*c)->exists = false;
+ c->reset();
+ txc->t->rmkey(PREFIX_COLL, stringify(cid));
+ r = 0;
+ } else {
+ dout(10) << __func__ << " " << cid
+ << " is non-empty" << dendl;
+ r = -ENOTEMPTY;
+ }
+ }
}
- txc->t->rmkey(PREFIX_COLL, stringify(cid));
- r = 0;
out:
dout(10) << __func__ << " " << cid << " = " << r << dendl;