]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: clean up repop code
authorSage Weil <sage@newdream.net>
Tue, 25 Nov 2008 14:45:48 +0000 (06:45 -0800)
committerSage Weil <sage@newdream.net>
Tue, 25 Nov 2008 14:45:48 +0000 (06:45 -0800)
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 4a2effcb419e458f61abd050adfa1f4c02b297ee..0d5fa0f6ebf6a8e8cb08cd499da2f57745ac5e3e 100644 (file)
@@ -1168,38 +1168,57 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
 }
 
 
+
+
+
+
+
 // ========================================================================
 // rep op gather
 
 class C_OSD_ModifyCommit : public Context {
 public:
   ReplicatedPG *pg;
-  tid_t rep_tid;
-  eversion_t pg_last_complete;
-  C_OSD_ModifyCommit(ReplicatedPG *p, tid_t rt, eversion_t lc) : pg(p), rep_tid(rt), pg_last_complete(lc) {
-    pg->get();  // we're copying the pointer
+  ReplicatedPG::RepGather *repop;
+
+  C_OSD_ModifyCommit(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
+    pg(p), repop(rg) {
+    repop->get();
+    pg->get();    // we're copying the pointer
   }
   void finish(int r) {
     pg->lock();
     if (!pg->is_deleted()) 
-      pg->op_modify_ondisk(rep_tid, pg_last_complete);
+      pg->op_modify_ondisk(repop);
+    repop->put();
     pg->put_unlock();
   }
 };
 
-
-void ReplicatedPG::get_rep_gather(RepGather *repop)
+/** op_modify_commit
+ * transaction commit on the acker.
+ */
+void ReplicatedPG::op_modify_ondisk(RepGather *repop)
 {
-  //repop->lock.Lock();
-  dout(10) << "get_repop " << *repop << dendl;
+  if (repop->aborted) {
+    dout(10) << "op_modify_ondisk " << *repop->op << " -- aborted" << dendl;
+  } else {
+    dout(10) << "op_modify_ondisk " << *repop->op << dendl;
+    assert(repop->waitfor_disk.count(osd->get_nodeid()));
+    repop->waitfor_nvram.erase(osd->get_nodeid());
+    repop->waitfor_disk.erase(osd->get_nodeid());
+    repop->pg_complete_thru[osd->get_nodeid()] = repop->pg_local_last_complete;
+    eval_repop(repop);
+  }
 }
 
+
 void ReplicatedPG::apply_repop(RepGather *repop)
 {
   dout(10) << "apply_repop  applying update on " << *repop << dendl;
   assert(!repop->applied);
 
-  Context *oncommit = new C_OSD_ModifyCommit(this, repop->rep_tid, repop->pg_local_last_complete);
+  Context *oncommit = new C_OSD_ModifyCommit(this, repop);
   unsigned r = osd->store->apply_transaction(repop->t, oncommit);
   if (r)
     dout(-10) << "apply_repop  apply transaction return " << r << " on " << *repop << dendl;
@@ -1245,19 +1264,18 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   }   
   
   update_stats();
-  
 }
 
-void ReplicatedPG::put_rep_gather(RepGather *repop)
+void ReplicatedPG::eval_repop(RepGather *repop)
 {
-  dout(10) << "put_repop " << *repop << dendl;
+  dout(10) << "eval_repop " << *repop << dendl;
   
   // disk?
   if (repop->can_send_disk()) {
     if (repop->op->wants_ondisk()) {
       // send commit.
       MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONDISK);
-      dout(10) << "put_repop  sending commit on " << *repop << " " << reply << dendl;
+      dout(10) << " sending commit on " << *repop << " " << reply << dendl;
       osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
       repop->sent_disk = true;
     }
@@ -1268,7 +1286,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
     if (repop->op->wants_onnvram()) {
       // send commit.
       MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONNVRAM);
-      dout(10) << "put_repop  sending onnvram on " << *repop << " " << reply << dendl;
+      dout(10) << " sending onnvram on " << *repop << " " << reply << dendl;
       osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
       repop->sent_nvram = true;
     }
@@ -1283,7 +1301,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
     if (repop->op->wants_ack()) {
       // send ack
       MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK);
-      dout(10) << "put_repop  sending ack on " << *repop << " " << reply << dendl;
+      dout(10) << " sending ack on " << *repop << " " << reply << dendl;
       osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
       repop->sent_ack = true;
     }
