]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: use OSDService send_message helper from PG context
authorSage Weil <sage@inktank.com>
Thu, 29 Nov 2012 17:21:49 +0000 (09:21 -0800)
committerSage Weil <sage@inktank.com>
Thu, 29 Nov 2012 20:39:44 +0000 (12:39 -0800)
Use the OSDService helper to send messages to peers.  This ensures that if
we are on an older OSDMap the messages don't actually get sent to down
OSDs that handle_osd_map has done mark_down() on.

Signed-off-by: Sage Weil <sage@inktank.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/ReplicatedPG.cc

index eef6d15f7db0bf66b1fa2b1afb9e05378f281319..da4b18ebeb785c3540122603aa462a768f1f4319 100644 (file)
@@ -4654,7 +4654,7 @@ void OSD::do_notifies(
              << " on " << it->second.size() << " PGs" << dendl;
       MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
                                         it->second);
-      cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+      service.send_message_osd_cluster(it->first, m, curmap->get_epoch());
     } else {
       dout(7) << "do_notify osd." << it->first
              << " sending seperate messages" << dendl;
@@ -4666,7 +4666,7 @@ void OSD::do_notifies(
        list[0] = *i;
        MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
                                           list);
-       cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+       service.send_message_osd_cluster(it->first, m, curmap->get_epoch());
       }
     }
   }
@@ -4692,7 +4692,7 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
       dout(7) << "do_queries querying osd." << who
              << " on " << pit->second.size() << " PGs" << dendl;
       MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
-      cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+      service.send_message_osd_cluster(who, m, curmap->get_epoch());
     } else {
       dout(7) << "do_queries querying osd." << who
              << " sending seperate messages "
@@ -4703,7 +4703,7 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
        map<pg_t, pg_query_t> to_send;
        to_send.insert(*i);
        MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
-       cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+       service.send_message_osd_cluster(who, m, curmap->get_epoch());
       }
     }
   }
@@ -4729,7 +4729,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
     if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
       MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
       m->pg_list = p->second;
-      cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+      service.send_message_osd_cluster(p->first, m, curmap->get_epoch());
     } else {
       for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
             p->second.begin();
@@ -4739,7 +4739,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
        to_send[0] = *i;
        MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
        m->pg_list = to_send;
-       cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+       service.send_message_osd_cluster(p->first, m, curmap->get_epoch());
       }
     }
   }
@@ -5113,8 +5113,7 @@ void OSD::handle_pg_query(OpRequestRef op)
       MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
                                      it->second.epoch_sent);
       _share_map_outgoing(osdmap->get_cluster_inst(from));
-      cluster_messenger->send_message(mlog,
-                                     osdmap->get_cluster_inst(from));
+      service.send_message_osd_cluster(from, mlog, osdmap->get_epoch());
     } else {
       notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
                                                        osdmap->get_epoch(),
index 77935ea2d7b8b4d67d825e777b389327bcd61562..21a6f514dbc2f2f4e7813f398b08384f673e95e9 100644 (file)
@@ -1538,7 +1538,7 @@ void PG::activate(ObjectStore::Transaction& t,
       if (m) {
        dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
        //m->log.print(cout);
-       osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+       osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
       }
 
       // peer now has 
@@ -1747,7 +1747,7 @@ void PG::_activate_committed(epoch_t e)
                                info);
     i.info.history.last_epoch_started = e;
     m->pg_list.push_back(make_pair(i, pg_interval_map_t()));
-    osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(acting[0]));
+    osd->send_message_osd_cluster(acting[0], m, get_osdmap()->get_epoch());
   }
 
   if (dirty_info) {
@@ -1954,8 +1954,7 @@ void PG::purge_strays()
       MOSDPGRemove *m = new MOSDPGRemove(
        get_osdmap()->get_epoch(),
        to_remove);
-      osd->cluster_messenger->send_message(
-       m, get_osdmap()->get_cluster_inst(*p));
+      osd->send_message_osd_cluster(*p, m, get_osdmap()->get_epoch());
       stray_purged.insert(*p);
     } else {
       dout(10) << "not sending PGRemove to down osd." << *p << dendl;
@@ -2268,9 +2267,10 @@ void PG::trim_peers()
   dout(10) << "trim_peers " << pg_trim_to << dendl;
   if (pg_trim_to != eversion_t()) {
     for (unsigned i=1; i<acting.size(); i++)
-      osd->cluster_messenger->send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
-                                                 pg_trim_to),
-                                  get_osdmap()->get_cluster_inst(acting[i]));
+      osd->send_message_osd_cluster(acting[i],
+                                   new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+                                                  pg_trim_to),
+                                   get_osdmap()->get_epoch());
   }
 }
 
