]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: implement PGLSFilter.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 7 Aug 2019 17:59:27 +0000 (19:59 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Sat, 24 Aug 2019 01:33:59 +0000 (03:33 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h

index d668dbd12b6a65cd6e07ed30efc1c0067d1ce16c..2d34e0dcf8deaeca40cc74ee8702ab22d11ee090 100644 (file)
@@ -6,7 +6,7 @@
 #include <boost/range/adaptor/filtered.hpp>
 #include <boost/range/adaptor/map.hpp>
 #include <boost/range/adaptor/transformed.hpp>
-#include <boost/range/algorithm/copy.hpp>
+#include <boost/range/algorithm_ext/push_back.hpp>
 #include <boost/range/algorithm/max_element.hpp>
 #include <boost/range/numeric.hpp>
 
@@ -100,51 +100,222 @@ seastar::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
       throw ceph::osd::make_error(ret);
     }
   });
+}
+
+static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
+  const std::string& type,
+  bufferlist::const_iterator& iter)
+{
+  // storing non-const PGLSFilter for the sake of ::init()
+  std::unique_ptr<PGLSFilter> filter;
+  if (type.compare("plain") == 0) {
+    //filter = std::make_unique<PGLSPlainFilter>();
+    ::operation_not_supported();
+  } else {
+    std::size_t dot = type.find(".");
+    if (dot == type.npos || dot == 0 || dot == type.size() - 1) {
+      throw ceph::osd::invalid_argument{};
+    }
+
+    const std::string class_name = type.substr(0, dot);
+    const std::string filter_name = type.substr(dot + 1);
+    ClassHandler::ClassData *cls = nullptr;
+    int r = ClassHandler::get_instance().open_class(class_name, &cls);
+    if (r != 0) {
+      logger().warn("can't open class {}: {}", class_name, cpp_strerror(r));
+      if (r == -EPERM) {
+        // propogate permission error
+        throw ceph::osd::permission_denied{};
+      } else {
+        throw ceph::osd::invalid_argument{};
+      }
+    } else {
+      ceph_assert(cls);
+    }
+
+    ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name);
+    if (class_filter == nullptr) {
+      logger().warn("can't find filter {} in class {}", filter_name, class_name);
+      throw ceph::osd::invalid_argument{};
+    }
 
+    filter.reset(class_filter->fn());
+    if (!filter) {
+      // Object classes are obliged to return us something, but let's
+      // give an error rather than asserting out.
+      logger().warn("buggy class {} failed to construct filter {}",
+                    class_name, filter_name);
+      throw ceph::osd::invalid_argument{};
+    }
+  }
+
+  ceph_assert(filter);
+  int r = filter->init(iter);
+  if (r < 0) {
+    logger().warn("error initializing filter {}: {}", type, cpp_strerror(r));
+    throw ceph::osd::invalid_argument{};
+  }
+
+  // successfully constructed and initialized, return it.
+  return filter;
 }
-seastar::future<ceph::bufferlist>
-OpsExecuter::do_pgnls(ceph::bufferlist& indata,
-                      const std::string& nspace,
-                      uint64_t limit)
+
+seastar::future<bool, hobject_t> OpsExecuter::pgls_filter(
+  const PGLSFilter& filter,
+  const PGBackend& backend,
+  const hobject_t& sobj)
 {
-  hobject_t lower_bound;
-  try {
-    ceph::decode(lower_bound, indata);
-  } catch (const buffer::error& e) {
-    throw std::invalid_argument("unable to decode PGNLS handle");
+  if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
+    logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
+                   xattr, sobj);
+    return backend.getxattr(sobj, xattr).then_wrapped(
+      [&filter, sobj] (auto futval) {
+        logger().debug("pgls_filter: got xvalue for obj={}", sobj);
+
+        ceph::bufferlist val;
+        if (!futval.failed()) {
+          val.push_back(std::move(futval).get0());
+        } else if (filter.reject_empty_xattr()) {
+          return seastar::make_ready_future<bool, hobject_t>(false, sobj);
+        }
+        const bool filtered = filter.filter(sobj, val);
+        return seastar::make_ready_future<bool, hobject_t>(filtered, sobj);
+    });
+  } else {
+    ceph::bufferlist empty_lvalue_bl;
+    const bool filtered = filter.filter(sobj, empty_lvalue_bl);
+    return seastar::make_ready_future<bool, hobject_t>(filtered, sobj);
   }
