]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,PG: added helper methods for creating and dispatching RecoveryCtxs
authorSamuel Just <sam.just@inktank.com>
Mon, 18 Jun 2012 17:08:11 +0000 (10:08 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:14:59 +0000 (10:14 -0700)
This is simpler than having to update all of the RecoveryCtx users
whenever we change the types in RecoveryCtx.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h

index a2972bf126a092bc61fcdd8a67d080e764b79562..a65b87061ed5ac4c7ec52f807afe1655f3a45696 100644 (file)
@@ -1364,7 +1364,7 @@ void OSD::load_pgs()
     int role = osdmap->calc_pg_role(whoami, pg->acting);
     pg->set_role(role);
 
-    PG::RecoveryCtx rctx(0, 0, 0, 0, 0);
+    PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0);
     pg->handle_loaded(&rctx);
 
     dout(10) << "load_pgs loaded " << *pg << " " << pg->log << dendl;
@@ -1382,8 +1382,6 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
                          epoch_t epoch, int from, int& created, bool primary)
 {
   PG *pg;
-  ObjectStore::Transaction *t = 0;
-  C_Contexts *fin = 0;
 
   if (!_have_pg(info.pgid)) {
     // same primary?
@@ -1426,17 +1424,11 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
     }
 
     // ok, create PG locally using provided Info and History
-    t = new ObjectStore::Transaction;
-    fin = new C_Contexts(g_ceph_context);
-    map< int, vector<pair<pg_notify_t, pg_interval_map_t> > >  notify_list;  // primary -> list
-    map< int, map<pg_t,pg_query_t> > query_map;    // peer -> PG -> get_summary_since
-    map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;  // peer -> message
-    PG::RecoveryCtx rctx(&query_map, &info_map, &notify_list, &fin->contexts, t);
-    pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, *t);
+    PG::RecoveryCtx rctx = create_context();
+    pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi,
+                        *rctx.transaction);
     pg->handle_create(&rctx);
-    do_notifies(notify_list);
-    do_queries(query_map);
-    do_infos(info_map);
+    dispatch_context(rctx, pg);
       
     created++;
     dout(10) << *pg << " is new" << dendl;
@@ -1454,17 +1446,6 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
       pg->unlock();
       return NULL;
     }
-    t = new ObjectStore::Transaction;
-    fin = new C_Contexts(g_ceph_context);
-  }
-  if (t && t->empty()) {
-    delete t;
-    delete fin;
-  } else if (t) {
-    int tr = store->queue_transaction(
-      &pg->osr,
-      t, new ObjectStore::C_DeleteTransaction(t), fin);
-    assert(tr == 0);
   }
   return pg;
 }
@@ -3943,6 +3924,7 @@ void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction&
 
   split_pg(parent, children, t); 
 
+#if 0
   // reset pg
   map< int, vector<pair<pg_notify_t, pg_interval_map_t> > >  notify_list;  // primary -> list
   map< int, map<pg_t,pg_query_t> > query_map;    // peer -> PG -> get_summary_since
@@ -3966,6 +3948,7 @@ void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction&
   do_notifies(notify_list);
   do_queries(query_map);
   do_infos(info_map);
+#endif
 }
 
 void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t)
@@ -4173,23 +4156,18 @@ void OSD::handle_pg_create(OpRequestRef op)
                                         osdmap->get_epoch());
     
     if (can_create_pg(pgid)) {
-      ObjectStore::Transaction *t = new ObjectStore::Transaction;
-      C_Contexts *fin = new C_Contexts(g_ceph_context);
       pg_interval_map_t pi;
+      PG::RecoveryCtx rctx = create_context();
       PG *pg = _create_lock_pg(pgid, true, false,
-                              0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history, pi,
-                              *t);
+                              0, creating_pgs[pgid].acting, creating_pgs[pgid].acting,
+                              history, pi,
+                              *rctx.transaction);
       creating_pgs.erase(pgid);
-
       wake_pg_waiters(pg->info.pgid);
-      PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
       pg->handle_create(&rctx);
-      pg->write_if_dirty(*t);
+      pg->write_if_dirty(*rctx.transaction);
       pg->update_stats();
-
-      int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
-      assert(tr == 0);
-
+      dispatch_context(rctx, pg);
       pg->unlock();
       num_created++;
     }
@@ -4205,6 +4183,57 @@ void OSD::handle_pg_create(OpRequestRef op)
 // ----------------------------------------
 // peering and recovery
 
