add_executable(crimson-osd
chained_dispatchers.cc
+ ec_backend.cc
heartbeat.cc
main.cc
osd.cc
osd_meta.cc
pg.cc
+ pg_backend.cc
pg_meta.cc
recovery_machine.cc
recovery_state.cc
- recovery_states.cc)
+ recovery_states.cc
+ replicated_backend.cc)
target_link_libraries(crimson-osd
crimson-common crimson-os crimson fmt::fmt)
--- /dev/null
+#include "ec_backend.h"
+#include "crimson/os/cyan_collection.h"
+
+ECBackend::ECBackend(shard_id_t shard,
+ ECBackend::CollectionRef coll,
+ ceph::os::CyanStore* store,
+ const ec_profile_t&,
+ uint64_t)
+ : PGBackend{shard, coll, store}
+{
+ // todo
+}
+
+seastar::future<bufferlist> ECBackend::_read(const hobject_t& hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t flags)
+{
+ // todo
+ return seastar::make_ready_future<bufferlist>();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/future.hh>
+#include "include/buffer_fwd.h"
+#include "osd/osd_types.h"
+#include "pg_backend.h"
+
+class ECBackend : public PGBackend
+{
+public:
+ ECBackend(shard_id_t shard,
+ CollectionRef, ceph::os::CyanStore*,
+ const ec_profile_t& ec_profile,
+ uint64_t stripe_width);
+private:
+ seastar::future<ceph::bufferlist> _read(const hobject_t& hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t flags) override;
+ CollectionRef coll;
+ ceph::os::CyanStore* store;
+};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <exception>
+
+class object_not_found : public std::exception {
+};
+
+class object_corrupted : public std::exception {
+};
+
#include "crimson/osd/heartbeat.h"
#include "crimson/osd/osd_meta.h"
#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
#include "osd/PGPeeringEvent.h"
}).then([pgid, this](pg_pool_t&& pool,
string&& name,
ec_profile_t&& ec_profile) {
+ auto backend = PGBackend::create(pgid, pool, store.get(), ec_profile);
Ref<PG> pg{new PG{pgid,
pg_shard_t{whoami, pgid.shard},
std::move(pool),
std::move(name),
- std::move(ec_profile),
+ std::move(backend),
osdmap,
cluster_msgr}};
return pg->read_state(store.get()).then([pg] {
#include <boost/range/algorithm/max_element.hpp>
#include <boost/range/numeric.hpp>
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGNotify.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
+#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_store.h"
+#include "crimson/osd/exceptions.h"
#include "crimson/osd/pg_meta.h"
+#include "pg_backend.h"
#include "recovery_events.h"
#include "recovery_state.h"
pg_shard_t pg_shard,
pg_pool_t&& pool,
std::string&& name,
- ec_profile_t&& ec_profile,
+ std::unique_ptr<PGBackend> backend,
cached_map_t osdmap,
ceph::net::Messenger& msgr)
: pgid{pgid},
pool{std::move(pool)},
recovery_state{*this},
info{pgid},
+ backend{std::move(backend)},
osdmap{osdmap},
msgr{msgr}
{
info.stats.last_change = now;
if (mask & PG_STATE_ACTIVE) {
info.stats.last_became_active = now;
+ if (active_promise) {
+ std::move(active_promise)->set_value();
+ active_promise.reset();
+ }
}
if (mask & (PG_STATE_ACTIVE | PG_STATE_PEERED) &&
test_state(PG_STATE_ACTIVE | PG_STATE_PEERED)) {
}
}
+seastar::future<>
+PG::do_osd_op(const object_info_t& oi, OSDOp* osd_op)
+{
+ switch (const auto& op = osd_op->op; op.op) {
+ case CEPH_OSD_OP_SYNC_READ:
+ [[fallthrough]];
+ case CEPH_OSD_OP_READ:
+ return backend->read(oi,
+ op.extent.offset,
+ op.extent.length,
+ op.extent.truncate_size,
+ op.extent.truncate_seq,
+ op.flags).then([osd_op](bufferlist bl) {
+ osd_op->rval = bl.length();
+ osd_op->outdata = std::move(bl);
+ return seastar::now();
+ });
+ default:
+ return seastar::now();
+ }
+}
+
+seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
+{
+ // todo: issue requests in parallel if they don't write,
+ // with writes being basically a synchronization barrier
+ return seastar::do_with(std::move(m), [this](auto& m) {
+ return seastar::do_for_each(begin(m->ops), end(m->ops),
+ [m,this](OSDOp& osd_op) {
+ const auto oid = (m->get_snapid() == CEPH_SNAPDIR ?
+ m->get_hobj().get_head() :
+ m->get_hobj());
+ return backend->get_object(oid).then([&osd_op,this](auto oi) {
+ return do_osd_op(oi, &osd_op);
+ }).handle_exception_type([&osd_op](const object_not_found&) {
+ osd_op.rval = -ENOENT;
+ throw;
+ });
+ }).then([=] {
+ auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ 0, false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }).handle_exception_type([=](const object_not_found& dne) {
+ auto reply = make_message<MOSDOpReply>(m.get(), -ENOENT, get_osdmap_epoch(),
+ 0, false);
+ reply->set_enoent_reply_versions(info.last_update,
+ info.last_user_version);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ });
+ });
+}
+
seastar::future<> PG::handle_op(ceph::net::ConnectionRef conn,
Ref<MOSDOp> m)
{
return wait_for_active().then([conn, m, this] {
- // todo
- return seastar::now();
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ return do_osd_ops(m);
+ }).then([conn](Ref<MOSDOpReply> reply) {
+ return conn->send(reply);
});
}
#pragma once
+#include <memory>
+#include <optional>
#include <boost/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/smart_ptr/local_shared_ptr.hpp>
#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
#include "crimson/net/Fwd.h"
#include "osd/osd_types.h"
template<typename T> using Ref = boost::intrusive_ptr<T>;
class OSDMap;
class MQuery;
+class PGBackend;
class PGPeeringEvent;
namespace recovery {
class Context;
pg_shard_t pg_shard,
pg_pool_t&& pool,
std::string&& name,
- ec_profile_t&& ec_profile,
+ std::unique_ptr<PGBackend> backend,
cached_map_t osdmap,
ceph::net::Messenger& msgr);
int new_up_primary,
const std::vector<int>& new_acting,
int new_acting_primary);
+ seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
+ seastar::future<> do_osd_op(const object_info_t& oi, OSDOp* op);
+
private:
const spg_t pgid;
pg_shard_t whoami;
seastar::future<> wait_for_active();
std::optional<seastar::shared_promise<>> active_promise;
+ std::unique_ptr<PGBackend> backend;
cached_map_t osdmap;
ceph::net::Messenger& msgr;
--- /dev/null
+#include "pg_backend.h"
+
+#include <optional>
+#include <fmt/ostream.h>
+#include <seastar/core/print.hh>
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+#include "replicated_backend.h"
+#include "ec_backend.h"
+#include "exceptions.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+std::unique_ptr<PGBackend> PGBackend::create(const spg_t pgid,
+ const pg_pool_t& pool,
+ ceph::os::CyanStore* store,
+ const ec_profile_t& ec_profile)
+{
+ auto coll = store->open_collection(coll_t{pgid});
+ switch (pool.type) {
+ case pg_pool_t::TYPE_REPLICATED:
+ return std::make_unique<ReplicatedBackend>(pgid.shard, coll, store);
+ case pg_pool_t::TYPE_ERASURE:
+ return std::make_unique<ECBackend>(pgid.shard, coll, store,
+ std::move(ec_profile),
+ pool.stripe_width);
+ default:
+ throw runtime_error(seastar::format("unsupported pool type '{}'",
+ pool.type));
+ }
+}
+
+PGBackend::PGBackend(shard_id_t shard,
+ CollectionRef coll,
+ ceph::os::CyanStore* store)
+ : shard{shard},
+ coll{coll},
+ store{store}
+{}
+
+seastar::future<object_info_t> PGBackend::get_object(const hobject_t& oid)
+{
+ // want the head?
+ if (oid.snap == CEPH_NOSNAP) {
+ logger().trace("find_object: {}@HEAD", oid);
+ return _load_oi(oid);
+ } else {
+ // we want a snap
+ return _load_ss(oid).then([oid,this](SnapSet ss) {
+ // head?
+ if (oid.snap > ss.seq) {
+ return _load_oi(oid.get_head());
+ } else {
+ // which clone would it be?
+ auto clone = std::upper_bound(begin(ss.clones), end(ss.clones),
+ oid.snap);
+ if (clone == end(ss.clones)) {
+ throw object_not_found{};
+ }
+ // clone
+ auto soid = oid;
+ soid.snap = *clone;
+ return _load_ss(soid).then([soid,this](SnapSet ss) {
+ auto clone_snap = ss.clone_snaps.find(soid.snap);
+ assert(clone_snap != end(ss.clone_snaps));
+ if (clone_snap->second.empty()) {
+ logger().trace("find_object: {}@[] -- DNE", soid);
+ throw object_not_found{};
+ }
+ auto first = clone_snap->second.back();
+ auto last = clone_snap->second.front();
+ if (first > soid.snap) {
+ logger().trace("find_object: {}@[{},{}] -- DNE",
+ soid, first, last);
+ throw object_not_found{};
+ }
+ logger().trace("find_object: {}@[{},{}] -- HIT",
+ soid, first, last);
+ return _load_oi(soid);
+ });
+ }
+ });
+ }
+}
+
+seastar::future<object_info_t> PGBackend::_load_oi(const hobject_t& oid)
+{
+ return store->get_attr(coll,
+ ghobject_t{oid, ghobject_t::NO_GEN, shard},
+ OI_ATTR).then([this](auto bp) {
+ object_info_t oi;
+ bufferlist bl;
+ bl.push_back(std::move(bp));
+ oi.decode(bl);
+ return seastar::make_ready_future<object_info_t>(std::move(oi));
+ });
+}
+
+seastar::future<SnapSet> PGBackend::_load_ss(const hobject_t& oid)
+{
+ return store->get_attr(coll,
+ ghobject_t{oid, ghobject_t::NO_GEN, shard},
+ SS_ATTR).then([this](auto bp) {
+ bufferlist bl;
+ bl.push_back(std::move(bp));
+ SnapSet snapset{bl};
+ return seastar::make_ready_future<SnapSet>(std::move(snapset));
+ });
+}
+
+seastar::future<bufferlist> PGBackend::read(const object_info_t& oi,
+ size_t offset,
+ size_t length,
+ size_t truncate_size,
+ uint32_t truncate_seq,
+ uint32_t flags)
+{
+ logger().trace("read: {} {}~{}", oi.soid, offset, length);
+ // are we beyond truncate_size?
+ size_t size = oi.size;
+ if ((truncate_seq > oi.truncate_seq) &&
+ (truncate_size < offset + length) &&
+ (truncate_size < size)) {
+ size = truncate_size;
+ }
+ if (!length) {
+ // read the whole object if length is 0
+ length = size;
+ }
+ if (offset >= size) {
+ // read size was trimmed to zero and it is expected to do nothing,
+ return seastar::make_ready_future<bufferlist>();
+ }
+ std::optional<uint32_t> maybe_crc;
+ if (oi.is_data_digest() && offset == 0 && length >= oi.size) {
+ maybe_crc = oi.data_digest;
+ }
+ return _read(oi.soid, offset, length, flags).then(
+ [maybe_crc, soid=oi.soid, size=oi.size](auto bl) {
+ // whole object? can we verify the checksum?
+ if (maybe_crc && bl.length() == size) {
+ if (auto crc = bl.crc32c(-1); crc != *maybe_crc) {
+ logger().error("full-object read crc {} != expected {} on {}",
+ crc, *maybe_crc, soid);
+ // todo: mark soid missing, perform recovery, and retry
+ throw object_corrupted{};
+ }
+ }
+ return seastar::make_ready_future<bufferlist>(std::move(bl));
+ });
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "osd/osd_types.h"
+
+struct hobject_t;
+namespace ceph::os {
+ class Collection;
+ class CyanStore;
+}
+
+class PGBackend
+{
+protected:
+ using CollectionRef = boost::intrusive_ptr<ceph::os::Collection>;
+ using ec_profile_t = std::map<std::string, std::string>;
+
+public:
+ PGBackend(shard_id_t shard, CollectionRef coll, ceph::os::CyanStore* store);
+ virtual ~PGBackend() = default;
+ static std::unique_ptr<PGBackend> create(const spg_t pgid,
+ const pg_pool_t& pool,
+ ceph::os::CyanStore* store,
+ const ec_profile_t& ec_profile);
+ seastar::future<object_info_t> get_object(const hobject_t& oid);
+ seastar::future<bufferlist> read(const object_info_t& oi,
+ uint64_t off,
+ uint64_t len,
+ size_t truncate_size,
+ uint32_t truncate_seq,
+ uint32_t flags);
+protected:
+ const shard_id_t shard;
+ CollectionRef coll;
+ ceph::os::CyanStore* store;
+
+private:
+ seastar::future<SnapSet> _load_ss(const hobject_t& oid);
+ seastar::future<object_info_t> _load_oi(const hobject_t& oid);
+ virtual seastar::future<bufferlist> _read(const hobject_t& hoid,
+ size_t offset,
+ size_t length,
+ uint32_t flags) = 0;
+};
--- /dev/null
+#include "replicated_backend.h"
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+
+ReplicatedBackend::ReplicatedBackend(shard_id_t shard,
+ ReplicatedBackend::CollectionRef coll,
+ ceph::os::CyanStore* store)
+ : PGBackend{shard, coll, store}
+{}
+
+seastar::future<bufferlist> ReplicatedBackend::_read(const hobject_t& hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t flags)
+{
+ return store->read(coll, ghobject_t{hoid}, off, len, flags);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/future.hh>
+#include "include/buffer_fwd.h"
+#include "osd/osd_types.h"
+#include "pg_backend.h"
+
+class ReplicatedBackend : public PGBackend
+{
+public:
+ ReplicatedBackend(shard_id_t shard,
+ CollectionRef coll,
+ ceph::os::CyanStore* store);
+private:
+ seastar::future<ceph::bufferlist> _read(const hobject_t& hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t flags) override;
+};