From 10fca231e29f4e63bd6386176e92ae98621e3ff9 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Wed, 7 Aug 2019 19:59:27 +0200 Subject: [PATCH] crimson/osd: implement PGLSFilter. Signed-off-by: Radoslaw Zarzynski --- src/crimson/osd/ops_executer.cc | 229 +++++++++++++++++++++++++++----- src/crimson/osd/ops_executer.h | 16 ++- src/crimson/osd/pg_backend.cc | 18 ++- src/crimson/osd/pg_backend.h | 3 + 4 files changed, 227 insertions(+), 39 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index d668dbd12b6..2d34e0dcf8d 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include @@ -100,51 +100,222 @@ seastar::future<> OpsExecuter::do_op_call(OSDOp& osd_op) throw ceph::osd::make_error(ret); } }); +} + +static inline std::unique_ptr get_pgls_filter( + const std::string& type, + bufferlist::const_iterator& iter) +{ + // storing non-const PGLSFilter for the sake of ::init() + std::unique_ptr filter; + if (type.compare("plain") == 0) { + //filter = std::make_unique(); + ::operation_not_supported(); + } else { + std::size_t dot = type.find("."); + if (dot == type.npos || dot == 0 || dot == type.size() - 1) { + throw ceph::osd::invalid_argument{}; + } + + const std::string class_name = type.substr(0, dot); + const std::string filter_name = type.substr(dot + 1); + ClassHandler::ClassData *cls = nullptr; + int r = ClassHandler::get_instance().open_class(class_name, &cls); + if (r != 0) { + logger().warn("can't open class {}: {}", class_name, cpp_strerror(r)); + if (r == -EPERM) { + // propogate permission error + throw ceph::osd::permission_denied{}; + } else { + throw ceph::osd::invalid_argument{}; + } + } else { + ceph_assert(cls); + } + + ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name); + if (class_filter == nullptr) { + logger().warn("can't find filter {} in class {}", filter_name, class_name); + throw ceph::osd::invalid_argument{}; + } + filter.reset(class_filter->fn()); + if (!filter) { + // Object classes are obliged to return us something, but let's + // give an error rather than asserting out. + logger().warn("buggy class {} failed to construct filter {}", + class_name, filter_name); + throw ceph::osd::invalid_argument{}; + } + } + + ceph_assert(filter); + int r = filter->init(iter); + if (r < 0) { + logger().warn("error initializing filter {}: {}", type, cpp_strerror(r)); + throw ceph::osd::invalid_argument{}; + } + + // successfully constructed and initialized, return it. + return filter; } -seastar::future -OpsExecuter::do_pgnls(ceph::bufferlist& indata, - const std::string& nspace, - uint64_t limit) + +seastar::future OpsExecuter::pgls_filter( + const PGLSFilter& filter, + const PGBackend& backend, + const hobject_t& sobj) { - hobject_t lower_bound; - try { - ceph::decode(lower_bound, indata); - } catch (const buffer::error& e) { - throw std::invalid_argument("unable to decode PGNLS handle"); + if (const auto xattr = filter.get_xattr(); !xattr.empty()) { + logger().debug("pgls_filter: filter is interested in xattr={} for obj={}", + xattr, sobj); + return backend.getxattr(sobj, xattr).then_wrapped( + [&filter, sobj] (auto futval) { + logger().debug("pgls_filter: got xvalue for obj={}", sobj); + + ceph::bufferlist val; + if (!futval.failed()) { + val.push_back(std::move(futval).get0()); + } else if (filter.reject_empty_xattr()) { + return seastar::make_ready_future(false, sobj); + } + const bool filtered = filter.filter(sobj, val); + return seastar::make_ready_future(filtered, sobj); + }); + } else { + ceph::bufferlist empty_lvalue_bl; + const bool filtered = filter.filter(sobj, empty_lvalue_bl); + return seastar::make_ready_future(filtered, sobj); } +} + +seastar::future +OpsExecuter::do_pgnls_common( + const hobject_t& lower_bound, + const std::string& nspace, + uint64_t limit, + const PGLSFilter* const filter) +{ const auto pg_start = pg.get_pgid().pgid.get_hobj_start(); - const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); + auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); if (!(lower_bound.is_min() || lower_bound.is_max() || (lower_bound >= pg_start && lower_bound < pg_end))) { // this should only happen with a buggy client. throw std::invalid_argument("outside of PG bounds"); } + return backend.list_objects(lower_bound, limit).then( - [lower_bound, pg_end, nspace](auto objects, auto next) { - auto in_my_namespace = [&nspace](const hobject_t& o) { + [this, filter, nspace](auto objects, auto next) { + auto in_my_namespace = [&nspace](const hobject_t& obj) { using ceph::common::local_conf; - if (o.get_namespace() == local_conf()->osd_hit_set_namespace) { + if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) { return false; } else if (nspace == librados::all_nspaces) { return true; } else { - return o.get_namespace() == nspace; + return obj.get_namespace() == nspace; + } + }; + auto to_pglsed = [this, filter] (const hobject_t& obj) { + // this transformation looks costly. However, I don't have any + // reason to think PGLS* operations are critical for, let's say, + // general performance. + // + // from tchaikov: "another way is to use seastar::map_reduce(), + // to 1) save the effort to filter the already filtered objects + // 2) avoid the space to keep the tuple even if + // the object is filtered out". + if (filter) { + return pgls_filter(*filter, obj); + } else { + return seastar::make_ready_future(true, obj); } }; + + auto range = objects | boost::adaptors::filtered(in_my_namespace) + | boost::adaptors::transformed(to_pglsed); + logger().debug("do_pgnls_common: finishing the 1st stage of pgls"); + return seastar::when_all_succeed(std::begin(range), + std::end(range)).then( + [next=std::move(next)] (auto items) mutable { + // the sole purpose of this chaining is to pass `next` to 2nd + // stage altogether with items + logger().debug("do_pgnls_common: 1st done"); + return seastar::make_ready_future( + std::move(items), std::move(next)); + }); + }).then( + [pg_end, filter] (const std::vector>& items, auto next) { + auto is_matched = [] (const auto& item) { + return std::get(item); + }; + auto to_entry = [] (const auto& item) { + const auto& obj = std::get(item); + return librados::ListObjectImpl{ + obj.get_namespace(), obj.oid.name, obj.get_key() + }; + }; + pg_nls_response_t response; - boost::copy(objects | - boost::adaptors::filtered(in_my_namespace) | - boost::adaptors::transformed([](const hobject_t& o) { - return librados::ListObjectImpl{o.get_namespace(), - o.oid.name, - o.get_key()}; }), - std::back_inserter(response.entries)); + boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched) + | boost::adaptors::transformed(to_entry)); response.handle = next.is_max() ? pg_end : next; - bufferlist bl; - encode(response, bl); - return seastar::make_ready_future(std::move(bl)); + ceph::bufferlist out; + encode(response, out); + logger().debug("{}: response.entries.size()=", + __func__, response.entries.size()); + return seastar::make_ready_future(std::move(out)); + }); +} + +seastar::future<> OpsExecuter::do_pgnls(OSDOp& osd_op) +{ + hobject_t lower_bound; + try { + ceph::decode(lower_bound, osd_op.indata); + } catch (const buffer::error&) { + throw std::invalid_argument("unable to decode PGNLS handle"); + } + return do_pgnls_common(lower_bound, os->oi.soid.get_namespace(), osd_op.op.pgls.count) + .then([&osd_op](bufferlist bl) { + osd_op.outdata = std::move(bl); + return seastar::now(); + }); +} + +seastar::future<> OpsExecuter::do_pgnls_filtered(OSDOp& osd_op) +{ + std::string cname, mname, type; + auto bp = osd_op.indata.cbegin(); + try { + ceph::decode(cname, bp); + ceph::decode(mname, bp); + ceph::decode(type, bp); + } catch (const buffer::error&) { + throw ceph::osd::invalid_argument{}; + } + + auto filter = get_pgls_filter(type, bp); + + hobject_t lower_bound; + try { + lower_bound.decode(bp); + } catch (const buffer::error&) { + throw std::invalid_argument("unable to decode PGNLS_FILTER description"); + } + + logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}", + __func__, cname, mname, type, lower_bound, + static_cast(filter.get())); + return seastar::do_with(std::move(filter), + [this, &osd_op, lower_bound=std::move(lower_bound)](auto&& filter) { + return do_pgnls_common(lower_bound, os->oi.soid.get_namespace(), + osd_op.op.pgls.count, filter.get()) + .then([&osd_op](bufferlist bl) { + osd_op.outdata = std::move(bl); + return seastar::now(); + }); }); } @@ -190,12 +361,10 @@ OpsExecuter::do_osd_op(OSDOp& osd_op) return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.setxattr(os, osd_op, txn); }); + case CEPH_OSD_OP_PGNLS_FILTER: + return do_pgnls_filtered(osd_op); case CEPH_OSD_OP_PGNLS: - return do_pgnls(osd_op.indata, os->oi.soid.get_namespace(), op.pgls.count) - .then([&osd_op](bufferlist bl) { - osd_op.outdata = std::move(bl); - return seastar::now(); - }); + return do_pgnls(osd_op); case CEPH_OSD_OP_DELETE: return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.remove(os, txn); diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index b5b1cd8b001..02e045b4325 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -26,6 +26,9 @@ #include "crimson/osd/pg.h" #include "crimson/osd/pg_backend.h" +class PGLSFilter; +class OSDOp; + namespace ceph::osd { class OpsExecuter { PGBackend::cached_os_t os; @@ -36,10 +39,17 @@ class OpsExecuter { size_t num_read = 0; ///< count read ops size_t num_write = 0; ///< count update ops - seastar::future do_pgnls( - ceph::bufferlist& indata, + seastar::future pgls_filter( + /*const*/PGLSFilter& filter, + const hobject_t& sobj); + seastar::future do_pgnls_common( + const hobject_t& lower_bound, const std::string& nspace, - uint64_t limit); + uint64_t limit, + PGLSFilter* filter = nullptr); + seastar::future<> do_pgnls(OSDOp& osd_op); + seastar::future<> do_pgnls_filtered(OSDOp& osd_op); + seastar::future<> do_op_call(class OSDOp& osd_op); template diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index e78c4ce4b93..30ec868ac38 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -438,15 +438,21 @@ seastar::future<> PGBackend::getxattr( bp.copy(osd_op.op.xattr.name_len, aname); name = "_" + aname; } - return store->get_attr(coll, ghobject_t{os.oi.soid}, name).then( - [&osd_op] (ceph::bufferptr val) { - osd_op.outdata.clear(); - osd_op.outdata.push_back(std::move(val)); - osd_op.op.xattr.value_len = osd_op.outdata.length(); - //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + return getxattr(os.oi.soid, name).then([&osd_op] (ceph::bufferptr val) { + osd_op.outdata.clear(); + osd_op.outdata.push_back(std::move(val)); + osd_op.op.xattr.value_len = osd_op.outdata.length(); + //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); }).handle_exception_type( [] (ceph::os::FuturizedStore::EnoentException& e) { return seastar::make_exception_future<>(ceph::osd::object_not_found{}); }); //ctx->delta_stats.num_rd++; } + +seastar::future PGBackend::getxattr( + const hobject_t& soid, + std::string_view key) +{ + return store->get_attr(coll, ghobject_t{soid}, key); +} diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index e88057757e2..6c40c09ec7f 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -79,6 +79,9 @@ public: seastar::future<> getxattr( const ObjectState& os, OSDOp& osd_op); + seastar::future getxattr( + const hobject_t& soid, + std::string_view key); virtual void got_rep_op_reply(const MOSDRepOpReply&) {} -- 2.39.5