]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: implement PG scanning.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 10 Mar 2020 22:49:27 +0000 (23:49 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 13 Jul 2020 14:23:53 +0000 (16:23 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_recovery.h
src/crimson/osd/recovery_backend.cc
src/crimson/osd/recovery_backend.h

index 694317f94fe84d2f47c405c0459c3417236f231a..3d60c2a8c61248f59fce23593d8ebf78144c28dd 100644 (file)
@@ -640,6 +640,8 @@ seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
     case MSG_OSD_PG_RECOVERY_DELETE:
       [[fallthrough]];
     case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+      [[fallthrough]];
+    case MSG_OSD_PG_SCAN:
       return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
     case MSG_OSD_PG_LEASE:
       [[fallthrough]];
index 201dea5139fe2523e52c120647ce6c7640fceb25..9dc10cf3d6c40127eb52f86aadddf5cb69f04d87 100644 (file)
@@ -453,6 +453,9 @@ public:
   const auto& get_pool() const {
     return peering_state.get_pool();
   }
+  pg_shard_t get_primary() const {
+    return peering_state.get_primary();
+  }
 
   /// initialize created PG
   void init(
index e264b76614af8a7939e9fdc22078916a4ff517df..20d462789a85437ccbe566a7b7fd7d1254bacd76 100644 (file)
@@ -77,4 +77,5 @@ private:
   seastar::future<> handle_recovery_delete_reply(
       Ref<MOSDPGRecoveryDeleteReply> m);
   seastar::future<> handle_pull_response(Ref<MOSDPGPush> m);
+  seastar::future<> handle_scan(MOSDPGScan& m);
 };
index fabbbffdf37b54e66401c02bc5df501a356f88d0..64f0c735c34960ff73441e7a15572a1f7f13a104 100644 (file)
@@ -6,6 +6,7 @@
 #include "crimson/common/exception.h"
 #include "crimson/osd/recovery_backend.h"
 #include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
 
 #include "messages/MOSDFastDispatchOp.h"
 #include "osd/osd_types.h"
@@ -59,10 +60,128 @@ void RecoveryBackend::WaitForObjectRecovery::stop() {
   }
 }
 
+seastar::future<BackfillInterval> RecoveryBackend::scan_for_backfill(
+  const hobject_t& start,
+  [[maybe_unused]] const std::int64_t min,
+  const std::int64_t max)
+{
+  logger().debug("{} starting from {}", __func__, start);
+  return seastar::do_with(
+    std::map<hobject_t, eversion_t>{},
+    [this, &start, max] (auto& version_map) {
+      return backend->list_objects(start, max).then(
+        [this, &start, &version_map] (auto&& ret) {
+          auto& [objects, next] = ret;
+          return seastar::do_for_each(
+            objects,
+            [this, &version_map] (const hobject_t& object) {
+              crimson::osd::ObjectContextRef obc;
+              if (pg.is_primary()) {
+                obc = shard_services.obc_registry.maybe_get_cached_obc(object);
+              }
+              if (obc) {
+                if (obc->obs.exists) {
+                  logger().debug("scan_for_backfill found (primary): {}  {}",
+                                 object, obc->obs.oi.version);
+                  version_map[object] = obc->obs.oi.version;
+                } else {
+                  // if the object does not exist here, it must have been removed
+                  // between the collection_list_partial and here.  This can happen
+                  // for the first item in the range, which is usually last_backfill.
+                }
+                return seastar::now();
+              } else {
+                return backend->load_metadata(object).safe_then(
+                  [&version_map, object] (auto md) {
+                    if (md->os.exists) {
+                      logger().debug("scan_for_backfill found: {}  {}",
+                                     object, md->os.oi.version);
+                      version_map[object] = md->os.oi.version;
+                    }
+                    return seastar::now();
+                  }, PGBackend::load_metadata_ertr::assert_all{});
+              }
+          }).then(
+            [&version_map, &start, next=std::move(next), this] {
+              BackfillInterval bi;
+              bi.begin = start;
+              bi.end = std::move(next);
+              bi.version = pg.get_info().last_update;
+              bi.objects = std::move(version_map);
+              logger().debug("{} BackfillInterval filled, leaving",
+                             "scan_for_backfill");
+              return seastar::make_ready_future<BackfillInterval>(std::move(bi));
+            });
+        });
+    });
+}
+
+seastar::future<> RecoveryBackend::handle_scan_get_digest(
+  MOSDPGScan& m)
+{
+  logger().debug("{}", __func__);
+  if (false /* FIXME: check for backfill too full */) {
+    std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
+      // TODO: abstract start_background_recovery
+      static_cast<crimson::osd::PG*>(&pg),
+      shard_services,
+      pg.get_pg_whoami(),
+      pg.get_pgid(),
+      pg.get_osdmap_epoch(),
+      pg.get_osdmap_epoch(),
+      PeeringState::BackfillTooFull());
+    return seastar::now();
+  }
+  return scan_for_backfill(
+    std::move(m.begin),
+    crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
+    crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
+  ).then([this,
+          query_epoch=m.query_epoch,
+          conn=m.get_connection()] (auto backfill_interval) {
+    auto reply = make_message<MOSDPGScan>(
+      MOSDPGScan::OP_SCAN_DIGEST,
+      pg.get_pg_whoami(),
+      pg.get_osdmap_epoch(),
+      query_epoch,
+      spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
+      backfill_interval.begin,
+      backfill_interval.end);
+    encode(backfill_interval.objects, reply->get_data());
+    return conn->send(std::move(reply));
+  });
+}
+
+seastar::future<> RecoveryBackend::handle_scan_digest(
+  MOSDPGScan& m)
+{
+  logger().debug("{}", __func__);
+  ceph_assert("Not implemented" == nullptr);
+  return seastar::now();
+}
+
+seastar::future<> RecoveryBackend::handle_scan(
+  MOSDPGScan& m)
+{
+  logger().debug("{}", __func__);
+  switch (m.op) {
+    case MOSDPGScan::OP_SCAN_GET_DIGEST:
+      return handle_scan_get_digest(m);
+    case MOSDPGScan::OP_SCAN_DIGEST:
+      return handle_scan_digest(m);
+    default:
+      // FIXME: move to errorator
+      ceph_assert("unknown op type for pg scan");
+      return seastar::now();
+  }
+}
+
 seastar::future<> RecoveryBackend::handle_recovery_op(
   Ref<MOSDFastDispatchOp> m)
 {
   switch (m->get_header().type) {
+  case MSG_OSD_PG_SCAN:
+    return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m));
   default:
     return seastar::make_exception_future<>(
        std::invalid_argument(fmt::format("invalid request type: {}",
index 2465ec46280f1b7ec8ad17e369acaddcbf36d7a0..771924d2d0119947501bfddddd475aea2292e76d 100644 (file)
@@ -11,6 +11,8 @@
 #include "crimson/osd/object_context.h"
 #include "crimson/osd/shard_services.h"
 
+#include "messages/MOSDPGScan.h"
+#include "osd/recovery_types.h"
 #include "osd/osd_types.h"
 
 namespace crimson::osd{
@@ -20,6 +22,12 @@ namespace crimson::osd{
 class PGBackend;
 
 class RecoveryBackend {
+  seastar::future<> handle_scan_get_digest(
+    MOSDPGScan& m);
+  seastar::future<> handle_scan_digest(
+    MOSDPGScan& m);
+  seastar::future<> handle_scan(
+    MOSDPGScan& m);
 protected:
   class WaitForObjectRecovery;
 public:
@@ -59,6 +67,11 @@ public:
     const hobject_t& soid,
     eversion_t need) = 0;
 
+  seastar::future<BackfillInterval> scan_for_backfill(
+    const hobject_t& from,
+    std::int64_t min,
+    std::int64_t max);
+
   void on_peering_interval_change(ceph::os::Transaction& t) {
     clean_up(t, "new peering interval");
   }