@@ -1305,20 +1323,19 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
       }
       
       if (min > peers_complete_thru) {
-        dout(10) << "put_repop  peers_complete_thru " 
-                                << peers_complete_thru << " -> " << min
-                                << dendl;
+        dout(10) << " peers_complete_thru " 
+                << peers_complete_thru << " -> " << min
+                << dendl;
         peers_complete_thru = min;
       }
     }
 
-    dout(10) << "put_repop  deleting " << *repop << dendl;
-       
-    assert(rep_gather.count(repop->rep_tid));
-    rep_gather.erase(repop->rep_tid);
-       
-    delete repop->op;
-    delete repop;
+    dout(10) << " removing " << *repop << dendl;
+    assert(!repop_queue.empty());
+    assert(repop_queue.front() == repop);
+    repop_queue.pop_front();
+    repop_map.erase(repop->rep_tid);
+    repop->put();
   }
 }
 
@@ -1347,42 +1364,32 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
   osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
 }
 
-ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv,
-                                                     SnapSet& snapset, SnapContext& snapc)
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, tid_t rep_tid, eversion_t nv,
+                                                SnapSet& snapset, SnapContext& snapc)
 {
-  dout(10) << "new_rep_gather rep_tid " << rep_tid << " on " << *op << dendl;
+  dout(10) << "new_repop rep_tid " << rep_tid << " on " << *op << dendl;
+
   RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete,
                                   snapset, snapc);
   
-  // osds. commits all come to me.
-  for (unsigned i=0; i<acting.size(); i++) {
-    int osd = acting[i];
-    repop->osds.insert(osd);
-    repop->waitfor_disk.insert(osd);
-  }
-
-  // primary.  all osds ack to me.
+  // initialize gather sets
   for (unsigned i=0; i<acting.size(); i++) {
     int osd = acting[i];
     repop->waitfor_ack.insert(osd);
+    repop->waitfor_disk.insert(osd);
   }
 
   repop->start = g_clock.now();
 
-  rep_gather[repop->rep_tid] = repop;
-
-  // anyone waiting?  (acks that got here before the op did)
-  if (waiting_for_repop.count(repop->rep_tid)) {
-    osd->take_waiters(waiting_for_repop[repop->rep_tid]);
-    waiting_for_repop.erase(repop->rep_tid);
-  }
+  repop_queue.push_back(repop);
+  repop_map[repop->rep_tid] = repop;
+  repop->get();
 
   return repop;
 }
  
 
