From: Sage Weil Date: Sat, 30 Jan 2010 00:35:04 +0000 (-0800) Subject: osd: add mutual exclusion while writes are applied X-Git-Tag: v0.20~431^2~31 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3f3aa0bdcffac90744897ce1a4add87826309f9c;p=ceph.git osd: add mutual exclusion while writes are applied --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 999ee7c302b5f..85283a4ba59a5 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1809,10 +1809,7 @@ public: pg->get(); // we're copying the pointer } void finish(int r) { - pg->lock(); pg->op_applied(repop); - repop->put(); - pg->unlock(); pg->put(); } }; @@ -1828,10 +1825,7 @@ public: pg->get(); // we're copying the pointer } void finish(int r) { - pg->lock(); pg->op_commit(repop); - repop->put(); - pg->unlock(); pg->put(); } }; @@ -1847,6 +1841,8 @@ void ReplicatedPG::apply_repop(RepGather *repop) repop->tls.push_back(&repop->ctx->op_t); repop->tls.push_back(&repop->ctx->local_t); + repop->obc->ondisk_write_lock(); + Context *oncommit = new C_OSD_OpCommit(this, repop); Context *onapplied = new C_OSD_OpApplied(this, repop); int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit); @@ -1858,6 +1854,7 @@ void ReplicatedPG::apply_repop(RepGather *repop) void ReplicatedPG::op_applied(RepGather *repop) { + lock(); dout(10) << "op_applied " << *repop << dendl; // discard my reference to the buffer @@ -1881,6 +1878,8 @@ void ReplicatedPG::op_applied(RepGather *repop) mode.write_applied(); dout(10) << "op_applied mode now " << mode << " (finish_write)" << dendl; + repop->obc->ondisk_write_unlock(); + put_object_context(repop->obc); repop->obc = 0; @@ -1922,10 +1921,15 @@ void ReplicatedPG::op_applied(RepGather *repop) } eval_repop(repop); + + repop->put(); + unlock(); } void ReplicatedPG::op_commit(RepGather *repop) { + lock(); + if (repop->aborted) { dout(10) << "op_commit " << *repop << " -- aborted" << dendl; } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) { @@ -1937,6 +1941,9 @@ void ReplicatedPG::op_commit(RepGather *repop) last_complete_ondisk = repop->pg_local_last_complete; eval_repop(repop); } + + repop->put(); + unlock(); } @@ -2772,10 +2779,9 @@ bool ReplicatedPG::pull(const sobject_t& soid) // send op osd_reqid_t rid; - rid.name = entity_name_t::OSD(osd->whoami); - rid.tid = osd->get_tid(); + tid_t tid = osd->get_tid(); MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK, - osd->osdmap->get_epoch(), rid.tid, v); + osd->osdmap->get_epoch(), tid, v); subop->ops = vector(1); subop->ops[0].op.op = CEPH_OSD_OP_PULL; subop->data_subset.swap(data_subset); @@ -2916,11 +2922,9 @@ void ReplicatedPG::push(const sobject_t& soid, int peer, osd->logger->inc(l_osd_r_pushb, bl.length()); // send - osd_reqid_t rid; - rid.name = entity_name_t::OSD(osd->whoami); - rid.tid = osd->get_tid(); + osd_reqid_t rid; // useless? MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0, - osd->osdmap->get_epoch(), rid.tid, oi.version); + osd->osdmap->get_epoch(), osd->get_tid(), oi.version); subop->ops = vector(1); subop->ops[0].op.op = CEPH_OSD_OP_PUSH; subop->ops[0].op.extent.offset = 0; @@ -3179,6 +3183,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) dout(10) << " log.complete_to = " << log.complete_to->version << dendl; } + // XXX: track ObjectContext + // apply to disk! write_info(*t); int r = osd->store->queue_transaction(t, @@ -3411,6 +3417,7 @@ int ReplicatedPG::recover_primary(int max) put_object_context(headobc); + // XXX: track objectcontext! int tr = osd->store->queue_transaction(t); assert(tr == 0); missing.got(latest->soid, latest->version); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index a92ee3d5cc44d..f1b139089b5be 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -226,10 +226,52 @@ public: bool registered; ObjectState obs; + Mutex lock; + Cond cond; + int unstable_writes, readers, writers_waiting, readers_waiting; + ObjectContext(const sobject_t& s) : - ref(0), registered(false), obs(s) {} + ref(0), registered(false), obs(s), + lock("ReplicatedPG::ObjectContext::lock"), + unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {} void get() { ++ref; } + + // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy. + void ondisk_write_lock() { + lock.Lock(); + writers_waiting++; + while (readers_waiting || readers) + cond.Wait(lock); + writers_waiting--; + unstable_writes++; + lock.Unlock(); + } + void ondisk_write_unlock() { + lock.Lock(); + assert(unstable_writes > 0); + unstable_writes--; + if (!unstable_writes && readers_waiting) + cond.Signal(); + lock.Unlock(); + } + void ondisk_read_lock() { + lock.Lock(); + readers_waiting++; + while (unstable_writes) + cond.Wait(lock); + readers_waiting--; + readers++; + lock.Unlock(); + } + void ondisk_read_unlock() { + lock.Lock(); + assert(readers > 0); + readers--; + if (!readers && writers_waiting) + cond.Signal(); + lock.Unlock(); + } };