From: Kefu Chai Date: Sat, 13 Jul 2019 02:39:48 +0000 (+0800) Subject: crimson/osd: replicate transaction to peers X-Git-Tag: v15.1.0~1909^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4add5fd47b257b7c37b32006eaf540926853c87f;p=ceph.git crimson/osd: replicate transaction to peers * handle `MOSDRepOpReply` message in osd, and pass it all the way down to `PGBackend`. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/osd/acked_peers.h b/src/crimson/osd/acked_peers.h new file mode 100644 index 000000000000..90199013b3a8 --- /dev/null +++ b/src/crimson/osd/acked_peers.h @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#if BOOST_VERSION >= 106900 +#include +#else +#include +#endif + +namespace ceph::osd { + struct peer_shard_t { + pg_shard_t shard; + eversion_t last_complete_ondisk; + }; +#if BOOST_VERSION >= 106900 + // small_vector is is_nothrow_move_constructible<> since 1.69 + // 2 + 1 = 3, which is the default value of "osd_pool_default_size" + using acked_peers_t = boost::container::small_vector; +#else + using acked_peers_t = std::vector; +#endif +} diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 66f8dbdff7f8..010b52121c7c 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -1,12 +1,14 @@ #include "ec_backend.h" + #include "crimson/os/cyan_collection.h" +#include "crimson/osd/shard_services.h" ECBackend::ECBackend(shard_id_t shard, ECBackend::CollectionRef coll, - ceph::os::FuturizedStore* store, + ceph::osd::ShardServices& shard_services, const ec_profile_t&, uint64_t) - : PGBackend{shard, coll, store} + : PGBackend{shard, coll, &shard_services.get_store()} { // todo } @@ -19,3 +21,15 @@ seastar::future ECBackend::_read(const hobject_t& hoid, // todo return seastar::make_ready_future(); } + +seastar::future +ECBackend::_submit_transaction(std::set&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_reqid_t req_id, + epoch_t min_epoch, epoch_t max_epoch, + eversion_t ver) +{ + // todo + return seastar::make_ready_future(); +} diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 4bf6b113a79c..107c048217b1 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -13,7 +13,8 @@ class ECBackend : public PGBackend { public: ECBackend(shard_id_t shard, - CollectionRef, ceph::os::FuturizedStore*, + CollectionRef coll, + ceph::osd::ShardServices& shard_services, const ec_profile_t& ec_profile, uint64_t stripe_width); private: @@ -21,6 +22,13 @@ private: uint64_t off, uint64_t len, uint32_t flags) override; + seastar::future + _submit_transaction(std::set&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_reqid_t req_id, + epoch_t min_epoch, epoch_t max_epoch, + eversion_t ver) final; CollectionRef coll; ceph::os::FuturizedStore* store; }; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 044df1a80d9a..05fc4d06d11e 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -16,22 +16,24 @@ #include "messages/MOSDMap.h" #include "messages/MOSDOp.h" #include "messages/MOSDPGLog.h" +#include "messages/MOSDRepOpReply.h" #include "messages/MPGStats.h" +#include "os/Transaction.h" +#include "osd/PGPeeringEvent.h" +#include "osd/PeeringState.h" + #include "crimson/mon/MonClient.h" #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" #include "crimson/os/futurized_store.h" -#include "os/Transaction.h" #include "crimson/osd/heartbeat.h" #include "crimson/osd/osd_meta.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_backend.h" #include "crimson/osd/pg_meta.h" -#include "osd/PGPeeringEvent.h" -#include "osd/PeeringState.h" #include "crimson/osd/osd_operations/compound_peering_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/pg_advance_map.h" @@ -477,6 +479,8 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m) return seastar::now(); case MSG_OSD_PG_LOG: return handle_pg_log(conn, boost::static_pointer_cast(m)); + case MSG_OSD_REPOPREPLY: + return handle_rep_op_reply(conn, boost::static_pointer_cast(m)); default: logger().info("{} unhandled message {}", __func__, *m); return seastar::now(); @@ -852,6 +856,18 @@ seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn, return seastar::now(); } +seastar::future<> OSD::handle_rep_op_reply(ceph::net::Connection* conn, + Ref m) +{ + const auto& pgs = pg_map.get_pgs(); + if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) { + pg->second->handle_rep_op_reply(conn, *m); + } else { + logger().warn("stale reply: {}", *m); + } + return seastar::now(); +} + bool OSD::should_restart() const { if (!osdmap->is_up(whoami)) { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 6345b33dc7b3..5578e268a50d 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -34,6 +34,7 @@ class MOSDMap; class MOSDOp; +class MOSDRepOpReply; class OSDMap; class OSDMeta; class Heartbeat; @@ -164,6 +165,8 @@ private: Ref m); seastar::future<> handle_osd_op(ceph::net::Connection* conn, Ref m); + seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn, + Ref m); seastar::future<> handle_pg_log(ceph::net::Connection* conn, Ref m); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 2b89a78513db..2efcc416b36f 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -80,10 +80,11 @@ PG::PG( osdmap{osdmap}, backend( PGBackend::create( - pgid, + pgid.pgid, + pg_shard, pool, coll_ref, - &shard_services.get_store(), + shard_services, profile)), peering_state( shard_services.get_cct(), @@ -411,6 +412,27 @@ seastar::future PG::do_pgnls(bufferlist& indata, }); } +seastar::future<> PG::submit_transaction(boost::local_shared_ptr&& os, + ceph::os::Transaction&& txn, + const MOSDOp& req) +{ + epoch_t map_epoch = get_osdmap_epoch(); + eversion_t at_version{map_epoch, projected_last_update.version + 1}; + return backend->mutate_object(peering_state.get_acting_recovery_backfill(), + std::move(os), + std::move(txn), + req, + peering_state.get_last_peering_reset(), + map_epoch, + at_version).then([this](auto acked) { + for (const auto& peer : acked) { + peering_state.update_peer_last_complete_ondisk( + peer.shard, peer.last_complete_ondisk); + } + return seastar::now(); + }); +} + seastar::future> PG::do_osd_ops(Ref m) { return seastar::do_with(std::move(m), ceph::os::Transaction{}, @@ -425,8 +447,11 @@ seastar::future> PG::do_osd_ops(Ref m) return do_osd_op(*pos, osd_op, txn); }).then([&txn,m,this,os=std::move(os)]() mutable { // XXX: the entire lambda could be scheduled conditionally. ::if_then()? - return txn.empty() ? seastar::now() - : backend->mutate_object(std::move(os), std::move(txn), *m); + if (txn.empty()) { + return seastar::now(); + } else { + return submit_transaction(std::move(os), std::move(txn), *m); + } }); }).then([m,this] { auto reply = make_message(m.get(), 0, get_osdmap_epoch(), @@ -459,4 +484,10 @@ seastar::future<> PG::handle_op(ceph::net::Connection* conn, }); } +void PG::handle_rep_op_reply(ceph::net::Connection* conn, + const MOSDRepOpReply& m) +{ + backend->got_rep_op_reply(m); +} + } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 5f3692b3729c..21e724c2a009 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -417,6 +417,9 @@ public: void handle_initialize(PeeringCtx &rctx); seastar::future<> handle_op(ceph::net::Connection* conn, Ref m); + void handle_rep_op_reply(ceph::net::Connection* conn, + const MOSDRepOpReply& m); + void print(std::ostream& os) const; private: @@ -431,6 +434,9 @@ private: seastar::future do_pgnls(ceph::bufferlist& indata, const std::string& nspace, uint64_t limit); + seastar::future<> submit_transaction(boost::local_shared_ptr&& os, + ceph::os::Transaction&& txn, + const MOSDOp& req); private: OSDMapGate osdmap_gate; diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index c31e88539667..fe1431d7f5d6 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -25,17 +25,19 @@ namespace { } } -std::unique_ptr PGBackend::create(const spg_t pgid, +std::unique_ptr PGBackend::create(pg_t pgid, + const pg_shard_t pg_shard, const pg_pool_t& pool, ceph::os::CollectionRef coll, - ceph::os::FuturizedStore* store, + ceph::osd::ShardServices& shard_services, const ec_profile_t& ec_profile) { switch (pool.type) { case pg_pool_t::TYPE_REPLICATED: - return std::make_unique(pgid.shard, coll, store); + return std::make_unique(pgid, pg_shard, + coll, shard_services); case pg_pool_t::TYPE_ERASURE: - return std::make_unique(pgid.shard, coll, store, + return std::make_unique(pg_shard.shard, coll, shard_services, std::move(ec_profile), pool.stripe_width); default: @@ -157,11 +159,15 @@ PGBackend::_load_ss(const hobject_t& oid) }); } -seastar::future<> +seastar::future PGBackend::mutate_object( + std::set pg_shards, cached_os_t&& os, ceph::os::Transaction&& txn, - const MOSDOp& m) + const MOSDOp& m, + epoch_t min_epoch, + epoch_t map_epoch, + eversion_t ver) { logger().trace("mutate_object: num_ops={}", txn.get_num_ops()); if (os->exists) { @@ -185,7 +191,8 @@ PGBackend::mutate_object( // reset cached ObjectState without enforcing eviction os->oi = object_info_t(os->oi.soid); } - return store->do_transaction(coll, std::move(txn)); + return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn), + m.get_reqid(), min_epoch, map_epoch, ver); } seastar::future<> diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 69380a64a4df..fbc2653d74c0 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -10,12 +10,18 @@ #include "crimson/os/futurized_store.h" #include "crimson/os/cyan_collection.h" +#include "crimson/osd/acked_peers.h" #include "crimson/common/shared_lru.h" #include "os/Transaction.h" #include "osd/osd_types.h" #include "osd/osd_internal_types.h" struct hobject_t; +class MOSDRepOpReply; + +namespace ceph::osd { + class ShardServices; +} class PGBackend { @@ -26,10 +32,11 @@ protected: public: PGBackend(shard_id_t shard, CollectionRef coll, ceph::os::FuturizedStore* store); virtual ~PGBackend() = default; - static std::unique_ptr create(const spg_t pgid, + static std::unique_ptr create(pg_t pgid, + const pg_shard_t pg_shard, const pg_pool_t& pool, ceph::os::CollectionRef coll, - ceph::os::FuturizedStore* store, + ceph::osd::ShardServices& shard_services, const ec_profile_t& ec_profile); using cached_os_t = boost::local_shared_ptr; seastar::future get_object_state(const hobject_t& oid); @@ -51,14 +58,20 @@ public: ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); - seastar::future<> mutate_object( + seastar::future mutate_object( + std::set pg_shards, cached_os_t&& os, ceph::os::Transaction&& txn, - const MOSDOp& m); + const MOSDOp& m, + epoch_t min_epoch, + epoch_t map_epoch, + eversion_t ver); seastar::future, hobject_t> list_objects( const hobject_t& start, uint64_t limit); + virtual void got_rep_op_reply(const MOSDRepOpReply&) {} + protected: const shard_id_t shard; CollectionRef coll; @@ -75,4 +88,11 @@ private: size_t length, uint32_t flags) = 0; bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn); + virtual seastar::future + _submit_transaction(std::set&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_reqid_t req_id, + epoch_t min_epoch, epoch_t max_epoch, + eversion_t ver) = 0; }; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 2a6b9b7fc6e8..44ca03af29f1 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -1,13 +1,27 @@ #include "replicated_backend.h" +#include "messages/MOSDRepOpReply.h" + +#include "crimson/common/log.h" #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" #include "crimson/os/futurized_store.h" +#include "crimson/osd/shard_services.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} -ReplicatedBackend::ReplicatedBackend(shard_id_t shard, +ReplicatedBackend::ReplicatedBackend(pg_t pgid, + pg_shard_t whoami, ReplicatedBackend::CollectionRef coll, - ceph::os::FuturizedStore* store) - : PGBackend{shard, coll, store} + ceph::osd::ShardServices& shard_services) + : PGBackend{whoami.shard, coll, &shard_services.get_store()}, + pgid{pgid}, + whoami{whoami}, + shard_services{shard_services} {} seastar::future ReplicatedBackend::_read(const hobject_t& hoid, @@ -17,3 +31,65 @@ seastar::future ReplicatedBackend::_read(const hobject_t& hoid, { return store->read(coll, ghobject_t{hoid}, off, len, flags); } + +seastar::future +ReplicatedBackend::_submit_transaction(std::set&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_reqid_t req_id, + epoch_t min_epoch, epoch_t map_epoch, + eversion_t ver) +{ + const ceph_tid_t tid = next_txn_id++; + auto pending_txn = + pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first; + bufferlist encoded_txn; + encode(txn, encoded_txn); + + return seastar::parallel_for_each(std::move(pg_shards), + [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)] + (auto pg_shard) mutable { + if (pg_shard == whoami) { + return shard_services.get_store().do_transaction(coll,std::move(txn)); + } else { + auto m = make_message(req_id, whoami, + spg_t{pgid, pg_shard.shard}, hoid, + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, + map_epoch, min_epoch, + tid, ver); + m->set_data(encoded_txn); + pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + // TODO: set more stuff. e.g., pg_states + return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch); + } + }).then([&peers=pending_txn->second] { + if (--peers.pending == 0) { + peers.all_committed.set_value(); + } + return peers.all_committed.get_future(); + }).then([tid, pending_txn, this] { + pending_txn->second.all_committed = {}; + auto acked_peers = std::move(pending_txn->second.acked_peers); + pending_trans.erase(pending_txn); + return seastar::make_ready_future(std::move(acked_peers)); + }); +} + +void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply) +{ + auto found = pending_trans.find(reply.get_tid()); + if (found == pending_trans.end()) { + logger().warn("{}: no matched pending rep op: {}", __func__, reply); + return; + } + auto& peers = found->second; + for (auto& peer : peers.acked_peers) { + if (peer.shard == reply.from) { + peer.last_complete_ondisk = reply.get_last_complete_ondisk(); + if (--peers.pending == 0) { + peers.all_committed.set_value(); + } + return; + } + } +} diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index e96060f948f1..ca9ccb1db4f2 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -7,17 +7,45 @@ #include #include "include/buffer_fwd.h" #include "osd/osd_types.h" + +#include "acked_peers.h" #include "pg_backend.h" +namespace ceph::osd { + class ShardServices; +} + class ReplicatedBackend : public PGBackend { public: - ReplicatedBackend(shard_id_t shard, + ReplicatedBackend(pg_t pgid, pg_shard_t whoami, CollectionRef coll, - ceph::os::FuturizedStore* store); + ceph::osd::ShardServices& shard_services); + void got_rep_op_reply(const MOSDRepOpReply& reply) final; private: seastar::future _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override; + seastar::future + _submit_transaction(std::set&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_reqid_t req_id, + epoch_t min_epoch, epoch_t max_epoch, + eversion_t ver) final; + const pg_t pgid; + const pg_shard_t whoami; + ceph::osd::ShardServices& shard_services; + ceph_tid_t next_txn_id = 0; + struct pending_on_t { + pending_on_t(size_t pending) + : pending{static_cast(pending)} + {} + unsigned pending; + ceph::osd::acked_peers_t acked_peers; + seastar::promise<> all_committed; + }; + using pending_transactions_t = std::map; + pending_transactions_t pending_trans; };