@@ -2996,8 +2996,7 @@ void PG::_request_scrub_map_classic(int replica, eversion_t version)
   MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
                                              last_update_applied,
                                               get_osdmap()->get_epoch());
-  osd->cluster_messenger->send_message(repscrubop,
-                                       get_osdmap()->get_cluster_inst(replica));
+  osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
 }
 
 // send scrub v3 messages (chunky scrub)
@@ -3010,8 +3009,7 @@ void PG::_request_scrub_map(int replica, eversion_t version,
   MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
                                               get_osdmap()->get_epoch(),
                                               start, end, deep);
-  osd->cluster_messenger->send_message(repscrubop,
-                                       get_osdmap()->get_cluster_inst(replica));
+  osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
 }
 
 void PG::sub_op_scrub_reserve(OpRequestRef op)
@@ -3116,7 +3114,7 @@ void PG::scrub_reserve_replicas()
     MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
                                      get_osdmap()->get_epoch(), osd->get_tid(), v);
     subop->ops = scrub;
-    osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+    osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
   }
 }
 
@@ -3132,7 +3130,7 @@ void PG::scrub_unreserve_replicas()
     MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
                                      get_osdmap()->get_epoch(), osd->get_tid(), v);
     subop->ops = scrub;
-    osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+    osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
   }
 }
 
@@ -4140,7 +4138,7 @@ void PG::share_pg_info()
          get_osdmap()->get_epoch(),
          info),
        pg_interval_map_t()));
-    osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+    osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
   }
 }
 
@@ -4172,7 +4170,7 @@ void PG::share_pg_log()
     }
     pinfo.last_update = m->log.head;
 
-    osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+    osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
   }
 }
 
@@ -4226,8 +4224,7 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
 
   osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from),
                                get_osdmap());
-  osd->cluster_messenger->send_message(mlog, 
-                                      get_osdmap()->get_cluster_inst(from));
+  osd->send_message_osd_cluster(from, mlog, get_osdmap()->get_epoch());
 }
 
 
@@ -5246,12 +5243,13 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
     pg->osd->cluster_messenger->get_connection(
       pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
   if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
-    pg->osd->cluster_messenger->send_message(
+    pg->osd->send_message_osd_cluster(
+      pg->backfill_target,
       new MBackfillReserve(
        MBackfillReserve::REQUEST,
        pg->info.pgid,
        pg->get_osdmap()->get_epoch()),
-      pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
+      pg->get_osdmap()->get_epoch());
   } else {
     post_event(RemoteBackfillReserved());
   }
@@ -5353,12 +5351,13 @@ boost::statechart::result
 PG::RecoveryState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
-  pg->osd->cluster_messenger->send_message(
+  pg->osd->send_message_osd_cluster(
+    pg->acting[0],
     new MRecoveryReserve(
       MRecoveryReserve::GRANT,
       pg->info.pgid,
       pg->get_osdmap()->get_epoch()),
-    pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+    pg->get_osdmap()->get_epoch());
   return transit<RepRecovering>();
 }
 
@@ -5400,12 +5399,13 @@ boost::statechart::result
 PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
-  pg->osd->cluster_messenger->send_message(
+  pg->osd->send_message_osd_cluster(
+    pg->acting[0],
     new MBackfillReserve(
       MBackfillReserve::GRANT,
       pg->info.pgid,
       pg->get_osdmap()->get_epoch()),
-    pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+    pg->get_osdmap()->get_epoch());
   return transit<RepRecovering>();
 }
 
@@ -5413,12 +5413,13 @@ boost::statechart::result
 PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteReservationRejected &evt)
 {
   PG *pg = context< RecoveryMachine >().pg;
-  pg->osd->cluster_messenger->send_message(
+  pg->osd->send_message_osd_cluster(
+    pg->acting[0],
     new MBackfillReserve(
       MBackfillReserve::REJECT,
       pg->info.pgid,
       pg->get_osdmap()->get_epoch()),
-    pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+    pg->get_osdmap()->get_epoch());
   return transit<RepNotRecovering>();
 }
 
@@ -5489,12 +5490,13 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con
       pg->osd->cluster_messenger->get_connection(
         pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
     if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
-      pg->osd->cluster_messenger->send_message(
+      pg->osd->send_message_osd_cluster(
+       *acting_osd_it,
         new MRecoveryReserve(
           MRecoveryReserve::REQUEST,
           pg->info.pgid,
           pg->get_osdmap()->get_epoch()),
-        pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
+       pg->get_osdmap()->get_epoch());
     } else {
       post_event(RemoteRecoveryReserved());
     }
@@ -5537,12 +5539,13 @@ void PG::RecoveryState::Recovering::release_reservations()
       pg->osd->cluster_messenger->get_connection(
         pg->get_osdmap()->get_cluster_inst(*i));
     if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
-      pg->osd->cluster_messenger->send_message(
+      pg->osd->send_message_osd_cluster(
+       *i,
         new MRecoveryReserve(
           MRecoveryReserve::RELEASE,
           pg->info.pgid,
           pg->get_osdmap()->get_epoch()),
-        pg->get_osdmap()->get_cluster_inst(*i));
+        pg->get_osdmap()->get_epoch());
     }
   }
 }
