Context *oncommit = new C_OSD_OpCommit(this, repop);
Context *onapplied = new C_OSD_OpApplied(this, repop);
- int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit);
+ Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc);
+ int r = osd->store->queue_transactions(repop->tls, onapplied, oncommit, onapplied_sync);
if (r) {
dout(-10) << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl;
assert(0);
void ReplicatedPG::op_applied(RepGather *repop)
{
- repop->obc->ondisk_write_unlock();
-
lock();
dout(10) << "op_applied " << *repop << dendl;
void ReplicatedPG::_wrote_pushed_object(ObjectStore::Transaction *t, ObjectContext *obc)
{
dout(10) << "_wrote_pushed_object " << *obc << dendl;
- obc->ondisk_write_unlock();
lock();
put_object_context(obc);
unlock();
// track ObjectContext
Context *onreadable = 0;
+ Context *onreadable_sync = 0;
if (is_primary()) {
dout(10) << " setting up obc for " << soid << dendl;
ObjectContext *obc = 0;
obc->obs.oi.decode(oibl);
onreadable = new C_OSD_WrotePushedObject(this, t, obc);
+ onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
} else {
onreadable = new ObjectStore::C_DeleteTransaction(t);
}
int r = osd->store->queue_transaction(t,
onreadable,
new C_OSD_Commit(this, info.history.same_acting_since,
- info.last_complete));
+ info.last_complete),
+ onreadable_sync);
assert(r == 0);
osd->logger->inc(l_osd_r_pull);
rm->pg->sub_op_modify_commit(rm);
}
};
+ struct C_OSD_OndiskWriteUnlock : public Context {
+ ObjectContext *obc;
+ C_OSD_OndiskWriteUnlock(ObjectContext *o) : obc(o) {}
+ void finish(int r) {
+ obc->ondisk_write_unlock();
+ }
+ };
struct C_OSD_WrotePushedObject : public Context {
ReplicatedPG *pg;
ObjectStore::Transaction *t;