+PG::RecoveryCtx OSD::create_context()
+{
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  C_Contexts *on_applied = new C_Contexts(g_ceph_context);
+  C_Contexts *on_safe = new C_Contexts(g_ceph_context);
+  map< int, map<pg_t,pg_query_t> > *query_map =
+    new map<int, map<pg_t, pg_query_t> >;
+  map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list =
+    new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;
+  map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map =
+    new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;
+  PG::RecoveryCtx rctx(query_map, info_map, notify_list,
+                      on_applied, on_safe, t);
+  return rctx;
+}
+
+void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg)
+{
+  if (!ctx.transaction->empty()) {
+    ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
+    int tr = store->queue_transaction(
+      &pg->osr,
+      ctx.transaction, ctx.on_applied, ctx.on_safe);
+    assert(tr == 0);
+    ctx.transaction = new ObjectStore::Transaction;
+    ctx.on_applied = new C_Contexts(g_ceph_context);
+    ctx.on_safe = new C_Contexts(g_ceph_context);
+  }
+}
+
+void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg)
+{
+  do_notifies(*ctx.notify_list);
+  delete ctx.notify_list;
+  do_queries(*ctx.query_map);
+  delete ctx.query_map;
+  do_infos(*ctx.info_map);
+  delete ctx.info_map;
+  if (ctx.transaction->empty() || !pg) {
+    delete ctx.transaction;
+    delete ctx.on_applied;
+    delete ctx.on_safe;
+  } else {
+    ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
+    int tr = store->queue_transaction(
+      &pg->osr,
+      ctx.transaction, ctx.on_applied, ctx.on_safe);
+    assert(tr == 0);
+  }
+}
+
 /** do_notifies
  * Send an MOSDPGNotify to a primary, with a list of PGs that I have
  * content for, and they are primary for.
@@ -4847,15 +4876,8 @@ void OSD::do_recovery(PG *pg)
     dout(20) << "  active was " << recovery_oids[pg->info.pgid] << dendl;
 #endif
     
-    ObjectStore::Transaction *t = new ObjectStore::Transaction;
-    C_Contexts *fin = new C_Contexts(g_ceph_context);
-    map< int, vector<pair<pg_notify_t, pg_interval_map_t> > >  notify_list;  // primary -> list
-    map< int, map<pg_t,pg_query_t> > query_map;    // peer -> PG -> get_summary_since
-    map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;  // peer -> message
-    PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
-
+    PG::RecoveryCtx rctx = create_context();
     int started = pg->start_recovery_ops(max, &rctx);
-    
     dout(10) << "do_recovery started " << started
             << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops) on "
             << *pg << dendl;
@@ -4867,8 +4889,8 @@ void OSD::do_recovery(PG *pg)
      * out while trying to pull.
      */
     if (!started && pg->have_unfound()) {
-      pg->discover_all_missing(query_map);
-      if (!query_map.size()) {
+      pg->discover_all_missing(*rctx.query_map);
+      if (!rctx.query_map->size()) {
        dout(10) << "do_recovery  no luck, giving up on this pg for now" << dendl;
        recovery_wq.lock();
        pg->recovery_item.remove_myself();      // sigh...
@@ -4876,19 +4898,9 @@ void OSD::do_recovery(PG *pg)
       }
     }
 
-    do_notifies(notify_list);
-    do_queries(query_map);
-    do_infos(info_map);
-
-    pg->write_if_dirty(*t);
 
-    if (!t->empty()) {
-      int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
-      assert(tr == 0);
-    } else {
-      delete t;
-      delete fin;
-    }
+    pg->write_if_dirty(*rctx.transaction);
+    dispatch_context(rctx, pg);
     pg->unlock();
   }
   pg->put();
@@ -5274,12 +5286,10 @@ void OSDService::queue_for_removal(epoch_t epoch, int osdnum, pg_t pgid) {
 
 void OSD::process_peering_event(PG *pg)
 {
-  map< int, map<pg_t, pg_query_t> > query_map;
-  map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
-  map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;  // peer -> message
   bool need_up_thru = false;
   epoch_t same_interval_since;
   OSDMapRef curmap;
+  PG::RecoveryCtx rctx = create_context();
   {
     map_lock.get_read();
     pg->lock();
@@ -5289,25 +5299,13 @@ void OSD::process_peering_event(PG *pg)
       pg->unlock();
       return;
     }
-    ObjectStore::Transaction *t = new ObjectStore::Transaction;
-    C_Contexts *pfin = new C_Contexts(g_ceph_context);
-    PG::RecoveryCtx rctx(&query_map, &info_map, &notify_list,
-                        &pfin->contexts, t);
     advance_pg(curmap->get_epoch(), pg, &rctx);
     if (!pg->peering_queue.empty()) {
       PG::CephPeeringEvtRef evt = pg->peering_queue.front();
       pg->peering_queue.pop_front();
       pg->handle_peering_event(evt, &rctx);
     }
-    if (!t->empty()) {
-      int tr = store->queue_transaction(
-       &pg->osr,
-       t, new ObjectStore::C_DeleteTransaction(t), pfin);
-      assert(tr == 0);
-    } else {
-      delete t;
-      delete pfin;
-    }
+    dispatch_context_transaction(rctx, pg);
     need_up_thru = pg->need_up_thru;
     same_interval_since = pg->info.history.same_interval_since;
     pg->unlock();
@@ -5316,9 +5314,7 @@ void OSD::process_peering_event(PG *pg)
     Mutex::Locker l(osd_lock);
     if (need_up_thru)
       queue_want_up_thru(same_interval_since);
-    do_notifies(notify_list);
-    do_queries(query_map);
-    do_infos(info_map);
+    dispatch_context(rctx, 0);
   }
   service.send_pg_temp();
 }