index 2513d9d6fe4947f994065a5a9fe5b8ca7940367d..de259eb16739b420c1ca1a005509e5bf1bd592c2 100644 (file)
@@ -3975,7 +3975,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
     }
     
     wr->pg_trim_to = pg_trim_to;
-    osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
+    osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
 
     // keep peer_info up to date
     if (pinfo.last_complete == pinfo.last_update)
@@ -4624,7 +4624,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
       // send ack to acker only if we haven't sent a commit already
       MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
       ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
-      osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
+      osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
     }
     
     rm->applied = true;
@@ -4670,7 +4670,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
       MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
       commit->set_last_complete_ondisk(rm->last_complete);
       commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
-      osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+      osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
     }
     
     rm->committed = true;
@@ -4975,7 +4975,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
   subop->ops = vector<OSDOp>(1);
   subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
 
-  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+  osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
 }
 
 /*
@@ -5112,8 +5112,7 @@ int ReplicatedPG::send_pull(int prio, int peer,
   subop->recovery_info = recovery_info;
   subop->recovery_progress = progress;
 
-  osd->cluster_messenger->send_message(subop,
-                                      get_osdmap()->get_cluster_inst(peer));
+  osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
 
   osd->logger->inc(l_osd_pull);
   return 0;
@@ -5507,8 +5506,7 @@ int ReplicatedPG::send_push(int prio, int peer,
   subop->recovery_info = recovery_info;
   subop->recovery_progress = new_progress;
   subop->current_progress = progress;
-  osd->cluster_messenger->
-    send_message(subop, get_osdmap()->get_cluster_inst(peer));
+  osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
   if (out_progress)
     *out_progress = new_progress;
   return 0;
@@ -5525,7 +5523,7 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
   subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
   subop->first = false;
   subop->complete = false;
-  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+  osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
 }
 
 void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
@@ -5664,10 +5662,10 @@ void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since,
     if (last_complete_ondisk == info.last_update) {
       if (is_replica()) {
        // we are fully up to date.  tell the primary!
-       osd->cluster_messenger->
-         send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
-                                     last_complete_ondisk),
-                      get_osdmap()->get_cluster_inst(get_primary()));
+       osd->send_message_osd_cluster(get_primary(),
+                                     new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+                                                    last_complete_ondisk),
+                                     get_osdmap()->get_epoch());
 
        // adjust local snaps!
        adjust_local_snaps();
@@ -6713,7 +6711,7 @@ int ReplicatedPG::recover_backfill(int max)
       epoch_t e = get_osdmap()->get_epoch();
       MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
                                     pbi.end, hobject_t());
-      osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+      osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
       waiting_on_backfill = true;
       start_recovery_op(pbi.end);
       ops++;
@@ -6828,7 +6826,7 @@ int ReplicatedPG::recover_backfill(int max)
     }
     m->last_backfill = bound;
     m->stats = pinfo.stats.stats;
-    osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+    osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
   }
 
   dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects