]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: do not apply_transaction in finish_recovery
authorSage Weil <sage@newdream.net>
Tue, 10 Nov 2009 22:51:20 +0000 (14:51 -0800)
committerSage Weil <sage@newdream.net>
Tue, 10 Nov 2009 23:01:22 +0000 (15:01 -0800)
finish_recovery needs to set up a callback for when the current set of
changes commit to disk (to kickstart cleanup of strya replicas etc).  We
can't call apply_transaction this deep inside the call chain without
causing problems.  So, pass a list of completion contexts all the way down
so that we can set up the completion callback.

src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc

index 28b5c0c77928566ccd1846632e639e8173187b59..b4825a168b6760d1913655c2d66a495359e9f88e 100644 (file)
@@ -1749,6 +1749,7 @@ void OSD::handle_osd_map(MOSDMap *m)
   assert(osd_lock.is_locked());
 
   ObjectStore::Transaction t;
+  C_Contexts *fin = new C_Contexts;
   
   logger->inc(l_osd_map);
 
@@ -1932,7 +1933,7 @@ void OSD::handle_osd_map(MOSDMap *m)
     if (osdmap->is_up(whoami) &&
        osdmap->get_addr(whoami) == messenger->get_myaddr()) {
       // yay!
-      activate_map(t);
+      activate_map(t, fin->contexts);
 
       // process waiters
       take_waiters(waiting_for_osdmap);
@@ -1970,7 +1971,7 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   // superblock and commit
   write_superblock(t);
-  int r = store->apply_transaction(t);
+  int r = store->apply_transaction(t, fin);
   if (r) {
     char buf[80];
     dout(0) << "error writing map: " << r << " " << strerror_r(-r, buf, sizeof(buf)) << dendl;
@@ -2221,7 +2222,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
   }
 }
 
-void OSD::activate_map(ObjectStore::Transaction& t)
+void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
 {
   assert(osd_lock.is_locked());
 
@@ -2249,7 +2250,7 @@ void OSD::activate_map(ObjectStore::Transaction& t)
       // i am (inactive) primary
       if (!pg->is_peering() || 
          (pg->need_up_thru && up_thru >= pg->info.history.same_acting_since))
-       pg->peer(t, query_map, &info_map);
+       pg->peer(t, tfin, query_map, &info_map);
     }
     else if (pg->is_stray() &&
             pg->get_primary() >= 0) {
@@ -2503,6 +2504,7 @@ void OSD::kick_pg_split_queue()
 
     // create and lock children
     ObjectStore::Transaction t;
+    C_Contexts *fin = new C_Contexts;
     map<pg_t,PG*> children;
     for (set<pg_t>::iterator q = p->second.begin();
         q != p->second.end();
@@ -2529,12 +2531,12 @@ void OSD::kick_pg_split_queue()
 
       wake_pg_waiters(pg->info.pgid);
 
-      pg->peer(t, query_map, &info_map);
+      pg->peer(t, fin->contexts, query_map, &info_map);
       pg->update_stats();
       pg->unlock();
       created++;
     }
-    int tr = store->apply_transaction(t);
+    int tr = store->apply_transaction(t, fin);
     assert(tr == 0);
 
     // remove from queue
@@ -2624,6 +2626,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
   map<int, MOSDPGInfo*> info_map;
 
   ObjectStore::Transaction t;
+  C_Contexts *fin = new C_Contexts;
   vector<PG*> to_peer;
 
   for (map<pg_t,MOSDPGCreate::create_rec>::iterator p = m->mkpg.begin();
@@ -2700,18 +2703,18 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
     }
   }
 
-  int tr = store->apply_transaction(t);
-  assert(tr == 0);
-
   for (vector<PG*>::iterator p = to_peer.begin(); p != to_peer.end(); p++) {
     PG *pg = *p;
     pg->lock();
     wake_pg_waiters(pg->info.pgid);
-    pg->peer(t, query_map, &info_map);
+    pg->peer(t, fin->contexts, query_map, &info_map);
     pg->update_stats();
     pg->unlock();
   }
 