+}
+
+seastar::future<ceph::bufferlist>
+OpsExecuter::do_pgnls_common(
+  const hobject_t& lower_bound,
+  const std::string& nspace,
+  uint64_t limit,
+  const PGLSFilter* const filter)
+{
   const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
-  const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+  auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
   if (!(lower_bound.is_min() ||
         lower_bound.is_max() ||
         (lower_bound >= pg_start && lower_bound < pg_end))) {
     // this should only happen with a buggy client.
     throw std::invalid_argument("outside of PG bounds");
   }
+
   return backend.list_objects(lower_bound, limit).then(
-    [lower_bound, pg_end, nspace](auto objects, auto next) {
-      auto in_my_namespace = [&nspace](const hobject_t& o) {
+    [this, filter, nspace](auto objects, auto next) {
+      auto in_my_namespace = [&nspace](const hobject_t& obj) {
         using ceph::common::local_conf;
-        if (o.get_namespace() == local_conf()->osd_hit_set_namespace) {
+        if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) {
           return false;
         } else if (nspace == librados::all_nspaces) {
           return true;
         } else {
-          return o.get_namespace() == nspace;
+          return obj.get_namespace() == nspace;
+        }
+      };
+      auto to_pglsed = [this, filter] (const hobject_t& obj) {
+        // this transformation looks costly. However, I don't have any
+        // reason to think PGLS* operations are critical for, let's say,
+        // general performance.
+        //
+        // from tchaikov: "another way is to use seastar::map_reduce(),
+        // to 1) save the effort to filter the already filtered objects
+        // 2) avoid the space to keep the tuple<bool, object> even if
+        // the object is filtered out".
+        if (filter) {
+          return pgls_filter(*filter, obj);
+        } else {
+          return seastar::make_ready_future<bool, hobject_t>(true, obj);
         }
       };
+
+      auto range = objects | boost::adaptors::filtered(in_my_namespace)
+                           | boost::adaptors::transformed(to_pglsed);
+      logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
+      return seastar::when_all_succeed(std::begin(range),
+                                       std::end(range)).then(
+        [next=std::move(next)] (auto items) mutable {
+          // the sole purpose of this chaining is to pass `next` to 2nd
+          // stage altogether with items
+          logger().debug("do_pgnls_common: 1st done");
+          return seastar::make_ready_future<decltype(items), decltype(next)>(
+            std::move(items), std::move(next));
+      });
+  }).then(
+    [pg_end, filter] (const std::vector<std::tuple<bool,
+                                hobject_t>>& items, auto next) {
+      auto is_matched = [] (const auto& item) {
+        return std::get<bool>(item);
+      };
+      auto to_entry = [] (const auto& item) {
+        const auto& obj = std::get<hobject_t>(item);
+        return librados::ListObjectImpl{
+          obj.get_namespace(), obj.oid.name, obj.get_key()
+        };
+      };
+
       pg_nls_response_t response;
-      boost::copy(objects |
-        boost::adaptors::filtered(in_my_namespace) |
-        boost::adaptors::transformed([](const hobject_t& o) {
-          return librados::ListObjectImpl{o.get_namespace(),
-                                          o.oid.name,
-                                          o.get_key()}; }),
-        std::back_inserter(response.entries));
+      boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched)
+                                               | boost::adaptors::transformed(to_entry));
       response.handle = next.is_max() ? pg_end : next;
-      bufferlist bl;
-      encode(response, bl);
-      return seastar::make_ready_future<bufferlist>(std::move(bl));
+      ceph::bufferlist out;
+      encode(response, out);
+      logger().debug("{}: response.entries.size()=",
+                     __func__, response.entries.size());
+      return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
+  });
+}
+
+seastar::future<> OpsExecuter::do_pgnls(OSDOp& osd_op)
+{
+  hobject_t lower_bound;
+  try {
+    ceph::decode(lower_bound, osd_op.indata);
+  } catch (const buffer::error&) {
+    throw std::invalid_argument("unable to decode PGNLS handle");
+  }
+  return do_pgnls_common(lower_bound, os->oi.soid.get_namespace(), osd_op.op.pgls.count)
+    .then([&osd_op](bufferlist bl) {
+      osd_op.outdata = std::move(bl);
+      return seastar::now();
+  });
+}
+
+seastar::future<> OpsExecuter::do_pgnls_filtered(OSDOp& osd_op)
+{
+  std::string cname, mname, type;
+  auto bp = osd_op.indata.cbegin();
+  try {
+    ceph::decode(cname, bp);
+    ceph::decode(mname, bp);
+    ceph::decode(type, bp);
+  } catch (const buffer::error&) {
+    throw ceph::osd::invalid_argument{};
+  }
+
+  auto filter = get_pgls_filter(type, bp);
+
+  hobject_t lower_bound;
+  try {
+    lower_bound.decode(bp);
+  } catch (const buffer::error&) {
+    throw std::invalid_argument("unable to decode PGNLS_FILTER description");
+  }
+
+  logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
+                 __func__, cname, mname, type, lower_bound,
+                 static_cast<const void*>(filter.get()));
+  return seastar::do_with(std::move(filter),
+    [this, &osd_op, lower_bound=std::move(lower_bound)](auto&& filter) {
+      return do_pgnls_common(lower_bound, os->oi.soid.get_namespace(),
+                             osd_op.op.pgls.count, filter.get())
+        .then([&osd_op](bufferlist bl) {
+          osd_op.outdata = std::move(bl);
+          return seastar::now();
+      });
   });
 }
 
