]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: make OSDService messenger helpers return ConnectionRef
authorSamuel Just <sam.just@inktank.com>
Fri, 30 Nov 2012 19:08:55 +0000 (11:08 -0800)
committerSamuel Just <sam.just@inktank.com>
Fri, 30 Nov 2012 19:08:55 +0000 (11:08 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/msg/DispatchQueue.h
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc

index ad4584829d1cff3ea25291c2a68ee00f30b2e629..ea44c165d56d680be76e0d8a953c3c9fd975d530 100644 (file)
@@ -33,9 +33,6 @@ class SimpleMessenger;
 class Message;
 class Connection;
 
-typedef boost::intrusive_ptr<Connection> ConnectionRef;
-typedef boost::intrusive_ptr<Message> MessageRef;
-
 /**
  * The DispatchQueue contains all the Pipes which have Messages
  * they want to be dispatched, carefully organized by Message priority
index b6a113f771fb8edad60ac3850ee0dacdf82fe2ac..fc434ed9b85a49a1d31cc1f5b1a9319fc237a4c3 100644 (file)
 #include <stdlib.h>
 #include <ostream>
 
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
 #include "include/types.h"
 #include "include/buffer.h"
 #include "common/Throttle.h"
@@ -261,6 +265,7 @@ public:
     rx_buffers.erase(tid);
   }
 };
+typedef boost::intrusive_ptr<Connection> ConnectionRef;
 
 
 
@@ -466,6 +471,7 @@ public:
 
   void encode(uint64_t features, bool datacrc);
 };
+typedef boost::intrusive_ptr<Message> MessageRef;
 
 extern Message *decode_message(CephContext *cct, ceph_msg_header &header,
                               ceph_msg_footer& footer, bufferlist& front,
index 9c57b66c4b89b35d15b47758f1c134e40731b926..feff33d494309051a474cba20c0ccfc47a2d62f0 100644 (file)
@@ -1781,11 +1781,12 @@ void OSD::_add_heartbeat_peer(int p)
 
   map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
   if (i == heartbeat_peers.end()) {
-    Connection *con = service.get_con_osd_hb(p, osdmap->get_epoch());
+    ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
     if (!con)
       return;
     hi = &heartbeat_peers[p];
-    hi->con = con;
+    hi->con = con.get();
+    hi->con->get();
     hi->peer = p;
     hi->con->set_priv(new HeartbeatSession(p));
     dout(10) << "_add_heartbeat_peer: new peer osd." << p
@@ -1913,10 +1914,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
       if (curmap->is_up(from)) {
        note_peer_epoch(from, m->map_epoch);
        if (is_active()) {
-         Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch());
+         ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
          if (con) {
-           _share_map_outgoing(from, con);
-           con->put();
+           _share_map_outgoing(from, con.get());
          }
        }
       }
@@ -1939,10 +1939,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
          curmap->is_up(from)) {
        note_peer_epoch(from, m->map_epoch);
        if (is_active()) {
-         Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch());
+         ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
          if (con) {
-           _share_map_outgoing(from, con);
-           con->put();
+           _share_map_outgoing(from, con.get());
          }
        }
       }
@@ -2087,14 +2086,15 @@ bool OSD::heartbeat_reset(Connection *con)
     map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
     if (p != heartbeat_peers.end() &&
        p->second.con == con) {
-      Connection *newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+      ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
       if (!newcon) {
        dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
        s->put();
        return true;
       }
       dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
-      p->second.con = newcon;
+      p->second.con = newcon.get();
+      p->second.con->get();
       p->second.con->set_priv(s);
     } else {
       dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
@@ -2510,7 +2510,7 @@ void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epo
   osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer));
 }
 
-Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
 {
   Mutex::Locker l(pre_publish_lock);
 
@@ -2521,10 +2521,13 @@ Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
       next_osdmap->get_info(peer).up_from > from_epoch) {
     return NULL;
   }
-  return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer));
+  ConnectionRef ret(
+    osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)));
+  ret->put(); // Ref from get_connection
+  return ret;
 }
 
-Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
 {
   Mutex::Locker l(pre_publish_lock);
 
@@ -2535,7 +2538,10 @@ Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
       next_osdmap->get_info(peer).up_from > from_epoch) {
     return NULL;
   }
-  return osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer));
+  ConnectionRef ret(
+    osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer)));
+  ret->put(); // Ref from get_connection
+  return ret;
 }
 
 void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
@@ -4688,16 +4694,16 @@ void OSD::do_notifies(
     }
     if (!curmap->is_up(it->first))
       continue;
-    Connection *con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
+    ConnectionRef con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
     if (!con)
       continue;
-    _share_map_outgoing(it->first, con, curmap);
+    _share_map_outgoing(it->first, con.get(), curmap);
     if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
       dout(7) << "do_notify osd." << it->first
              << " on " << it->second.size() << " PGs" << dendl;
       MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
                                         it->second);
-      cluster_messenger->send_message(m, con);
+      cluster_messenger->send_message(m, con.get());
     } else {
       dout(7) << "do_notify osd." << it->first
              << " sending seperate messages" << dendl;
@@ -4709,10 +4715,9 @@ void OSD::do_notifies(
        list[0] = *i;
        MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
                                           list);
-       cluster_messenger->send_message(m, con);
+       cluster_messenger->send_message(m, con.get());
       }
     }
-    con->put();
   }
 }
 
@@ -4729,15 +4734,15 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
     if (!curmap->is_up(pit->first))
       continue;
     int who = pit->first;
-    Connection *con = service.get_con_osd_cluster(who, curmap->get_epoch());
+    ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
     if (!con)
       continue;
-    _share_map_outgoing(who, con, curmap);
+    _share_map_outgoing(who, con.get(), curmap);
     if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
       dout(7) << "do_queries querying osd." << who
              << " on " << pit->second.size() << " PGs" << dendl;
       MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
-      cluster_messenger->send_message(m, con);
+      cluster_messenger->send_message(m, con.get());
     } else {
       dout(7) << "do_queries querying osd." << who
              << " sending seperate messages "
@@ -4748,10 +4753,9 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
        map<pg_t, pg_query_t> to_send;
        to_send.insert(*i);
        MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
-       cluster_messenger->send_message(m, con);
+       cluster_messenger->send_message(m, con.get());
       }
     }
-    con->put();
   }
 }
 
@@ -4769,14 +4773,14 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
         ++i) {
       dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl;
     }
-    Connection *con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
+    ConnectionRef con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
     if (!con)
       continue;
-    _share_map_outgoing(p->first, con, curmap);
+    _share_map_outgoing(p->first, con.get(), curmap);
     if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
       MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
       m->pg_list = p->second;
-      cluster_messenger->send_message(m, con);
+      cluster_messenger->send_message(m, con.get());
     } else {
       for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
             p->second.begin();
@@ -4786,10 +4790,9 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
        to_send[0] = *i;
        MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
        m->pg_list = to_send;
-       cluster_messenger->send_message(m, con);
+       cluster_messenger->send_message(m, con.get());
       }
     }
-    con->put();
   }
   info_map.clear();
 }
@@ -5158,13 +5161,12 @@ void OSD::handle_pg_query(OpRequestRef op)
     pg_info_t empty(pgid);
     if (it->second.type == pg_query_t::LOG ||
        it->second.type == pg_query_t::FULLLOG) {
-      Connection *con = service.get_con_osd_cluster(from, osdmap->get_epoch());
+      ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
       if (con) {
        MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
                                        it->second.epoch_sent);
-       _share_map_outgoing(from, con, osdmap);
-       cluster_messenger->send_message(mlog, con);
-       con->put();
+       _share_map_outgoing(from, con.get(), osdmap);
+       cluster_messenger->send_message(mlog, con.get());
       }
     } else {
       notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
index 2b623efa339ec157d9979cc9c671710be7b46aae..a749ec178db5d575334cc07b582546cf36a81810 100644 (file)
@@ -225,8 +225,8 @@ public:
     Mutex::Locker l(pre_publish_lock);
     next_osdmap = map;
   }
-  Connection *get_con_osd_cluster(int peer, epoch_t from_epoch);
-  Connection *get_con_osd_hb(int peer, epoch_t from_epoch);
+  ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
+  ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
   void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
 
   // -- scrub scheduling --
index d71fe50a0e94f3d628b0e4b9d1e4b11ccf064fe6..55348e6c786e13fdf8a599b22add2e914fe684da 100644 (file)
@@ -3383,7 +3383,7 @@ void PG::scrub()
     OSDMapRef curmap = osd->get_osdmap();
     scrubber.is_chunky = true;
     for (unsigned i=1; i<acting.size(); i++) {
-      Connection *con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
+      ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
       if (!con)
        continue;
       if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) {
@@ -3391,10 +3391,8 @@ void PG::scrub()
                  << " does not support chunky scrubs, falling back to classic"
                  << dendl;
         scrubber.is_chunky = false;
-       con->put();
         break;
       }
-      con->put();
     }
 
     if (scrubber.is_chunky) {
@@ -4226,11 +4224,10 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
 
   dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
 
-  Connection *con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
+  ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
   if (con) {
-    osd->osd->_share_map_outgoing(from, con, get_osdmap());
-    osd->cluster_messenger->send_message(mlog, con);
-    con->put();
+    osd->osd->_share_map_outgoing(from, con.get(), get_osdmap());
+    osd->cluster_messenger->send_message(mlog, con.get());
   } else {
     mlog->put();
   }
@@ -5248,7 +5245,8 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
   context< RecoveryMachine >().log_enter(state_name);
   PG *pg = context< RecoveryMachine >().pg;
   pg->state_set(PG_STATE_BACKFILL_WAIT);
-  Connection *con = pg->osd->get_con_osd_cluster(pg->backfill_target, pg->get_osdmap()->get_epoch());
+  ConnectionRef con = pg->osd->get_con_osd_cluster(
+    pg->backfill_target, pg->get_osdmap()->get_epoch());
   if (con) {
     if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
       pg->osd->cluster_messenger->send_message(
@@ -5256,11 +5254,10 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
          MBackfillReserve::REQUEST,
          pg->info.pgid,
          pg->get_osdmap()->get_epoch()),
-       con);
+       con.get());
     } else {
       post_event(RemoteBackfillReserved());
     }
-    con->put();
   }
 }
 
@@ -5495,18 +5492,17 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con
   }
 
   if (acting_osd_it != context< Active >().sorted_acting_set.end()) {
-    Connection *con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
+    ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
     if (con) {
       if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
        pg->osd->cluster_messenger->send_message(
           new MRecoveryReserve(MRecoveryReserve::REQUEST,
                               pg->info.pgid,
                               pg->get_osdmap()->get_epoch()),
-         con);
+         con.get());
       } else {
        post_event(RemoteRecoveryReserved());
       }
-      con->put();
     }
     ++acting_osd_it;
   } else {
@@ -5543,16 +5539,15 @@ void PG::RecoveryState::Recovering::release_reservations()
         ++i) {
     if (*i == pg->osd->whoami) // skip myself
       continue;
-    Connection *con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
+    ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
     if (con) {
       if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
        pg->osd->cluster_messenger->send_message(
           new MRecoveryReserve(MRecoveryReserve::RELEASE,
                               pg->info.pgid,
                               pg->get_osdmap()->get_epoch()),
-         con);
+         con.get());
       }
-      con->put();
     }
   }
 }