]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG: process peering events in a queue
authorSamuel Just <sam.just@inktank.com>
Thu, 31 May 2012 04:51:29 +0000 (21:51 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:14:57 +0000 (10:14 -0700)
Peering events are now queued via queue_peering_event in the
peering_queue.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h

index ab88df6a759c6296dbad439a69075656f8c0d598..48c2053728e2e77652c5d69949a1e6e58ba64082 100644 (file)
@@ -645,6 +645,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   admin_ops_hook(NULL),
   op_queue_len(0),
   op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
+  peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
   map_lock("OSD::map_lock"),
   peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
   map_cache_lock("OSD::map_cache_lock"),
@@ -1347,6 +1348,8 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
                          C_Contexts **pfin)
 {
   PG *pg;
+  ObjectStore::Transaction *t = 0;
+  C_Contexts *fin = 0;
 
   if (!_have_pg(info.pgid)) {
     // same primary?
@@ -1389,8 +1392,8 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
     }
 
     // ok, create PG locally using provided Info and History
-    *pt = new ObjectStore::Transaction;
-    *pfin = new C_Contexts(g_ceph_context);
+    t = new ObjectStore::Transaction;
+    fin = new C_Contexts(g_ceph_context);
     pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, **pt);
       
     created++;
@@ -1409,8 +1412,21 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
       pg->unlock();
       return NULL;
     }
-    *pt = new ObjectStore::Transaction;
-    *pfin = new C_Contexts(g_ceph_context);
+    t = new ObjectStore::Transaction;
+    fin = new C_Contexts(g_ceph_context);
+  }
+  if (pt) {
+    assert(pfin);
+    *pt = t;
+    *pfin = fin;
+  } else if (t && t->empty()) {
+    delete t;
+    delete fin;
+  } else {
+    int tr = store->queue_transaction(
+      &pg->osr,
+      t, new ObjectStore::C_DeleteTransaction(t), fin);
+    assert(tr == 0);
   }
   return pg;
 }
@@ -4309,8 +4325,9 @@ void OSD::handle_pg_create(OpRequestRef op)
  * content for, and they are primary for.
  */
 
-void OSD::do_notifies(map< int, vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list,
-                     epoch_t query_epoch)
+void OSD::do_notifies(
+  map< int,vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list,
+  epoch_t query_epoch)
 {
   for (map< int, vector<pair<pg_info_t,pg_interval_map_t> > >::iterator it = notify_list.begin();
        it != notify_list.end();
@@ -4399,32 +4416,13 @@ void OSD::handle_pg_notify(OpRequestRef op)
       continue;
     }
 
-    ObjectStore::Transaction *t;
-    C_Contexts *fin;
-    pg = get_or_create_pg(it->first, it->second, m->get_epoch(), from, created, true, &t, &fin);
+    pg = get_or_create_pg(it->first, it->second,
+                         m->get_epoch(), from, created, true);
     if (!pg)
       continue;
-
-    PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
-    pg->handle_notify(m->get_epoch(), m->get_query_epoch(), from, *it, &rctx);
-    pg->write_if_dirty(*t);
-
-    if (!t->empty()) {
-      int tr = store->queue_transaction(
-       &pg->osr,
-       t, new ObjectStore::C_DeleteTransaction(t), fin);
-      assert(tr == 0);
-    } else {
-      delete t;
-      delete fin;
-    }
+    pg->queue_notify(m->get_epoch(), m->get_query_epoch(), from, it->first);
     pg->unlock();
   }
-  
-  do_queries(query_map);
-  do_infos(info_map);
-  
-  maybe_update_heartbeat_peers();
 }
 
 void OSD::handle_pg_log(OpRequestRef op)
@@ -4445,35 +4443,13 @@ void OSD::handle_pg_log(OpRequestRef op)
   }
 
   int created = 0;
-  ObjectStore::Transaction *t;
-  C_Contexts *fin;  
   PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(), 
-                           from, created, false, &t, &fin);
-  if (!pg) {
+                           from, created, false);
+  if (!pg)
     return;
-  }
-
   op->mark_started();
-
-  map< int, map<pg_t,pg_query_t> > query_map;
-  map< int, MOSDPGInfo* > info_map;
-  PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
-  pg->handle_log(m->get_epoch(), m->get_query_epoch(), from, m, &rctx);
-  pg->write_if_dirty(*t);
+  pg->queue_log(m->get_epoch(), m->get_query_epoch(), from, m);
   pg->unlock();
-  do_queries(query_map);
-  do_infos(info_map);
-
-  if (!t->empty()) {
-    int tr = store->queue_transaction(
-      &pg->osr,
-      t, new ObjectStore::C_DeleteTransaction(t), fin);
-    assert(!tr);
-  } else {
-    delete t;
-    delete fin;
-  }
-  maybe_update_heartbeat_peers();
 }
 
 void OSD::handle_pg_info(OpRequestRef op)
@@ -4490,8 +4466,6 @@ void OSD::handle_pg_info(OpRequestRef op)
 
   op->mark_started();
 
-  map< int, MOSDPGInfo* > info_map;
-
   int created = 0;
 
   for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator p = m->pg_list.begin();
@@ -4502,31 +4476,13 @@ void OSD::handle_pg_info(OpRequestRef op)
       continue;
     }
 
-    ObjectStore::Transaction *t = 0;
-    C_Contexts *fin = 0;
     PG *pg = get_or_create_pg(p->first, p->second, m->get_epoch(), 
-                             from, created, false, &t, &fin);
+                             from, created, false);
     if (!pg)
       continue;
-
-    PG::RecoveryCtx rctx(0, &info_map, 0, &fin->contexts, t);
-
-    pg->handle_info(m->get_epoch(), m->get_epoch(), from, *p, &rctx);
-    pg->write_if_dirty(*t);
-
-    if (!t->empty()) {
-      int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
-      assert(!tr);
-    } else {
-      delete t;
-      delete fin;
-    }
+    pg->queue_info(m->get_epoch(), m->get_epoch(), from,  p->first);
     pg->unlock();
   }
-
-  do_infos(info_map);
-
-  maybe_update_heartbeat_peers();
 }
 
 void OSD::handle_pg_trim(OpRequestRef op)
@@ -4738,14 +4694,10 @@ void OSD::handle_pg_query(OpRequestRef op)
     }
 
     pg = _lookup_lock_pg(pgid);
-
-    // ok, process query!
-    PG::RecoveryCtx rctx(0, 0, &notify_list, 0, 0);
-    pg->handle_query(m->get_epoch(), m->get_epoch(),
-                    from, it->second, &rctx);
+    pg->queue_query(m->get_epoch(), m->get_epoch(),
+                   from, it->second);
     pg->unlock();
   }
-  
   do_notifies(notify_list, m->get_epoch());
 }
 
@@ -5565,6 +5517,55 @@ PG *OSD::OpWQ::_dequeue()
   return pg;
 }
 
+void OSD::queue_for_peering(PG *pg)
+{
+  peering_wq._enqueue(pg);
+}
+
+void OSD::process_peering_event(PG *pg)
+{
+  map< int, map<pg_t, pg_query_t> > query_map;
+  map< int, vector<pair<pg_info_t, pg_interval_map_t> > > notify_list;
+  map<int,MOSDPGInfo*> info_map;  // peer -> message
+  {
+    pg->lock();
+    if (pg->peering_queue.empty()) {
+      pg->unlock();
+      return;
+    }
+    ObjectStore::Transaction *t = new ObjectStore::Transaction;
+    C_Contexts *pfin = new C_Contexts(g_ceph_context);
+    PG::RecoveryCtx rctx(&query_map, &info_map, &notify_list,
+                        &pfin->contexts, t);
+    PG::CephPeeringEvtRef evt = pg->peering_queue.front();
+    pg->peering_queue.pop_front();
+    pg->handle_peering_event(evt, &rctx);
+    if (!t->empty()) {
+      int tr = store->queue_transaction(
+       &pg->osr,
+       t, new ObjectStore::C_DeleteTransaction(t), pfin);
+      assert(tr == 0);
+    } else {
+      delete t;
+      delete pfin;
+    }
+    pg->unlock();
+  }
+  epoch_t current_epoch;
+  {
+    map_lock.get_read();
+    current_epoch = osdmap->get_epoch();
+    map_lock.put_read();
+  }
+  do_notifies(notify_list, current_epoch);
+  do_queries(query_map);
+  do_infos(info_map);
+  {
+    Mutex::Locker l(osd_lock);
+    maybe_update_heartbeat_peers();
+  }
+}
+
 /*
  * requeue ops at _front_ of queue.  these are previously queued
  * operations that need to get requeued ahead of anything the dispatch
index f7e7d8eac4e95756fe739a4156e1c5cb491d1d78..e88c5554f45239bfcf81926e51fc7a9fc2aaec4e 100644 (file)
@@ -379,6 +379,43 @@ private:
     o->dequeue_op(pg);
   };
 
+  // -- peering queue --
+  struct PeeringWQ : public ThreadPool::WorkQueue<PG> {
+    deque<PG*> peering_queue;
+    OSD *osd;
+    PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>(
+       "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
+
+    void _dequeue(PG *pg) {
+      assert(0);
+    }
+    bool _enqueue(PG *pg) {
+      pg->get();
+      peering_queue.push_back(pg);
+      return true;
+    }
+    bool _empty() {
+      return peering_queue.empty();
+    }
+    PG *_dequeue() {
+      if (peering_queue.empty())
+       return 0;
+      PG *retval = peering_queue.front();
+      peering_queue.pop_front();
+      return retval;
+    }
+    void _process(PG *pg) {
+      osd->process_peering_event(pg);
+      pg->put();
+    }
+    void _clear() {
+      assert(peering_queue.empty());
+    }
+  } peering_wq;
+
+  void queue_for_peering(PG *pg);
+  void process_peering_event(PG *pg);
 
   friend class PG;
   friend class ReplicatedPG;
@@ -471,15 +508,18 @@ protected:
   PG   *_lookup_lock_pg_with_map_lock_held(pg_t pgid);
   PG   *_open_lock_pg(pg_t pg, bool no_lockdep_check=false, bool hold_map_lock=false);
   PG   *_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock,
-                       int role, vector<int>& up, vector<int>& acting, pg_history_t history,
+                       int role, vector<int>& up, vector<int>& acting,
+                       pg_history_t history,
                        pg_interval_map_t& pi, ObjectStore::Transaction& t);
 
   PG *lookup_lock_raw_pg(pg_t pgid);
 
-  PG *get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
-                      epoch_t epoch, int from, int& pcreated, bool primary,
-                      ObjectStore::Transaction **pt,
-                      C_Contexts **pfin);
+  PG *get_or_create_pg(const pg_info_t& info,
+                      pg_interval_map_t& pi,
+                      epoch_t epoch, int from, int& pcreated,
+                      bool primary,
+                      ObjectStore::Transaction **pt = 0,
+                      C_Contexts **pfin = 0);
   
   void load_pgs();
   void calc_priors_during(pg_t pgid, epoch_t start, epoch_t end, set<int>& pset);
index 8e0095a7b6cc61db0ea044d3eddb877a221e391d..11a61cdd16f2f7d8575464b62be1b24cc5ba9033 100644 (file)
@@ -3890,53 +3890,53 @@ void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
   recovery_state.handle_event(evt, rctx);
 }
 
-void PG::handle_notify(epoch_t msg_epoch,
-                      epoch_t query_epoch,
-                      int from, pg_info_t& i,
-                      RecoveryCtx *rctx)
+void PG::queue_peering_event(CephPeeringEvtRef evt)
+{
+  if (old_peering_evt(evt))
+    return;
+  peering_queue.push_back(evt);
+  osd->queue_for_peering(this);
+}
+
+void PG::queue_notify(epoch_t msg_epoch,
+                     epoch_t query_epoch,
+                     int from, pg_info_t& i)
 {
   dout(10) << "handle_notify " << i << " from osd." << from << dendl;
-  handle_peering_event(
+  queue_peering_event(
     CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
-                                        MNotifyRec(from, i))),
-    rctx);
+                                        MNotifyRec(from, i))));
 }
 
-void PG::handle_info(epoch_t msg_epoch,
+void PG::queue_info(epoch_t msg_epoch,
                     epoch_t query_epoch,
-                    int from, pg_info_t& i,
-                    RecoveryCtx *rctx)
+                    int from, pg_info_t& i)
 {
   dout(10) << "handle_info " << i << " from osd." << from << dendl;
-  handle_peering_event(
+  queue_peering_event(
     CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
-                                        MInfoRec(from, i))),
-    rctx);
+                                        MInfoRec(from, i))));
 }
 
-void PG::handle_log(epoch_t msg_epoch,
-                   epoch_t query_epoch,
-                   int from,
-                   MOSDPGLog *msg,
-                   RecoveryCtx *rctx)
+void PG::queue_log(epoch_t msg_epoch,
+                  epoch_t query_epoch,
+                  int from,
+                  MOSDPGLog *msg)
 {
   dout(10) << "handle_log " << *msg << " from osd." << from << dendl;
-  handle_peering_event(
+  queue_peering_event(
     CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
-                                        MLogRec(from, msg))),
-    rctx);
+                                        MLogRec(from, msg))));
 }
 
-void PG::handle_query(epoch_t msg_epoch,
-                     epoch_t query_epoch,
-                     int from, const pg_query_t& q,
-                     RecoveryCtx *rctx)
+void PG::queue_query(epoch_t msg_epoch,
+                    epoch_t query_epoch,
+                    int from, const pg_query_t& q)
 {
   dout(10) << "handle_query " << q << " from osd." << from << dendl;
-  handle_peering_event(
+  queue_peering_event(
     CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
-                                        MQuery(from, q, query_epoch))),
-    rctx);
+                                        MQuery(from, q, query_epoch))));
 }
 
 void PG::handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap,
@@ -4639,7 +4639,9 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MQuery& query)
     pair<int, pg_info_t> notify_info;
     pg->update_history_from_master(query.query.history);
     pg->fulfill_info(query.from, query.query, notify_info);
-    context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
+    context< RecoveryMachine >().send_notify(notify_info.first,
+                                            notify_info.second,
+                                            pg->past_intervals);
   } else {
     pg->fulfill_log(query.from, query.query, query.query_epoch);
   }
index fe5ed0644d77721f9361931fb7f9dc54d6ac69e1..14e5495a838e739b6126be2942b23fefbbe58aa0 100644 (file)
@@ -798,6 +798,7 @@ public:
     const boost::statechart::event_base &get_event() { return *evt; }
   };
   typedef std::tr1::shared_ptr<CephPeeringEvt> CephPeeringEvtRef;
+  list<CephPeeringEvtRef> peering_queue;  // op queue
 
   struct QueryState : boost::statechart::event< QueryState > {
     Formatter *f;
@@ -1224,7 +1225,7 @@ public:
       machine.initiate();
     }
 
-    void handle_event(boost::statechart::event_base &evt,
+    void handle_event(const boost::statechart::event_base &evt,
                      RecoveryCtx *rctx) {
       start_handle(rctx);
       machine.process_event(evt);
@@ -1237,6 +1238,7 @@ public:
       machine.process_event(evt->get_event());
       end_handle();
     }
+
   } recovery_state;
 
 
@@ -1367,17 +1369,16 @@ public:
   }
 
   // recovery bits
+  void queue_peering_event(CephPeeringEvtRef evt);
   void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx);
-  void handle_notify(epoch_t msg_epoch, epoch_t query_epoch,
-                    int from, pg_info_t& i, RecoveryCtx *rctx);
-  void handle_info(epoch_t msg_epoch, epoch_t query_epoch,
-                  int from, pg_info_t& i, RecoveryCtx *rctx);
-  void handle_log(epoch_t msg_epoch, epoch_t query_epoch, int from,
-                 MOSDPGLog *msg,
-                 RecoveryCtx *rctx);
-  void handle_query(epoch_t msg_epoch, epoch_t query_epoch,
-                   int from, const pg_query_t& q,
-                   RecoveryCtx *rctx);
+  void queue_notify(epoch_t msg_epoch, epoch_t query_epoch,
+                   int from, pg_info_t& i);
+  void queue_info(epoch_t msg_epoch, epoch_t query_epoch,
+                 int from, pg_info_t& i);
+  void queue_log(epoch_t msg_epoch, epoch_t query_epoch, int from,
+                MOSDPGLog *msg);
+  void queue_query(epoch_t msg_epoch, epoch_t query_epoch,
+                  int from, const pg_query_t& q);
   void handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap,
                          vector<int>& newup, vector<int>& newacting,
                          RecoveryCtx *rctx);