-void ReplicatedPG::repop_ack(RepGather *repop,
-                            int result, int ack_type,
+void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
                             int fromosd, eversion_t pg_complete_thru)
 {
   MOSDOp *op = repop->op;
@@ -1392,34 +1399,25 @@ void ReplicatedPG::repop_ack(RepGather *repop,
          << " ack_type " << ack_type
          << " from osd" << fromosd
           << dendl;
-
-  get_rep_gather(repop);
-  {
-    if (ack_type & CEPH_OSD_OP_ONDISK) {
-      // disk
-      assert(repop->waitfor_disk.count(fromosd));      
-      repop->waitfor_disk.erase(fromosd);
-      repop->waitfor_nvram.erase(fromosd);
-      repop->waitfor_ack.erase(fromosd);
-      repop->pg_complete_thru[fromosd] = pg_complete_thru;
-    } else if (ack_type & CEPH_OSD_OP_ONNVRAM) {
-      // nvram
-      repop->waitfor_nvram.erase(fromosd);
-      repop->waitfor_ack.erase(fromosd);
-    } else {
-      // ack
-      repop->waitfor_ack.erase(fromosd);
-    }
+  
+  if (ack_type & CEPH_OSD_OP_ONDISK) {
+    // disk
+    assert(repop->waitfor_disk.count(fromosd));      
+    repop->waitfor_disk.erase(fromosd);
+    repop->waitfor_nvram.erase(fromosd);
+    repop->waitfor_ack.erase(fromosd);
+    repop->pg_complete_thru[fromosd] = pg_complete_thru;
+  } else if (ack_type & CEPH_OSD_OP_ONNVRAM) {
+    // nvram
+    repop->waitfor_nvram.erase(fromosd);
+    repop->waitfor_ack.erase(fromosd);
+  } else {
+    // ack
+    repop->waitfor_ack.erase(fromosd);
   }
-  put_rep_gather(repop);
-}
-
-
-
-
-
-
 
+  eval_repop(repop);
+}
 
 
 
@@ -1436,77 +1434,9 @@ void ReplicatedPG::repop_ack(RepGather *repop,
 
 
 
-/** op_modify_commit
- * transaction commit on the acker.
- */
-void ReplicatedPG::op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru)
-{
-  if (rep_gather.count(rep_tid)) {
-    RepGather *repop = rep_gather[rep_tid];
-    
-    dout(10) << "op_modify_ondisk " << *repop->op << dendl;
-    get_rep_gather(repop);
-    {
-      assert(repop->waitfor_disk.count(osd->get_nodeid()));
-      repop->waitfor_nvram.erase(osd->get_nodeid());
-      repop->waitfor_disk.erase(osd->get_nodeid());
-      repop->pg_complete_thru[osd->get_nodeid()] = pg_complete_thru;
-    }
-    put_rep_gather(repop);
-    dout(10) << "op_modify_ondisk done on " << repop << dendl;
-  } else {
-    dout(10) << "op_modify_ondisk rep_tid " << rep_tid << " dne" << dendl;
-  }
-}
 
 
 
-// commit (to disk) callback
-class C_OSD_RepModifyCommit : public Context {
-public:
-  ReplicatedPG *pg;
-  MOSDSubOp *op;
-  int destosd;
-
-  eversion_t pg_last_complete;
-
-  Mutex lock;
-  Cond cond;
-  bool acked;
-  bool waiting;
-
-  C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) : 
-    pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
-    lock("C_OSD_RepModifyCommit::lock"),
-    acked(false), waiting(false) { 
-    pg->get();  // we're copying the pointer.
-  }
-  void finish(int r) {
-    lock.Lock();
-    assert(!waiting);
-    while (!acked) {
-      waiting = true;
-      cond.Wait(lock);
-    }
-    assert(acked);
-    lock.Unlock();
-
-    pg->lock();
-    pg->sub_op_modify_ondisk(op, destosd, pg_last_complete);
-    pg->put_unlock();
-  }
-  void ack() {
-    lock.Lock();
-    assert(!acked);
-    acked = true;
-    if (waiting) cond.Signal();
-
-    // discard my reference to buffer
-    op->get_data().clear();
-
-    lock.Unlock();
-  }
-};
 
 void ReplicatedPG::op_modify(MOSDOp *op)
 {
@@ -1645,7 +1575,8 @@ void ReplicatedPG::op_modify(MOSDOp *op)
 
   // issue replica writes
   tid_t rep_tid = osd->get_tid();
-  RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
+  RepGather *repop = new_repop(op, rep_tid, av, snapset, snapc);
+
   for (unsigned i=1; i<acting.size(); i++)
     issue_repop(repop, acting[i], now);
 
@@ -1660,18 +1591,67 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   
   // (logical) local ack.
   // (if alone, this will apply the update.)
-  get_rep_gather(repop);
-  {
-    assert(repop->waitfor_ack.count(whoami));
-    repop->waitfor_ack.erase(whoami);
-  }
-  put_rep_gather(repop);
+  assert(repop->waitfor_ack.count(whoami));
+  repop->waitfor_ack.erase(whoami);
+  eval_repop(repop);
+  repop->put();
 }
 
 
 
