]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: use ObjectContext and take obc locks
authorSamuel Just <sjust@redhat.com>
Wed, 9 Oct 2019 22:07:55 +0000 (15:07 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 3 Dec 2019 05:35:36 +0000 (21:35 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h

index e976b9a7d76ec54731d0cbbeeb6e5088a071b4df..ef4fc716743c4b182b3b07cfa84fb2ba873be7fd 100644 (file)
@@ -23,6 +23,7 @@ add_executable(crimson-osd
   objclass.cc
   ${PROJECT_SOURCE_DIR}/src/objclass/class_api.cc
   ${PROJECT_SOURCE_DIR}/src/osd/ClassHandler.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/osd_op_util.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
index 7124f49d5494de25d8ae73364a73e390adb87563..8d9db0d58e7b16d3d0d372f5d750d4a4bf539b1e 100644 (file)
@@ -63,7 +63,7 @@ OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
   }
 
   const auto flags = method->get_flags();
-  if (!os->exists && (flags & CLS_METHOD_WR) == 0) {
+  if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
     return crimson::ct_error::enoent::make();
   }
 
@@ -357,7 +357,10 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
   // TODO: dispatch via call table?
   // TODO: we might want to find a way to unify both input and output
   // of each op.
-  logger().debug("handling op {}", ceph_osd_op_name(osd_op.op.op));
+  logger().debug(
+    "handling op {} on object {}",
+    ceph_osd_op_name(osd_op.op.op),
+    get_target());
   switch (const ceph_osd_op& op = osd_op.op; op.op) {
   case CEPH_OSD_OP_SYNC_READ:
     [[fallthrough]];
index 66ab7ad8b663b0d194239697570126cc05aae863..5641a211c89b63b7e077e0d6100e1faeea0353b7 100644 (file)
@@ -72,7 +72,7 @@ private:
     virtual ~effect_t() = default;
   };
 
-  PGBackend::cached_os_t os;
+  ObjectContextRef obc;
   PG& pg;
   PGBackend& backend;
   Ref<MOSDOp> msg;
@@ -91,10 +91,14 @@ private:
 
   call_errorator::future<> do_op_call(class OSDOp& osd_op);
 
+  hobject_t &get_target() const {
+    return obc->obs.oi.soid;
+  }
+
   template <class Func>
   auto do_const_op(Func&& f) {
     // TODO: pass backend as read-only
-    return std::forward<Func>(f)(backend, std::as_const(*os));
+    return std::forward<Func>(f)(backend, std::as_const(obc->obs));
   }
 
   template <class Func>
@@ -107,7 +111,7 @@ private:
   template <class Func>
   auto do_write_op(Func&& f) {
     ++num_write;
-    return std::forward<Func>(f)(backend, *os, txn);
+    return std::forward<Func>(f)(backend, obc->obs, txn);
   }
 
   // PG operations are being provided with pg instead of os.
@@ -122,14 +126,14 @@ private:
   }
 
 public:
-  OpsExecuter(PGBackend::cached_os_t os, PG& pg, Ref<MOSDOp> msg)
-    : os(std::move(os)),
+  OpsExecuter(ObjectContextRef obc, PG& pg, Ref<MOSDOp> msg)
+    : obc(std::move(obc)),
       pg(pg),
       backend(pg.get_backend()),
       msg(std::move(msg)) {
   }
   OpsExecuter(PG& pg, Ref<MOSDOp> msg)
-    : OpsExecuter{PGBackend::cached_os_t{}, pg, std::move(msg)}
+    : OpsExecuter{ObjectContextRef(), pg, std::move(msg)}
   {}
 
   osd_op_errorator::future<> execute_osd_op(class OSDOp& osd_op);
@@ -175,9 +179,9 @@ auto OpsExecuter::with_effect(
 template <typename Func>
 OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && {
   if (__builtin_expect(op_effects.empty(), true)) {
-    return std::forward<Func>(f)(std::move(txn), std::move(os));
+    return std::forward<Func>(f)(std::move(txn), std::move(obc));
   }
-  return std::forward<Func>(f)(std::move(txn), std::move(os)).safe_then([this] {
+  return std::forward<Func>(f)(std::move(txn), std::move(obc)).safe_then([this] {
     // let's do the cleaning of `op_effects` in destructor
     return crimson::do_for_each(op_effects, [] (auto& op_effect) {
       return op_effect->execute();
index 7a84afbb0ded4b5e77ceb9269a6f00e1821ffec5..320de1469478d158feeb5bf8c7fa1c596fbb689c 100644 (file)
@@ -4,6 +4,7 @@
 #include <seastar/core/future.hh>
 
 #include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
 
 #include "crimson/osd/pg.h"
 #include "crimson/osd/osd.h"
@@ -43,11 +44,18 @@ ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
   return pg.client_request_pg_pipeline;
 }
 
+bool ClientRequest::is_pg_op() const
+{
+  return std::any_of(
+    begin(m->ops), end(m->ops),
+    [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
+}
+
 seastar::future<> ClientRequest::start()
 {
   logger().debug("{}: start", *this);
 
-  IRef ref = this;
+  IRef opref = this;
   return with_blocking_future(handle.enter(cp().await_map))
     .then([this]() {
       return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
@@ -55,21 +63,67 @@ seastar::future<> ClientRequest::start()
       return with_blocking_future(handle.enter(cp().get_pg));
     }).then([this] {
       return with_blocking_future(osd.wait_for_pg(m->get_spg()));
-    }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+    }).then([this, opref=std::move(opref)](Ref<PG> pgref) {
       return seastar::do_with(
-       std::move(pg), std::move(ref), [this](auto pg, auto op) {
-       return with_blocking_future(
-         handle.enter(pp(*pg).await_map)
-       ).then([this, pg] {
+       std::move(pgref), std::move(opref), [this](auto pgref, auto opref) {
+         PG &pg = *pgref;
          return with_blocking_future(
-           pg->osdmap_gate.wait_for_map(m->get_map_epoch()));
-       }).then([this, pg] (auto) {
-         return with_blocking_future(handle.enter(pp(*pg).process));
-       }).then([this, pg] {
-         return pg->handle_op(conn.get(), std::move(m));
+           handle.enter(pp(pg).await_map)
+         ).then([this, &pg]() mutable {
+           return with_blocking_future(
+             pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
+         }).then([this, &pg](auto map) mutable {
+           return with_blocking_future(
+             handle.enter(pp(pg).wait_for_active));
+         }).then([this, &pg]() mutable {
+           return pg.wait_for_active();
+         }).then([this, &pg]() mutable {
+           if (m->finish_decode()) {
+             m->clear_payload();
+           }
+           if (is_pg_op()) {
+             return process_pg_op(pg);
+           } else {
+             return process_op(pg);
+           }
+         });
        });
-      });
     });
 }
 
+seastar::future<> ClientRequest::process_pg_op(
+  PG &pg)
+{
+  return pg.do_pg_ops(m)
+    .then([this](Ref<MOSDOpReply> reply) {
+      return conn->send(reply);
+    });
+}
+
+seastar::future<> ClientRequest::process_op(
+  PG &pg)
+{
+  return with_blocking_future(
+    handle.enter(pp(pg).get_obc)
+  ).then([this, &pg]() {
+    op_info.set_from_op(&*m, *pg.get_osdmap());
+    return pg.with_locked_obc(
+      m,
+      op_info,
+      this,
+      [this, &pg](auto obc) {
+       return with_blocking_future(handle.enter(pp(pg).process)
+       ).then([this, &pg, obc]() {
+         return pg.do_osd_ops(m, obc);
+       }).then([this](Ref<MOSDOpReply> reply) {
+         return conn->send(reply);
+       });
+      });
+  }).safe_then([] {
+    return seastar::now();
+  }, PG::load_obc_ertr::all_same_way([](auto &code) {
+    logger().error("ClientRequest saw error code {}", code);
+    return seastar::now();
+  }));
+}
 }
index 8940194f62f555df4644d4557acdf027c0eee6e8..eb81e0c3b949c8057170ff648bdd6b12976fdd90 100644 (file)
@@ -3,6 +3,7 @@
 
 #pragma once
 
+#include "osd/osd_op_util.h"
 #include "crimson/net/Connection.h"
 #include "crimson/osd/osd_operation.h"
 #include "crimson/common/type_helpers.h"
@@ -17,6 +18,7 @@ class ClientRequest final : public OperationT<ClientRequest> {
   OSD &osd;
   crimson::net::ConnectionRef conn;
   Ref<MOSDOp> m;
+  OpInfo op_info;
   OrderedPipelinePhase::Handle handle;
 
 public:
@@ -33,6 +35,12 @@ public:
     OrderedPipelinePhase await_map = {
       "ClientRequest::PGPipeline::await_map"
     };
+    OrderedPipelinePhase wait_for_active = {
+      "ClientRequest::PGPipeline::wait_for_active"
+    };
+    OrderedPipelinePhase get_obc = {
+      "ClientRequest::PGPipeline::get_obc"
+    };
     OrderedPipelinePhase process = {
       "ClientRequest::PGPipeline::process"
     };
@@ -45,9 +53,17 @@ public:
 
   void print(std::ostream &) const final;
   void dump_detail(Formatter *f) const final;
+
+public:
   seastar::future<> start();
 
 private:
+  seastar::future<> process_pg_op(
+    PG &pg);
+  seastar::future<> process_op(
+    PG &pg);
+  bool is_pg_op() const;
+
   ConnectionPipeline &cp();
   PGPipeline &pp(PG &pg);
 };
index ed0f40b35058afd1cdb1fecc5e418f938e495a78..9db75b01053636a7926c399ecaf4c3126779803b 100644 (file)
@@ -412,14 +412,14 @@ seastar::future<> PG::wait_for_active()
   }
 }
 
-seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
                                         ceph::os::Transaction&& txn,
                                         const MOSDOp& req)
 {
   epoch_t map_epoch = get_osdmap_epoch();
   eversion_t at_version{map_epoch, projected_last_update.version + 1};
   return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
-                               std::move(os),
+                               std::move(obc),
                                std::move(txn),
                                req,
                                peering_state.get_last_peering_reset(),
@@ -433,56 +433,80 @@ seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&&
   });
 }
 
-seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
+seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
+  Ref<MOSDOp> m,
+  ObjectContextRef obc)
 {
   using osd_op_errorator = OpsExecuter::osd_op_errorator;
   const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
                                                    : m->get_hobj();
-  return backend->get_object_state(oid).safe_then([this, m](auto os) mutable {
-    auto ox =
-      std::make_unique<OpsExecuter>(std::move(os), *this/* as const& */, m);
-    return crimson::do_for_each(m->ops, [this, ox = ox.get()](OSDOp& osd_op) {
-      logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op));
-      return ox->execute_osd_op(osd_op);
-    }).safe_then([this, m, ox = std::move(ox)] {
-      logger().debug("all operations have been executed successfully");
-      return std::move(*ox).submit_changes([this, m] (auto&& txn, auto&& os) -> osd_op_errorator::future<> {
+  auto ox =
+    std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
+  return crimson::do_for_each(
+    m->ops, [this, obc, m, ox = ox.get()](OSDOp& osd_op) {
+    logger().debug(
+      "do_osd_ops: {} - object {} - handling op {}",
+      *m,
+      obc->obs.oi.soid,
+      ceph_osd_op_name(osd_op.op.op));
+    return ox->execute_osd_op(osd_op);
+  }).safe_then([this, obc, m, ox = std::move(ox)] {
+    logger().debug(
+      "do_osd_ops: {} - object {} all operations successful",
+      *m,
+      obc->obs.oi.soid);
+    return std::move(*ox).submit_changes(
+      [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
         // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
         if (txn.empty()) {
-          logger().debug("txn is empty, bypassing mutate");
+         logger().debug(
+           "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
+           *m,
+           obc->obs.oi.soid);
           return osd_op_errorator::now();
         } else {
-          return submit_transaction(std::move(os), std::move(txn), *m);
+         logger().debug(
+           "do_osd_ops: {} - object {} submitting txn",
+           *m,
+           obc->obs.oi.soid);
+          return submit_transaction(std::move(obc), std::move(txn), *m);
         }
       });
-    });
-  }).safe_then([m,this] {
+  }).safe_then([m, obc, this] {
     auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
                                            0, false);
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+    logger().debug(
+      "do_osd_ops: {} - object {} sending reply",
+      *m,
+      obc->obs.oi.soid);
     return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
   }, OpsExecuter::osd_op_errorator::all_same_way([=,&oid] (const std::error_code& e) {
     assert(e.value() > 0);
-    logger().debug("got statical error code while handling object {}: {} ({})",
-                   oid, e.value(), e.message());
-    return backend->evict_object_state(oid).then([=] {
-      auto reply = make_message<MOSDOpReply>(
-        m.get(), -e.value(), get_osdmap_epoch(), 0, false);
-      reply->set_enoent_reply_versions(peering_state.get_info().last_update,
-                                         peering_state.get_info().last_user_version);
-      return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-    });
+    logger().debug(
+      "do_osd_ops: {} - object {} got error code {}, {}",
+      *m,
+      obc->obs.oi.soid,
+      e.value(),
+      e.message());
+    auto reply = make_message<MOSDOpReply>(
+      m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+    reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+                                    peering_state.get_info().last_user_version);
+    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
   })).handle_exception_type([=,&oid](const crimson::osd::error& e) {
     // we need this handler because throwing path which aren't errorated yet.
-    logger().debug("got ceph::osd::error while handling object {}: {} ({})",
-                   oid, e.code(), e.what());
-    return backend->evict_object_state(oid).then([=] {
-      auto reply = make_message<MOSDOpReply>(
-        m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
-      reply->set_enoent_reply_versions(peering_state.get_info().last_update,
-                                         peering_state.get_info().last_user_version);
-      return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-    });
+    logger().debug(
+      "do_osd_ops: {} - object {} got unhandled exception {} ({})",
+      *m,
+      obc->obs.oi.soid,
+      e.code(),
+      e.what());
+    auto reply = make_message<MOSDOpReply>(
+      m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
+    reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+                                    peering_state.get_info().last_user_version);
+    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
   });
 }
 
@@ -506,22 +530,183 @@ seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
   });
 }
 
-seastar::future<> PG::handle_op(crimson::net::Connection* conn,
-                                Ref<MOSDOp> m)
+std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
+  const MOSDOp &m,
+  const OpInfo &op_info)
 {
-  return wait_for_active().then([conn, m, this] {
-    if (m->finish_decode()) {
-      m->clear_payload();
+  auto oid = m.get_snapid() == CEPH_SNAPDIR ?
+    m.get_hobj().get_head() : m.get_hobj();
+
+  RWState::State lock_type = RWState::RWNONE;
+  if (op_info.rwordered() && op_info.may_read()) {
+    lock_type = RWState::RWState::RWEXCL;
+  } else if (op_info.rwordered()) {
+    lock_type = RWState::RWState::RWWRITE;
+  } else {
+    ceph_assert(op_info.may_read());
+    lock_type = RWState::RWState::RWREAD;
+  }
+  return std::make_pair(oid, lock_type);
+}
+
+std::optional<hobject_t> PG::resolve_oid(
+  const SnapSet &ss,
+  const hobject_t &oid)
+{
+  if (oid.snap > ss.seq) {
+    return oid.get_head();
+  } else {
+    // which clone would it be?
+    auto clone = std::upper_bound(
+      begin(ss.clones), end(ss.clones),
+      oid.snap);
+    if (clone == end(ss.clones)) {
+      // Doesn't exist, > last clone, < ss.seq
+      return std::nullopt;
     }
-    if (std::any_of(begin(m->ops), end(m->ops),
-                   [](auto& op) { return ceph_osd_op_type_pg(op.op.op); })) {
-      return do_pg_ops(m);
+    auto citer = ss.clone_snaps.find(*clone);
+    // TODO: how do we want to handle this kind of logic error?
+    ceph_assert(citer != ss.clone_snaps.end());
+
+    if (std::find(
+         citer->second.begin(),
+         citer->second.end(),
+         *clone) == citer->second.end()) {
+      return std::nullopt;
     } else {
-      return do_osd_ops(m);
+      auto soid = oid;
+      soid.snap = *clone;
+      return std::optional<hobject_t>(soid);
     }
-  }).then([conn](Ref<MOSDOpReply> reply) {
-    return conn->send(reply);
-  });
+  }
+}
+
+PG::load_obc_ertr::future<
+  std::pair<crimson::osd::ObjectContextRef, bool>>
+PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
+{
+  ceph_assert(!oid.is_head());
+  using ObjectContextRef = crimson::osd::ObjectContextRef;
+  auto coid = resolve_oid(head->get_ro_ss(), oid);
+  if (!coid) {
+    return load_obc_ertr::make_ready_future<
+      std::pair<crimson::osd::ObjectContextRef, bool>>(
+       std::make_pair(ObjectContextRef(), true)
+      );
+  }
+  auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
+  if (existed) {
+    return load_obc_ertr::make_ready_future<
+      std::pair<crimson::osd::ObjectContextRef, bool>>(
+       std::make_pair(obc, true)
+      );
+  } else {
+    bool got = obc->maybe_get_excl();
+    ceph_assert(got);
+    return backend->load_metadata(*coid).safe_then(
+      [oid, obc=std::move(obc), head, this](auto &&md) mutable {
+       obc->set_clone_state(std::move(md->os), std::move(head));
+       return load_obc_ertr::make_ready_future<
+         std::pair<crimson::osd::ObjectContextRef, bool>>(
+           std::make_pair(obc, false)
+         );
+      });
+  }
+}
+
+PG::load_obc_ertr::future<
+  std::pair<crimson::osd::ObjectContextRef, bool>>
+PG::get_or_load_head_obc(hobject_t oid)
+{
+  ceph_assert(oid.is_head());
+  auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
+  if (existed) {
+    logger().debug(
+      "{}: found {} in cache",
+      __func__,
+      oid);
+    return load_obc_ertr::make_ready_future<
+      std::pair<crimson::osd::ObjectContextRef, bool>>(
+       std::make_pair(std::move(obc), true)
+      );
+  } else {
+    logger().debug(
+      "{}: cache miss on {}",
+      __func__,
+      oid);
+    bool got = obc->maybe_get_excl();
+    ceph_assert(got);
+    return backend->load_metadata(oid).safe_then(
+      [oid, obc=std::move(obc), this](auto md) ->
+        load_obc_ertr::future<
+          std::pair<crimson::osd::ObjectContextRef, bool>>
+      {
+       logger().debug(
+         "{}: loaded obs {} for {}",
+         __func__,
+         md->os.oi,
+         oid);
+       if (!md->ss) {
+         logger().error(
+           "{}: oid {} missing snapset",
+           __func__,
+           oid);
+         return crimson::ct_error::object_corrupted::make();
+       }
+       obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
+         logger().debug(
+           "{}: returning obc {} for {}",
+           __func__,
+           obc->obs.oi,
+           obc->obs.oi.soid);
+         return load_obc_ertr::make_ready_future<
+           std::pair<crimson::osd::ObjectContextRef, bool>>(
+             std::make_pair(obc, false)
+           );
+      });
+  }
+}
+
+PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+PG::get_locked_obc(
+  Operation *op, const hobject_t &oid, RWState::State type)
+{
+  return get_or_load_head_obc(oid.get_head()).safe_then(
+    [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
+      auto &[head_obc, head_existed] = p;
+      if (oid.is_head()) {
+       if (head_existed) {
+         return head_obc->get_lock_type(op, type).then([head_obc] {
+           ceph_assert(head_obc->loaded);
+           return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
+         });
+       } else {
+         head_obc->degrade_excl_to(type);
+         return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
+       }
+      } else {
+       return head_obc->get_lock_type(op, RWState::RWREAD).then(
+         [this, head_obc, op, oid, type] {
+           ceph_assert(head_obc->loaded);
+           return get_or_load_clone_obc(oid, head_obc);
+         }).safe_then([this, head_obc, op, oid, type](auto p) {
+             auto &[obc, existed] = p;
+             if (existed) {
+               return load_obc_ertr::future<>(
+                 obc->get_lock_type(op, type)).safe_then([obc] {
+                 ceph_assert(obc->loaded);
+                 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+               });
+             } else {
+               obc->degrade_excl_to(type);
+               return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+             }
+         }).safe_then([head_obc](auto obc) {
+           head_obc->put_lock_type(RWState::RWREAD);
+           return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+         });
+      }
+    });
 }
 
 seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
index 4c5cd11b542d1183399c98d0d10a7f683ec9c5db..fd6c90dbadf93d2ca6abe85f105bee8c34dfe028 100644 (file)
@@ -15,7 +15,7 @@
 #include "crimson/net/Fwd.h"
 #include "os/Transaction.h"
 #include "osd/osd_types.h"
-#include "osd/osd_internal_types.h"
+#include "crimson/osd/object_context.h"
 #include "osd/PeeringState.h"
 
 #include "crimson/common/type_helpers.h"
@@ -435,8 +435,46 @@ public:
   void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
   void handle_activate_map(PeeringCtx &rctx);
   void handle_initialize(PeeringCtx &rctx);
-  seastar::future<> handle_op(crimson::net::Connection* conn,
-                             Ref<MOSDOp> m);
+
+  static std::pair<hobject_t, RWState::State> get_oid_and_lock(
+    const MOSDOp &m,
+    const OpInfo &op_info);
+  static std::optional<hobject_t> resolve_oid(
+    const SnapSet &snapset,
+    const hobject_t &oid);
+
+  using load_obc_ertr = crimson::errorator<
+    crimson::ct_error::object_corrupted>;
+  load_obc_ertr::future<
+    std::pair<crimson::osd::ObjectContextRef, bool>>
+  get_or_load_clone_obc(
+    hobject_t oid, crimson::osd::ObjectContextRef head_obc);
+
+  load_obc_ertr::future<
+    std::pair<crimson::osd::ObjectContextRef, bool>>
+  get_or_load_head_obc(hobject_t oid);
+
+  load_obc_ertr::future<ObjectContextRef> get_locked_obc(
+    Operation *op,
+    const hobject_t &oid,
+    RWState::State type);
+public:
+  template <typename F>
+  auto with_locked_obc(
+    Ref<MOSDOp> &m,
+    const OpInfo &op_info,
+    Operation *op,
+    F &&f) {
+    auto [oid, type] = get_oid_and_lock(*m, op_info);
+    return get_locked_obc(op, oid, type)
+      .safe_then([this, f=std::forward<F>(f), type](auto obc) {
+       return f(obc).finally([this, obc, type] {
+         obc->put_lock_type(type);
+         return load_obc_ertr::now();
+       });
+      });
+  }
+
   seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
   void handle_rep_op_reply(crimson::net::Connection* conn,
                           const MOSDRepOpReply& m);
@@ -447,7 +485,9 @@ private:
   void do_peering_event(
     const boost::statechart::event_base &evt,
     PeeringCtx &rctx);
-  seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
+  seastar::future<Ref<MOSDOpReply>> do_osd_ops(
+    Ref<MOSDOp> m,
+    ObjectContextRef obc);
   seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
   seastar::future<> do_osd_op(
     ObjectState& os,
@@ -456,7 +496,7 @@ private:
   seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
                                             const std::string& nspace,
                                             uint64_t limit);
-  seastar::future<> submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+  seastar::future<> submit_transaction(ObjectContextRef&& obc,
                                       ceph::os::Transaction&& txn,
                                       const MOSDOp& req);
 
@@ -465,6 +505,11 @@ private:
   ShardServices &shard_services;
 
   cached_map_t osdmap;
+
+public:
+  cached_map_t get_osdmap() { return osdmap; }
+
+private:
   std::unique_ptr<PGBackend> backend;
 
   PeeringState peering_state;
index a0a59b466dd81dcb4228f9b25e7ee84ebc755ad4..415c2e7d0f9838c5fd5a94442645d47c67c08d00 100644 (file)
@@ -13,6 +13,7 @@
 
 #include "messages/MOSDOp.h"
 #include "os/Transaction.h"
+#include "common/Clock.h"
 
 #include "crimson/os/cyanstore/cyan_object.h"
 #include "crimson/os/futurized_collection.h"
@@ -60,128 +61,63 @@ PGBackend::PGBackend(shard_id_t shard,
     store{store}
 {}
 
-PGBackend::get_os_errorator::future<PGBackend::cached_os_t>
-PGBackend::get_object_state(const hobject_t& oid)
-{
-  // want the head?
-  if (oid.snap == CEPH_NOSNAP) {
-    logger().trace("find_object: {}@HEAD", oid);
-    return _load_os(oid);
-  } else {
-    // we want a snap
-    return _load_ss(oid).safe_then(
-      [oid,this](cached_ss_t ss) -> get_os_errorator::future<cached_os_t> {
-        // head?
-        if (oid.snap > ss->seq) {
-          return _load_os(oid.get_head());
-        } else {
-          // which clone would it be?
-          auto clone = std::upper_bound(begin(ss->clones), end(ss->clones),
-                                        oid.snap);
-          if (clone == end(ss->clones)) {
-            return crimson::ct_error::enoent::make();
-          }
-          // clone
-          auto soid = oid;
-          soid.snap = *clone;
-          return _load_ss(soid).safe_then(
-            [soid,this](cached_ss_t ss) -> get_os_errorator::future<cached_os_t> {
-              auto clone_snap = ss->clone_snaps.find(soid.snap);
-              assert(clone_snap != end(ss->clone_snaps));
-              if (clone_snap->second.empty()) {
-                logger().trace("find_object: {}@[] -- DNE", soid);
-                return crimson::ct_error::enoent::make();
-              }
-              auto first = clone_snap->second.back();
-              auto last = clone_snap->second.front();
-              if (first > soid.snap) {
-                logger().trace("find_object: {}@[{},{}] -- DNE",
-                               soid, first, last);
-                return crimson::ct_error::enoent::make();
-              }
-              logger().trace("find_object: {}@[{},{}] -- HIT",
-                             soid, first, last);
-              return _load_os(soid);
-          });
-        }
-    });
-  }
-}
-
-PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t>
+PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref>
 PGBackend::load_metadata(const hobject_t& oid)
 {
   return store->get_attrs(
     coll,
     ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then(
-      [oid, this](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t>{
-       loaded_object_md_t ret;
+      [oid, this](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
+       loaded_object_md_t::ref ret(new loaded_object_md_t());
        if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
          bufferlist bl;
-         bl.push_back(oiiter->second);
-         ret.os = ObjectState(
+         bl.push_back(std::move(oiiter->second));
+         ret->os = ObjectState(
            object_info_t(bl),
            true);
        } else {
+         logger().error(
+           "load_metadata: object {} present but missing object info",
+           oid);
          return crimson::ct_error::object_corrupted::make();
        }
        
        if (oid.is_head()) {
          if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
            bufferlist bl;
-           bl.push_back(ssiter->second);
-           ret.ss = SnapSet(bl);
+           bl.push_back(std::move(ssiter->second));
+           ret->ss = SnapSet(bl);
          } else {
-           return crimson::ct_error::object_corrupted::make();
+           /* TODO: add support for writing out snapsets
+           logger().error(
+             "load_metadata: object {} present but missing snapset",
+             oid);
+           //return crimson::ct_error::object_corrupted::make();
+           */
+           ret->ss = SnapSet();
          }
        }
 
-       return load_metadata_ertr::make_ready_future<loaded_object_md_t>(
+       return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
          std::move(ret));
       }, crimson::ct_error::enoent::handle([oid, this] {
-       return load_metadata_ertr::make_ready_future<loaded_object_md_t>(
-         loaded_object_md_t{
-           ObjectState(),
-           std::nullopt
+       logger().debug(
+         "load_metadata: object {} doesn't exist, returning empty metadata",
+         oid);
+       return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
+         new loaded_object_md_t{
+           ObjectState(
+             object_info_t(oid),
+             false),
+           oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
          });
       }));
 }
 
-PGBackend::get_os_errorator::future<PGBackend::cached_os_t>
-PGBackend::_load_os(const hobject_t& oid)
-{
-  if (auto found = os_cache.find(oid); found) {
-    return get_os_errorator::make_ready_future<cached_os_t>(std::move(found));
-  }
-  return load_metadata(oid).safe_then([oid, this](auto &&md) {
-    return get_os_errorator::make_ready_future<cached_os_t>(
-      os_cache.insert(
-       oid,
-       std::make_unique<ObjectState>(std::move(md.os))));
-  });
-}
-
-PGBackend::get_os_errorator::future<PGBackend::cached_ss_t>
-PGBackend::_load_ss(const hobject_t& oid)
-{
-  if (auto found = ss_cache.find(oid); found) {
-    return get_os_errorator::make_ready_future<cached_ss_t>(std::move(found));
-  }
-  return load_metadata(oid).safe_then([oid, this](auto &&md) {
-    if (!md.ss) {
-      return get_os_errorator::make_ready_future<cached_ss_t>(
-       std::make_unique<SnapSet>());
-    } else {
-      return get_os_errorator::make_ready_future<cached_ss_t>(
-       ss_cache.insert(oid, std::make_unique<SnapSet>(std::move(*(md.ss)))));
-    }
-  });
-}
-
 seastar::future<crimson::osd::acked_peers_t>
 PGBackend::mutate_object(
   std::set<pg_shard_t> pg_shards,
-  cached_os_t&& os,
+  crimson::osd::ObjectContextRef &&obc,
   ceph::os::Transaction&& txn,
   const MOSDOp& m,
   epoch_t min_epoch,
@@ -189,36 +125,30 @@ PGBackend::mutate_object(
   eversion_t ver)
 {
   logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
-  if (os->exists) {
+  if (obc->obs.exists) {
 #if 0
-    os.oi.version = ctx->at_version;
-    os.oi.prior_version = ctx->obs->oi.version;
+    obc->obs.oi.version = ctx->at_version;
+    obc->obs.oi.prior_version = ctx->obs->oi.version;
 #endif
 
-    os->oi.last_reqid = m.get_reqid();
-    os->oi.mtime = m.get_mtime();
-    os->oi.local_mtime = ceph_clock_now();
+    obc->obs.oi.last_reqid = m.get_reqid();
+    obc->obs.oi.mtime = m.get_mtime();
+    obc->obs.oi.local_mtime = ceph_clock_now();
 
     // object_info_t
     {
       ceph::bufferlist osv;
-      encode(os->oi, osv, 0);
+      encode(obc->obs.oi, osv, 0);
       // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
-      txn.setattr(coll->get_cid(), ghobject_t{os->oi.soid}, OI_ATTR, osv);
+      txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
     }
   } else {
     // reset cached ObjectState without enforcing eviction
-    os->oi = object_info_t(os->oi.soid);
+    obc->obs.oi = object_info_t(obc->obs.oi.soid);
   }
-  return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn),
-                            m.get_reqid(), min_epoch, map_epoch, ver);
-}
-
-seastar::future<>
-PGBackend::evict_object_state(const hobject_t& oid)
-{
-  os_cache.erase(oid);
-  return seastar::now();
+  return _submit_transaction(
+    std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
+    m.get_reqid(), min_epoch, map_epoch, ver);
 }
 
 static inline bool _read_verify_data(
index b2ba4fd2c7b4259392e52ef0262d3d2cc051f2bb..d72997eff66ab51697c1496f4a39a9665e752326 100644 (file)
@@ -43,10 +43,6 @@ public:
                                           crimson::os::CollectionRef coll,
                                           crimson::osd::ShardServices& shard_services,
                                           const ec_profile_t& ec_profile);
-  using cached_os_t = boost::local_shared_ptr<ObjectState>;
-  using get_os_errorator = crimson::errorator<crimson::ct_error::enoent>;
-  get_os_errorator::future<cached_os_t> get_object_state(const hobject_t& oid);
-  seastar::future<> evict_object_state(const hobject_t& oid);
 
   using read_errorator = ll_read_errorator::extend<
     crimson::ct_error::input_output_error,
@@ -58,6 +54,7 @@ public:
     size_t truncate_size,
     uint32_t truncate_seq,
     uint32_t flags);
+
   using stat_errorator = crimson::errorator<crimson::ct_error::enoent>;
   stat_errorator::future<> stat(
     const ObjectState& os,
@@ -80,7 +77,7 @@ public:
     ceph::os::Transaction& trans);
   seastar::future<crimson::osd::acked_peers_t> mutate_object(
     std::set<pg_shard_t> pg_shards,
-    cached_os_t&& os,
+    crimson::osd::ObjectContextRef &&obc,
     ceph::os::Transaction&& txn,
     const MOSDOp& m,
     epoch_t min_epoch,
@@ -129,18 +126,12 @@ public:
   struct loaded_object_md_t {
     ObjectState os;
     std::optional<SnapSet> ss;
+    using ref = std::unique_ptr<loaded_object_md_t>;
   };
-  load_metadata_ertr::future<loaded_object_md_t> load_metadata(
+  load_metadata_ertr::future<loaded_object_md_t::ref> load_metadata(
     const hobject_t &oid);
 
 private:
-  using cached_ss_t = boost::local_shared_ptr<SnapSet>;
-  SharedLRU<hobject_t, SnapSet> ss_cache;
-  get_os_errorator::future<cached_ss_t> _load_ss(const hobject_t& oid);
-
-  SharedLRU<hobject_t, ObjectState> os_cache;
-  get_os_errorator::future<cached_os_t> _load_os(const hobject_t& oid);
-
   virtual ll_read_errorator::future<ceph::bufferlist> _read(
     const hobject_t& hoid,
     size_t offset,