From: Kefu Chai Date: Tue, 22 Sep 2020 08:11:24 +0000 (+0800) Subject: osd,crimson/osd: pass message using intrusive_ptr<> X-Git-Tag: v16.1.0~934^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d4a71c3c5369c7a881822a6eb7624e3e80cc53de;p=ceph.git osd,crimson/osd: pass message using intrusive_ptr<> for two reasons: * crimson::osd::PG::send_cluster_message() accepts a `Message*` pointer, and then hand it over to `shard_services.send_to_osd()`, which expects a `Ref`. so the raw pointer is used to construct an `intrusive_ptr`, which increment the refcount of that Message instance by one. but that Message was owned by nobody before that, so we end up with an `intrusive_ptr` of 2 refcount, and only a single owner. hence the memory leak. * osd: to use Connection::send_message2(), which accepts MessageRef and less error-prone in the sense of preventing memory leak of Messages. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 3b193697ae45d..2e9f6023cdc19 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -157,9 +157,9 @@ public: } void send_cluster_message( - int osd, Message *m, + int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) final { - (void)shard_services.send_to_osd(osd, MessageRef{m, false}, epoch); + (void)shard_services.send_to_osd(osd, m, epoch); } void send_pg_created(pg_t pgid) final { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index dc25f604c35aa..074320fcd90fe 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -262,8 +262,8 @@ public: std::pair get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front) void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); void send_message_osd_cluster(std::vector>& messages, epoch_t from_epoch); - void send_message_osd_cluster(Message *m, Connection *con) { - con->send_message(m); + void send_message_osd_cluster(MessageRef m, Connection *con) { + con->send_message2(m); } void send_message_osd_cluster(Message *m, const ConnectionRef& con) { con->send_message(m); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 3d220732ccc79..9bd8aa5f1fea6 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -799,7 +799,7 @@ void PG::set_probe_targets(const set &probe_set) } void PG::send_cluster_message( - int target, Message *m, + int target, MessageRef m, epoch_t epoch, bool share_map_update=false) { ConnectionRef con = osd->get_con_osd_cluster( diff --git a/src/osd/PG.h b/src/osd/PG.h index b9f4e37477f4a..3f7c1cd7c9bb7 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -662,7 +662,7 @@ public: bool dne() { return info.dne(); } virtual void send_cluster_message( - int osd, Message *m, epoch_t epoch, bool share_map_update) override; + int osd, MessageRef m, epoch_t epoch, bool share_map_update) override; protected: epoch_t get_last_peering_reset() const { diff --git a/src/osd/PGBackend.cc b/src/osd/PGBackend.cc index 4025afc8b8d63..b6357b12639a9 100644 --- a/src/osd/PGBackend.cc +++ b/src/osd/PGBackend.cc @@ -143,7 +143,7 @@ void PGBackend::handle_recovery_delete(OpRequestRef op) get_parent()->remove_missing_object(p.first, p.second, gather.new_sub()); } - MOSDPGRecoveryDeleteReply *reply = new MOSDPGRecoveryDeleteReply; + auto reply = make_message(); reply->from = get_parent()->whoami_shard(); reply->set_priority(m->get_priority()); reply->pgid = spg_t(get_parent()->get_info().pgid.pgid, m->from.shard); diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index ad49626c58365..12bdfc0d113b4 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -282,7 +282,7 @@ typedef std::shared_ptr OSDMapRef; virtual void send_message_osd_cluster( std::vector>& messages, epoch_t from_epoch) = 0; virtual void send_message_osd_cluster( - Message *m, Connection *con) = 0; + MessageRef, Connection *con) = 0; virtual void send_message_osd_cluster( Message *m, const ConnectionRef& con) = 0; virtual ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) = 0; diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 3ac4fa8cda464..c7637c004de2f 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -287,7 +287,7 @@ void PeeringState::purge_strays() psdout(10) << "sending PGRemove to osd." << *p << dendl; vector to_remove; to_remove.push_back(spg_t(info.pgid.pgid, p->shard)); - MOSDPGRemove *m = new MOSDPGRemove( + auto m = make_message( get_osdmap_epoch(), to_remove); pl->send_cluster_message(p->osd, m, get_osdmap_epoch()); @@ -1164,7 +1164,7 @@ void PeeringState::send_lease() } pl->send_cluster_message( peer.osd, - new MOSDPGLease(epoch, + make_message(epoch, spg_t(spgid.pgid, peer.shard), get_lease()), epoch); @@ -1464,7 +1464,7 @@ void PeeringState::reject_reservation() pl->unreserve_recovery_space(); pl->send_cluster_message( primary.osd, - new MBackfillReserve( + make_message( MBackfillReserve::REJECT_TOOFULL, spg_t(info.pgid.pgid, primary.shard), get_osdmap_epoch()), @@ -2783,7 +2783,7 @@ void PeeringState::activate( psdout(10) << "activate peer osd." << peer << " " << pi << dendl; - MOSDPGLog *m = 0; + MRef m; ceph_assert(peer_missing.count(peer)); pg_missing_t& pm = peer_missing[peer]; @@ -2810,7 +2810,7 @@ void PeeringState::activate( } else { psdout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl; - m = new MOSDPGLog( + m = make_message( i->shard, pg_whoami.shard, get_osdmap_epoch(), info, last_peering_reset); @@ -2847,7 +2847,7 @@ void PeeringState::activate( // initialize peer with our purged_snaps. pi.purged_snaps = info.purged_snaps; - m = new MOSDPGLog( + m = make_message( i->shard, pg_whoami.shard, get_osdmap_epoch(), pi, last_peering_reset /* epoch to create pg at */); @@ -2862,7 +2862,7 @@ void PeeringState::activate( } else { // catch up ceph_assert(pg_log.get_tail() <= pi.last_update); - m = new MOSDPGLog( + m = make_message( i->shard, pg_whoami.shard, get_osdmap_epoch(), info, last_peering_reset /* epoch to create pg at */); @@ -3009,21 +3009,23 @@ void PeeringState::share_pg_info() peer->second.last_interval_started = info.last_interval_started; peer->second.history.merge(info.history); } - Message* m = nullptr; + MessageRef m; if (last_require_osd_release >= ceph_release_t::octopus) { - m = new MOSDPGInfo2{spg_t{info.pgid.pgid, pg_shard.shard}, + m = make_message(spg_t{info.pgid.pgid, pg_shard.shard}, info, get_osdmap_epoch(), get_osdmap_epoch(), - get_lease(), {}}; + std::optional{get_lease()}, + std::nullopt); } else { - m = new MOSDPGInfo{get_osdmap_epoch(), - {pg_notify_t{pg_shard.shard, - pg_whoami.shard, - get_osdmap_epoch(), - get_osdmap_epoch(), - info, - past_intervals}}}; + m = make_message(get_osdmap_epoch(), + MOSDPGInfo::pg_list_t{ + pg_notify_t{pg_shard.shard, + pg_whoami.shard, + get_osdmap_epoch(), + get_osdmap_epoch(), + info, + past_intervals}}); } pl->send_cluster_message(pg_shard.osd, m, get_osdmap_epoch()); } @@ -3150,7 +3152,7 @@ void PeeringState::fulfill_log( ceph_assert(from == primary); ceph_assert(query.type != pg_query_t::INFO); - MOSDPGLog *mlog = new MOSDPGLog( + auto mlog = make_message( from.shard, pg_whoami.shard, get_osdmap_epoch(), info, query_epoch); @@ -4397,7 +4399,7 @@ void PeeringState::recovery_committed_to(eversion_t version) // we are fully up to date. tell the primary! pl->send_cluster_message( get_primary().osd, - new MOSDPGTrim( + make_message( get_osdmap_epoch(), spg_t(info.pgid.pgid, primary.shard), last_complete_ondisk), @@ -5040,7 +5042,7 @@ void PeeringState::Backfilling::backfill_release_reservations() ceph_assert(*it != ps->pg_whoami); pl->send_cluster_message( it->osd, - new MBackfillReserve( + make_message( MBackfillReserve::RELEASE, spg_t(ps->info.pgid.pgid, it->shard), ps->get_osdmap_epoch()), @@ -5164,7 +5166,7 @@ PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved &ev ceph_assert(*backfill_osd_it != ps->pg_whoami); pl->send_cluster_message( backfill_osd_it->osd, - new MBackfillReserve( + make_message( MBackfillReserve::REQUEST, spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard), ps->get_osdmap_epoch(), @@ -5204,7 +5206,7 @@ void PeeringState::WaitRemoteBackfillReserved::retry() ceph_assert(*it != ps->pg_whoami); pl->send_cluster_message( it->osd, - new MBackfillReserve( + make_message( MBackfillReserve::RELEASE, spg_t(context< PeeringMachine >().spgid.pgid, it->shard), ps->get_osdmap_epoch()), @@ -5377,7 +5379,7 @@ PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt) DECLARE_LOCALS; pl->send_cluster_message( ps->primary.osd, - new MRecoveryReserve( + make_message( MRecoveryReserve::GRANT, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), @@ -5485,7 +5487,7 @@ PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt) pl->send_cluster_message( ps->primary.osd, - new MBackfillReserve( + make_message( MBackfillReserve::GRANT, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), @@ -5542,7 +5544,7 @@ PeeringState::RepRecovering::react(const RemoteRecoveryPreempted &) pl->unreserve_recovery_space(); pl->send_cluster_message( ps->primary.osd, - new MRecoveryReserve( + make_message( MRecoveryReserve::REVOKE, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), @@ -5559,7 +5561,7 @@ PeeringState::RepRecovering::react(const BackfillTooFull &) pl->unreserve_recovery_space(); pl->send_cluster_message( ps->primary.osd, - new MBackfillReserve( + make_message( MBackfillReserve::REVOKE_TOOFULL, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), @@ -5576,7 +5578,7 @@ PeeringState::RepRecovering::react(const RemoteBackfillPreempted &) pl->unreserve_recovery_space(); pl->send_cluster_message( ps->primary.osd, - new MBackfillReserve( + make_message( MBackfillReserve::REVOKE, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), @@ -5680,7 +5682,7 @@ PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved &ev ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami); pl->send_cluster_message( remote_recovery_reservation_it->osd, - new MRecoveryReserve( + make_message( MRecoveryReserve::REQUEST, spg_t(context< PeeringMachine >().spgid.pgid, remote_recovery_reservation_it->shard), @@ -5730,7 +5732,7 @@ void PeeringState::Recovering::release_reservations(bool cancel) continue; pl->send_cluster_message( i->osd, - new MRecoveryReserve( + make_message( MRecoveryReserve::RELEASE, spg_t(ps->info.pgid.pgid, i->shard), ps->get_osdmap_epoch()), @@ -6429,7 +6431,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l) ps->proc_lease(l.lease); pl->send_cluster_message( ps->get_primary().osd, - new MOSDPGLeaseAck(epoch, + make_message(epoch, spg_t(spgid.pgid, ps->get_primary().shard), ps->get_lease_ack()), epoch); diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 74f2250589c6c..0695486509377 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -270,7 +270,7 @@ public: /// Send cluster message to osd virtual void send_cluster_message( - int osd, Message *m, epoch_t epoch, bool share_map_update=false) = 0; + int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) = 0; /// Send pg_created to mon virtual void send_pg_created(pg_t pgid) = 0; diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index b34b62ca5d964..479994d80eb84 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -548,7 +548,7 @@ public: osd->send_message_osd_cluster(messages, from_epoch); } void send_message_osd_cluster( - Message *m, Connection *con) override { + MessageRef m, Connection *con) override { osd->send_message_osd_cluster(m, con); } void send_message_osd_cluster( diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 7c71aa26fe7a2..d7ffc0533958d 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -59,7 +59,7 @@ class PG_SendMessageOnConn: public Context { Message *reply, ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {} void finish(int) override { - pg->send_message_osd_cluster(reply, conn.get()); + pg->send_message_osd_cluster(MessageRef(reply, false), conn.get()); } };