const PGBackend& backend,
const hobject_t& lower_bound,
const std::string& nspace,
- const uint64_t limit)
+ const uint64_t limit,
+ const PGLSFilter* const filter)
{
if (!(lower_bound.is_min() ||
lower_bound.is_max() ||
throw std::invalid_argument("outside of PG bounds");
}
+ using entries_t = decltype(pg_ls_response_t::entries);
return backend.list_objects(lower_bound, limit).then(
- [pg_end, nspace, &backend](auto objects, auto next) {
+ [&backend, filter, nspace](auto objects, auto next) {
+ return seastar::when_all_succeed(
+ seastar::map_reduce(std::move(objects),
+ [&backend, filter, nspace](const hobject_t& obj) {
+ if (obj.get_namespace() == nspace) {
+ if (filter) {
+ return pgls_filter(*filter, backend, obj);
+ } else {
+ return seastar::make_ready_future<hobject_t>(obj);
+ }
+ } else {
+ return seastar::make_ready_future<hobject_t>(hobject_t{});
+ }
+ },
+ entries_t{},
+ [](entries_t&& entries, hobject_t obj) {
+ if (!obj.is_min()) {
+ entries.emplace_back(obj.oid, obj.get_key());
+ }
+ return entries;
+ }),
+ seastar::make_ready_future<hobject_t>(next));
+ }).then([pg_end](entries_t entries, hobject_t next) {
pg_ls_response_t response;
response.handle = next.is_max() ? pg_end : next;
- auto in_my_namespace = [&nspace](const hobject_t& obj) {
- return obj.get_namespace() == nspace;
- };
- auto to_entry = [] (const hobject_t& hobj) {
- return make_pair(hobj.oid, hobj.get_key());
- };
- boost::push_back(response.entries,
- objects |
- boost::adaptors::filtered(in_my_namespace) |
- boost::adaptors::transformed(to_entry));
+ response.entries = std::move(entries);
ceph::bufferlist out;
encode(response, out);
logger().debug("{}: response.entries.size()=",
pg.get_backend(),
lower_bound,
nspace,
- osd_op.op.pgls.count)
+ osd_op.op.pgls.count,
+ nullptr /* no filter */)
.then([&osd_op](bufferlist bl) {
osd_op.outdata = std::move(bl);
return seastar::now();
});
}
+static seastar::future<> do_pgls_filtered(
+ const PG& pg,
+ const std::string& nspace,
+ OSDOp& osd_op)
+{
+ std::string cname, mname, type;
+ auto bp = osd_op.indata.cbegin();
+ try {
+ ceph::decode(cname, bp);
+ ceph::decode(mname, bp);
+ ceph::decode(type, bp);
+ } catch (const buffer::error&) {
+ throw ceph::osd::invalid_argument{};
+ }
+
+ auto filter = get_pgls_filter(type, bp);
+
+ hobject_t lower_bound;
+ try {
+ lower_bound.decode(bp);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument("unable to decode PGLS_FILTER description");
+ }
+
+ logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
+ __func__, cname, mname, type, lower_bound,
+ static_cast<const void*>(filter.get()));
+ return seastar::do_with(std::move(filter),
+ [&, lower_bound=std::move(lower_bound)](auto&& filter) {
+ const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+ const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ return do_pgls_common(pg_start,
+ pg_end,
+ pg.get_backend(),
+ lower_bound,
+ nspace,
+ osd_op.op.pgls.count,
+ filter.get())
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+ });
+}
+
seastar::future<>
OpsExecuter::execute_pg_op(OSDOp& osd_op)
{
return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
return do_pgls(pg, nspace, osd_op);
});
+ case CEPH_OSD_OP_PGLS_FILTER:
+ return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
+ return do_pgls_filtered(pg, nspace, osd_op);
+ });
case CEPH_OSD_OP_PGNLS:
return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
return do_pgnls(pg, nspace, osd_op);