]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd,crimson/osd: pass message using intrusive_ptr<> 37304/head
authorKefu Chai <kchai@redhat.com>
Tue, 22 Sep 2020 08:11:24 +0000 (16:11 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 23 Sep 2020 14:54:51 +0000 (22:54 +0800)
for two reasons:

* crimson::osd::PG::send_cluster_message() accepts a `Message*`
  pointer, and then hand it over to `shard_services.send_to_osd()`,
  which expects a `Ref<Message>`. so the raw pointer is used to
  construct an `intrusive_ptr<Message>`, which increment the
  refcount of that Message instance by one. but that Message
  was owned by nobody before that, so we end up with an
  `intrusive_ptr<Message>` of 2 refcount, and only a single
  owner. hence the memory leak.
* osd: to use Connection::send_message2(), which accepts
  MessageRef and less error-prone in the sense of preventing
  memory leak of Messages.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/pg.h
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PGBackend.cc
src/osd/PGBackend.h
src/osd/PeeringState.cc
src/osd/PeeringState.h
src/osd/PrimaryLogPG.h
src/osd/ReplicatedBackend.cc

index 3b193697ae45d8dca96e0e18e6c89b7507e9ff0c..2e9f6023cdc191e5dfc3fdf9220c937c73e86e7d 100644 (file)
@@ -157,9 +157,9 @@ public:
   }
 
   void send_cluster_message(
-    int osd, Message *m,
+    int osd, MessageRef m,
     epoch_t epoch, bool share_map_update=false) final {
-    (void)shard_services.send_to_osd(osd, MessageRef{m, false}, epoch);
+    (void)shard_services.send_to_osd(osd, m, epoch);
   }
 
   void send_pg_created(pg_t pgid) final {
index dc25f604c35aaf4b1198e94cecb720d7485a9d28..074320fcd90fef521439f6a044384ab047e33631 100644 (file)
@@ -262,8 +262,8 @@ public:
   std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
   void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
   void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
-  void send_message_osd_cluster(Message *m, Connection *con) {
-    con->send_message(m);
+  void send_message_osd_cluster(MessageRef m, Connection *con) {
+    con->send_message2(m);
   }
   void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
     con->send_message(m);
index 3d220732ccc79bfea978407b7be64c7e1a278235..9bd8aa5f1fea62e272a2b74e7b6619e30d6d4e94 100644 (file)
@@ -799,7 +799,7 @@ void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
 }
 
 void PG::send_cluster_message(
-  int target, Message *m,
+  int target, MessageRef m,
   epoch_t epoch, bool share_map_update=false)
 {
   ConnectionRef con = osd->get_con_osd_cluster(
index b9f4e37477f4adc706a6aeec6f279c67e98ec430..3f7c1cd7c9bb709169a43193af88785d45f95cd6 100644 (file)
@@ -662,7 +662,7 @@ public:
   bool dne() { return info.dne(); }
 
   virtual void send_cluster_message(
-    int osd, Message *m, epoch_t epoch, bool share_map_update) override;
+    int osd, MessageRef m, epoch_t epoch, bool share_map_update) override;
 
 protected:
   epoch_t get_last_peering_reset() const {
index 4025afc8b8d635099cce805493009e951fc321a8..b6357b12639a9054cef56cc2d38771cc45ecd22d 100644 (file)
@@ -143,7 +143,7 @@ void PGBackend::handle_recovery_delete(OpRequestRef op)
     get_parent()->remove_missing_object(p.first, p.second, gather.new_sub());
   }
 
-  MOSDPGRecoveryDeleteReply *reply = new MOSDPGRecoveryDeleteReply;
+  auto reply = make_message<MOSDPGRecoveryDeleteReply>();
   reply->from = get_parent()->whoami_shard();
   reply->set_priority(m->get_priority());
   reply->pgid = spg_t(get_parent()->get_info().pgid.pgid, m->from.shard);
index ad49626c5836528427bcd51f09bfc690028a7201..12bdfc0d113b43be1d1ed44444c2cfdbc4298d0a 100644 (file)
@@ -282,7 +282,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
       virtual void send_message_osd_cluster(
        std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) = 0;
      virtual void send_message_osd_cluster(
-       Message *m, Connection *con) = 0;
+       MessageRef, Connection *con) = 0;
      virtual void send_message_osd_cluster(
        Message *m, const ConnectionRef& con) = 0;
      virtual ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) = 0;
index 3ac4fa8cda4643ed7296305a5d01344524828331..c7637c004de2f25a62f97b6ad89269493f862547 100644 (file)
@@ -287,7 +287,7 @@ void PeeringState::purge_strays()
       psdout(10) << "sending PGRemove to osd." << *p << dendl;
       vector<spg_t> to_remove;
       to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
