]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add mutual exclusion while writes are applied
authorSage Weil <sage@newdream.net>
Sat, 30 Jan 2010 00:35:04 +0000 (16:35 -0800)
committerSage Weil <sage@newdream.net>
Mon, 1 Feb 2010 21:51:14 +0000 (13:51 -0800)
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 999ee7c302b5f0bceafdb06e8df06697f3151372..85283a4ba59a5778ba1847c1c0fcb54834e02326 100644 (file)
@@ -1809,10 +1809,7 @@ public:
     pg->get();    // we're copying the pointer
   }
   void finish(int r) {
-    pg->lock();
     pg->op_applied(repop);
-    repop->put();
-    pg->unlock();
     pg->put();
   }
 };
@@ -1828,10 +1825,7 @@ public:
     pg->get();    // we're copying the pointer
   }
   void finish(int r) {
-    pg->lock();
     pg->op_commit(repop);
-    repop->put();
-    pg->unlock();
     pg->put();
   }
 };
@@ -1847,6 +1841,8 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   repop->tls.push_back(&repop->ctx->op_t);
   repop->tls.push_back(&repop->ctx->local_t);
 
+  repop->obc->ondisk_write_lock();
+
   Context *oncommit = new C_OSD_OpCommit(this, repop);
   Context *onapplied = new C_OSD_OpApplied(this, repop);
   int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit);
@@ -1858,6 +1854,7 @@ void ReplicatedPG::apply_repop(RepGather *repop)
 
 void ReplicatedPG::op_applied(RepGather *repop)
 {
+  lock();
   dout(10) << "op_applied " << *repop << dendl;
 
   // discard my reference to the buffer
@@ -1881,6 +1878,8 @@ void ReplicatedPG::op_applied(RepGather *repop)
   mode.write_applied();
   dout(10) << "op_applied mode now " << mode << " (finish_write)" << dendl;
 
+  repop->obc->ondisk_write_unlock();
+
   put_object_context(repop->obc);
   repop->obc = 0;
 
@@ -1922,10 +1921,15 @@ void ReplicatedPG::op_applied(RepGather *repop)
   }   
 
   eval_repop(repop);
+
+  repop->put();
+  unlock();
 }
 
 void ReplicatedPG::op_commit(RepGather *repop)
 {
+  lock();
+
   if (repop->aborted) {
     dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
   } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
@@ -1937,6 +1941,9 @@ void ReplicatedPG::op_commit(RepGather *repop)
     last_complete_ondisk = repop->pg_local_last_complete;
     eval_repop(repop);
   }
+
+  repop->put();
+  unlock();
 }
 
 
@@ -2772,10 +2779,9 @@ bool ReplicatedPG::pull(const sobject_t& soid)
 
   // send op
   osd_reqid_t rid;
-  rid.name = entity_name_t::OSD(osd->whoami);
-  rid.tid = osd->get_tid();
+  tid_t tid = osd->get_tid();
   MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK,
-                                  osd->osdmap->get_epoch(), rid.tid, v);
+                                  osd->osdmap->get_epoch(), tid, v);
   subop->ops = vector<OSDOp>(1);
   subop->ops[0].op.op = CEPH_OSD_OP_PULL;
   subop->data_subset.swap(data_subset);
@@ -2916,11 +2922,9 @@ void ReplicatedPG::push(const sobject_t& soid, int peer,
   osd->logger->inc(l_osd_r_pushb, bl.length());
   
   // send
-  osd_reqid_t rid;
-  rid.name = entity_name_t::OSD(osd->whoami);
-  rid.tid = osd->get_tid();
+  osd_reqid_t rid;  // useless?
   MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
-                                  osd->osdmap->get_epoch(), rid.tid, oi.version);
+                                  osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
   subop->ops = vector<OSDOp>(1);
   subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
   subop->ops[0].op.extent.offset = 0;
@@ -3179,6 +3183,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
       dout(10) << " log.complete_to = " << log.complete_to->version << dendl;
   }
 
+  // XXX: track ObjectContext
+
   // apply to disk!
   write_info(*t);
   int r = osd->store->queue_transaction(t,
@@ -3411,6 +3417,7 @@ int ReplicatedPG::recover_primary(int max)
 
            put_object_context(headobc);
 
+           // XXX: track objectcontext!
            int tr = osd->store->queue_transaction(t);
            assert(tr == 0);
            missing.got(latest->soid, latest->version);
index a92ee3d5cc44d8417a3f7f9ee2c3259eeab550ec..f1b139089b5bee74c20c3d55c4e13b7edb06635f 100644 (file)
@@ -226,10 +226,52 @@ public:
     bool registered; 
     ObjectState obs;
 
+    Mutex lock;
+    Cond cond;
+    int unstable_writes, readers, writers_waiting, readers_waiting;
+
     ObjectContext(const sobject_t& s) :
-      ref(0), registered(false), obs(s) {}
+      ref(0), registered(false), obs(s),
+      lock("ReplicatedPG::ObjectContext::lock"),
+      unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {}
 
     void get() { ++ref; }
+
+    // do simple synchronous mutual exclusion, for now.  now waitqueues or anything fancy.
+    void ondisk_write_lock() {
+      lock.Lock();
+      writers_waiting++;
+      while (readers_waiting || readers)
+       cond.Wait(lock);
+      writers_waiting--;
+      unstable_writes++;
+      lock.Unlock();
+    }
+    void ondisk_write_unlock() {
+      lock.Lock();
+      assert(unstable_writes > 0);
+      unstable_writes--;
+      if (!unstable_writes && readers_waiting)
+       cond.Signal();
+      lock.Unlock();
+    }
+    void ondisk_read_lock() {
+      lock.Lock();
+      readers_waiting++;
+      while (unstable_writes)
+       cond.Wait(lock);
+      readers_waiting--;
+      readers++;
+      lock.Unlock();
+    }
+    void ondisk_read_unlock() {
+      lock.Lock();
+      assert(readers > 0);
+      readers--;
+      if (!readers && writers_waiting)
+       cond.Signal();
+      lock.Unlock();
+    }
   };