]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: refactor op_applied logic
authorSage Weil <sage@newdream.net>
Thu, 17 Dec 2009 19:31:12 +0000 (11:31 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 21:59:56 +0000 (13:59 -0800)
Behavior is still essentially unchanged (for now).

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 99f92b2153b7ad5e2e0eb37ca144cb448d358ffa..7ca75276a104ed2d428db673099b48347e6dd9b8 100644 (file)
@@ -642,11 +642,6 @@ void ReplicatedPG::do_op(MOSDOp *op)
   if (mode.is_rmw_mode())
     apply_repop(repop);
 
-  // (logical) local ack.
-  //  (if alone and delayed, this will apply the update.)
-  int whoami = osd->get_nodeid();
-  assert(repop->waitfor_ack.count(whoami));
-  repop->waitfor_ack.erase(whoami);
   eval_repop(repop);
   repop->put();
 
@@ -1807,6 +1802,25 @@ void ReplicatedPG::log_op(vector<Log::Entry>& logv, eversion_t trim_to,
 // ========================================================================
 // rep op gather
 
+class C_OSD_OpApplied : public Context {
+public:
+  ReplicatedPG *pg;
+  ReplicatedPG::RepGather *repop;
+
+  C_OSD_OpApplied(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
+    pg(p), repop(rg) {
+    repop->get();
+    pg->get();    // we're copying the pointer
+  }
+  void finish(int r) {
+    pg->lock();
+    pg->op_applied(repop);
+    repop->put();
+    pg->unlock();
+    pg->put();
+  }
+};
+
 class C_OSD_OpCommit : public Context {
 public:
   ReplicatedPG *pg;
@@ -1826,25 +1840,6 @@ public:
   }
 };
 
-/** op_commit
- * transaction commit on the acker.
- */
-void ReplicatedPG::op_ondisk(RepGather *repop)
-{
-  if (repop->aborted) {
-    dout(10) << "op_ondisk " << *repop << " -- aborted" << dendl;
-  } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
-    dout(10) << "op_ondisk " << *repop << " -- already marked ondisk" << dendl;
-  } else {
-    dout(10) << "op_ondisk " << *repop << dendl;
-    repop->waitfor_disk.erase(osd->get_nodeid());
-    repop->waitfor_nvram.erase(osd->get_nodeid());
-    last_complete_ondisk = repop->pg_local_last_complete;
-    eval_repop(repop);
-  }
-}
-
-
 void ReplicatedPG::apply_repop(RepGather *repop)
 {
   dout(10) << "apply_repop  applying update on " << *repop << dendl;
@@ -1860,13 +1855,24 @@ void ReplicatedPG::apply_repop(RepGather *repop)
     dout(-10) << "apply_repop  apply transaction return " << r << " on " << *repop << dendl;
     assert(0);
   }
-  
+  op_applied(repop);
+}
+
+void ReplicatedPG::op_applied(RepGather *repop)
+{
+  dout(10) << "op_applied " << *repop << dendl;
+
   // discard my reference to the buffer
   repop->ctx->op->get_data().clear();
-  tls.clear();
   repop->ctx->op_t.clear_data();
   
   repop->applied = true;
+
+  // (logical) local ack.
+  int whoami = osd->get_nodeid();
+  assert(repop->waitfor_ack.count(whoami));
+  repop->waitfor_ack.erase(whoami);
   
   if (repop->ctx->clone_obc) {
     put_object_context(repop->ctx->clone_obc);
@@ -1874,7 +1880,7 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   }
 
   dout(10) << "apply_repop mode was " << mode << dendl;
-  mode.finish_write();
+  mode.write_applied();
   dout(10) << "apply_repop mode now " << mode << " (finish_write)" << dendl;
 
   put_object_context(repop->obc);
@@ -1916,9 +1922,25 @@ void ReplicatedPG::apply_repop(RepGather *repop)
     }
     break;
   }   
-  
 }
 
+void ReplicatedPG::op_ondisk(RepGather *repop)
+{
+  if (repop->aborted) {
+    dout(10) << "op_ondisk " << *repop << " -- aborted" << dendl;
+  } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
+    dout(10) << "op_ondisk " << *repop << " -- already marked ondisk" << dendl;
+  } else {
+    dout(10) << "op_ondisk " << *repop << dendl;
+    repop->waitfor_disk.erase(osd->get_nodeid());
+    repop->waitfor_nvram.erase(osd->get_nodeid());
+    last_complete_ondisk = repop->pg_local_last_complete;
+    eval_repop(repop);
+  }
+}
+
+
+
 void ReplicatedPG::eval_repop(RepGather *repop)
 {
   dout(10) << "eval_repop " << *repop << dendl;
@@ -1928,7 +1950,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
   // apply?
   if (!repop->applied &&
       mode.is_delayed_mode() &&
-      repop->waitfor_ack.empty())  // all replicas have acked
+      repop->waitfor_ack.size() == 1)  // all other replicas have acked
     apply_repop(repop);
 
   // disk?
@@ -2032,7 +2054,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *
   RepGather *repop = new RepGather(ctx, obc, noop, rep_tid, info.last_complete);
 
   dout(10) << "new_repop mode was " << mode << dendl;
-  mode.start_write();
+  mode.write_start();
   obc->get();  // we take a ref
   dout(10) << "new_repop mode now " << mode << " (start_write)" << dendl;
 
index cdfd023a355e279f407b3e3632956b229845ec61..81be3ce0cd4f21cb6143a4149041097691a9c4a7 100644 (file)
@@ -174,11 +174,11 @@ public:
       return state == RMW || state == RMW_FLUSHING;
     }
 
-    void start_write() {
+    void write_start() {
       num_wr++;
       assert(state == DELAYED || state == RMW);
     }
-    void finish_write() {
+    void write_applied() {
       assert(num_wr > 0);
       --num_wr;
       if (num_wr == 0) {
@@ -186,6 +186,8 @@ public:
        wake = true;
       }
     }
+    void write_commit() {
+    }
   };
 
 
@@ -323,6 +325,8 @@ protected:
   map<tid_t, RepGather*> repop_map;
 
   void apply_repop(RepGather *repop);
+  void op_applied(RepGather *repop);
+  void op_ondisk(RepGather *repop);
   void eval_repop(RepGather*);
   void issue_repop(RepGather *repop, int dest, utime_t now,
                   bool old_exists, __u64 old_size, eversion_t old_version);
@@ -331,6 +335,8 @@ protected:
                  int result, int ack_type,
                  int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
 
+  friend class C_OSD_OpCommit;
+  friend class C_OSD_OpApplied;
 
   // projected object info
   map<sobject_t, ObjectContext*> object_contexts;
@@ -386,7 +392,6 @@ protected:
 
 
   // low level ops
-  void op_ondisk(RepGather *repop);
 
   void _make_clone(ObjectStore::Transaction& t,
                   const sobject_t& head, const sobject_t& coid,
@@ -398,7 +403,6 @@ protected:
   int prepare_transaction(OpContext *ctx);
   void log_op(vector<Log::Entry>& log, eversion_t trim_to, ObjectStore::Transaction& t);
   
-  friend class C_OSD_OpCommit;
   friend class C_OSD_RepModifyCommit;
 
   // pg on-disk content