]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: lots of fixes
authorSage Weil <sage@newdream.net>
Tue, 25 Nov 2008 18:02:48 +0000 (10:02 -0800)
committerSage Weil <sage@newdream.net>
Tue, 25 Nov 2008 18:02:48 +0000 (10:02 -0800)
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/RAID4PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index cfe3a0555b6d2ae3991e1144c444c7a14a504419..eef9540eda50714e05b0af572d6ff5e4008b1ef8 100644 (file)
@@ -497,6 +497,11 @@ int OSD::shutdown()
   snap_trim_wq.stop();
   dout(10) << "snap trim wq stopped" << dendl;
 
+  // tell pgs we're shutting down
+  for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
+       p != pg_map.end();
+       p++)
+    p->second->on_shutdown();
 
   // zap waiters (bleh, this is messy)
   finished_lock.Lock();
@@ -525,12 +530,12 @@ int OSD::shutdown()
   for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
        p != pg_map.end();
        p++) {
-    delete p->second;
+    PG *pg = p->second;
+    pg->lock();
+    pg->put_unlock();
   }
   pg_map.clear();
 
-  // shut everything else down
-  //monitor->shutdown();
   messenger->shutdown();
   return r;
 }
index 171ab796e01e9cfec25a051131de350d57655b61..42c063f29b9b5d70443f9548a609724efdee6f10 100644 (file)
@@ -470,6 +470,29 @@ void PG::merge_log(ObjectStore::Transaction& t, Log &olog, Missing &omissing, in
   if (changed) {
     write_info(t);
     write_log(t);
+
+    // init complete pointer
+    if (missing.num_missing() == 0 &&
+       info.last_complete != info.last_update) {
+      dout(10) << "merge_log - no missing, moving last_complete " << info.last_complete 
+              << " -> " << info.last_update << dendl;
+      info.last_complete = info.last_update;
+    }
+
+    if (info.last_complete == info.last_update) {
+      dout(10) << "merge_log - complete" << dendl;
+      log.complete_to = log.log.end();
+      log.requested_to = log.log.end();
+    } else {
+      dout(10) << "merge_log - not complete, " << missing << dendl;
+      
+      log.complete_to = log.log.begin();
+      while (log.complete_to->version < info.last_complete) {
+       log.complete_to++;
+       assert(log.complete_to != log.log.end());
+      }
+      log.requested_to = log.complete_to;
+    }
   }
 }
 
@@ -1154,37 +1177,20 @@ void PG::activate(ObjectStore::Transaction& t,
   if (!info.dead_snaps.empty())
     queue_snap_trim();
 
-  // init complete pointer
-  if (missing.num_missing() == 0 &&
-      info.last_complete != info.last_update) {
-    dout(10) << "activate - no missing, moving last_complete " << info.last_complete 
-            << " -> " << info.last_update << dendl;
-    info.last_complete = info.last_update;
-  }
-
   if (info.last_complete == info.last_update) {
     dout(10) << "activate - complete" << dendl;
-    log.complete_to == log.log.end();
+    log.complete_to = log.log.end();
     log.requested_to = log.log.end();
-  } 
-  else if (true) {
+  } else {
     dout(10) << "activate - not complete, " << missing << dendl;
-    
-    // init complete_to
-    log.complete_to = log.log.begin();
-    while (log.complete_to->version < info.last_complete) {
-      log.complete_to++;
-      assert(log.complete_to != log.log.end());
-    }
+    assert(log.complete_to->version >= info.last_complete);
     
     if (is_primary()) {
       // start recovery
       dout(10) << "activate - starting recovery" << dendl;    
-      log.requested_to = log.complete_to;
+      assert(log.requested_to == log.complete_to);
       osd->queue_for_recovery(this);
     }
-  } else {
-    dout(10) << "activate - not complete, " << missing << dendl;
   }
 
   // if primary..
index 4fb43a85239fe212411c667e45a2b22212c56d31..c146f10a925a03a1796902f78d3687260d745868 100644 (file)
@@ -781,6 +781,7 @@ public:
   virtual void on_acker_change() = 0;
   virtual void on_role_change() = 0;
   virtual void on_change() = 0;
+  virtual void on_shutdown() = 0;
 };
 
 WRITE_CLASS_ENCODER(PG::Info::History)
