From c25a9d7233e8c67e0934f872139f96592d80e8ff Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 18 Sep 2019 00:13:27 +0800 Subject: [PATCH] crimson/osd: add pgls_filter support it's modeled after pgnls_filter. and use seastar::map_reduce() for filtering the objects into the response. Signed-off-by: Kefu Chai --- src/crimson/osd/ops_executer.cc | 91 ++++++++++++++++++++++++++++----- 1 file changed, 78 insertions(+), 13 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index d3d009c4b1dd2..13f6f086c4eac 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -442,7 +442,8 @@ static seastar::future do_pgls_common( const PGBackend& backend, const hobject_t& lower_bound, const std::string& nspace, - const uint64_t limit) + const uint64_t limit, + const PGLSFilter* const filter) { if (!(lower_bound.is_min() || lower_bound.is_max() || @@ -451,20 +452,34 @@ static seastar::future do_pgls_common( throw std::invalid_argument("outside of PG bounds"); } + using entries_t = decltype(pg_ls_response_t::entries); return backend.list_objects(lower_bound, limit).then( - [pg_end, nspace, &backend](auto objects, auto next) { + [&backend, filter, nspace](auto objects, auto next) { + return seastar::when_all_succeed( + seastar::map_reduce(std::move(objects), + [&backend, filter, nspace](const hobject_t& obj) { + if (obj.get_namespace() == nspace) { + if (filter) { + return pgls_filter(*filter, backend, obj); + } else { + return seastar::make_ready_future(obj); + } + } else { + return seastar::make_ready_future(hobject_t{}); + } + }, + entries_t{}, + [](entries_t&& entries, hobject_t obj) { + if (!obj.is_min()) { + entries.emplace_back(obj.oid, obj.get_key()); + } + return entries; + }), + seastar::make_ready_future(next)); + }).then([pg_end](entries_t entries, hobject_t next) { pg_ls_response_t response; response.handle = next.is_max() ? pg_end : next; - auto in_my_namespace = [&nspace](const hobject_t& obj) { - return obj.get_namespace() == nspace; - }; - auto to_entry = [] (const hobject_t& hobj) { - return make_pair(hobj.oid, hobj.get_key()); - }; - boost::push_back(response.entries, - objects | - boost::adaptors::filtered(in_my_namespace) | - boost::adaptors::transformed(to_entry)); + response.entries = std::move(entries); ceph::bufferlist out; encode(response, out); logger().debug("{}: response.entries.size()=", @@ -493,13 +508,59 @@ static seastar::future<> do_pgls( pg.get_backend(), lower_bound, nspace, - osd_op.op.pgls.count) + osd_op.op.pgls.count, + nullptr /* no filter */) .then([&osd_op](bufferlist bl) { osd_op.outdata = std::move(bl); return seastar::now(); }); } +static seastar::future<> do_pgls_filtered( + const PG& pg, + const std::string& nspace, + OSDOp& osd_op) +{ + std::string cname, mname, type; + auto bp = osd_op.indata.cbegin(); + try { + ceph::decode(cname, bp); + ceph::decode(mname, bp); + ceph::decode(type, bp); + } catch (const buffer::error&) { + throw ceph::osd::invalid_argument{}; + } + + auto filter = get_pgls_filter(type, bp); + + hobject_t lower_bound; + try { + lower_bound.decode(bp); + } catch (const buffer::error&) { + throw std::invalid_argument("unable to decode PGLS_FILTER description"); + } + + logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}", + __func__, cname, mname, type, lower_bound, + static_cast(filter.get())); + return seastar::do_with(std::move(filter), + [&, lower_bound=std::move(lower_bound)](auto&& filter) { + const auto pg_start = pg.get_pgid().pgid.get_hobj_start(); + const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); + return do_pgls_common(pg_start, + pg_end, + pg.get_backend(), + lower_bound, + nspace, + osd_op.op.pgls.count, + filter.get()) + .then([&osd_op](bufferlist bl) { + osd_op.outdata = std::move(bl); + return seastar::now(); + }); + }); +} + seastar::future<> OpsExecuter::execute_pg_op(OSDOp& osd_op) { @@ -509,6 +570,10 @@ OpsExecuter::execute_pg_op(OSDOp& osd_op) return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) { return do_pgls(pg, nspace, osd_op); }); + case CEPH_OSD_OP_PGLS_FILTER: + return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) { + return do_pgls_filtered(pg, nspace, osd_op); + }); case CEPH_OSD_OP_PGNLS: return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) { return do_pgnls(pg, nspace, osd_op); -- 2.39.5