@@ -190,12 +361,10 @@ OpsExecuter::do_osd_op(OSDOp& osd_op)
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.setxattr(os, osd_op, txn);
     });
+  case CEPH_OSD_OP_PGNLS_FILTER:
+     return do_pgnls_filtered(osd_op);
   case CEPH_OSD_OP_PGNLS:
-    return do_pgnls(osd_op.indata, os->oi.soid.get_namespace(), op.pgls.count)
-      .then([&osd_op](bufferlist bl) {
-        osd_op.outdata = std::move(bl);
-       return seastar::now();
-    });
+    return do_pgnls(osd_op);
   case CEPH_OSD_OP_DELETE:
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.remove(os, txn);
index b5b1cd8b00185f41235e8fb85a3bdf57583785f9..02e045b4325ad222dc3cf57c79498b23e7d2376f 100644 (file)
@@ -26,6 +26,9 @@
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_backend.h"
 
+class PGLSFilter;
+class OSDOp;
+
 namespace ceph::osd {
 class OpsExecuter {
   PGBackend::cached_os_t os;
@@ -36,10 +39,17 @@ class OpsExecuter {
   size_t num_read = 0;    ///< count read ops
   size_t num_write = 0;   ///< count update ops
 
-  seastar::future<ceph::bufferlist> do_pgnls(
-    ceph::bufferlist& indata,
+  seastar::future<bool, hobject_t> pgls_filter(
+    /*const*/PGLSFilter& filter,
+    const hobject_t& sobj);
+  seastar::future<ceph::bufferlist> do_pgnls_common(
+    const hobject_t& lower_bound,
     const std::string& nspace,
-    uint64_t limit);
+    uint64_t limit,
+    PGLSFilter* filter = nullptr);
+  seastar::future<> do_pgnls(OSDOp& osd_op);
+  seastar::future<> do_pgnls_filtered(OSDOp& osd_op);
+
   seastar::future<> do_op_call(class OSDOp& osd_op);
 
   template <class Func>
index e78c4ce4b93e6b623fdd6f8bc5936a0ae08291df..30ec868ac38c4ea9f7477ef3f009d764449cdfc4 100644 (file)
@@ -438,15 +438,21 @@ seastar::future<> PGBackend::getxattr(
     bp.copy(osd_op.op.xattr.name_len, aname);
     name = "_" + aname;
   }
-  return store->get_attr(coll, ghobject_t{os.oi.soid}, name).then(
-    [&osd_op] (ceph::bufferptr val) {
-      osd_op.outdata.clear();
-      osd_op.outdata.push_back(std::move(val));
-      osd_op.op.xattr.value_len = osd_op.outdata.length();
-      //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+  return getxattr(os.oi.soid, name).then([&osd_op] (ceph::bufferptr val) {
+    osd_op.outdata.clear();
+    osd_op.outdata.push_back(std::move(val));
+    osd_op.op.xattr.value_len = osd_op.outdata.length();
+    //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
   }).handle_exception_type(
     [] (ceph::os::FuturizedStore::EnoentException& e) {
       return seastar::make_exception_future<>(ceph::osd::object_not_found{});
   });
   //ctx->delta_stats.num_rd++;
 }
+
+seastar::future<ceph::bufferptr> PGBackend::getxattr(
+  const hobject_t& soid,
+  std::string_view key)
+{
+  return store->get_attr(coll, ghobject_t{soid}, key);
+}
index e88057757e2b2842da145a289f208584b7e0368d..6c40c09ec7fe350ec8c9063127c36555b14eaf61 100644 (file)
@@ -79,6 +79,9 @@ public:
   seastar::future<> getxattr(
     const ObjectState& os,
     OSDOp& osd_op);
+  seastar::future<ceph::bufferptr> getxattr(
+    const hobject_t& soid,
+    std::string_view key);
 
   virtual void got_rep_op_reply(const MOSDRepOpReply&) {}