#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/transformed.hpp>
-#include <boost/range/algorithm/copy.hpp>
+#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/algorithm/max_element.hpp>
#include <boost/range/numeric.hpp>
throw ceph::osd::make_error(ret);
}
});
+}
+
+static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
+ const std::string& type,
+ bufferlist::const_iterator& iter)
+{
+ // storing non-const PGLSFilter for the sake of ::init()
+ std::unique_ptr<PGLSFilter> filter;
+ if (type.compare("plain") == 0) {
+ //filter = std::make_unique<PGLSPlainFilter>();
+ ::operation_not_supported();
+ } else {
+ std::size_t dot = type.find(".");
+ if (dot == type.npos || dot == 0 || dot == type.size() - 1) {
+ throw ceph::osd::invalid_argument{};
+ }
+
+ const std::string class_name = type.substr(0, dot);
+ const std::string filter_name = type.substr(dot + 1);
+ ClassHandler::ClassData *cls = nullptr;
+ int r = ClassHandler::get_instance().open_class(class_name, &cls);
+ if (r != 0) {
+ logger().warn("can't open class {}: {}", class_name, cpp_strerror(r));
+ if (r == -EPERM) {
+ // propogate permission error
+ throw ceph::osd::permission_denied{};
+ } else {
+ throw ceph::osd::invalid_argument{};
+ }
+ } else {
+ ceph_assert(cls);
+ }
+
+ ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name);
+ if (class_filter == nullptr) {
+ logger().warn("can't find filter {} in class {}", filter_name, class_name);
+ throw ceph::osd::invalid_argument{};
+ }
+ filter.reset(class_filter->fn());
+ if (!filter) {
+ // Object classes are obliged to return us something, but let's
+ // give an error rather than asserting out.
+ logger().warn("buggy class {} failed to construct filter {}",
+ class_name, filter_name);
+ throw ceph::osd::invalid_argument{};
+ }
+ }
+
+ ceph_assert(filter);
+ int r = filter->init(iter);
+ if (r < 0) {
+ logger().warn("error initializing filter {}: {}", type, cpp_strerror(r));
+ throw ceph::osd::invalid_argument{};
+ }
+
+ // successfully constructed and initialized, return it.
+ return filter;
}
-seastar::future<ceph::bufferlist>
-OpsExecuter::do_pgnls(ceph::bufferlist& indata,
- const std::string& nspace,
- uint64_t limit)
+
+seastar::future<bool, hobject_t> OpsExecuter::pgls_filter(
+ const PGLSFilter& filter,
+ const PGBackend& backend,
+ const hobject_t& sobj)
{
- hobject_t lower_bound;
- try {
- ceph::decode(lower_bound, indata);
- } catch (const buffer::error& e) {
- throw std::invalid_argument("unable to decode PGNLS handle");
+ if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
+ logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
+ xattr, sobj);
+ return backend.getxattr(sobj, xattr).then_wrapped(
+ [&filter, sobj] (auto futval) {
+ logger().debug("pgls_filter: got xvalue for obj={}", sobj);
+
+ ceph::bufferlist val;
+ if (!futval.failed()) {
+ val.push_back(std::move(futval).get0());
+ } else if (filter.reject_empty_xattr()) {
+ return seastar::make_ready_future<bool, hobject_t>(false, sobj);
+ }
+ const bool filtered = filter.filter(sobj, val);
+ return seastar::make_ready_future<bool, hobject_t>(filtered, sobj);
+ });
+ } else {
+ ceph::bufferlist empty_lvalue_bl;
+ const bool filtered = filter.filter(sobj, empty_lvalue_bl);
+ return seastar::make_ready_future<bool, hobject_t>(filtered, sobj);
}
+}
+
+seastar::future<ceph::bufferlist>
+OpsExecuter::do_pgnls_common(
+ const hobject_t& lower_bound,
+ const std::string& nspace,
+ uint64_t limit,
+ const PGLSFilter* const filter)
+{
const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
- const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
if (!(lower_bound.is_min() ||
lower_bound.is_max() ||
(lower_bound >= pg_start && lower_bound < pg_end))) {
// this should only happen with a buggy client.
throw std::invalid_argument("outside of PG bounds");
}
+
return backend.list_objects(lower_bound, limit).then(
- [lower_bound, pg_end, nspace](auto objects, auto next) {
- auto in_my_namespace = [&nspace](const hobject_t& o) {
+ [this, filter, nspace](auto objects, auto next) {
+ auto in_my_namespace = [&nspace](const hobject_t& obj) {
using ceph::common::local_conf;
- if (o.get_namespace() == local_conf()->osd_hit_set_namespace) {
+ if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) {
return false;
} else if (nspace == librados::all_nspaces) {
return true;
} else {
- return o.get_namespace() == nspace;
+ return obj.get_namespace() == nspace;
+ }
+ };
+ auto to_pglsed = [this, filter] (const hobject_t& obj) {
+ // this transformation looks costly. However, I don't have any
+ // reason to think PGLS* operations are critical for, let's say,
+ // general performance.
+ //
+ // from tchaikov: "another way is to use seastar::map_reduce(),
+ // to 1) save the effort to filter the already filtered objects
+ // 2) avoid the space to keep the tuple<bool, object> even if
+ // the object is filtered out".
+ if (filter) {
+ return pgls_filter(*filter, obj);
+ } else {
+ return seastar::make_ready_future<bool, hobject_t>(true, obj);
}
};
+
+ auto range = objects | boost::adaptors::filtered(in_my_namespace)
+ | boost::adaptors::transformed(to_pglsed);
+ logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
+ return seastar::when_all_succeed(std::begin(range),
+ std::end(range)).then(
+ [next=std::move(next)] (auto items) mutable {
+ // the sole purpose of this chaining is to pass `next` to 2nd
+ // stage altogether with items
+ logger().debug("do_pgnls_common: 1st done");
+ return seastar::make_ready_future<decltype(items), decltype(next)>(
+ std::move(items), std::move(next));
+ });
+ }).then(
+ [pg_end, filter] (const std::vector<std::tuple<bool,
+ hobject_t>>& items, auto next) {
+ auto is_matched = [] (const auto& item) {
+ return std::get<bool>(item);
+ };
+ auto to_entry = [] (const auto& item) {
+ const auto& obj = std::get<hobject_t>(item);
+ return librados::ListObjectImpl{
+ obj.get_namespace(), obj.oid.name, obj.get_key()
+ };
+ };
+
pg_nls_response_t response;
- boost::copy(objects |
- boost::adaptors::filtered(in_my_namespace) |
- boost::adaptors::transformed([](const hobject_t& o) {
- return librados::ListObjectImpl{o.get_namespace(),
- o.oid.name,
- o.get_key()}; }),
- std::back_inserter(response.entries));
+ boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched)
+ | boost::adaptors::transformed(to_entry));
response.handle = next.is_max() ? pg_end : next;
- bufferlist bl;
- encode(response, bl);
- return seastar::make_ready_future<bufferlist>(std::move(bl));
+ ceph::bufferlist out;
+ encode(response, out);
+ logger().debug("{}: response.entries.size()=",
+ __func__, response.entries.size());
+ return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
+ });
+}
+
+seastar::future<> OpsExecuter::do_pgnls(OSDOp& osd_op)
+{
+ hobject_t lower_bound;
+ try {
+ ceph::decode(lower_bound, osd_op.indata);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument("unable to decode PGNLS handle");
+ }
+ return do_pgnls_common(lower_bound, os->oi.soid.get_namespace(), osd_op.op.pgls.count)
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+}
+
+seastar::future<> OpsExecuter::do_pgnls_filtered(OSDOp& osd_op)
+{
+ std::string cname, mname, type;
+ auto bp = osd_op.indata.cbegin();
+ try {
+ ceph::decode(cname, bp);
+ ceph::decode(mname, bp);
+ ceph::decode(type, bp);
+ } catch (const buffer::error&) {
+ throw ceph::osd::invalid_argument{};
+ }
+
+ auto filter = get_pgls_filter(type, bp);
+
+ hobject_t lower_bound;
+ try {
+ lower_bound.decode(bp);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument("unable to decode PGNLS_FILTER description");
+ }
+
+ logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
+ __func__, cname, mname, type, lower_bound,
+ static_cast<const void*>(filter.get()));
+ return seastar::do_with(std::move(filter),
+ [this, &osd_op, lower_bound=std::move(lower_bound)](auto&& filter) {
+ return do_pgnls_common(lower_bound, os->oi.soid.get_namespace(),
+ osd_op.op.pgls.count, filter.get())
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
});
}
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.setxattr(os, osd_op, txn);
});
+ case CEPH_OSD_OP_PGNLS_FILTER:
+ return do_pgnls_filtered(osd_op);
case CEPH_OSD_OP_PGNLS:
- return do_pgnls(osd_op.indata, os->oi.soid.get_namespace(), op.pgls.count)
- .then([&osd_op](bufferlist bl) {
- osd_op.outdata = std::move(bl);
- return seastar::now();
- });
+ return do_pgnls(osd_op);
case CEPH_OSD_OP_DELETE:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.remove(os, txn);