+  int tr = store->apply_transaction(t, fin);
+  assert(tr == 0);
+
   do_queries(query_map);
   do_infos(info_map);
 
@@ -2788,6 +2791,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
 
   ObjectStore::Transaction t;
+  C_Contexts *fin = new C_Contexts;
   
   // look for unknown PGs i'm primary for
   map< int, map<pg_t,PG::Query> > query_map;
@@ -2878,13 +2882,13 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
        pg->state_clear(PG_STATE_CLEAN);
       }
       
-      pg->peer(t, query_map, &info_map);
+      pg->peer(t, fin->contexts, query_map, &info_map);
       pg->update_stats();
     }
     pg->unlock();
   }
   
-  int tr = store->apply_transaction(t);
+  int tr = store->apply_transaction(t, fin);
   assert(tr == 0);
 
   do_queries(query_map);
@@ -2916,6 +2920,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
                           int& created)
 {
   ObjectStore::Transaction t;
+  C_Contexts *fin = new C_Contexts;
 
   PG *pg = 0;
   if (!_have_pg(info.pgid)) {
@@ -2972,7 +2977,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
        
        // peer
        map< int, map<pg_t,PG::Query> > query_map;
-       pg->peer(t, query_map, info_map);
+       pg->peer(t, fin->contexts, query_map, info_map);
        pg->update_stats();
        do_queries(query_map);
       } else {
@@ -2986,7 +2991,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
       // i am REPLICA
       if (!pg->is_active()) {
        pg->merge_log(t, info, log, missing, from);
-       pg->activate(t, info_map);
+       pg->activate(t, fin->contexts, info_map);
       } else {
        // just update our stats
        dout(10) << *pg << " writing updated stats" << dendl;
@@ -2996,7 +3001,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
     }
   }
 
-  int tr = store->apply_transaction(t);
+  int tr = store->apply_transaction(t, fin);
   assert(tr == 0);
 
   pg->unlock();
@@ -3442,13 +3447,14 @@ void OSD::generate_backlog(PG *pg)
 
     map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
     ObjectStore::Transaction t;
-    pg->peer(t, query_map, NULL);
+    C_Contexts *fin = new C_Contexts;
+    pg->peer(t, fin->contexts, query_map, NULL);
     do_queries(query_map);
     if (pg->dirty_info)
       pg->write_info(t);
     if (pg->dirty_log)
       pg->write_log(t);
-    int tr = store->apply_transaction(t);
+    int tr = store->apply_transaction(t, fin);
     assert(tr == 0);
   }
 
@@ -3493,8 +3499,9 @@ void OSD::activate_pg(pg_t pgid, utime_t activate_at)
        pg->get_role() == 0 &&
        pg->replay_until == activate_at) {
       ObjectStore::Transaction t;
-      pg->activate(t);
-      int tr = store->apply_transaction(t);
+      C_Contexts *fin = new C_Contexts;
+      pg->activate(t, fin->contexts);
+      int tr = store->apply_transaction(t, fin);
       assert(tr == 0);
     }
     pg->unlock();
index 9ab63f9534868e6f890aee6449d0968e61b605cd..ee8e633aa6c8898744bf4da762ec02aa2b5c49bb 100644 (file)
@@ -399,7 +399,7 @@ private:
   void note_up_osd(int osd);
   
   void advance_map(ObjectStore::Transaction& t);
-  void activate_map(ObjectStore::Transaction& t);
+  void activate_map(ObjectStore::Transaction& t, list<Context*>& tfin);
 
   // osd map cache (past osd maps)
   map<epoch_t,OSDMap*> map_cache;
index b890f28e9d5c5ca61e03e6cd9872c3c3166babcc..457bf92ed064d7207e5d4be18f80dceb865de3b6 100644 (file)
@@ -1202,7 +1202,7 @@ bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
 }
 
 
