const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
- uint32_t op_flags)
+ uint32_t op_flags,
+ omap_iterate_conf_t on_conflict)
{
logger().debug("{} with_start", __func__);
assert(tp);
const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
- uint32_t op_flags = 0) final;
+ uint32_t op_flags = 0,
+ omap_iterate_conf_t on_conflict = nullptr) final;
seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
- uint32_t op_flags)
+ uint32_t op_flags,
+ omap_iterate_conf_t on_conflict)
-> CyanStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
{
assert(store_active);
const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
- uint32_t op_flags = 0
+ uint32_t op_flags = 0,
+ omap_iterate_conf_t on_conflict = nullptr
) final;
get_attr_errorator::future<ceph::bufferlist> omap_get_header(
* omap_iter_ret_t::NEXT means omap_iterate() reaches the end of omap tree
*/
using omap_iterate_cb_t = std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)>;
+ using omap_iterate_conf_t = std::function<ObjectStore::omap_iter_ret_t()>;
virtual read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
CollectionRef c, ///< [in] collection
const ghobject_t &oid, ///< [in] object
omap_iterate_cb_t callback,
///< [in] the callback function for each OMAP entry after start_from till end of the OMAP or
/// till the iteration is stopped by `STOP`.
- uint32_t op_flags = 0
+ uint32_t op_flags = 0,
+ omap_iterate_conf_t on_conflict = nullptr
) = 0;
virtual get_attr_errorator::future<bufferlist> omap_get_header(
const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
- uint32_t op_flags)
+ uint32_t op_flags,
+ omap_iterate_conf_t on_conflict)
{
assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
return seastar::do_with(
std::move(start_from),
- [this, ch, &oid, callback, op_flags] (auto &start_from)
+ uint32_t(0),
+ [this, ch, &oid, callback, op_flags, on_conflict] (
+ auto &start_from, auto &conflict_counter)
{
return repeat_with_onode<ObjectStore::omap_iter_ret_t>(
ch,
"omap_iterate",
op_type_t::OMAP_ITERATE,
op_flags,
- [this, &start_from, callback](auto &t, auto &onode)
+ [this, &start_from, callback, on_conflict, &conflict_counter](auto &t, auto &onode)
{
+ ceph_assert(conflict_counter < std::numeric_limits<uint32_t>::max());
+ conflict_counter++;
+ if (conflict_counter > 1 && on_conflict) {
+ // This means conflict occurs
+ auto ret = on_conflict();
+ if (ret == ObjectStore::omap_iter_ret_t::STOP) {
+ return base_iertr::make_ready_future<ObjectStore::omap_iter_ret_t>(ret);
+ }
+ }
auto root = select_log_omap_root(onode);
return omaptree_iterate(
t, std::move(root), start_from, callback);
const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
- uint32_t op_flags = 0) final;
+ uint32_t op_flags = 0,
+ omap_iterate_conf_t on_conflict = nullptr) final;
get_attr_errorator::future<bufferlist> omap_get_header(
CollectionRef c,
obj,
start_from,
callback,
- 0
+ 0,
+ nullptr
).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
assert(result == ObjectStore::omap_iter_ret_t::NEXT);
DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
{
if (oi.is_omap()) {
return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
- store, coll, ghobject_t{oi.soid}, start_from, callback, 0);
+ store, coll, ghobject_t{oi.soid}, start_from, callback, 0, nullptr);
} else {
return crimson::ct_error::enodata::make();
}
co_await interruptor::make_interruptible(
crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
shard_services.get_store(pg.get_store_index()),
- coll, ghobject_t{oid}, start_from, callback, 0
+ coll, ghobject_t{oid}, start_from, callback, 0, nullptr
).safe_then([&new_progress](auto ret) {
if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
new_progress.omap_complete = true;
ObjectStore::omap_iter_seek_t start_from{"", ObjectStore::omap_iter_seek_t::UPPER_BOUND};
+ std::map<std::string, ceph::bufferlist> kvs;
std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)> callback =
- [this] (std::string_view key, std::string_view value)
+ [&kvs] (std::string_view key, std::string_view value)
{
- ceph::bufferlist bl;
- bl.append(value);
- process_entry(key, bl);
+ ceph::bufferlist bl;
+ bl.append(value);
+ kvs[std::string(key)] = std::move(bl);
return ObjectStore::omap_iter_ret_t::NEXT;
};
+ std::function<ObjectStore::omap_iter_ret_t()> on_conflict =
+ [&kvs] ()
+ {
+ kvs.clear();
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ };
co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
- store,
- ch, pgmeta_oid, start_from, callback, 0
+ store,
+ ch, pgmeta_oid, start_from, callback, 0, on_conflict
).safe_then([] (auto ret) {
ceph_assert (ret == ObjectStore::omap_iter_ret_t::NEXT);
}).handle_error(
crimson::os::FuturizedStore::Shard::read_errorator::assert_all{}
);
+ for (auto &p : kvs) {
+ process_entry(p.first, p.second);
+ }
+
if (info.pgid.is_no_shard()) {
// replicated pool pg does not persist this key
ceph_assert(on_disk_rollback_info_trimmed_to == eversion_t());
};
return interruptor::green_get(
crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
- os, ch, hoid, start_from, callback, 0
+ os, ch, hoid, start_from, callback, 0, nullptr
).safe_then([FNAME, key] (auto ret) {
if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
DEBUG("key {} no more values", key);