]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: move the peer_epoch and map sharing infrastructure into OSDService
authorGreg Farnum <greg@inktank.com>
Fri, 25 Apr 2014 21:00:00 +0000 (14:00 -0700)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:21 +0000 (15:29 -0700)
None of this code requires OSD-internal data or acquring locks from
anybody else.

Signed-off-by: Greg Farnum <greg@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc

index cfc6e34b54f5ffb1274e96103821e2a7e487cb75..812714b22264a224e572e8d4f81c2dd99fb413f5 100644 (file)
@@ -194,6 +194,7 @@ OSDService::OSDService(OSD *osd) :
   class_handler(osd->class_handler),
   publish_lock("OSDService::publish_lock"),
   pre_publish_lock("OSDService::pre_publish_lock"),
+  peer_map_epoch_lock("OSDService::peer_map_epoch_lock"),
   sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
   scrubs_active(0),
   agent_lock("OSD::agent_lock"),
@@ -941,7 +942,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
   peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
   map_lock("OSD::map_lock"),
-  peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
   pg_map_lock("OSD::pg_map_lock"),
   debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
   debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
@@ -3026,11 +3026,11 @@ void OSD::handle_osd_ping(MOSDPing *m)
       m->get_connection()->get_messenger()->send_message(r, m->get_connection());
 
       if (curmap->is_up(from)) {
-       note_peer_epoch(from, m->map_epoch);
+       service.note_peer_epoch(from, m->map_epoch);
        if (is_active()) {
          ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
          if (con) {
-           _share_map_outgoing(from, con.get());
+           service.share_map_outgoing(from, con.get());
          }
        }
       } else if (!curmap->exists(from) ||
@@ -3073,11 +3073,11 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
       if (m->map_epoch &&
          curmap->is_up(from)) {
-       note_peer_epoch(from, m->map_epoch);
+       service.note_peer_epoch(from, m->map_epoch);
        if (is_active()) {
          ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
          if (con) {
-           _share_map_outgoing(from, con.get());
+           service.share_map_outgoing(from, con.get());
          }
        }
       }
@@ -3978,7 +3978,7 @@ void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epo
   }
   const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
   Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
-  osd->_share_map_outgoing(peer, peer_con, next_map);
+  share_map_outgoing(peer, peer_con, next_map);
   osd->cluster_messenger->send_message(m, peer_inst);
   release_map(next_map);
 }
@@ -4417,7 +4417,7 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
 
          // send them the latest diff to ensure they realize the mapping
          // has changed.
-         send_incremental_map(osdmap->get_epoch() - 1, con, osdmap);
+         service.send_incremental_map(osdmap->get_epoch() - 1, con, osdmap);
 
          // do not reply; they will get newer maps and realize they
          // need to resend.
@@ -4665,7 +4665,7 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
 // --------------------------------------
 // dispatch
 
-epoch_t OSD::get_peer_epoch(int peer)
+epoch_t OSDService::get_peer_epoch(int peer)
 {
   Mutex::Locker l(peer_map_epoch_lock);
   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
@@ -4674,7 +4674,7 @@ epoch_t OSD::get_peer_epoch(int peer)
   return p->second;
 }
 
-epoch_t OSD::note_peer_epoch(int peer, epoch_t e)
+epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
 {
   Mutex::Locker l(peer_map_epoch_lock);
   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
@@ -4693,7 +4693,7 @@ epoch_t OSD::note_peer_epoch(int peer, epoch_t e)
   }
 }
  