index 4999878890998e8fd7d96385c2dc3ec7538ece7c..03b3822536595f7f5a806092ed4defa84ba64209 100644 (file)
@@ -785,6 +785,9 @@ protected:
   }
 
   // -- generic pg peering --
+  PG::RecoveryCtx create_context();
+  void dispatch_context(PG::RecoveryCtx &ctx, PG *pg);
+  void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg);
   void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list);
   void do_queries(map< int, map<pg_t,pg_query_t> >& query_map);
   void do_infos(map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map);
index efbe3dcdb5d29991a2b9d0f8b13a5ab2d76e0506..eeb416d9036f89d67d4d34e8c41d4e9321e6f8d4 100644 (file)
@@ -4471,7 +4471,7 @@ PG::RecoveryState::Active::Active(my_context ctx)
   assert(pg->is_primary());
   dout(10) << "In Active, about to call activate" << dendl;
   pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
-              *context< RecoveryMachine >().get_context_list(),
+              *context< RecoveryMachine >().get_on_safe_context_list(),
               *context< RecoveryMachine >().get_query_map(),
               context< RecoveryMachine >().get_info_map());
   assert(pg->is_active());
@@ -4611,7 +4611,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const RecoveryComplet
 
   assert(!pg->needs_recovery());
   pg->finish_recovery(*context< RecoveryMachine >().get_cur_transaction(),
-                     *context< RecoveryMachine >().get_context_list());
+                     *context< RecoveryMachine >().get_on_safe_context_list());
   return discard_event();
 }
 
@@ -4695,7 +4695,7 @@ PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx)
   pg->last_peering_reset = pg->get_osdmap()->get_epoch();
 
   pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
-              *context< RecoveryMachine >().get_context_list(),
+              *context< RecoveryMachine >().get_on_safe_context_list(),
               query_map, NULL);
   dout(10) << "Activate Finished" << dendl;
 }
index 051bd7401ca484ab0d67aca839148a051c5d15c0..04c45e31299737e27b2fbad651c890bdf5faffae 100644 (file)
@@ -481,18 +481,20 @@ public:
     map< int, map<pg_t, pg_query_t> > *query_map;
     map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map;
     map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list;
-    list< Context* > *context_list;
+    C_Contexts *on_applied;
+    C_Contexts *on_safe;
     ObjectStore::Transaction *transaction;
-    RecoveryCtx() : query_map(0), info_map(0), notify_list(0),
-                   context_list(0), transaction(0) {}
     RecoveryCtx(map< int, map<pg_t, pg_query_t> > *query_map,
                map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map,
                map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list,
-               list< Context* > *context_list,
+               C_Contexts *on_applied,
+               C_Contexts *on_safe,
                ObjectStore::Transaction *transaction)
       : query_map(query_map), info_map(info_map), 
        notify_list(notify_list),
-       context_list(context_list), transaction(transaction) {}
+       on_applied(on_applied),
+       on_safe(on_safe),
+       transaction(transaction) {}
   };
 
   struct NamedState {
@@ -991,9 +993,14 @@ public:
        return state->rctx->info_map;
       }
 
-      list< Context* > *get_context_list() {
-       assert(state->rctx->context_list);
-       return state->rctx->context_list;
+      list< Context* > *get_on_safe_context_list() {
+       assert(state->rctx->on_safe);
+       return &(state->rctx->on_safe->contexts);
+      }
+
+      list< Context * > *get_on_applied_context_list() {
+       assert(state->rctx->on_applied);
+       return &(state->rctx->on_applied->contexts);
       }
 
       void send_notify(int to, const pg_notify_t &info, const pg_interval_map_t &pi) {