-      MOSDPGRemove *m = new MOSDPGRemove(
+      auto m = make_message<MOSDPGRemove>(
        get_osdmap_epoch(),
        to_remove);
       pl->send_cluster_message(p->osd, m, get_osdmap_epoch());
@@ -1164,7 +1164,7 @@ void PeeringState::send_lease()
     }
     pl->send_cluster_message(
       peer.osd,
-      new MOSDPGLease(epoch,
+      make_message<MOSDPGLease>(epoch,
                      spg_t(spgid.pgid, peer.shard),
                      get_lease()),
       epoch);
@@ -1464,7 +1464,7 @@ void PeeringState::reject_reservation()
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     primary.osd,
-    new MBackfillReserve(
+    make_message<MBackfillReserve>(
       MBackfillReserve::REJECT_TOOFULL,
       spg_t(info.pgid.pgid, primary.shard),
       get_osdmap_epoch()),
@@ -2783,7 +2783,7 @@ void PeeringState::activate(
 
       psdout(10) << "activate peer osd." << peer << " " << pi << dendl;
 
-      MOSDPGLog *m = 0;
+      MRef<MOSDPGLog> m;
       ceph_assert(peer_missing.count(peer));
       pg_missing_t& pm = peer_missing[peer];
 
@@ -2810,7 +2810,7 @@ void PeeringState::activate(
        } else {
          psdout(10) << "activate peer osd." << peer
                     << " is up to date, but sending pg_log anyway" << dendl;
-         m = new MOSDPGLog(
+         m = make_message<MOSDPGLog>(
            i->shard, pg_whoami.shard,
            get_osdmap_epoch(), info,
            last_peering_reset);
@@ -2847,7 +2847,7 @@ void PeeringState::activate(
        // initialize peer with our purged_snaps.
        pi.purged_snaps = info.purged_snaps;
 
-       m = new MOSDPGLog(
+       m = make_message<MOSDPGLog>(
          i->shard, pg_whoami.shard,
          get_osdmap_epoch(), pi,
          last_peering_reset /* epoch to create pg at */);
@@ -2862,7 +2862,7 @@ void PeeringState::activate(
       } else {
        // catch up
        ceph_assert(pg_log.get_tail() <= pi.last_update);
-       m = new MOSDPGLog(
+       m = make_message<MOSDPGLog>(
          i->shard, pg_whoami.shard,
          get_osdmap_epoch(), info,
          last_peering_reset /* epoch to create pg at */);
@@ -3009,21 +3009,23 @@ void PeeringState::share_pg_info()
       peer->second.last_interval_started = info.last_interval_started;
       peer->second.history.merge(info.history);
     }
-    Message* m = nullptr;
+    MessageRef m;
     if (last_require_osd_release >= ceph_release_t::octopus) {
-      m = new MOSDPGInfo2{spg_t{info.pgid.pgid, pg_shard.shard},
+      m = make_message<MOSDPGInfo2>(spg_t{info.pgid.pgid, pg_shard.shard},
                          info,
                          get_osdmap_epoch(),
                          get_osdmap_epoch(),
-                         get_lease(), {}};
+                         std::optional<pg_lease_t>{get_lease()},
+                         std::nullopt);
     } else {
-      m = new MOSDPGInfo{get_osdmap_epoch(),
-                        {pg_notify_t{pg_shard.shard,
-                                     pg_whoami.shard,
-                                     get_osdmap_epoch(),
-                                     get_osdmap_epoch(),
-                                     info,
-                                     past_intervals}}};
+      m = make_message<MOSDPGInfo>(get_osdmap_epoch(),
+             MOSDPGInfo::pg_list_t{
+               pg_notify_t{pg_shard.shard,
+                           pg_whoami.shard,
+                           get_osdmap_epoch(),
+                           get_osdmap_epoch(),
+                           info,
+                           past_intervals}});
     }
     pl->send_cluster_message(pg_shard.osd, m, get_osdmap_epoch());
   }
@@ -3150,7 +3152,7 @@ void PeeringState::fulfill_log(
   ceph_assert(from == primary);
   ceph_assert(query.type != pg_query_t::INFO);
 
-  MOSDPGLog *mlog = new MOSDPGLog(
+  auto mlog = make_message<MOSDPGLog>(
     from.shard, pg_whoami.shard,
     get_osdmap_epoch(),
     info, query_epoch);
@@ -4397,7 +4399,7 @@ void PeeringState::recovery_committed_to(eversion_t version)
       // we are fully up to date.  tell the primary!
       pl->send_cluster_message(
        get_primary().osd,
-       new MOSDPGTrim(
+       make_message<MOSDPGTrim>(
          get_osdmap_epoch(),
          spg_t(info.pgid.pgid, primary.shard),
          last_complete_ondisk),
@@ -5040,7 +5042,7 @@ void PeeringState::Backfilling::backfill_release_reservations()
     ceph_assert(*it != ps->pg_whoami);
     pl->send_cluster_message(
       it->osd,
-      new MBackfillReserve(
+      make_message<MBackfillReserve>(
        MBackfillReserve::RELEASE,
        spg_t(ps->info.pgid.pgid, it->shard),
        ps->get_osdmap_epoch()),
@@ -5164,7 +5166,7 @@ PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved &ev
     ceph_assert(*backfill_osd_it != ps->pg_whoami);
     pl->send_cluster_message(
       backfill_osd_it->osd,
-      new MBackfillReserve(
+      make_message<MBackfillReserve>(
        MBackfillReserve::REQUEST,
        spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard),
        ps->get_osdmap_epoch(),
@@ -5204,7 +5206,7 @@ void PeeringState::WaitRemoteBackfillReserved::retry()
     ceph_assert(*it != ps->pg_whoami);
     pl->send_cluster_message(
       it->osd,
-      new MBackfillReserve(
+      make_message<MBackfillReserve>(
        MBackfillReserve::RELEASE,
        spg_t(context< PeeringMachine >().spgid.pgid, it->shard),
        ps->get_osdmap_epoch()),
@@ -5377,7 +5379,7 @@ PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
   DECLARE_LOCALS;
   pl->send_cluster_message(
     ps->primary.osd,
-    new MRecoveryReserve(
+    make_message<MRecoveryReserve>(
       MRecoveryReserve::GRANT,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5485,7 +5487,7 @@ PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
 
   pl->send_cluster_message(
       ps->primary.osd,
-      new MBackfillReserve(
+      make_message<MBackfillReserve>(
        MBackfillReserve::GRANT,
        spg_t(ps->info.pgid.pgid, ps->primary.shard),
        ps->get_osdmap_epoch()),
@@ -5542,7 +5544,7 @@ PeeringState::RepRecovering::react(const RemoteRecoveryPreempted &)
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     ps->primary.osd,
-    new MRecoveryReserve(
+    make_message<MRecoveryReserve>(
       MRecoveryReserve::REVOKE,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5559,7 +5561,7 @@ PeeringState::RepRecovering::react(const BackfillTooFull &)
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     ps->primary.osd,
-    new MBackfillReserve(
+    make_message<MBackfillReserve>(
       MBackfillReserve::REVOKE_TOOFULL,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5576,7 +5578,7 @@ PeeringState::RepRecovering::react(const RemoteBackfillPreempted &)
   pl->unreserve_recovery_space();
   pl->send_cluster_message(
     ps->primary.osd,
-    new MBackfillReserve(
+    make_message<MBackfillReserve>(
       MBackfillReserve::REVOKE,
       spg_t(ps->info.pgid.pgid, ps->primary.shard),
       ps->get_osdmap_epoch()),
@@ -5680,7 +5682,7 @@ PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved &ev
     ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami);
     pl->send_cluster_message(
       remote_recovery_reservation_it->osd,
-      new MRecoveryReserve(
+      make_message<MRecoveryReserve>(
        MRecoveryReserve::REQUEST,
        spg_t(context< PeeringMachine >().spgid.pgid,
              remote_recovery_reservation_it->shard),
@@ -5730,7 +5732,7 @@ void PeeringState::Recovering::release_reservations(bool cancel)
       continue;
     pl->send_cluster_message(
       i->osd,
-      new MRecoveryReserve(
+      make_message<MRecoveryReserve>(
        MRecoveryReserve::RELEASE,
        spg_t(ps->info.pgid.pgid, i->shard),
        ps->get_osdmap_epoch()),
@@ -6429,7 +6431,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l)
   ps->proc_lease(l.lease);
   pl->send_cluster_message(
     ps->get_primary().osd,
-    new MOSDPGLeaseAck(epoch,
+    make_message<MOSDPGLeaseAck>(epoch,
                       spg_t(spgid.pgid, ps->get_primary().shard),
                       ps->get_lease_ack()),
     epoch);
index 74f2250589c6c85699a82f81db16ecaae46f74ac..0695486509377a8efb27c9c22ac5162175bc75c2 100644 (file)
@@ -270,7 +270,7 @@ public:
 
     /// Send cluster message to osd
     virtual void send_cluster_message(
-      int osd, Message *m, epoch_t epoch, bool share_map_update=false) = 0;
+      int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) = 0;
     /// Send pg_created to mon
     virtual void send_pg_created(pg_t pgid) = 0;
 
index b34b62ca5d96488d2de3a025de1736488a3131df..479994d80eb8425754760bc237d644a93d764963 100644 (file)
@@ -548,7 +548,7 @@ public:
     osd->send_message_osd_cluster(messages, from_epoch);
   }
   void send_message_osd_cluster(
-    Message *m, Connection *con) override {
+    MessageRef m, Connection *con) override {
     osd->send_message_osd_cluster(m, con);
   }
   void send_message_osd_cluster(
index 7c71aa26fe7a2b1c2d91fa2d1371fdb94e53660b..d7ffc0533958da8ce657a664ef308c019d3de370 100644 (file)
@@ -59,7 +59,7 @@ class PG_SendMessageOnConn: public Context {
     Message *reply,
     ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
   void finish(int) override {
-    pg->send_message_osd_cluster(reply, conn.get());
+    pg->send_message_osd_cluster(MessageRef(reply, false), conn.get());
   }
 };