]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: add pg backends
authorKefu Chai <kchai@redhat.com>
Mon, 11 Mar 2019 13:42:55 +0000 (21:42 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 26 Mar 2019 01:34:04 +0000 (09:34 +0800)
* add exceptions.h for EIO, ENOENT error handling
* add PGBackend for serving i/o requests

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/ec_backend.cc [new file with mode: 0644]
src/crimson/osd/ec_backend.h [new file with mode: 0644]
src/crimson/osd/exceptions.h [new file with mode: 0644]
src/crimson/osd/osd.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc [new file with mode: 0644]
src/crimson/osd/pg_backend.h [new file with mode: 0644]
src/crimson/osd/replicated_backend.cc [new file with mode: 0644]
src/crimson/osd/replicated_backend.h [new file with mode: 0644]

index 9045b61112db6802dc3c8a45defcddea5b584bdc..e0f1c580417e4ab81f116e1e020bb9b676b1be90 100644 (file)
@@ -1,13 +1,16 @@
 add_executable(crimson-osd
   chained_dispatchers.cc
+  ec_backend.cc
   heartbeat.cc
   main.cc
   osd.cc
   osd_meta.cc
   pg.cc
+  pg_backend.cc
   pg_meta.cc
   recovery_machine.cc
   recovery_state.cc
-  recovery_states.cc)
+  recovery_states.cc
+  replicated_backend.cc)
 target_link_libraries(crimson-osd
   crimson-common crimson-os crimson fmt::fmt)
diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc
new file mode 100644 (file)
index 0000000..510679e
--- /dev/null
@@ -0,0 +1,21 @@
+#include "ec_backend.h"
+#include "crimson/os/cyan_collection.h"
+
+ECBackend::ECBackend(shard_id_t shard,
+                     ECBackend::CollectionRef coll,
+                     ceph::os::CyanStore* store,
+                     const ec_profile_t&,
+                     uint64_t)
+  : PGBackend{shard, coll, store}
+{
+  // todo
+}
+
+seastar::future<bufferlist> ECBackend::_read(const hobject_t& hoid,
+                                             uint64_t off,
+                                             uint64_t len,
+                                             uint32_t flags)
+{
+  // todo
+  return seastar::make_ready_future<bufferlist>();
+}
diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h
new file mode 100644 (file)
index 0000000..6849c2a
--- /dev/null
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/future.hh>
+#include "include/buffer_fwd.h"
+#include "osd/osd_types.h"
+#include "pg_backend.h"
+
+class ECBackend : public PGBackend
+{
+public:
+  ECBackend(shard_id_t shard,
+           CollectionRef, ceph::os::CyanStore*,
+           const ec_profile_t& ec_profile,
+           uint64_t stripe_width);
+private:
+  seastar::future<ceph::bufferlist> _read(const hobject_t& hoid,
+                                         uint64_t off,
+                                         uint64_t len,
+                                         uint32_t flags) override;
+  CollectionRef coll;
+  ceph::os::CyanStore* store;
+};
diff --git a/src/crimson/osd/exceptions.h b/src/crimson/osd/exceptions.h
new file mode 100644 (file)
index 0000000..8462d73
--- /dev/null
@@ -0,0 +1,13 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <exception>
+
+class object_not_found : public std::exception {
+};
+
+class object_corrupted : public std::exception {
+};
+
index 99e6684d19e49dc6d9fdca2507d42d456d00de1e..7eb59dc88a539d99199c21ac2fca5a70c6ce8fac 100644 (file)
@@ -28,6 +28,7 @@
 #include "crimson/osd/heartbeat.h"
 #include "crimson/osd/osd_meta.h"
 #include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
 #include "crimson/osd/pg_meta.h"
 
 #include "osd/PGPeeringEvent.h"
