pg->get(); // we're copying the pointer
}
void finish(int r) {
- pg->lock();
pg->op_applied(repop);
- repop->put();
- pg->unlock();
pg->put();
}
};
pg->get(); // we're copying the pointer
}
void finish(int r) {
- pg->lock();
pg->op_commit(repop);
- repop->put();
- pg->unlock();
pg->put();
}
};
repop->tls.push_back(&repop->ctx->op_t);
repop->tls.push_back(&repop->ctx->local_t);
+ repop->obc->ondisk_write_lock();
+
Context *oncommit = new C_OSD_OpCommit(this, repop);
Context *onapplied = new C_OSD_OpApplied(this, repop);
int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit);
void ReplicatedPG::op_applied(RepGather *repop)
{
+ lock();
dout(10) << "op_applied " << *repop << dendl;
// discard my reference to the buffer
mode.write_applied();
dout(10) << "op_applied mode now " << mode << " (finish_write)" << dendl;
+ repop->obc->ondisk_write_unlock();
+
put_object_context(repop->obc);
repop->obc = 0;
}
eval_repop(repop);
+
+ repop->put();
+ unlock();
}
void ReplicatedPG::op_commit(RepGather *repop)
{
+ lock();
+
if (repop->aborted) {
dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
} else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
last_complete_ondisk = repop->pg_local_last_complete;
eval_repop(repop);
}
+
+ repop->put();
+ unlock();
}
// send op
osd_reqid_t rid;
- rid.name = entity_name_t::OSD(osd->whoami);
- rid.tid = osd->get_tid();
+ tid_t tid = osd->get_tid();
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK,
- osd->osdmap->get_epoch(), rid.tid, v);
+ osd->osdmap->get_epoch(), tid, v);
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_PULL;
subop->data_subset.swap(data_subset);
osd->logger->inc(l_osd_r_pushb, bl.length());
// send
- osd_reqid_t rid;
- rid.name = entity_name_t::OSD(osd->whoami);
- rid.tid = osd->get_tid();
+ osd_reqid_t rid; // useless?
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
- osd->osdmap->get_epoch(), rid.tid, oi.version);
+ osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
subop->ops[0].op.extent.offset = 0;
dout(10) << " log.complete_to = " << log.complete_to->version << dendl;
}
+ // XXX: track ObjectContext
+
// apply to disk!
write_info(*t);
int r = osd->store->queue_transaction(t,
put_object_context(headobc);
+ // XXX: track objectcontext!
int tr = osd->store->queue_transaction(t);
assert(tr == 0);
missing.got(latest->soid, latest->version);
bool registered;
ObjectState obs;
+ Mutex lock;
+ Cond cond;
+ int unstable_writes, readers, writers_waiting, readers_waiting;
+
ObjectContext(const sobject_t& s) :
- ref(0), registered(false), obs(s) {}
+ ref(0), registered(false), obs(s),
+ lock("ReplicatedPG::ObjectContext::lock"),
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {}
void get() { ++ref; }
+
+ // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy.
+ void ondisk_write_lock() {
+ lock.Lock();
+ writers_waiting++;
+ while (readers_waiting || readers)
+ cond.Wait(lock);
+ writers_waiting--;
+ unstable_writes++;
+ lock.Unlock();
+ }
+ void ondisk_write_unlock() {
+ lock.Lock();
+ assert(unstable_writes > 0);
+ unstable_writes--;
+ if (!unstable_writes && readers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+ void ondisk_read_lock() {
+ lock.Lock();
+ readers_waiting++;
+ while (unstable_writes)
+ cond.Wait(lock);
+ readers_waiting--;
+ readers++;
+ lock.Unlock();
+ }
+ void ondisk_read_unlock() {
+ lock.Lock();
+ assert(readers > 0);
+ readers--;
+ if (!readers && writers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
};