+
+
 // sub op modify
 
+
+// commit (to disk) callback
+class C_OSD_RepModifyCommit : public Context {
+public:
+  ReplicatedPG *pg;
+
+  MOSDSubOp *op;
+  int destosd;
+
+  eversion_t pg_last_complete;
+
+  Mutex lock;
+  Cond cond;
+  bool acked;
+  bool waiting;
+
+  C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) : 
+    pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
+    lock("C_OSD_RepModifyCommit::lock"),
+    acked(false), waiting(false) { 
+    pg->get();  // we're copying the pointer.
+  }
+  void finish(int r) {
+    lock.Lock();
+    assert(!waiting);
+    while (!acked) {
+      waiting = true;
+      cond.Wait(lock);
+    }
+    assert(acked);
+    lock.Unlock();
+
+    pg->lock();
+    pg->sub_op_modify_ondisk(op, destosd, pg_last_complete);
+    pg->put_unlock();
+  }
+  void ack() {
+    lock.Lock();
+    assert(!acked);
+    acked = true;
+    if (waiting) cond.Signal();
+
+    // discard my reference to buffer
+    op->get_data().clear();
+
+    lock.Unlock();
+  }
+};
+
 void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
 {
   pobject_t poid = op->poid;
@@ -1763,17 +1743,15 @@ void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
   
   osd->take_peer_stat(fromosd, r->get_peer_stat());
   
-  if (rep_gather.count(rep_tid)) {
+  if (repop_map.count(rep_tid)) {
     // oh, good.
-    repop_ack(rep_gather[rep_tid], 
+    repop_ack(repop_map[rep_tid], 
              r->get_result(), r->ack_type,
              fromosd, 
              r->get_pg_complete_thru());
-    delete r;
-  } else {
-    // early ack.
-    waiting_for_repop[rep_tid].push_back(r);
   }
+
+  delete r;
 }
 
 
@@ -2385,14 +2363,15 @@ void ReplicatedPG::on_osd_failure(int o)
 {
   dout(10) << "on_osd_failure " << o << dendl;
 
-  hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
-  while (p != rep_gather.end()) {
-    RepGather *repop = p->second;
-    p++;
-    dout(-1) << "checking repop tid " << repop->rep_tid << dendl;
+  // artificially ack failed osds
+  deque<RepGather*>::iterator p = repop_queue.begin();
+  while (p != repop_queue.end()) {
+    RepGather *repop = *p++;
+    dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl;
     if (repop->waitfor_ack.count(o) ||
+       repop->waitfor_nvram.count(o) ||
        repop->waitfor_disk.count(o))
-      repop_ack(repop, -1, true, o);
+      repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, o);
   }
   
   // remove from pushing map
@@ -2419,26 +2398,27 @@ void ReplicatedPG::on_change()
 
   // apply all local repops
   //  (pg is inactive; we will repeer)
-  for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
-       p != rep_gather.end();
+  for (deque<RepGather*>::iterator p = repop_queue.begin();
+       p != repop_queue.end();
        p++) 
-    if (!p->second->applied)
-      apply_repop(p->second);
+    if (!(*p)->applied)
+      apply_repop(*p);
 
-  hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin(); 
-  while (p != rep_gather.end()) {
-    RepGather *repop = p->second;
+  deque<RepGather*>::iterator p = repop_queue.begin(); 
+  while (p != repop_queue.end()) {
+    RepGather *repop = *p;
 
-    if (acting.empty() || acting[0] != osd->get_nodeid()) {
+    if (!is_primary()) {
       // no longer primary.  hose repops.
-      dout(-1) << "no longer primary, aborting repop tid " << repop->rep_tid << dendl;
-      rep_gather.erase(p++);
-      delete repop->op;
-      delete repop;
+      dout(-1) << " aborting repop tid " << repop->rep_tid << dendl;
+      repop->aborted = true;
+      repop_queue.erase(p++);
+      repop_map.erase(repop->rep_tid);
+      repop->put();
     } else {
       // still primary. artificially ack+commit any replicas who dropped out of the pg
       p++;
-      dout(-1) << "checking repop tid " << repop->rep_tid << dendl;
+      dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl;
       set<int> all;
       set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(),
                repop->waitfor_ack.begin(), repop->waitfor_ack.end(),
index d33427dcc1797a88105e49bfba5a324ce6ba66a0..61ff11b3f8d3934d3e37d606d508cdbe253e48ce 100644 (file)
@@ -28,34 +28,34 @@ public:
    */
   class RepGather {
   public:
+    int nref;
+
     class MOSDOp *op;
     tid_t rep_tid;
 
     ObjectStore::Transaction t;
-    bool applied;
+    bool applied, aborted;
     pg_stat_t stats;
 
     set<int>  waitfor_ack;
     set<int>  waitfor_nvram;
     set<int>  waitfor_disk;
+    bool sent_ack, sent_nvram, sent_disk;
     
     utime_t   start;
-
-    bool sent_ack, sent_nvram, sent_disk;
     
-    set<int>         osds;
     eversion_t       old_version, at_version;
 
     SnapSet snapset;
     SnapContext snapc;
 
-    eversion_t       pg_local_last_complete;
+    eversion_t          pg_local_last_complete;
     map<int,eversion_t> pg_complete_thru;
     
     RepGather(MOSDOp *o, tid_t rt, eversion_t av, eversion_t lc,
              SnapSet& ss, SnapContext& sc) :
-      op(o), rep_tid(rt),
-      applied(false),
+      nref(1), op(o), rep_tid(rt),
+      applied(false), aborted(false),
       sent_ack(false), sent_nvram(false), sent_disk(false),
       at_version(av), 
       snapset(ss), snapc(sc),
@@ -79,28 +79,38 @@ public:
     bool can_delete() { 
       return waitfor_ack.empty() && waitfor_nvram.empty() && waitfor_disk.empty(); 
     }
+
+    void get() {
+      nref++;
+    }
+    void put() {
+      if (--nref == 0) {
+       delete op;
+       delete this;
+      }
+    }
   };
 
 protected:
   // replica ops
   // [primary|tail]
-  hash_map<tid_t, RepGather*>            rep_gather;
-  hash_map<tid_t, list<class Message*> > waiting_for_repop;
-
-  // load balancing
-  set<object_t> balancing_reads;
-  set<object_t> unbalancing_reads;
-  hash_map<object_t, list<Message*> > waiting_for_unbalanced_reads;  // i.e. primary-lock
+  deque<RepGather*> repop_queue;
+  map<tid_t, RepGather*> repop_map;
 
-  void get_rep_gather(RepGather*);
   void apply_repop(RepGather *repop);
-  void put_rep_gather(RepGather*);
+  void eval_repop(RepGather*);
   void issue_repop(RepGather *repop, int dest, utime_t now);
-  RepGather *new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv,
-                           SnapSet& snapset, SnapContext& snapc);
+  RepGather *new_repop(MOSDOp *op, tid_t rep_tid, eversion_t nv,
+                      SnapSet& snapset, SnapContext& snapc);
   void repop_ack(RepGather *repop,
                  int result, int ack_type,
                  int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
+
+  // load balancing
+  set<object_t> balancing_reads;
+  set<object_t> unbalancing_reads;
+  hash_map<object_t, list<Message*> > waiting_for_unbalanced_reads;  // i.e. primary-lock
+
   
   // push/pull
   map<object_t, pair<eversion_t, int> > pulling;  // which objects are currently being pulled, and from where
@@ -122,7 +132,7 @@ protected:
 
 
   // modify
-  void op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru);
+  void op_modify_ondisk(RepGather *repop);
   void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete);
 
   void _make_clone(ObjectStore::Transaction& t,