From 4619a9cc4915e5056ac58962389f43cde719e0d7 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 11 Mar 2019 21:42:55 +0800 Subject: [PATCH] crimson/osd: add pg backends * add exceptions.h for EIO, ENOENT error handling * add PGBackend for serving i/o requests Signed-off-by: Kefu Chai --- src/crimson/osd/CMakeLists.txt | 5 +- src/crimson/osd/ec_backend.cc | 21 ++++ src/crimson/osd/ec_backend.h | 26 +++++ src/crimson/osd/exceptions.h | 13 +++ src/crimson/osd/osd.cc | 4 +- src/crimson/osd/pg.cc | 73 +++++++++++- src/crimson/osd/pg.h | 10 +- src/crimson/osd/pg_backend.cc | 157 ++++++++++++++++++++++++++ src/crimson/osd/pg_backend.h | 50 ++++++++ src/crimson/osd/replicated_backend.cc | 19 ++++ src/crimson/osd/replicated_backend.h | 23 ++++ 11 files changed, 395 insertions(+), 6 deletions(-) create mode 100644 src/crimson/osd/ec_backend.cc create mode 100644 src/crimson/osd/ec_backend.h create mode 100644 src/crimson/osd/exceptions.h create mode 100644 src/crimson/osd/pg_backend.cc create mode 100644 src/crimson/osd/pg_backend.h create mode 100644 src/crimson/osd/replicated_backend.cc create mode 100644 src/crimson/osd/replicated_backend.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 9045b61112d..e0f1c580417 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -1,13 +1,16 @@ add_executable(crimson-osd chained_dispatchers.cc + ec_backend.cc heartbeat.cc main.cc osd.cc osd_meta.cc pg.cc + pg_backend.cc pg_meta.cc recovery_machine.cc recovery_state.cc - recovery_states.cc) + recovery_states.cc + replicated_backend.cc) target_link_libraries(crimson-osd crimson-common crimson-os crimson fmt::fmt) diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc new file mode 100644 index 00000000000..510679efa91 --- /dev/null +++ b/src/crimson/osd/ec_backend.cc @@ -0,0 +1,21 @@ +#include "ec_backend.h" +#include "crimson/os/cyan_collection.h" + +ECBackend::ECBackend(shard_id_t shard, + ECBackend::CollectionRef coll, + ceph::os::CyanStore* store, + const ec_profile_t&, + uint64_t) + : PGBackend{shard, coll, store} +{ + // todo +} + +seastar::future ECBackend::_read(const hobject_t& hoid, + uint64_t off, + uint64_t len, + uint32_t flags) +{ + // todo + return seastar::make_ready_future(); +} diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h new file mode 100644 index 00000000000..6849c2adf2d --- /dev/null +++ b/src/crimson/osd/ec_backend.h @@ -0,0 +1,26 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include "include/buffer_fwd.h" +#include "osd/osd_types.h" +#include "pg_backend.h" + +class ECBackend : public PGBackend +{ +public: + ECBackend(shard_id_t shard, + CollectionRef, ceph::os::CyanStore*, + const ec_profile_t& ec_profile, + uint64_t stripe_width); +private: + seastar::future _read(const hobject_t& hoid, + uint64_t off, + uint64_t len, + uint32_t flags) override; + CollectionRef coll; + ceph::os::CyanStore* store; +}; diff --git a/src/crimson/osd/exceptions.h b/src/crimson/osd/exceptions.h new file mode 100644 index 00000000000..8462d73188d --- /dev/null +++ b/src/crimson/osd/exceptions.h @@ -0,0 +1,13 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +class object_not_found : public std::exception { +}; + +class object_corrupted : public std::exception { +}; + diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 99e6684d19e..7eb59dc88a5 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -28,6 +28,7 @@ #include "crimson/osd/heartbeat.h" #include "crimson/osd/osd_meta.h" #include "crimson/osd/pg.h" +#include "crimson/osd/pg_backend.h" #include "crimson/osd/pg_meta.h" #include "osd/PGPeeringEvent.h" @@ -357,11 +358,12 @@ seastar::future> OSD::load_pg(spg_t pgid) }).then([pgid, this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) { + auto backend = PGBackend::create(pgid, pool, store.get(), ec_profile); Ref pg{new PG{pgid, pg_shard_t{whoami, pgid.shard}, std::move(pool), std::move(name), - std::move(ec_profile), + std::move(backend), osdmap, cluster_msgr}}; return pg->read_state(store.get()).then([pg] { diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 2997305acbc..d21ba3ba613 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -9,6 +9,8 @@ #include #include +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGLog.h" #include "messages/MOSDPGNotify.h" @@ -18,9 +20,12 @@ #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" +#include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_store.h" +#include "crimson/osd/exceptions.h" #include "crimson/osd/pg_meta.h" +#include "pg_backend.h" #include "recovery_events.h" #include "recovery_state.h" @@ -43,7 +48,7 @@ PG::PG(spg_t pgid, pg_shard_t pg_shard, pg_pool_t&& pool, std::string&& name, - ec_profile_t&& ec_profile, + std::unique_ptr backend, cached_map_t osdmap, ceph::net::Messenger& msgr) : pgid{pgid}, @@ -51,6 +56,7 @@ PG::PG(spg_t pgid, pool{std::move(pool)}, recovery_state{*this}, info{pgid}, + backend{std::move(backend)}, osdmap{osdmap}, msgr{msgr} { @@ -166,6 +172,10 @@ void PG::set_state(uint64_t mask) info.stats.last_change = now; if (mask & PG_STATE_ACTIVE) { info.stats.last_became_active = now; + if (active_promise) { + std::move(active_promise)->set_value(); + active_promise.reset(); + } } if (mask & (PG_STATE_ACTIVE | PG_STATE_PEERED) && test_state(PG_STATE_ACTIVE | PG_STATE_PEERED)) { @@ -957,11 +967,68 @@ seastar::future<> PG::wait_for_active() } } +seastar::future<> +PG::do_osd_op(const object_info_t& oi, OSDOp* osd_op) +{ + switch (const auto& op = osd_op->op; op.op) { + case CEPH_OSD_OP_SYNC_READ: + [[fallthrough]]; + case CEPH_OSD_OP_READ: + return backend->read(oi, + op.extent.offset, + op.extent.length, + op.extent.truncate_size, + op.extent.truncate_seq, + op.flags).then([osd_op](bufferlist bl) { + osd_op->rval = bl.length(); + osd_op->outdata = std::move(bl); + return seastar::now(); + }); + default: + return seastar::now(); + } +} + +seastar::future> PG::do_osd_ops(Ref m) +{ + // todo: issue requests in parallel if they don't write, + // with writes being basically a synchronization barrier + return seastar::do_with(std::move(m), [this](auto& m) { + return seastar::do_for_each(begin(m->ops), end(m->ops), + [m,this](OSDOp& osd_op) { + const auto oid = (m->get_snapid() == CEPH_SNAPDIR ? + m->get_hobj().get_head() : + m->get_hobj()); + return backend->get_object(oid).then([&osd_op,this](auto oi) { + return do_osd_op(oi, &osd_op); + }).handle_exception_type([&osd_op](const object_not_found&) { + osd_op.rval = -ENOENT; + throw; + }); + }).then([=] { + auto reply = make_message(m.get(), 0, get_osdmap_epoch(), + 0, false); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + return seastar::make_ready_future>(std::move(reply)); + }).handle_exception_type([=](const object_not_found& dne) { + auto reply = make_message(m.get(), -ENOENT, get_osdmap_epoch(), + 0, false); + reply->set_enoent_reply_versions(info.last_update, + info.last_user_version); + return seastar::make_ready_future>(std::move(reply)); + }); + }); +} + seastar::future<> PG::handle_op(ceph::net::ConnectionRef conn, Ref m) { return wait_for_active().then([conn, m, this] { - // todo - return seastar::now(); + if (m->finish_decode()) { + m->clear_payload(); + } + return do_osd_ops(m); + }).then([conn](Ref reply) { + return conn->send(reply); }); } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 06a186a865c..47340f034f4 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -3,10 +3,13 @@ #pragma once +#include +#include #include #include #include #include +#include #include "crimson/net/Fwd.h" #include "osd/osd_types.h" @@ -15,6 +18,7 @@ template using Ref = boost::intrusive_ptr; class OSDMap; class MQuery; +class PGBackend; class PGPeeringEvent; namespace recovery { class Context; @@ -40,7 +44,7 @@ public: pg_shard_t pg_shard, pg_pool_t&& pool, std::string&& name, - ec_profile_t&& ec_profile, + std::unique_ptr backend, cached_map_t osdmap, ceph::net::Messenger& msgr); @@ -123,6 +127,9 @@ private: int new_up_primary, const std::vector& new_acting, int new_acting_primary); + seastar::future> do_osd_ops(Ref m); + seastar::future<> do_osd_op(const object_info_t& oi, OSDOp* op); + private: const spg_t pgid; pg_shard_t whoami; @@ -153,6 +160,7 @@ private: seastar::future<> wait_for_active(); std::optional> active_promise; + std::unique_ptr backend; cached_map_t osdmap; ceph::net::Messenger& msgr; diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc new file mode 100644 index 00000000000..8ceaddc9dae --- /dev/null +++ b/src/crimson/osd/pg_backend.cc @@ -0,0 +1,157 @@ +#include "pg_backend.h" + +#include +#include +#include + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/cyan_store.h" +#include "replicated_backend.h" +#include "ec_backend.h" +#include "exceptions.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +std::unique_ptr PGBackend::create(const spg_t pgid, + const pg_pool_t& pool, + ceph::os::CyanStore* store, + const ec_profile_t& ec_profile) +{ + auto coll = store->open_collection(coll_t{pgid}); + switch (pool.type) { + case pg_pool_t::TYPE_REPLICATED: + return std::make_unique(pgid.shard, coll, store); + case pg_pool_t::TYPE_ERASURE: + return std::make_unique(pgid.shard, coll, store, + std::move(ec_profile), + pool.stripe_width); + default: + throw runtime_error(seastar::format("unsupported pool type '{}'", + pool.type)); + } +} + +PGBackend::PGBackend(shard_id_t shard, + CollectionRef coll, + ceph::os::CyanStore* store) + : shard{shard}, + coll{coll}, + store{store} +{} + +seastar::future PGBackend::get_object(const hobject_t& oid) +{ + // want the head? + if (oid.snap == CEPH_NOSNAP) { + logger().trace("find_object: {}@HEAD", oid); + return _load_oi(oid); + } else { + // we want a snap + return _load_ss(oid).then([oid,this](SnapSet ss) { + // head? + if (oid.snap > ss.seq) { + return _load_oi(oid.get_head()); + } else { + // which clone would it be? + auto clone = std::upper_bound(begin(ss.clones), end(ss.clones), + oid.snap); + if (clone == end(ss.clones)) { + throw object_not_found{}; + } + // clone + auto soid = oid; + soid.snap = *clone; + return _load_ss(soid).then([soid,this](SnapSet ss) { + auto clone_snap = ss.clone_snaps.find(soid.snap); + assert(clone_snap != end(ss.clone_snaps)); + if (clone_snap->second.empty()) { + logger().trace("find_object: {}@[] -- DNE", soid); + throw object_not_found{}; + } + auto first = clone_snap->second.back(); + auto last = clone_snap->second.front(); + if (first > soid.snap) { + logger().trace("find_object: {}@[{},{}] -- DNE", + soid, first, last); + throw object_not_found{}; + } + logger().trace("find_object: {}@[{},{}] -- HIT", + soid, first, last); + return _load_oi(soid); + }); + } + }); + } +} + +seastar::future PGBackend::_load_oi(const hobject_t& oid) +{ + return store->get_attr(coll, + ghobject_t{oid, ghobject_t::NO_GEN, shard}, + OI_ATTR).then([this](auto bp) { + object_info_t oi; + bufferlist bl; + bl.push_back(std::move(bp)); + oi.decode(bl); + return seastar::make_ready_future(std::move(oi)); + }); +} + +seastar::future PGBackend::_load_ss(const hobject_t& oid) +{ + return store->get_attr(coll, + ghobject_t{oid, ghobject_t::NO_GEN, shard}, + SS_ATTR).then([this](auto bp) { + bufferlist bl; + bl.push_back(std::move(bp)); + SnapSet snapset{bl}; + return seastar::make_ready_future(std::move(snapset)); + }); +} + +seastar::future PGBackend::read(const object_info_t& oi, + size_t offset, + size_t length, + size_t truncate_size, + uint32_t truncate_seq, + uint32_t flags) +{ + logger().trace("read: {} {}~{}", oi.soid, offset, length); + // are we beyond truncate_size? + size_t size = oi.size; + if ((truncate_seq > oi.truncate_seq) && + (truncate_size < offset + length) && + (truncate_size < size)) { + size = truncate_size; + } + if (!length) { + // read the whole object if length is 0 + length = size; + } + if (offset >= size) { + // read size was trimmed to zero and it is expected to do nothing, + return seastar::make_ready_future(); + } + std::optional maybe_crc; + if (oi.is_data_digest() && offset == 0 && length >= oi.size) { + maybe_crc = oi.data_digest; + } + return _read(oi.soid, offset, length, flags).then( + [maybe_crc, soid=oi.soid, size=oi.size](auto bl) { + // whole object? can we verify the checksum? + if (maybe_crc && bl.length() == size) { + if (auto crc = bl.crc32c(-1); crc != *maybe_crc) { + logger().error("full-object read crc {} != expected {} on {}", + crc, *maybe_crc, soid); + // todo: mark soid missing, perform recovery, and retry + throw object_corrupted{}; + } + } + return seastar::make_ready_future(std::move(bl)); + }); +} diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h new file mode 100644 index 00000000000..0ac8ff38ed9 --- /dev/null +++ b/src/crimson/osd/pg_backend.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include + +#include "osd/osd_types.h" + +struct hobject_t; +namespace ceph::os { + class Collection; + class CyanStore; +} + +class PGBackend +{ +protected: + using CollectionRef = boost::intrusive_ptr; + using ec_profile_t = std::map; + +public: + PGBackend(shard_id_t shard, CollectionRef coll, ceph::os::CyanStore* store); + virtual ~PGBackend() = default; + static std::unique_ptr create(const spg_t pgid, + const pg_pool_t& pool, + ceph::os::CyanStore* store, + const ec_profile_t& ec_profile); + seastar::future get_object(const hobject_t& oid); + seastar::future read(const object_info_t& oi, + uint64_t off, + uint64_t len, + size_t truncate_size, + uint32_t truncate_seq, + uint32_t flags); +protected: + const shard_id_t shard; + CollectionRef coll; + ceph::os::CyanStore* store; + +private: + seastar::future _load_ss(const hobject_t& oid); + seastar::future _load_oi(const hobject_t& oid); + virtual seastar::future _read(const hobject_t& hoid, + size_t offset, + size_t length, + uint32_t flags) = 0; +}; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc new file mode 100644 index 00000000000..81c09c4b320 --- /dev/null +++ b/src/crimson/osd/replicated_backend.cc @@ -0,0 +1,19 @@ +#include "replicated_backend.h" + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/cyan_store.h" + +ReplicatedBackend::ReplicatedBackend(shard_id_t shard, + ReplicatedBackend::CollectionRef coll, + ceph::os::CyanStore* store) + : PGBackend{shard, coll, store} +{} + +seastar::future ReplicatedBackend::_read(const hobject_t& hoid, + uint64_t off, + uint64_t len, + uint32_t flags) +{ + return store->read(coll, ghobject_t{hoid}, off, len, flags); +} diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h new file mode 100644 index 00000000000..0a91e03b858 --- /dev/null +++ b/src/crimson/osd/replicated_backend.h @@ -0,0 +1,23 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include "include/buffer_fwd.h" +#include "osd/osd_types.h" +#include "pg_backend.h" + +class ReplicatedBackend : public PGBackend +{ +public: + ReplicatedBackend(shard_id_t shard, + CollectionRef coll, + ceph::os::CyanStore* store); +private: + seastar::future _read(const hobject_t& hoid, + uint64_t off, + uint64_t len, + uint32_t flags) override; +}; -- 2.39.5