index d767f3b975c2967114581a62f87c6ff852074338..608b95133c545ecebeebc6eeb5fae52414b08e09 100644 (file)
@@ -68,6 +68,7 @@ public:
   void on_acker_change();
   void on_role_change();
   void on_change();
+  void on_shutdown() {}
 };
 
 
index 0d5fa0f6ebf6a8e8cb08cd499da2f57745ac5e3e..d2773386f6820f1f1930a4e475c2fb24066f4566 100644 (file)
@@ -34,7 +34,8 @@
 #undef dout_prefix
 #define dout_prefix _prefix(this, osd->whoami, osd->osdmap)
 static ostream& _prefix(PG *pg, int whoami, OSDMap *osdmap) {
-  return *_dout << dbeginl<< pthread_self() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " " << *pg << " ";
+  return *_dout << dbeginl << pthread_self() << " osd" << whoami
+               << " " << (osdmap ? osdmap->get_epoch():0) << " " << *pg << " ";
 }
 
 
@@ -1201,12 +1202,12 @@ public:
 void ReplicatedPG::op_modify_ondisk(RepGather *repop)
 {
   if (repop->aborted) {
-    dout(10) << "op_modify_ondisk " << *repop->op << " -- aborted" << dendl;
+    dout(10) << "op_modify_ondisk " << *repop << " -- aborted" << dendl;
   } else {
-    dout(10) << "op_modify_ondisk " << *repop->op << dendl;
+    dout(10) << "op_modify_ondisk " << *repop << dendl;
     assert(repop->waitfor_disk.count(osd->get_nodeid()));
-    repop->waitfor_nvram.erase(osd->get_nodeid());
     repop->waitfor_disk.erase(osd->get_nodeid());
+    repop->waitfor_nvram.erase(osd->get_nodeid());
     repop->pg_complete_thru[osd->get_nodeid()] = repop->pg_local_last_complete;
     eval_repop(repop);
   }
@@ -1381,7 +1382,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, tid_t rep_tid, ever
 
   repop->start = g_clock.now();
 
-  repop_queue.push_back(repop);
+  repop_queue.push_back(&repop->queue_item);
   repop_map[repop->rep_tid] = repop;
   repop->get();
 
@@ -2304,9 +2305,9 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
   missing_loc.erase(poid.oid);
 
   // raise last_complete?
-  assert(log.complete_to != log.log.end());
   while (log.complete_to != log.log.end()) {
-    if (missing.missing.count(log.complete_to->oid)) break;
+    if (missing.missing.count(log.complete_to->oid))
+      break;
     if (info.last_complete < log.complete_to->version)
       info.last_complete = log.complete_to->version;
     log.complete_to++;
@@ -2364,9 +2365,10 @@ void ReplicatedPG::on_osd_failure(int o)
   dout(10) << "on_osd_failure " << o << dendl;
 
   // artificially ack failed osds
-  deque<RepGather*>::iterator p = repop_queue.begin();
-  while (p != repop_queue.end()) {
-    RepGather *repop = *p++;
+  xlist<RepGather*>::iterator p = repop_queue.begin();
+  while (!p.end()) {
+    RepGather *repop = *p;
+    ++p;
     dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl;
     if (repop->waitfor_ack.count(o) ||
        repop->waitfor_nvram.count(o) ||
@@ -2392,44 +2394,62 @@ void ReplicatedPG::on_acker_change()
   dout(10) << "on_acker_change" << dendl;
 }
 
+void ReplicatedPG::on_shutdown()
+{
+  dout(10) << "on_shutdown" << dendl;
+
+  // apply all local repops
+  //  (pg is inactive; we will repeer)
+  xlist<RepGather*>::iterator p = repop_queue.begin();
+  while (!p.end()) {
+    RepGather *repop = *p;
+    ++p;
+    if (!repop->applied)
+      apply_repop(repop);
+    repop->queue_item.remove_myself();
+    repop->put();
+  }
+}
+
 void ReplicatedPG::on_change()
 {
   dout(10) << "on_change" << dendl;
 
   // apply all local repops
   //  (pg is inactive; we will repeer)
-  for (deque<RepGather*>::iterator p = repop_queue.begin();
-       p != repop_queue.end();
-       p++) 
+  for (xlist<RepGather*>::iterator p = repop_queue.begin();
+       !p.end(); ++p)
     if (!(*p)->applied)
       apply_repop(*p);
 
-  deque<RepGather*>::iterator p = repop_queue.begin(); 
-  while (p != repop_queue.end()) {
+  xlist<RepGather*>::iterator p = repop_queue.begin(); 
+  while (!p.end()) {
     RepGather *repop = *p;
+    ++p;
 
     if (!is_primary()) {
       // no longer primary.  hose repops.
       dout(-1) << " aborting repop tid " << repop->rep_tid << dendl;
       repop->aborted = true;
-      repop_queue.erase(p++);
+      repop->queue_item.remove_myself();
       repop_map.erase(repop->rep_tid);
       repop->put();
     } else {
       // still primary. artificially ack+commit any replicas who dropped out of the pg
-      p++;
       dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl;
       set<int> all;
       set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(),
                repop->waitfor_ack.begin(), repop->waitfor_ack.end(),
                inserter(all, all.begin()));
       for (set<int>::iterator q = all.begin(); q != all.end(); q++) {
+       if (*q == osd->get_nodeid())
+         continue;
        bool have = false;
        for (unsigned i=1; i<acting.size(); i++)
          if (acting[i] == *q) 
            have = true;
        if (!have)
-         repop_ack(repop, -EIO, true, *q);
+         repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, *q);
       }
     }
   }
index 61ff11b3f8d3934d3e37d606d508cdbe253e48ce..651f894f91fcaaf8b4146b6012564116e5273935 100644 (file)
@@ -28,6 +28,7 @@ public:
    */
   class RepGather {
   public:
+    xlist<RepGather*>::item queue_item;
     int nref;
 
     class MOSDOp *op;
@@ -54,6 +55,7 @@ public:
     
     RepGather(MOSDOp *o, tid_t rt, eversion_t av, eversion_t lc,
              SnapSet& ss, SnapContext& sc) :
+      queue_item(this),
       nref(1), op(o), rep_tid(rt),
       applied(false), aborted(false),
       sent_ack(false), sent_nvram(false), sent_disk(false),
@@ -87,6 +89,7 @@ public:
       if (--nref == 0) {
        delete op;
        delete this;
+       generic_dout(0) << "deleting " << this << dendl;
       }
     }
   };
@@ -94,7 +97,7 @@ public:
 protected:
   // replica ops
   // [primary|tail]
-  deque<RepGather*> repop_queue;
+  xlist<RepGather*> repop_queue;
   map<tid_t, RepGather*> repop_map;
 
   void apply_repop(RepGather *repop);
@@ -205,6 +208,7 @@ public:
   void on_acker_change();
   void on_role_change();
   void on_change();
+  void on_shutdown();
 };
 
 
@@ -212,11 +216,11 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
 {
   out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid 
       << " wfack=" << repop.waitfor_ack
-      << " wfnvram=" << repop.waitfor_nvram
+    //<< " wfnvram=" << repop.waitfor_nvram
       << " wfdisk=" << repop.waitfor_disk;
   out << " pct=" << repop.pg_complete_thru;
   out << " op=" << *(repop.op);
-  out << " repop=" << &repop;
+  out << " " << &repop;
   out << ")";
   return out;
 }