-void OSD::forget_peer_epoch(int peer, epoch_t as_of) 
+void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
 {
   Mutex::Locker l(peer_map_epoch_lock);
   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
@@ -4737,7 +4737,7 @@ bool OSDService::should_share_map(entity_name_t name, Connection *con,
       (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
        osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
     // remember
-    epoch_t has = MAX(osd->get_peer_epoch(name.num()), epoch);
+    epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
 
     // share?
     if (has < osdmap->get_epoch()) {
@@ -4773,7 +4773,7 @@ void OSDService::share_map_incoming(
           << " < " << osdmap->get_epoch() << dendl;
       // we know the Session is valid or we wouldn't be sending
       *sent_epoch_p = osdmap->get_epoch();
-      osd->send_incremental_map(epoch, con, osdmap);
+      send_incremental_map(epoch, con, osdmap);
     } else if (con->get_messenger() == osd->cluster_messenger &&
         osdmap->is_up(name.num()) &&
         (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
@@ -4781,17 +4781,17 @@ void OSDService::share_map_incoming(
       dout(10) << name << " " << con->get_peer_addr()
                       << " has old map " << epoch << " < "
                       << osdmap->get_epoch() << dendl;
-      osd->note_peer_epoch(name.num(), osdmap->get_epoch());
-      osd->send_incremental_map(epoch, con, osdmap);
+      note_peer_epoch(name.num(), osdmap->get_epoch());
+      send_incremental_map(epoch, con, osdmap);
     }
   }
 }
 
 
-void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map)
+void OSDService::share_map_outgoing(int peer, Connection *con, OSDMapRef map)
 {
   if (!map)
-    map = service.get_osdmap();
+    map = get_osdmap();
 
   // send map?
   epoch_t pe = get_peer_epoch(peer);
@@ -5586,7 +5586,7 @@ void OSD::note_down_osd(int peer)
 
 void OSD::note_up_osd(int peer)
 {
-  forget_peer_epoch(peer, osdmap->get_epoch() - 1);
+  service.forget_peer_epoch(peer, osdmap->get_epoch() - 1);
 }
 
 struct C_OnMapApply : public Context {
@@ -6246,12 +6246,12 @@ void OSD::activate_map()
 }
 
 
-MOSDMap *OSD::build_incremental_map_msg(epoch_t since, epoch_t to,
-                                        OSDSuperblock& superblock)
+MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
+                                               OSDSuperblock& sblock)
 {
   MOSDMap *m = new MOSDMap(monc->get_fsid());
-  m->oldest_map = superblock.oldest_map;
-  m->newest_map = superblock.newest_map;
+  m->oldest_map = sblock.oldest_map;
+  m->newest_map = sblock.newest_map;
   
   for (epoch_t e = to; e > since; e--) {
     bufferlist bl;
@@ -6272,7 +6272,7 @@ MOSDMap *OSD::build_incremental_map_msg(epoch_t since, epoch_t to,
   return m;
 }
 
-void OSD::send_map(MOSDMap *m, Connection *con)
+void OSDService::send_map(MOSDMap *m, Connection *con)
 {
   Messenger *msgr = client_messenger;
   if (entity_name_t::TYPE_OSD == con->get_peer_type())
@@ -6280,8 +6280,8 @@ void OSD::send_map(MOSDMap *m, Connection *con)
   msgr->send_message(m, con);
 }
 
-void OSD::send_incremental_map(epoch_t since, Connection *con,
-                               OSDMapRef& osdmap)
+void OSDService::send_incremental_map(epoch_t since, Connection *con,
+                                      OSDMapRef& osdmap)
 {
   epoch_t to = osdmap->get_epoch();
   dout(10) << "send_incremental_map " << since << " -> " << to
@@ -6289,12 +6289,12 @@ void OSD::send_incremental_map(epoch_t since, Connection *con,
 
   MOSDMap *m = NULL;
   while (!m) {
-    OSDSuperblock superblock(service.get_superblock());
-    if (since < superblock.oldest_map) {
+    OSDSuperblock sblock(get_superblock());
+    if (since < sblock.oldest_map) {
       // just send latest full map
       MOSDMap *m = new MOSDMap(monc->get_fsid());
-      m->oldest_map = superblock.oldest_map;
-      m->newest_map = superblock.newest_map;
+      m->oldest_map = sblock.oldest_map;
+      m->newest_map = sblock.newest_map;
       get_map_bl(to, m->maps[to]);
       send_map(m, con);
       return;
@@ -6308,7 +6308,7 @@ void OSD::send_incremental_map(epoch_t since, Connection *con,
     
     if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
       to = since + cct->_conf->osd_map_message_max;
-    m = build_incremental_map_msg(since, to, superblock);
+    m = build_incremental_map_msg(since, to, sblock);
   }
   send_map(m, con);
 }
@@ -6827,7 +6827,7 @@ void OSD::do_notifies(
       it->first, curmap->get_epoch());
     if (!con)
       continue;
-    _share_map_outgoing(it->first, con.get(), curmap);
+    service.share_map_outgoing(it->first, con.get(), curmap);
     if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
       dout(7) << "do_notify osd " << it->first
              << " on " << it->second.size() << " PGs" << dendl;
@@ -6867,7 +6867,7 @@ void OSD::do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
     ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
     if (!con)
       continue;
-    _share_map_outgoing(who, con.get(), curmap);
+    service.share_map_outgoing(who, con.get(), curmap);
     if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
       dout(7) << "do_queries querying osd." << who
              << " on " << pit->second.size() << " PGs" << dendl;
@@ -6911,7 +6911,7 @@ void OSD::do_infos(map<int,
       p->first, curmap->get_epoch());
     if (!con)
       continue;
-    _share_map_outgoing(p->first, con.get(), curmap);
+    service.share_map_outgoing(p->first, con.get(), curmap);
     if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
       MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
       m->pg_list = p->second;
@@ -7289,7 +7289,7 @@ void OSD::handle_pg_query(OpRequestRef op)
          it->second.from, it->second.to,
          osdmap->get_epoch(), empty,
          it->second.epoch_sent);
-       _share_map_outgoing(from, con.get(), osdmap);
+       service.share_map_outgoing(from, con.get(), osdmap);
        cluster_messenger->send_message(mlog, con.get());
       }
     } else {
index 8863c5aa265448c4b4daf4ca87b35f8b72fe7930..91cbfd6066cea4cc3e9fd0ea92905808962e8953 100644 (file)
@@ -420,10 +420,24 @@ public:
     }
   }
 
+private:
+  Mutex peer_map_epoch_lock;
+  map<int, epoch_t> peer_map_epoch;
+public:
+  epoch_t get_peer_epoch(int p);
+  epoch_t note_peer_epoch(int p, epoch_t e);
+  void forget_peer_epoch(int p, epoch_t e);
+
+  void send_map(class MOSDMap *m, Connection *con);
+  void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+  MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
+                                       OSDSuperblock& superblock);
   bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
                         OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
   void share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch,
                           OSDMapRef& osdmap, epoch_t *sent_epoch_p);
+  void share_map_outgoing(int peer, Connection *con,
+                          OSDMapRef map = OSDMapRef());
 
   ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
   pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
@@ -1373,18 +1387,8 @@ private:
   RWLock          map_lock;
   list<OpRequestRef>  waiting_for_osdmap;
 
-  Mutex peer_map_epoch_lock;
-  map<int, epoch_t> peer_map_epoch;
-  
-  epoch_t get_peer_epoch(int p);
-  epoch_t note_peer_epoch(int p, epoch_t e);
-  void forget_peer_epoch(int p, epoch_t e);
-
   friend struct send_map_on_destruct;
 
-  void _share_map_outgoing(int peer, Connection *con,
-                          OSDMapRef map = OSDMapRef());
-
   void wait_for_new_map(OpRequestRef op);
   void handle_osd_map(class MOSDMap *m);
   void note_down_osd(int osd);
@@ -1422,14 +1426,6 @@ private:
   void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
     return service.pin_map_inc_bl(e, bl);
   }
-  bool get_inc_map_bl(epoch_t e, bufferlist& bl) {
-    return service.get_inc_map_bl(e, bl);
-  }
-
-  MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
-                                     OSDSuperblock& superblock);
-  void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
-  void send_map(MOSDMap *m, Connection *con);
 
 protected:
   // -- placement groups --
index 2bbb440568b092a5e0df3c848c8edd15c2afda76..312b56da8e7953f9b5ec75f25df254d599d844a3 100644 (file)
@@ -4465,7 +4465,7 @@ void PG::fulfill_log(
   ConnectionRef con = osd->get_con_osd_cluster(
     from.osd, get_osdmap()->get_epoch());
   if (con) {
-    osd->osd->_share_map_outgoing(from.osd, con.get(), get_osdmap());
+    osd->share_map_outgoing(from.osd, con.get(), get_osdmap());
     osd->send_message_osd_cluster(mlog, con.get());
   } else {
     mlog->put();