--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/version.hpp>
+#if BOOST_VERSION >= 106900
+#include <boost/container/small_vector.hpp>
+#else
+#include <vector>
+#endif
+
+namespace ceph::osd {
+ struct peer_shard_t {
+ pg_shard_t shard;
+ eversion_t last_complete_ondisk;
+ };
+#if BOOST_VERSION >= 106900
+ // small_vector is is_nothrow_move_constructible<> since 1.69
+ // 2 + 1 = 3, which is the default value of "osd_pool_default_size"
+ using acked_peers_t = boost::container::small_vector<peer_shard_t, 2>;
+#else
+ using acked_peers_t = std::vector<peer_shard_t>;
+#endif
+}
#include "ec_backend.h"
+
#include "crimson/os/cyan_collection.h"
+#include "crimson/osd/shard_services.h"
ECBackend::ECBackend(shard_id_t shard,
ECBackend::CollectionRef coll,
- ceph::os::FuturizedStore* store,
+ ceph::osd::ShardServices& shard_services,
const ec_profile_t&,
uint64_t)
- : PGBackend{shard, coll, store}
+ : PGBackend{shard, coll, &shard_services.get_store()}
{
// todo
}
// todo
return seastar::make_ready_future<bufferlist>();
}
+
+seastar::future<ceph::osd::acked_peers_t>
+ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
+ const hobject_t& hoid,
+ ceph::os::Transaction&& txn,
+ osd_reqid_t req_id,
+ epoch_t min_epoch, epoch_t max_epoch,
+ eversion_t ver)
+{
+ // todo
+ return seastar::make_ready_future<ceph::osd::acked_peers_t>();
+}
{
public:
ECBackend(shard_id_t shard,
- CollectionRef, ceph::os::FuturizedStore*,
+ CollectionRef coll,
+ ceph::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile,
uint64_t stripe_width);
private:
uint64_t off,
uint64_t len,
uint32_t flags) override;
+ seastar::future<ceph::osd::acked_peers_t>
+ _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+ const hobject_t& hoid,
+ ceph::os::Transaction&& txn,
+ osd_reqid_t req_id,
+ epoch_t min_epoch, epoch_t max_epoch,
+ eversion_t ver) final;
CollectionRef coll;
ceph::os::FuturizedStore* store;
};
#include "messages/MOSDMap.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPGLog.h"
+#include "messages/MOSDRepOpReply.h"
#include "messages/MPGStats.h"
+#include "os/Transaction.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_object.h"
#include "crimson/os/futurized_store.h"
-#include "os/Transaction.h"
#include "crimson/osd/heartbeat.h"
#include "crimson/osd/osd_meta.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
-#include "osd/PGPeeringEvent.h"
-#include "osd/PeeringState.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
return seastar::now();
case MSG_OSD_PG_LOG:
return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+ case MSG_OSD_REPOPREPLY:
+ return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
default:
logger().info("{} unhandled message {}", __func__, *m);
return seastar::now();
return seastar::now();
}
+seastar::future<> OSD::handle_rep_op_reply(ceph::net::Connection* conn,
+ Ref<MOSDRepOpReply> m)
+{
+ const auto& pgs = pg_map.get_pgs();
+ if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
+ pg->second->handle_rep_op_reply(conn, *m);
+ } else {
+ logger().warn("stale reply: {}", *m);
+ }
+ return seastar::now();
+}
+
bool OSD::should_restart() const
{
if (!osdmap->is_up(whoami)) {
class MOSDMap;
class MOSDOp;
+class MOSDRepOpReply;
class OSDMap;
class OSDMeta;
class Heartbeat;
Ref<MOSDMap> m);
seastar::future<> handle_osd_op(ceph::net::Connection* conn,
Ref<MOSDOp> m);
+ seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
+ Ref<MOSDRepOpReply> m);
seastar::future<> handle_pg_log(ceph::net::Connection* conn,
Ref<MOSDPGLog> m);
osdmap{osdmap},
backend(
PGBackend::create(
- pgid,
+ pgid.pgid,
+ pg_shard,
pool,
coll_ref,
- &shard_services.get_store(),
+ shard_services,
profile)),
peering_state(
shard_services.get_cct(),
});
}
+seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+ ceph::os::Transaction&& txn,
+ const MOSDOp& req)
+{
+ epoch_t map_epoch = get_osdmap_epoch();
+ eversion_t at_version{map_epoch, projected_last_update.version + 1};
+ return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
+ std::move(os),
+ std::move(txn),
+ req,
+ peering_state.get_last_peering_reset(),
+ map_epoch,
+ at_version).then([this](auto acked) {
+ for (const auto& peer : acked) {
+ peering_state.update_peer_last_complete_ondisk(
+ peer.shard, peer.last_complete_ondisk);
+ }
+ return seastar::now();
+ });
+}
+
seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
{
return seastar::do_with(std::move(m), ceph::os::Transaction{},
return do_osd_op(*pos, osd_op, txn);
}).then([&txn,m,this,os=std::move(os)]() mutable {
// XXX: the entire lambda could be scheduled conditionally. ::if_then()?
- return txn.empty() ? seastar::now()
- : backend->mutate_object(std::move(os), std::move(txn), *m);
+ if (txn.empty()) {
+ return seastar::now();
+ } else {
+ return submit_transaction(std::move(os), std::move(txn), *m);
+ }
});
}).then([m,this] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
});
}
+void PG::handle_rep_op_reply(ceph::net::Connection* conn,
+ const MOSDRepOpReply& m)
+{
+ backend->got_rep_op_reply(m);
+}
+
}
void handle_initialize(PeeringCtx &rctx);
seastar::future<> handle_op(ceph::net::Connection* conn,
Ref<MOSDOp> m);
+ void handle_rep_op_reply(ceph::net::Connection* conn,
+ const MOSDRepOpReply& m);
+
void print(std::ostream& os) const;
private:
seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
const std::string& nspace,
uint64_t limit);
+ seastar::future<> submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+ ceph::os::Transaction&& txn,
+ const MOSDOp& req);
private:
OSDMapGate osdmap_gate;
}
}
-std::unique_ptr<PGBackend> PGBackend::create(const spg_t pgid,
+std::unique_ptr<PGBackend> PGBackend::create(pg_t pgid,
+ const pg_shard_t pg_shard,
const pg_pool_t& pool,
ceph::os::CollectionRef coll,
- ceph::os::FuturizedStore* store,
+ ceph::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile)
{
switch (pool.type) {
case pg_pool_t::TYPE_REPLICATED:
- return std::make_unique<ReplicatedBackend>(pgid.shard, coll, store);
+ return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
+ coll, shard_services);
case pg_pool_t::TYPE_ERASURE:
- return std::make_unique<ECBackend>(pgid.shard, coll, store,
+ return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
std::move(ec_profile),
pool.stripe_width);
default:
});
}
-seastar::future<>
+seastar::future<ceph::osd::acked_peers_t>
PGBackend::mutate_object(
+ std::set<pg_shard_t> pg_shards,
cached_os_t&& os,
ceph::os::Transaction&& txn,
- const MOSDOp& m)
+ const MOSDOp& m,
+ epoch_t min_epoch,
+ epoch_t map_epoch,
+ eversion_t ver)
{
logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
if (os->exists) {
// reset cached ObjectState without enforcing eviction
os->oi = object_info_t(os->oi.soid);
}
- return store->do_transaction(coll, std::move(txn));
+ return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn),
+ m.get_reqid(), min_epoch, map_epoch, ver);
}
seastar::future<>
#include "crimson/os/futurized_store.h"
#include "crimson/os/cyan_collection.h"
+#include "crimson/osd/acked_peers.h"
#include "crimson/common/shared_lru.h"
#include "os/Transaction.h"
#include "osd/osd_types.h"
#include "osd/osd_internal_types.h"
struct hobject_t;
+class MOSDRepOpReply;
+
+namespace ceph::osd {
+ class ShardServices;
+}
class PGBackend
{
public:
PGBackend(shard_id_t shard, CollectionRef coll, ceph::os::FuturizedStore* store);
virtual ~PGBackend() = default;
- static std::unique_ptr<PGBackend> create(const spg_t pgid,
+ static std::unique_ptr<PGBackend> create(pg_t pgid,
+ const pg_shard_t pg_shard,
const pg_pool_t& pool,
ceph::os::CollectionRef coll,
- ceph::os::FuturizedStore* store,
+ ceph::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile);
using cached_os_t = boost::local_shared_ptr<ObjectState>;
seastar::future<cached_os_t> get_object_state(const hobject_t& oid);
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
- seastar::future<> mutate_object(
+ seastar::future<ceph::osd::acked_peers_t> mutate_object(
+ std::set<pg_shard_t> pg_shards,
cached_os_t&& os,
ceph::os::Transaction&& txn,
- const MOSDOp& m);
+ const MOSDOp& m,
+ epoch_t min_epoch,
+ epoch_t map_epoch,
+ eversion_t ver);
seastar::future<std::vector<hobject_t>, hobject_t> list_objects(
const hobject_t& start,
uint64_t limit);
+ virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
+
protected:
const shard_id_t shard;
CollectionRef coll;
size_t length,
uint32_t flags) = 0;
bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
+ virtual seastar::future<ceph::osd::acked_peers_t>
+ _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+ const hobject_t& hoid,
+ ceph::os::Transaction&& txn,
+ osd_reqid_t req_id,
+ epoch_t min_epoch, epoch_t max_epoch,
+ eversion_t ver) = 0;
};
#include "replicated_backend.h"
+#include "messages/MOSDRepOpReply.h"
+
+#include "crimson/common/log.h"
#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_object.h"
#include "crimson/os/futurized_store.h"
+#include "crimson/osd/shard_services.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
-ReplicatedBackend::ReplicatedBackend(shard_id_t shard,
+ReplicatedBackend::ReplicatedBackend(pg_t pgid,
+ pg_shard_t whoami,
ReplicatedBackend::CollectionRef coll,
- ceph::os::FuturizedStore* store)
- : PGBackend{shard, coll, store}
+ ceph::osd::ShardServices& shard_services)
+ : PGBackend{whoami.shard, coll, &shard_services.get_store()},
+ pgid{pgid},
+ whoami{whoami},
+ shard_services{shard_services}
{}
seastar::future<bufferlist> ReplicatedBackend::_read(const hobject_t& hoid,
{
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
+
+seastar::future<ceph::osd::acked_peers_t>
+ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
+ const hobject_t& hoid,
+ ceph::os::Transaction&& txn,
+ osd_reqid_t req_id,
+ epoch_t min_epoch, epoch_t map_epoch,
+ eversion_t ver)
+{
+ const ceph_tid_t tid = next_txn_id++;
+ auto pending_txn =
+ pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
+ bufferlist encoded_txn;
+ encode(txn, encoded_txn);
+
+ return seastar::parallel_for_each(std::move(pg_shards),
+ [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)]
+ (auto pg_shard) mutable {
+ if (pg_shard == whoami) {
+ return shard_services.get_store().do_transaction(coll,std::move(txn));
+ } else {
+ auto m = make_message<MOSDRepOp>(req_id, whoami,
+ spg_t{pgid, pg_shard.shard}, hoid,
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ map_epoch, min_epoch,
+ tid, ver);
+ m->set_data(encoded_txn);
+ pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+ // TODO: set more stuff. e.g., pg_states
+ return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
+ }
+ }).then([&peers=pending_txn->second] {
+ if (--peers.pending == 0) {
+ peers.all_committed.set_value();
+ }
+ return peers.all_committed.get_future();
+ }).then([tid, pending_txn, this] {
+ pending_txn->second.all_committed = {};
+ auto acked_peers = std::move(pending_txn->second.acked_peers);
+ pending_trans.erase(pending_txn);
+ return seastar::make_ready_future<ceph::osd::acked_peers_t>(std::move(acked_peers));
+ });
+}
+
+void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
+{
+ auto found = pending_trans.find(reply.get_tid());
+ if (found == pending_trans.end()) {
+ logger().warn("{}: no matched pending rep op: {}", __func__, reply);
+ return;
+ }
+ auto& peers = found->second;
+ for (auto& peer : peers.acked_peers) {
+ if (peer.shard == reply.from) {
+ peer.last_complete_ondisk = reply.get_last_complete_ondisk();
+ if (--peers.pending == 0) {
+ peers.all_committed.set_value();
+ }
+ return;
+ }
+ }
+}
#include <seastar/core/future.hh>
#include "include/buffer_fwd.h"
#include "osd/osd_types.h"
+
+#include "acked_peers.h"
#include "pg_backend.h"
+namespace ceph::osd {
+ class ShardServices;
+}
+
class ReplicatedBackend : public PGBackend
{
public:
- ReplicatedBackend(shard_id_t shard,
+ ReplicatedBackend(pg_t pgid, pg_shard_t whoami,
CollectionRef coll,
- ceph::os::FuturizedStore* store);
+ ceph::osd::ShardServices& shard_services);
+ void got_rep_op_reply(const MOSDRepOpReply& reply) final;
private:
seastar::future<ceph::bufferlist> _read(const hobject_t& hoid,
uint64_t off,
uint64_t len,
uint32_t flags) override;
+ seastar::future<ceph::osd::acked_peers_t>
+ _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+ const hobject_t& hoid,
+ ceph::os::Transaction&& txn,
+ osd_reqid_t req_id,
+ epoch_t min_epoch, epoch_t max_epoch,
+ eversion_t ver) final;
+ const pg_t pgid;
+ const pg_shard_t whoami;
+ ceph::osd::ShardServices& shard_services;
+ ceph_tid_t next_txn_id = 0;
+ struct pending_on_t {
+ pending_on_t(size_t pending)
+ : pending{static_cast<unsigned>(pending)}
+ {}
+ unsigned pending;
+ ceph::osd::acked_peers_t acked_peers;
+ seastar::promise<> all_committed;
+ };
+ using pending_transactions_t = std::map<ceph_tid_t, pending_on_t>;
+ pending_transactions_t pending_trans;
};