-void PG::peer(ObjectStore::Transaction& t, 
+void PG::peer(ObjectStore::Transaction& t, list<Context*>& tfin,
               map< int, map<pg_t,Query> >& query_map,
              map<int, MOSDPGInfo*> *activator_map)
 {
@@ -1385,14 +1385,14 @@ void PG::peer(ObjectStore::Transaction& t,
   } 
   else if (!is_active()) {
     // -- ok, activate!
-    activate(t, activator_map);
+    activate(t, tfin, activator_map);
   }
   else if (is_all_uptodate()) 
-    finish_recovery();
+    finish_recovery(t, tfin);
 }
 
 
-void PG::activate(ObjectStore::Transaction& t,
+void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
                  map<int, MOSDPGInfo*> *activator_map)
 {
   assert(!is_active());
@@ -1545,7 +1545,7 @@ void PG::activate(ObjectStore::Transaction& t,
     
     // all clean?
     if (is_all_uptodate()) 
-      finish_recovery();
+      finish_recovery(t, tfin);
     else {
       dout(10) << "activate not all replicas are uptodate, queueing recovery" << dendl;
       osd->queue_for_recovery(this);
@@ -1604,7 +1604,7 @@ struct C_PG_FinishRecovery : public Context {
   }
 };
 
-void PG::finish_recovery()
+void PG::finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin)
 {
   dout(10) << "finish_recovery" << dendl;
   state_set(PG_STATE_CLEAN);
@@ -1612,15 +1612,13 @@ void PG::finish_recovery()
 
   clear_recovery_state();
 
+  write_info(t);
+
   /*
    * sync all this before purging strays.  but don't block!
    */
   finish_sync_event = new C_PG_FinishRecovery(this);
-
-  ObjectStore::Transaction t;
-  write_info(t);
-  int tr = osd->store->apply_transaction(t, finish_sync_event);
-  assert(tr == 0);
+  tfin.push_back(finish_sync_event);
 }
 
 void PG::_finish_recovery(Context *c)
index 81baeebc3d1d71f44bc92df6fae03c46dd4b215b..57e24551ff6d2718a83a5edbf28228dfe3a6c7a8 100644 (file)
@@ -775,10 +775,10 @@ public:
 
   bool choose_acting(int newest_update_osd);
   bool recover_master_log(map< int, map<pg_t,Query> >& query_map);
-  void peer(ObjectStore::Transaction& t, 
+  void peer(ObjectStore::Transaction& t, list<Context*>& tfin,
            map< int, map<pg_t,Query> >& query_map,
            map<int, MOSDPGInfo*> *activator_map=0);
-  void activate(ObjectStore::Transaction& t, 
+  void activate(ObjectStore::Transaction& t, list<Context*>& tfin,
                map<int, MOSDPGInfo*> *activator_map=0);
 
   virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
@@ -789,7 +789,7 @@ public:
 
   Context *finish_sync_event;
 
-  void finish_recovery();
+  void finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin);
   void _finish_recovery(Context *c);
   void cancel_recovery();
   void clear_recovery_state();
index dd30a7c9e940c076001185221f7b456ec2dd1cb0..aabc74546b7a659d113dd54065e811f95d006f53 100644 (file)
@@ -3443,7 +3443,11 @@ int ReplicatedPG::recover_primary(int max)
   uptodate_set.insert(osd->whoami);
   if (is_all_uptodate()) {
     dout(-7) << "recover_primary complete" << dendl;
-    finish_recovery();
+    ObjectStore::Transaction t;
+    C_Contexts *fin = new C_Contexts;
+    finish_recovery(t, fin->contexts);
+    int tr = osd->store->apply_transaction(t, fin);
+    assert(tr == 0);
   } else {
     dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
   }
@@ -3492,7 +3496,11 @@ int ReplicatedPG::recover_replicas(int max)
   dout(-10) << "recover_replicas - nothing to do!" << dendl;
 
   if (is_all_uptodate()) {
-    finish_recovery();
+    ObjectStore::Transaction t;
+    C_Contexts *fin = new C_Contexts;
+    finish_recovery(t, fin->contexts);
+    int tr = osd->store->apply_transaction(t, fin);
+    assert(tr == 0);
   } else {
     dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl;
   }