]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: add pgls_filter support 30433/head
authorKefu Chai <kchai@redhat.com>
Tue, 17 Sep 2019 16:13:27 +0000 (00:13 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 18 Sep 2019 14:59:18 +0000 (22:59 +0800)
it's modeled after pgnls_filter. and use seastar::map_reduce() for
filtering the objects into the response.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/ops_executer.cc

index d3d009c4b1dd23d88ca6c226f8d84699fbed23b4..13f6f086c4eac189bf52e0c830c56c5cf220c78a 100644 (file)
@@ -442,7 +442,8 @@ static seastar::future<ceph::bufferlist> do_pgls_common(
   const PGBackend& backend,
   const hobject_t& lower_bound,
   const std::string& nspace,
-  const uint64_t limit)
+  const uint64_t limit,
+  const PGLSFilter* const filter)
 {
   if (!(lower_bound.is_min() ||
         lower_bound.is_max() ||
@@ -451,20 +452,34 @@ static seastar::future<ceph::bufferlist> do_pgls_common(
     throw std::invalid_argument("outside of PG bounds");
   }
 
+  using entries_t = decltype(pg_ls_response_t::entries);
   return backend.list_objects(lower_bound, limit).then(
-    [pg_end, nspace, &backend](auto objects, auto next) {
+    [&backend, filter, nspace](auto objects, auto next) {
+      return seastar::when_all_succeed(
+        seastar::map_reduce(std::move(objects),
+          [&backend, filter, nspace](const hobject_t& obj) {
+            if (obj.get_namespace() == nspace) {
+              if (filter) {
+                return pgls_filter(*filter, backend, obj);
+              } else {
+                return seastar::make_ready_future<hobject_t>(obj);
+              }
+            } else {
+              return seastar::make_ready_future<hobject_t>(hobject_t{});
+            }
+          },
+          entries_t{},
+          [](entries_t&& entries, hobject_t obj) {
+            if (!obj.is_min()) {
+              entries.emplace_back(obj.oid, obj.get_key());
+            }
+            return entries;
+          }),
+        seastar::make_ready_future<hobject_t>(next));
+    }).then([pg_end](entries_t entries, hobject_t next) {
       pg_ls_response_t response;
       response.handle = next.is_max() ? pg_end : next;
-      auto in_my_namespace = [&nspace](const hobject_t& obj) {
-        return obj.get_namespace() == nspace;
-      };
-      auto to_entry = [] (const hobject_t& hobj) {
-       return make_pair(hobj.oid, hobj.get_key());
-      };
-      boost::push_back(response.entries,
-                      objects |
-                      boost::adaptors::filtered(in_my_namespace) |
-                      boost::adaptors::transformed(to_entry));
+      response.entries = std::move(entries);
       ceph::bufferlist out;
       encode(response, out);
       logger().debug("{}: response.entries.size()=",
@@ -493,13 +508,59 @@ static seastar::future<> do_pgls(
                        pg.get_backend(),
                        lower_bound,
                        nspace,
-                       osd_op.op.pgls.count)
+                       osd_op.op.pgls.count,
+                       nullptr /* no filter */)
     .then([&osd_op](bufferlist bl) {
       osd_op.outdata = std::move(bl);
       return seastar::now();
     });
 }
 
+static seastar::future<> do_pgls_filtered(
+  const PG& pg,
+  const std::string& nspace,
+  OSDOp& osd_op)
+{
+  std::string cname, mname, type;
+  auto bp = osd_op.indata.cbegin();
+  try {
+    ceph::decode(cname, bp);
+    ceph::decode(mname, bp);
+    ceph::decode(type, bp);
+  } catch (const buffer::error&) {
+    throw ceph::osd::invalid_argument{};
+  }
+
+  auto filter = get_pgls_filter(type, bp);
+
+  hobject_t lower_bound;
+  try {
+    lower_bound.decode(bp);
+  } catch (const buffer::error&) {
+    throw std::invalid_argument("unable to decode PGLS_FILTER description");
+  }
+
+  logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
+                 __func__, cname, mname, type, lower_bound,
+                 static_cast<const void*>(filter.get()));
+  return seastar::do_with(std::move(filter),
+    [&, lower_bound=std::move(lower_bound)](auto&& filter) {
+      const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+      const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+      return do_pgls_common(pg_start,
+                            pg_end,
+                            pg.get_backend(),
+                            lower_bound,
+                            nspace,
+                            osd_op.op.pgls.count,
+                            filter.get())
+        .then([&osd_op](bufferlist bl) {
+          osd_op.outdata = std::move(bl);
+          return seastar::now();
+      });
+  });
+}
+
 seastar::future<>
 OpsExecuter::execute_pg_op(OSDOp& osd_op)
 {
@@ -509,6 +570,10 @@ OpsExecuter::execute_pg_op(OSDOp& osd_op)
     return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
       return do_pgls(pg, nspace, osd_op);
     });
+  case CEPH_OSD_OP_PGLS_FILTER:
+    return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
+      return do_pgls_filtered(pg, nspace, osd_op);
+    });
   case CEPH_OSD_OP_PGNLS:
     return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
       return do_pgnls(pg, nspace, osd_op);