From: Amnon Hanuhov Date: Thu, 3 Jun 2021 11:47:00 +0000 (+0300) Subject: crimson/net: Use MessageURef in messenger internals X-Git-Tag: v17.1.0~1598^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c9a96891a9d5e50d36388c635db9534a04b2a4d3;p=ceph.git crimson/net: Use MessageURef in messenger internals Signed-off-by: Amnon Hanuhov --- diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index db888fe470f8..5d259c29e13c 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -70,7 +70,7 @@ void Client::ms_handle_connect(crimson::net::ConnectionRef c) gate.dispatch_in_background(__func__, *this, [this, c] { if (conn == c) { // ask for the mgrconfigure message - auto m = ceph::make_message(); + auto m = crimson::make_message(); m->daemon_name = local_conf()->name.get_id(); return conn->send(std::move(m)); } else { diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index ad7e1fde54e8..17d62d034771 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -24,7 +24,7 @@ namespace crimson::mgr // implement WithStats if you want to report stats to mgr periodically class WithStats { public: - virtual MessageRef get_stats() const = 0; + virtual MessageURef get_stats() const = 0; virtual ~WithStats() {} }; diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 4e10dded230e..a494a6baacbc 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -127,8 +127,6 @@ class Connection : public seastar::enable_shared_from_this { /// send a message over a connection that has completed its handshake virtual seastar::future<> send(MessageURef msg) = 0; - // The version with MessageRef will be dropped in the future - virtual seastar::future<> send(MessageRef msg) = 0; /// send a keepalive message over a connection that has completed its /// handshake diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 804a80923d64..30ee539d2567 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -110,7 +110,7 @@ ceph::bufferlist Protocol::sweep_messages_and_move_to_sent( return bl; } -seastar::future<> Protocol::send(MessageRef msg) +seastar::future<> Protocol::send(MessageURef msg) { if (write_state != write_state_t::drop) { conn.out_q.push_back(std::move(msg)); @@ -153,7 +153,7 @@ void Protocol::requeue_sent() conn.out_seq -= conn.sent.size(); logger().debug("{} requeue {} items, revert out_seq to {}", conn, conn.sent.size(), conn.out_seq); - for (MessageRef& msg : conn.sent) { + for (MessageURef& msg : conn.sent) { msg->clear_payload(); msg->set_seq(0); } @@ -204,7 +204,7 @@ void Protocol::ack_writes(seq_num_t seq) } while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) { logger().trace("{} got ack seq {} >= {}, pop {}", - conn, seq, conn.sent.front()->get_seq(), conn.sent.front()); + conn, seq, conn.sent.front()->get_seq(), *conn.sent.front()); conn.sent.pop_front(); } } diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index ce88629ba6c2..0343f026025a 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -56,7 +56,7 @@ class Protocol { virtual void trigger_close() = 0; virtual ceph::bufferlist do_sweep_messages( - const std::deque& msgs, + const std::deque& msgs, size_t num_msgs, bool require_keepalive, std::optional keepalive_ack, @@ -90,7 +90,7 @@ class Protocol { // the write state-machine public: - seastar::future<> send(MessageRef msg); + seastar::future<> send(MessageURef msg); seastar::future<> keepalive(); // TODO: encapsulate a SessionedSender class diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 15376fe8a142..370085a43a12 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1792,7 +1792,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, // READY state ceph::bufferlist ProtocolV2::do_sweep_messages( - const std::deque& msgs, + const std::deque& msgs, size_t num_msgs, bool require_keepalive, std::optional _keepalive_ack, @@ -1818,7 +1818,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); } - std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) { // TODO: move to common code // set priority msg->get_header().src = messenger.get_myname(); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index be9a22816687..319802690cc3 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -31,7 +31,7 @@ class ProtocolV2 final : public Protocol { void trigger_close() override; ceph::bufferlist do_sweep_messages( - const std::deque& msgs, + const std::deque& msgs, size_t num_msgs, bool require_keepalive, std::optional keepalive_ack, diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 630dbdfa8c8a..a119b9a96c67 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -70,12 +70,6 @@ bool SocketConnection::peer_wins() const } seastar::future<> SocketConnection::send(MessageURef msg) -{ - assert(seastar::this_shard_id() == shard_id()); - return protocol->send(MessageRef{msg.release(), false}); -} - -seastar::future<> SocketConnection::send(MessageRef msg) { assert(seastar::this_shard_id() == shard_id()); return protocol->send(std::move(msg)); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 068d8886ac4f..e2bdc24853d4 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -44,9 +44,9 @@ class SocketConnection : public Connection { bool update_rx_seq(seq_num_t seq); // messages to be resent after connection gets reset - std::deque out_q; + std::deque out_q; // messages sent, but not yet acked by peer - std::deque sent; + std::deque sent; seastar::shard_id shard_id() const; @@ -70,7 +70,6 @@ class SocketConnection : public Connection { #endif seastar::future<> send(MessageURef msg) override; - seastar::future<> send(MessageRef msg) override; seastar::future<> keepalive() override; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 539e3a65b673..65cf8cc459d9 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -760,11 +760,11 @@ void OSD::update_stats() }); } -MessageRef OSD::get_stats() const +MessageURef OSD::get_stats() const { // todo: m-to-n: collect stats using map-reduce // MPGStats::had_map_for is not used since PGMonitor was removed - auto m = ceph::make_message(monc->get_fsid(), osdmap->get_epoch()); + auto m = crimson::make_message(monc->get_fsid(), osdmap->get_epoch()); m->osd_stat = osd_stat; for (auto [pgid, pg] : pg_map.get_pgs()) { if (pg->is_primary()) { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index ebce16bf8ccf..59b7cf50c008 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -105,7 +105,7 @@ class OSD final : public crimson::net::Dispatcher, osd_stat_t osd_stat; uint32_t osd_stat_seq = 0; void update_stats(); - MessageRef get_stats() const final; + MessageURef get_stats() const final; // AuthHandler methods void handle_authentication(const EntityName& name, diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 93b633afb5ac..2244791ffe4c 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -141,8 +141,8 @@ ClientRequest::process_pg_op( Ref &pg) { return pg->do_pg_ops(m) - .then_interruptible([this, pg=std::move(pg)](Ref reply) { - return conn->send(reply); + .then_interruptible([this, pg=std::move(pg)](MURef reply) { + return conn->send(std::move(reply)); }); } @@ -214,10 +214,10 @@ ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) }).then_interruptible( [this, pg, all_completed=std::move(all_completed)]() mutable { return all_completed.safe_then_interruptible( - [this, pg](Ref reply) { + [this, pg](MURef reply) { return with_blocking_future_interruptible( handle.enter(pp(*pg).send_reply)).then_interruptible( - [this, reply=std::move(reply)] { + [this, reply=std::move(reply)]() mutable{ return conn->send(std::move(reply)); }); }, crimson::ct_error::eagain::handle([this, pg]() mutable { diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 066798417a48..c7ad0e1360dc 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -736,7 +736,7 @@ PG::do_osd_ops_execute( })); } -PG::do_osd_ops_iertr::future>> +PG::do_osd_ops_iertr::future>> PG::do_osd_ops( Ref m, ObjectContextRef obc, @@ -745,7 +745,7 @@ PG::do_osd_ops( if (__builtin_expect(stopping, false)) { throw crimson::common::system_shutdown_exception(); } - return do_osd_ops_execute>( + return do_osd_ops_execute>( seastar::make_lw_shared( std::move(obc), op_info, get_pool().info, get_backend(), *m), m->ops, @@ -757,7 +757,7 @@ PG::do_osd_ops( if (result > 0 && !rvec) { result = 0; } - auto reply = ceph::make_message(m.get(), + auto reply = crimson::make_message(m.get(), result, get_osdmap_epoch(), 0, @@ -767,16 +767,16 @@ PG::do_osd_ops( "do_osd_ops: {} - object {} sending reply", *m, m->get_hobj()); - return do_osd_ops_iertr::make_ready_future>( + return do_osd_ops_iertr::make_ready_future>( std::move(reply)); }, [m, this] (const std::error_code& e) { - auto reply = ceph::make_message( + auto reply = crimson::make_message( m.get(), -e.value(), get_osdmap_epoch(), 0, false); reply->set_enoent_reply_versions( peering_state.get_info().last_update, peering_state.get_info().last_user_version); - return do_osd_ops_iertr::make_ready_future>(std::move(reply)); + return do_osd_ops_iertr::make_ready_future>(std::move(reply)); }); } @@ -798,7 +798,7 @@ PG::do_osd_ops( std::move(failure_func)); } -PG::interruptible_future> PG::do_pg_ops(Ref m) +PG::interruptible_future> PG::do_pg_ops(Ref m) { if (__builtin_expect(stopping, false)) { throw crimson::common::system_shutdown_exception(); @@ -810,16 +810,16 @@ PG::interruptible_future> PG::do_pg_ops(Ref m) logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op)); return ox->execute_op(osd_op); }).then_interruptible([m, this, ox = std::move(ox)] { - auto reply = ceph::make_message(m.get(), 0, get_osdmap_epoch(), + auto reply = crimson::make_message(m.get(), 0, get_osdmap_epoch(), CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); - return seastar::make_ready_future>(std::move(reply)); + return seastar::make_ready_future>(std::move(reply)); }).handle_exception_type_interruptible([=](const crimson::osd::error& e) { - auto reply = ceph::make_message( + auto reply = crimson::make_message( m.get(), -e.code().value(), get_osdmap_epoch(), 0, false); reply->set_enoent_reply_versions(peering_state.get_info().last_update, peering_state.get_info().last_user_version); - return seastar::make_ready_future>(std::move(reply)); + return seastar::make_ready_future>(std::move(reply)); }); } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d56decf724a7..f8c531f03596 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -571,7 +571,7 @@ private: using pg_rep_op_fut_t = std::tuple, do_osd_ops_iertr::future>; - do_osd_ops_iertr::future>> do_osd_ops( + do_osd_ops_iertr::future>> do_osd_ops( Ref m, ObjectContextRef obc, const OpInfo &op_info); @@ -594,7 +594,7 @@ private: const OpInfo &op_info, SuccessFunc&& success_func, FailureFunc&& failure_func); - interruptible_future> do_pg_ops(Ref m); + interruptible_future> do_pg_ops(Ref m); std::tuple, interruptible_future<>> submit_transaction( const OpInfo& op_info, diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 1b8c865f21f4..e879cb9ec14c 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -459,7 +459,7 @@ void PGRecovery::enqueue_drop( // allocate a pair if target is seen for the first time auto& req = backfill_drop_requests[target]; if (!req) { - req = ceph::make_message( + req = crimson::make_message( spg_t(pg->get_pgid().pgid, target.shard), pg->get_osdmap_epoch()); } req->ls.emplace_back(obj, v); diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index 931b6d7a9c6d..7120342dd583 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -89,7 +89,7 @@ private: // backfill begin std::unique_ptr backfill_state; std::map> backfill_drop_requests; + MURef> backfill_drop_requests; template void start_backfill_recovery(