}
}
+using get_omap_iterate_ertr =
+ crimson::os::FuturizedStore::Shard::read_errorator::extend<
+ crimson::ct_error::enodata>;
+using omap_iterate_cb_t = crimson::os::FuturizedStore::Shard::omap_iterate_cb_t;
static
-get_omap_iertr::future<
- std::tuple<bool, crimson::os::FuturizedStore::Shard::omap_values_t>>
-maybe_get_omap_vals(
+get_omap_iterate_ertr::future<ObjectStore::omap_iter_ret_t>
+maybe_do_omap_iterate(
crimson::os::FuturizedStore::Shard* store,
const crimson::os::CollectionRef& coll,
const object_info_t& oi,
- const std::string& start_after)
+ ObjectStore::omap_iter_seek_t start_from,
+ omap_iterate_cb_t callback)
{
if (oi.is_omap()) {
- return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after);
+ return store->omap_iterate(coll, ghobject_t{oi.soid}, start_from, callback);
} else {
return crimson::ct_error::enodata::make();
}
{
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: object does not exist: {}", os.oi.soid);
- return crimson::ct_error::enoent::make();
+ co_await ll_read_ierrorator::future<>(crimson::ct_error::enoent::make());
}
std::string start_after;
uint64_t max_return;
} catch (buffer::error&) {
throw crimson::osd::invalid_argument{};
}
+ uint64_t max_omap_entries = local_conf()->osd_max_omap_entries_per_request;
max_return =
- std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
-
-
- // TODO: truly chunk the reading
- return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible(
- [=,&delta_stats, &osd_op](auto ret) {
- ceph::bufferlist result;
- bool truncated = false;
+ std::min(max_return, max_omap_entries);
+
+ ceph::bufferlist result;
+ uint32_t num = 0;
+ bool truncated = false;
+ ObjectStore::omap_iter_seek_t start_from{start_after, ObjectStore::omap_iter_seek_t::UPPER_BOUND};
+ omap_iterate_cb_t callback = [&result, &num, &truncated, max_return]
+ (std::string_view key, std::string_view value)
+ {
+ if (num >= max_return) {
+ truncated = true;
+ return ObjectStore::omap_iter_ret_t::STOP;
+ }
+ encode(key, result);
+ ++num;
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ };
+
+ co_await maybe_do_omap_iterate(store, coll, os.oi, start_from, callback
+ ).safe_then([&delta_stats, &osd_op, &result, &num, &truncated](auto ret){
+ if (ret != ObjectStore::omap_iter_ret_t::STOP) {
+ logger().warn("omap_iterate not meet a stop condition");
+ }
+ encode(num, osd_op.outdata);
+ osd_op.outdata.claim_append(result);
+ encode(truncated, osd_op.outdata);
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+ delta_stats.num_rd++;
+ }).handle_error(
+ crimson::ct_error::enodata::handle([&osd_op] {
uint32_t num = 0;
- for (auto &[key, val] : std::get<1>(ret)) {
- if (num >= max_return ||
- result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
- truncated = true;
- break;
- }
- encode(key, result);
- ++num;
- }
+ bool truncated = false;
encode(num, osd_op.outdata);
- osd_op.outdata.claim_append(result);
encode(truncated, osd_op.outdata);
- delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
- delta_stats.num_rd++;
+ osd_op.rval = 0;
return seastar::now();
- }).handle_error_interruptible(
- crimson::ct_error::enodata::handle([&osd_op] {
- uint32_t num = 0;
- bool truncated = false;
- encode(num, osd_op.outdata);
- encode(truncated, osd_op.outdata);
- osd_op.rval = 0;
- return seastar::now();
- }),
- ll_read_errorator::pass_further{}
- );
+ }),
+ ll_read_errorator::pass_further{}
+ );
}
+
static
PGBackend::omap_cmp_ertr::future<> do_omap_val_cmp(
std::map<std::string, bufferlist, std::less<>> out,
{
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: object does not exist: {}", os.oi.soid);
- return crimson::ct_error::enoent::make();
+ co_await ll_read_ierrorator::future<>(crimson::ct_error::enoent::make());
}
std::string start_after;
uint64_t max_return;
throw crimson::osd::invalid_argument{};
}
+ uint64_t max_omap_entries = local_conf()->osd_max_omap_entries_per_request;
max_return = \
- std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
- delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
- delta_stats.num_rd++;
+ std::min(max_return, max_omap_entries);
+
+ ceph::bufferlist result;
+ uint32_t num = 0;
+ bool truncated = false;
+ ObjectStore::omap_iter_seek_t start_from = ObjectStore::omap_iter_seek_t::min_lower_bound();
+ start_from.seek_position = filter_prefix > start_after ? filter_prefix : start_after;
+ start_from.seek_type = filter_prefix > start_after ?
+ ObjectStore::omap_iter_seek_t::LOWER_BOUND :
+ ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+ omap_iterate_cb_t callback = [filter_prefix, max_return, &result, &num, &truncated]
+ (std::string_view key, std::string_view value)
+ {
+ if (num >= max_return) {
+ truncated = true;
+ }
+ if (key.substr(0, filter_prefix.size()) != filter_prefix || truncated == true) {
+ return ObjectStore::omap_iter_ret_t::STOP;
+ }
- // TODO: truly chunk the reading
- return maybe_get_omap_vals(store, coll, os.oi, start_after)
- .safe_then_interruptible(
- [=, &osd_op] (auto&& ret) {
- auto [done, vals] = std::move(ret);
- assert(done);
- ceph::bufferlist result;
- bool truncated = false;
- uint32_t num = 0;
- auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix)
- : std::begin(vals);
- for (; iter != std::end(vals); ++iter) {
- const auto& [key, value] = *iter;
- if (key.substr(0, filter_prefix.size()) != filter_prefix) {
- break;
- } else if (num >= max_return ||
- result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
- truncated = true;
- break;
- }
- encode(key, result);
- encode(value, result);
- ++num;
- }
- encode(num, osd_op.outdata);
- osd_op.outdata.claim_append(result);
- encode(truncated, osd_op.outdata);
+ encode(key, result);
+ encode(value, result);
+ ++num;
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ };
+
+ co_await maybe_do_omap_iterate(store, coll, os.oi, start_from, callback
+ ).safe_then([&osd_op, &delta_stats, &result, &num, &truncated](auto ret) {
+ if (ret != ObjectStore::omap_iter_ret_t::STOP) {
+ logger().warn("omap_iterate not meet a stop condition");
+ }
+ encode(num, osd_op.outdata);
+ osd_op.outdata.claim_append(result);
+ encode(truncated, osd_op.outdata);
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+ delta_stats.num_rd++;
+ }).handle_error(
+ crimson::ct_error::enodata::handle([&osd_op] {
+ encode(uint32_t{0} /* num */, osd_op.outdata);
+ encode(bool{false} /* truncated */, osd_op.outdata);
+ osd_op.rval = 0;
return ll_read_errorator::now();
- }).handle_error_interruptible(
- crimson::ct_error::enodata::handle([&osd_op] {
- encode(uint32_t{0} /* num */, osd_op.outdata);
- encode(bool{false} /* truncated */, osd_op.outdata);
- osd_op.rval = 0;
- return ll_read_errorator::now();
- }),
- ll_read_errorator::pass_further{}
- );
+ }),
+ ll_read_errorator::pass_further{}
+ );
}
PGBackend::ll_read_ierrorator::future<>