@@ -357,11 +358,12 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
   }).then([pgid, this](pg_pool_t&& pool,
                        string&& name,
                        ec_profile_t&& ec_profile) {
+    auto backend = PGBackend::create(pgid, pool, store.get(), ec_profile);
     Ref<PG> pg{new PG{pgid,
                       pg_shard_t{whoami, pgid.shard},
                       std::move(pool),
                       std::move(name),
-                      std::move(ec_profile),
+                      std::move(backend),
                       osdmap,
                       cluster_msgr}};
     return pg->read_state(store.get()).then([pg] {
index 2997305acbcffbf485af0c764e935d660ba3390d..d21ba3ba613b9a0e7ef5294e6fec0de73b145736 100644 (file)
@@ -9,6 +9,8 @@
 #include <boost/range/algorithm/max_element.hpp>
 #include <boost/range/numeric.hpp>
 
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGNotify.h"
 
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
+#include "crimson/os/cyan_collection.h"
 #include "crimson/os/cyan_store.h"
+#include "crimson/osd/exceptions.h"
 #include "crimson/osd/pg_meta.h"
 
+#include "pg_backend.h"
 #include "recovery_events.h"
 #include "recovery_state.h"
 
@@ -43,7 +48,7 @@ PG::PG(spg_t pgid,
        pg_shard_t pg_shard,
        pg_pool_t&& pool,
        std::string&& name,
-       ec_profile_t&& ec_profile,
+       std::unique_ptr<PGBackend> backend,
        cached_map_t osdmap,
        ceph::net::Messenger& msgr)
   : pgid{pgid},
@@ -51,6 +56,7 @@ PG::PG(spg_t pgid,
     pool{std::move(pool)},
     recovery_state{*this},
     info{pgid},
+    backend{std::move(backend)},
     osdmap{osdmap},
     msgr{msgr}
 {
@@ -166,6 +172,10 @@ void PG::set_state(uint64_t mask)
   info.stats.last_change = now;
   if (mask & PG_STATE_ACTIVE) {
     info.stats.last_became_active = now;
+    if (active_promise) {
+      std::move(active_promise)->set_value();
+      active_promise.reset();
+    }
   }
   if (mask & (PG_STATE_ACTIVE | PG_STATE_PEERED) &&
       test_state(PG_STATE_ACTIVE | PG_STATE_PEERED)) {
@@ -957,11 +967,68 @@ seastar::future<> PG::wait_for_active()
   }
 }
 
+seastar::future<>
+PG::do_osd_op(const object_info_t& oi, OSDOp* osd_op)
+{
+  switch (const auto& op = osd_op->op; op.op) {
+  case CEPH_OSD_OP_SYNC_READ:
+    [[fallthrough]];
+  case CEPH_OSD_OP_READ:
+    return backend->read(oi,
+                         op.extent.offset,
+                         op.extent.length,
+                         op.extent.truncate_size,
+                         op.extent.truncate_seq,
+                         op.flags).then([osd_op](bufferlist bl) {
+      osd_op->rval = bl.length();
+      osd_op->outdata = std::move(bl);
+      return seastar::now();
+    });
+  default:
+    return seastar::now();
+  }
+}
+
+seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
+{
+  // todo: issue requests in parallel if they don't write,
+  // with writes being basically a synchronization barrier
+  return seastar::do_with(std::move(m), [this](auto& m) {
+    return seastar::do_for_each(begin(m->ops), end(m->ops),
+                                [m,this](OSDOp& osd_op) {
+      const auto oid = (m->get_snapid() == CEPH_SNAPDIR ?
+                        m->get_hobj().get_head() :
+                        m->get_hobj());
+      return backend->get_object(oid).then([&osd_op,this](auto oi) {
+        return do_osd_op(oi, &osd_op);
+      }).handle_exception_type([&osd_op](const object_not_found&) {
+        osd_op.rval = -ENOENT;
+        throw;
+      });
+    }).then([=] {
+      auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+                                             0, false);
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    }).handle_exception_type([=](const object_not_found& dne) {
+      auto reply = make_message<MOSDOpReply>(m.get(), -ENOENT, get_osdmap_epoch(),
+                                             0, false);
+      reply->set_enoent_reply_versions(info.last_update,
+                                       info.last_user_version);
+      return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    });
+  });
+}
+
 seastar::future<> PG::handle_op(ceph::net::ConnectionRef conn,
                                 Ref<MOSDOp> m)
 {
   return wait_for_active().then([conn, m, this] {
-    // todo
-    return seastar::now();
+    if (m->finish_decode()) {
+      m->clear_payload();
+    }
+    return do_osd_ops(m);
+  }).then([conn](Ref<MOSDOpReply> reply) {
+    return conn->send(reply);
   });
 }
index 06a186a865c16f790d1e783d7dca8b7e2e4b3f8c..47340f034f464a9b9224fe293c7b201c9a7b3ed0 100644 (file)
@@ -3,10 +3,13 @@
 
 #pragma once
 
+#include <memory>
+#include <optional>
 #include <boost/intrusive_ptr.hpp>
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include <boost/smart_ptr/local_shared_ptr.hpp>
 #include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
 
 #include "crimson/net/Fwd.h"
 #include "osd/osd_types.h"
@@ -15,6 +18,7 @@
 template<typename T> using Ref = boost::intrusive_ptr<T>;
 class OSDMap;
 class MQuery;
+class PGBackend;
 class PGPeeringEvent;
 namespace recovery {
   class Context;
@@ -40,7 +44,7 @@ public:
      pg_shard_t pg_shard,
      pg_pool_t&& pool,
      std::string&& name,
-     ec_profile_t&& ec_profile,
+     std::unique_ptr<PGBackend> backend,
      cached_map_t osdmap,
      ceph::net::Messenger& msgr);
 
@@ -123,6 +127,9 @@ private:
                            int new_up_primary,
                            const std::vector<int>& new_acting,
                            int new_acting_primary);
+  seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
+  seastar::future<> do_osd_op(const object_info_t& oi, OSDOp* op);
+
 private:
   const spg_t pgid;
   pg_shard_t whoami;
@@ -153,6 +160,7 @@ private:
 
   seastar::future<> wait_for_active();
   std::optional<seastar::shared_promise<>> active_promise;
+  std::unique_ptr<PGBackend> backend;
 
   cached_map_t osdmap;
   ceph::net::Messenger& msgr;
diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc
new file mode 100644 (file)
index 0000000..8ceaddc
--- /dev/null
@@ -0,0 +1,157 @@
+#include "pg_backend.h"
+
+#include <optional>
+#include <fmt/ostream.h>
+#include <seastar/core/print.hh>
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+#include "replicated_backend.h"
+#include "ec_backend.h"
+#include "exceptions.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+std::unique_ptr<PGBackend> PGBackend::create(const spg_t pgid,
+                                             const pg_pool_t& pool,
+                                             ceph::os::CyanStore* store,
+                                             const ec_profile_t& ec_profile)
+{
+  auto coll = store->open_collection(coll_t{pgid});
+  switch (pool.type) {
+  case pg_pool_t::TYPE_REPLICATED:
+    return std::make_unique<ReplicatedBackend>(pgid.shard, coll, store);
+  case pg_pool_t::TYPE_ERASURE:
+    return std::make_unique<ECBackend>(pgid.shard, coll, store,
+                                       std::move(ec_profile),
+                                       pool.stripe_width);
+  default:
+    throw runtime_error(seastar::format("unsupported pool type '{}'",
+                                        pool.type));
+  }
+}
+
+PGBackend::PGBackend(shard_id_t shard,
+                     CollectionRef coll,
+                     ceph::os::CyanStore* store)
+  : shard{shard},
+    coll{coll},
+    store{store}
+{}
+
+seastar::future<object_info_t> PGBackend::get_object(const hobject_t& oid)
+{
+  // want the head?
+  if (oid.snap == CEPH_NOSNAP) {
+    logger().trace("find_object: {}@HEAD", oid);
+    return _load_oi(oid);
+  } else {
+    // we want a snap
+    return _load_ss(oid).then([oid,this](SnapSet ss) {
+      // head?
+      if (oid.snap > ss.seq) {
+        return _load_oi(oid.get_head());
+      } else {
+        // which clone would it be?
+        auto clone = std::upper_bound(begin(ss.clones), end(ss.clones),
+                                      oid.snap);
+        if (clone == end(ss.clones)) {
+          throw object_not_found{};
+        }
+        // clone
+        auto soid = oid;
+        soid.snap = *clone;
+        return _load_ss(soid).then([soid,this](SnapSet ss) {
+          auto clone_snap = ss.clone_snaps.find(soid.snap);
+          assert(clone_snap != end(ss.clone_snaps));
+          if (clone_snap->second.empty()) {
+            logger().trace("find_object: {}@[] -- DNE", soid);
+            throw object_not_found{};
+          }
+          auto first = clone_snap->second.back();
+          auto last = clone_snap->second.front();
+          if (first > soid.snap) {
+            logger().trace("find_object: {}@[{},{}] -- DNE",
+                           soid, first, last);
+            throw object_not_found{};
+          }
+          logger().trace("find_object: {}@[{},{}] -- HIT",
+                         soid, first, last);
+          return _load_oi(soid);
+        });
+      }
+    });
+  }
+}
+
+seastar::future<object_info_t> PGBackend::_load_oi(const hobject_t& oid)
+{
+  return store->get_attr(coll,
+                         ghobject_t{oid, ghobject_t::NO_GEN, shard},
+                         OI_ATTR).then([this](auto bp) {
+    object_info_t oi;
+    bufferlist bl;
+    bl.push_back(std::move(bp));
+    oi.decode(bl);
+    return seastar::make_ready_future<object_info_t>(std::move(oi));
+  });
+}
+
+seastar::future<SnapSet> PGBackend::_load_ss(const hobject_t& oid)
+{
+  return store->get_attr(coll,
+                         ghobject_t{oid, ghobject_t::NO_GEN, shard},
+                         SS_ATTR).then([this](auto bp) {
+    bufferlist bl;
+    bl.push_back(std::move(bp));
+    SnapSet snapset{bl};
+    return seastar::make_ready_future<SnapSet>(std::move(snapset));
+  });
+}
+
+seastar::future<bufferlist> PGBackend::read(const object_info_t& oi,
+                                            size_t offset,
+                                            size_t length,
+                                            size_t truncate_size,
+                                            uint32_t truncate_seq,
+                                            uint32_t flags)
+{
+  logger().trace("read: {} {}~{}", oi.soid, offset, length);
+  // are we beyond truncate_size?
+  size_t size = oi.size;
+  if ((truncate_seq > oi.truncate_seq) &&
+      (truncate_size < offset + length) &&
+      (truncate_size < size)) {
+    size = truncate_size;
+  }
+  if (!length) {
+    // read the whole object if length is 0
+    length = size;
+  }
+  if (offset >= size) {
+    // read size was trimmed to zero and it is expected to do nothing,
+    return seastar::make_ready_future<bufferlist>();
+  }
+  std::optional<uint32_t> maybe_crc;
+  if (oi.is_data_digest() && offset == 0 && length >= oi.size) {
+    maybe_crc = oi.data_digest;
+  }
+  return _read(oi.soid, offset, length, flags).then(
+    [maybe_crc, soid=oi.soid, size=oi.size](auto bl) {
+      // whole object?  can we verify the checksum?
+      if (maybe_crc && bl.length() == size) {
+        if (auto crc = bl.crc32c(-1); crc != *maybe_crc) {
+          logger().error("full-object read crc {} != expected {} on {}",
+            crc, *maybe_crc, soid);
+          // todo: mark soid missing, perform recovery, and retry
+          throw object_corrupted{};
+        }
+      }
+      return seastar::make_ready_future<bufferlist>(std::move(bl));
+    });
+}
diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h
new file mode 100644 (file)
index 0000000..0ac8ff3
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "osd/osd_types.h"
+
+struct hobject_t;
+namespace ceph::os {
+  class Collection;
+  class CyanStore;
+}
+
+class PGBackend
+{
+protected:
+  using CollectionRef = boost::intrusive_ptr<ceph::os::Collection>;
+  using ec_profile_t = std::map<std::string, std::string>;
+
+public:
+  PGBackend(shard_id_t shard, CollectionRef coll, ceph::os::CyanStore* store);
+  virtual ~PGBackend() = default;
+  static std::unique_ptr<PGBackend> create(const spg_t pgid,
+                                          const pg_pool_t& pool,
+                                          ceph::os::CyanStore* store,
+                                          const ec_profile_t& ec_profile);
+  seastar::future<object_info_t> get_object(const hobject_t& oid);
+  seastar::future<bufferlist> read(const object_info_t& oi,
+                                  uint64_t off,
+                                  uint64_t len,
+                                  size_t truncate_size,
+                                  uint32_t truncate_seq,
+                                  uint32_t flags);
+protected:
+  const shard_id_t shard;
+  CollectionRef coll;
+  ceph::os::CyanStore* store;
+
+private:
+  seastar::future<SnapSet> _load_ss(const hobject_t& oid);
+  seastar::future<object_info_t> _load_oi(const hobject_t& oid);
+  virtual seastar::future<bufferlist> _read(const hobject_t& hoid,
+                                           size_t offset,
+                                           size_t length,
+                                           uint32_t flags) = 0;
+};
diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc
new file mode 100644 (file)
index 0000000..81c09c4
--- /dev/null
@@ -0,0 +1,19 @@
+#include "replicated_backend.h"
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+
+ReplicatedBackend::ReplicatedBackend(shard_id_t shard,
+                                     ReplicatedBackend::CollectionRef coll,
+                                     ceph::os::CyanStore* store)
+  : PGBackend{shard, coll, store}
+{}
+
+seastar::future<bufferlist> ReplicatedBackend::_read(const hobject_t& hoid,
+                                                     uint64_t off,
+                                                     uint64_t len,
+                                                     uint32_t flags)
+{
+  return store->read(coll, ghobject_t{hoid}, off, len, flags);
+}
diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h
new file mode 100644 (file)
index 0000000..0a91e03
--- /dev/null
@@ -0,0 +1,23 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/future.hh>
+#include "include/buffer_fwd.h"
+#include "osd/osd_types.h"
+#include "pg_backend.h"
+
+class ReplicatedBackend : public PGBackend
+{
+public:
+  ReplicatedBackend(shard_id_t shard,
+                   CollectionRef coll,
+                   ceph::os::CyanStore* store);
+private:
+  seastar::future<ceph::bufferlist> _read(const hobject_t& hoid,
+                                         uint64_t off,
+                                         uint64_t len,
+                                         uint32_t flags) override;
+};