]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
fixed osd bug on failure that caused clients to hang
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 26 Nov 2007 21:26:34 +0000 (21:26 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 26 Nov 2007 21:26:34 +0000 (21:26 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2116 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/client/Client.cc
trunk/ceph/osd/OSD.cc
trunk/ceph/osd/PG.h
trunk/ceph/osd/ReplicatedPG.cc
trunk/ceph/osd/ReplicatedPG.h
trunk/ceph/osdc/Objecter.cc

index 3241b5c882818dd4b939b04c5e161b196a308e41..cdf56673ed8edba8d841342297835e912d27149a 100644 (file)
@@ -583,7 +583,7 @@ int Client::choose_target_mds(MClientRequest *req)
   if (!diri || g_conf.client_use_random_mds) {
     // no root info, pick a random MDS
     mds = mdsmap->get_random_in_mds();
-    dout(0) << "random mds" << mds << dendl;
+    dout(10) << "random mds" << mds << dendl;
     if (mds < 0) mds = 0;
 
     if (0) {
index 9e5210a4bc862120c2d29589c6777651062b8eb7..2f0828f9461d1384acd609d72089de4b3432b786 100644 (file)
@@ -1223,17 +1223,6 @@ void OSD::handle_osd_map(MOSDMap *m)
         if (osd == whoami) continue;
         messenger->mark_down(osdmap->get_addr(i->first));
         peer_map_epoch.erase(entity_name_t::OSD(i->first));
-      
-        // kick any replica ops
-        for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
-             it != pg_map.end();
-             it++) {
-          PG *pg = it->second;
-
-         pg->lock();
-         pg->note_failed_osd(osd);
-         pg->unlock();
-        }
       }
       for (map<int32_t,entity_addr_t>::iterator i = inc.new_up.begin();
            i != inc.new_up.end();
@@ -1416,10 +1405,13 @@ void OSD::advance_map(ObjectStore::Transaction& t)
       if (oldrole == 0 || pg->get_role() == 0)
         pg->clear_primary_state();
 
+      // pg->on_*
+      for (int i=0; i<oldacting.size(); i++)
+       if (osdmap->is_down(oldacting[i]))
+         pg->on_osd_failure(oldacting[i]);
       pg->on_change();
-      if (oldacker != pg->get_acker() && oldacker == whoami) {
+      if (oldacker != pg->get_acker() && oldacker == whoami)
        pg->on_acker_change();
-      }
 
       if (role != oldrole) {
         // old primary?
@@ -2159,7 +2151,6 @@ void OSD::handle_op(MOSDOp *op)
     // REGULAR OP (non-replication)
 
     // note original source
-    op->set_client_inst( op->get_source_inst() );
     op->clear_payload();    // and hose encoded payload (in case we forward)
 
     // have pg?
@@ -2357,6 +2348,7 @@ void OSD::handle_op_reply(MOSDOpReply *op)
  */
 void OSD::enqueue_op(PG *pg, Message *op)
 {
+  dout(15) << *pg << " enqueue_op " << op << " " << *op << dendl;
   // add to pg's op_queue
   pg->op_queue.push_back(op);
   pending_ops++;
index 3a8ebb9b8e4b1625e84d4b09f43a22fd04fc8984..cf1174dd9163875bb3edfd2678873e3103a66212 100644 (file)
@@ -656,8 +656,7 @@ public:
   virtual bool is_missing_object(object_t oid) = 0;
   virtual void wait_for_missing_object(object_t oid, MOSDOp *op) = 0;
 
-  virtual void note_failed_osd(int osd) = 0;
-
+  virtual void on_osd_failure(int osd) = 0;
   virtual void on_acker_change() = 0;
   virtual void on_role_change() = 0;
   virtual void on_change() = 0;
index 07ab10fadbd20545ed8b25ea12e53eb6dbc09932..327bc6d46ac26380261b204cb0f0cb3825231bbd 100644 (file)
@@ -858,6 +858,8 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
       MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
       dout(10) << "put_repop  sending ack on " << *repop << " " << reply << dendl;
       osd->messenger->send_message(reply, repop->op->get_client_inst());
+    } else {
+      dout(10) << "put_repop  NOT sending ack on " << *repop << dendl;
     }
     repop->sent_ack = true;
 
@@ -1650,15 +1652,15 @@ void ReplicatedPG::op_push(MOSDOp *op)
 
 
 
-void ReplicatedPG::note_failed_osd(int o)
+void ReplicatedPG::on_osd_failure(int o)
 {
-  dout(10) << "note_failed_osd " << o << dendl;
+  dout(10) << "on_osd_failure " << o << dendl;
   // do async; repop_ack() may modify pg->repop_gather
   list<RepGather*> ls;  
   for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
        p != rep_gather.end();
        p++) {
-    //dout(-1) << "checking repop tid " << p->first << dendl;
+    dout(-1) << "checking repop tid " << p->first << dendl;
     if (p->second->waitfor_ack.count(o) ||
        p->second->waitfor_commit.count(o)) 
       ls.push_back(p->second);
@@ -1677,26 +1679,42 @@ void ReplicatedPG::on_acker_change()
 
 void ReplicatedPG::on_change()
 {
-  // apply repops
-  for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
-       p != rep_gather.end();
-       p++) {
-    if (!p->second->applied)
-      apply_repop(p->second);
-    delete p->second->op;
-    delete p->second;
+  dout(10) << "on_change" << dendl;
+
+  if (g_conf.osd_rep == OSD_REP_PRIMARY ||
+      g_conf.osd_rep == OSD_REP_SPLAY) {
+    // apply all local repops
+    //  (pg is inactive; we will repeer)
+    for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
+        p != rep_gather.end();
+        p++) 
+      if (!p->second->applied)
+       apply_repop(p->second);
+  }
+  else if (g_conf.osd_rep == OSD_REP_CHAIN) {
+    // apply all local repops
+    //  (pg is inactive; we will repeer)
+    //  note: because we hose rep_gather, clients must resubmit ops on ANY pg membership change.
+    for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
+        p != rep_gather.end();
+        p++) {
+      if (!p->second->applied)
+       apply_repop(p->second);
+      delete p->second->op;
+      delete p->second;
+    }
+    rep_gather.clear();
+    
+    // and discard repop waiters (chain/splay artifact)
+    for (hash_map<tid_t, list<Message*> >::iterator p = waiting_for_repop.begin();
+        p != waiting_for_repop.end();
+        p++)
+      for (list<Message*>::iterator pm = p->second.begin();
+          pm != p->second.end();
+          pm++)
+       delete *pm;
+    waiting_for_repop.clear();
   }
-  rep_gather.clear();
-  
-  // and discard repop waiters (chain/splay artifact)
-  for (hash_map<tid_t, list<Message*> >::iterator p = waiting_for_repop.begin();
-       p != waiting_for_repop.end();
-       p++)
-    for (list<Message*>::iterator pm = p->second.begin();
-        pm != p->second.end();
-        pm++)
-      delete *pm;
-  waiting_for_repop.clear();
 }
 
 
index 628b3a6eeda455480597647c38247abbeefda195..0d4066ad69c674a9d984e99ea5358d36b512ae41 100644 (file)
@@ -147,7 +147,7 @@ public:
   bool is_missing_object(object_t oid);
   void wait_for_missing_object(object_t oid, MOSDOp *op);
 
-  void note_failed_osd(int o);
+  void on_osd_failure(int o);
   void on_acker_change();
   void on_role_change();
   void on_change();
index a96578c275fdc28a909433f440efdb5d451f448e..ab957aac0a43aeaaff85b841a689b308809293bf 100644 (file)
@@ -274,9 +274,9 @@ void Objecter::tick()
        i++) {
     if (!i->second.active_tids.empty() &&
        i->second.last < cutoff) {
-      dout(10) << "tick pg " << i->first << " is laggy" << dendl;
+      dout(10) << "tick pg " << i->first << " is laggy: " << i->second.active_tids << dendl;
       maybe_request_map();
-      break;
+      //break;
     }
   }
 
@@ -295,8 +295,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     break;
 
   case OSD_OP_STAT:
-       handle_osd_stat_reply(m);
-       break;
+    handle_osd_stat_reply(m);
+    break;
     
   case OSD_OP_WRNOOP:
   case OSD_OP_WRITE:
@@ -801,7 +801,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
 
   dout(7) << "handle_osd_modify_reply " << tid 
           << (m->get_commit() ? " commit":" ack")
-          << " v " << m->get_version()
+          << " v " << m->get_version() << " in " << m->get_pg()
           << dendl;
   OSDModify *wr = op_modify[ tid ];
 
@@ -828,6 +828,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
     // remove from tid/osd maps
     assert(pg.active_tids.count(tid));
     pg.active_tids.erase(tid);
+    dout(15) << "handle_osd_modify_reply pg " << m->get_pg() << " still has " << pg.active_tids << dendl;
     if (pg.active_tids.empty()) close_pg( m->get_pg() );
 
     // commit.