_name(name), threshold(_threshold) { }
void call(ObjectStore *store, coll_t coll, ghobject_t &ghobj, object_info_t &oi) override {
- ObjectMap::ObjectMapIterator iter;
auto start1 = mono_clock::now();
- ceph::signedspan first_seek_time = start1 - start1;
- ceph::signedspan last_seek_time = first_seek_time;
- ceph::signedspan total_time = first_seek_time;
+ ceph::signedspan first_seek_time{ceph::signedspan::zero()};
+ ceph::signedspan last_seek_time{ceph::signedspan::zero()};
+ ceph::signedspan total_time{ceph::signedspan::zero()};
{
auto ch = store->open_collection(coll);
- iter = store->get_omap_iterator(ch, ghobj);
- if (!iter) {
+ const auto result = store->omap_iterate(
+ ch, ghobj,
+ ObjectStore::omap_iter_seek_t::min_lower_bound(),
+ [first_seek_began=mono_clock::now(),
+ &first_seek_time,
+ last_seek_began=mono_clock::now(),
+ &last_seek_time]
+ (std::string_view, std::string_view) mutable {
+ if (first_seek_time == ceph::signedspan::zero()) {
+ first_seek_time = mono_clock::now() - first_seek_began;
+ }
+ last_seek_time = mono_clock::now() - last_seek_began;
+ // carry to the next round if any
+ last_seek_began = mono_clock::now();
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ });
+ if (result < 0) {
cerr << "omap_get_iterator: " << cpp_strerror(ENOENT)
<< " obj:" << ghobj
<< std::endl;
return;
}
- auto start = mono_clock::now();
- iter->seek_to_first();
- first_seek_time = mono_clock::now() - start;
-
- while(iter->valid()) {
- start = mono_clock::now();
- iter->next();
- last_seek_time = mono_clock::now() - start;
- }
}
if (coll != last_coll) {
ceph_assert(info.last_update.version > max_entries);
version_t trim_to = info.last_update.version - max_entries;
- size_t trim_at_once = g_ceph_context->_conf->osd_pg_log_trim_max;
eversion_t new_tail;
bool done = false;
// gather keys so we can delete them in a batch without
// affecting the iterator
set<string> keys_to_trim;
- {
- ObjectMap::ObjectMapIterator p = store->get_omap_iterator(ch, oid);
- if (!p)
- break;
- for (p->seek_to_first(); p->valid(); p->next()) {
- if (p->key()[0] == '_')
- continue;
- if (p->key() == "can_rollback_to")
- continue;
- if (p->key() == "divergent_priors")
- continue;
- if (p->key() == "rollback_info_trimmed_to")
- continue;
- if (p->key() == "may_include_deletes_in_missing")
- continue;
- if (p->key().substr(0, 7) == string("missing"))
- continue;
- if (p->key().substr(0, 4) == string("dup_"))
- continue;
-
- bufferlist bl = p->value();
- auto bp = bl.cbegin();
- pg_log_entry_t e;
- try {
- e.decode_with_checksum(bp);
- } catch (const buffer::error &e) {
- cerr << "Error reading pg log entry: " << e.what() << std::endl;
- }
- if (debug) {
- cerr << "read entry " << e << std::endl;
- }
- if (e.version.version > trim_to) {
- done = true;
- break;
- }
- keys_to_trim.insert(p->key());
- new_tail = e.version;
- if (keys_to_trim.size() >= trim_at_once)
- break;
- }
+ const auto result = store->omap_iterate(
+ ch, oid,
+ ObjectStore::omap_iter_seek_t::min_lower_bound(),
+ [&keys_to_trim, &new_tail, &done, trim_to,
+ trim_at_once=g_ceph_context->_conf->osd_pg_log_trim_max]
+ (std::string_view key, std::string_view value) mutable {
+ if (key[0] == '_')
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "can_rollback_to")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "divergent_priors")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "rollback_info_trimmed_to")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "may_include_deletes_in_missing")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key.substr(0, 7) == string("missing"))
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key.substr(0, 4) == string("dup_"))
+ return ObjectStore::omap_iter_ret_t::NEXT;
- if (!p->valid())
+ bufferlist bl;
+ bl.append(value); // avoidable memcpy
+ auto bp = bl.cbegin();
+ pg_log_entry_t e;
+ try {
+ e.decode_with_checksum(bp);
+ } catch (const buffer::error &e) {
+ cerr << "Error reading pg log entry: " << e.what() << std::endl;
+ }
+ if (debug) {
+ cerr << "read entry " << e << std::endl;
+ }
+ if (e.version.version > trim_to) {
+ done = true; // terminate the main loop, not just omap_iterate
+ return ObjectStore::omap_iter_ret_t::STOP;
+ }
+ keys_to_trim.insert(std::string{key});
+ new_tail = e.version;
+ if (keys_to_trim.size() >= trim_at_once) {
+ return ObjectStore::omap_iter_ret_t::STOP;
+ }
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ });
+ if (result < 0) {
+ break;
+ } else if (const auto more = static_cast<bool>(result); !more) {
done = true;
- } // deconstruct ObjectMapIterator
+ }
// delete the keys
if (!dry_run && !keys_to_trim.empty()) {
size_t num_removed = 0;
do {
set<string> keys_to_trim;
- {
- ObjectMap::ObjectMapIterator p = store->get_omap_iterator(ch, oid);
- if (!p)
+ const auto result = store->omap_iterate(
+ ch, oid,
+ ObjectStore::omap_iter_seek_t::min_lower_bound(),
+ [&keys_to_keep, &keys_to_trim, max_dup_entries, max_chunk_size]
+ (std::string_view key, std::string_view value) mutable {
+ if (key[0] == '_')
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "can_rollback_to")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "divergent_priors")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "rollback_info_trimmed_to")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key == "may_include_deletes_in_missing")
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key.substr(0, 7) == string("missing"))
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ if (key.substr(0, 4) != string("dup_"))
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ keys_to_keep.insert(std::string{key});
+ if (keys_to_keep.size() > max_dup_entries) {
+ auto oldest_to_keep = keys_to_keep.begin();
+ keys_to_trim.emplace(*oldest_to_keep);
+ keys_to_keep.erase(oldest_to_keep);
+ }
+ if (keys_to_trim.size() >= max_chunk_size) {
+ return ObjectStore::omap_iter_ret_t::STOP;
+ }
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ });
+ if (result < 0) {
break;
- for (p->seek_to_first(); p->valid(); p->next()) {
- if (p->key()[0] == '_')
- continue;
- if (p->key() == "can_rollback_to")
- continue;
- if (p->key() == "divergent_priors")
- continue;
- if (p->key() == "rollback_info_trimmed_to")
- continue;
- if (p->key() == "may_include_deletes_in_missing")
- continue;
- if (p->key().substr(0, 7) == string("missing"))
- continue;
- if (p->key().substr(0, 4) != string("dup_"))
- continue;
- keys_to_keep.insert(p->key());
- if (keys_to_keep.size() > max_dup_entries) {
- auto oldest_to_keep = keys_to_keep.begin();
- keys_to_trim.emplace(*oldest_to_keep);
- keys_to_keep.erase(oldest_to_keep);
- }
- if (keys_to_trim.size() >= max_chunk_size) {
- break;
- }
}
- } // deconstruct ObjectMapIterator
// delete the keys
num_removed = keys_to_trim.size();
if (!dry_run && !keys_to_trim.empty()) {
}
const int OMAP_BATCH_SIZE = 25;
-void get_omap_batch(ObjectMap::ObjectMapIterator &iter, map<string, bufferlist> &oset)
+bool fill_omap_batch(std::string_view key, std::string_view value, map<string, bufferlist> &oset)
{
+ oset[std::string{key}].append(value);
+ return oset.size() < OMAP_BATCH_SIZE;
+}
+
+template <class F>
+void flush_omap_batch(map<string, bufferlist> &oset, int& mapcount, F f)
+{
+ ceph_assert(oset.size() <= OMAP_BATCH_SIZE);
+ mapcount += oset.size();
+ f(oset);
oset.clear();
- for (int count = OMAP_BATCH_SIZE; count && iter->valid(); --count, iter->next()) {
- oset.insert(pair<string, bufferlist>(iter->key(), iter->value()));
- }
}
int ObjectStoreTool::export_file(ObjectStore *store, coll_t cid, ghobject_t &obj, bool force)
if (ret)
return ret;
- ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(ch, obj);
- if (!iter) {
+ int mapcount = 0;
+ map<string, bufferlist> out;
+ const auto result = store->omap_iterate(
+ ch, obj,
+ ObjectStore::omap_iter_seek_t::min_lower_bound(),
+ [&mapcount, &out, &ret, this]
+ (std::string_view key, std::string_view value) mutable {
+ if (fill_omap_batch(key, value, out)) {
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ }
+ flush_omap_batch(out, mapcount, [&ret, this] (auto &oset) {
+ omap_section oms(oset);
+ ret = write_section(TYPE_OMAP, oms, file_fd);
+ });
+ // stop on error. will be handled after the last flush
+ return ret ? ObjectStore::omap_iter_ret_t::STOP
+ : ObjectStore::omap_iter_ret_t::NEXT;
+ });
+ if (result < 0) {
ret = -ENOENT;
cerr << "omap_get_iterator: " << cpp_strerror(ret) << std::endl;
return ret;
+ } else if (!out.empty()) {
+ flush_omap_batch(out, mapcount, [&ret, this] (auto &oset) {
+ omap_section oms(oset);
+ ret = write_section(TYPE_OMAP, oms, file_fd);
+ });
}
- iter->seek_to_first();
- int mapcount = 0;
- map<string, bufferlist> out;
- while(iter->valid()) {
- get_omap_batch(iter, out);
-
- if (out.empty()) break;
-
- mapcount += out.size();
- omap_section oms(out);
- ret = write_section(TYPE_OMAP, oms, file_fd);
- if (ret)
- return ret;
- }
+ if (ret < 0)
+ return ret;
if (debug)
cerr << "omap map size " << mapcount << std::endl;
cerr << "Collection " << coll << " does not exist" << std::endl;
return -ENOENT;
}
- ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(ch, ghobj);
- if (!iter) {
- cerr << "omap_get_iterator: " << cpp_strerror(ENOENT) << std::endl;
+ const auto result = store->omap_iterate(
+ ch, ghobj,
+ ObjectStore::omap_iter_seek_t::min_lower_bound(),
+ [] (std::string_view key, std::string_view) {
+ if (outistty) {
+ std::string tmp{key};
+ cout << cleanbin(tmp) << std::endl;
+ } else {
+ cout << key << std::endl;
+ }
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ });
+ if (result < 0) {
+ cerr << "omap_get_iterator: " << cpp_strerror(result) << std::endl;
return -ENOENT;
}
- iter->seek_to_first();
- map<string, bufferlist> oset;
- while(iter->valid()) {
- get_omap_batch(iter, oset);
-
- for (map<string,bufferlist>::iterator i = oset.begin();i != oset.end(); ++i) {
- string key(i->first);
- if (outistty)
- key = cleanbin(key);
- cout << key << std::endl;
- }
- }
return 0;
}