dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
- int r = 0;
Context *oncommit = 0;
op->hack_blah = true; // hack: make sure any 'commit' goes out _after_ our ack
-
+
+ oncommit = new C_OSD_RepModifyCommit(this, op);
+ op_apply(op, op->get_version(), oncommit);
+
if (op->get_op() == OSD_OP_REP_WRITE) {
- // write
- assert(op->get_data().length() == op->get_length());
- oncommit = new C_OSD_RepModifyCommit(this, op);
- r = apply_write(op, op->get_version(), oncommit);
- store->collection_add(pg->get_pgid(), oid);
-
- logger->inc("r_wr");
+ logger->inc("r_wr");
logger->inc("r_wrb", op->get_length());
- } else if (op->get_op() == OSD_OP_REP_DELETE) {
- // delete
- store->collection_remove(pg->get_pgid(), op->get_oid());
- r = store->remove(oid);
- } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
- // truncate
- r = store->truncate(oid, op->get_offset());
- } else assert(0);
-
+ }
+
// update pg version too
pg->info.last_update = op->get_version();
if (pg->info.last_complete == ov)
}
-
-
// =========================================================
// OPS
//messenger->send_message(reply, op->get_asker());
// do it
- int r;
- if (op->get_op() == OSD_OP_WRITE) {
+ Context *oncommit = new C_OSD_WriteCommit(this, repop);
+ op_apply(op, nv, oncommit);
+
+ get_repop(repop);
+ assert(repop->waitfor_ack.count(0));
+ repop->waitfor_ack.erase(0);
+ put_repop(repop);
+
+ if (op->get_op() == OSD_OP_WRITE)
+ {
+ logger->inc("c_wr");
+ logger->inc("c_wrb", op->get_length());
+ }
+
+ //unlock_object(oid);
+}
+
+
+
+void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit)
+{
+ object_t oid = op->get_oid();
+ pg_t pgid = op->get_pg();
+ int r;
+
+ if ((op->get_op() == OSD_OP_WRLOCK)||
+ (op->get_op() == OSD_OP_REP_WRLOCK)){
+ //lock object
+ r = store->setattr(oid, "wrlock", &op->get_source(), sizeof(msg_addr_t), oncommit);
+ }
+ else if ((op->get_op() == OSD_OP_WRUNLOCK) ||
+ (op->get_op() == OSD_OP_REP_WRUNLOCK)) {
+ //unlock object
+ r = store->rmattr(oid, "wrlock", oncommit);
+ }
+ else if ((op->get_op() == OSD_OP_WRITE) ||
+ (op->get_op() == OSD_OP_REP_WRITE)){
// write
assert(op->get_data().length() == op->get_length());
- Context *oncommit = new C_OSD_WriteCommit(this, repop);
- r = apply_write(op, nv, oncommit);
+ r = apply_write(op, version, oncommit);
// put new object in proper collection
store->collection_add(pgid, oid); // FIXME : be careful w/ locking
-
- get_repop(repop);
- assert(repop->waitfor_ack.count(0));
- repop->waitfor_ack.erase(0);
- put_repop(repop);
-
- logger->inc("c_wr");
- logger->inc("c_wrb", op->get_length());
}
- else if (op->get_op() == OSD_OP_TRUNCATE) {
+ else if ((op->get_op() == OSD_OP_TRUNCATE)||
+ (op->get_op() == OSD_OP_REP_TRUNCATE)) {
// truncate
- r = store->truncate(oid, op->get_offset());
- get_repop(repop);
- assert(repop->waitfor_ack.count(0));
- assert(repop->waitfor_commit.count(0));
- repop->waitfor_ack.erase(0);
- repop->waitfor_commit.erase(0);
- put_repop(repop);
+ r = store->truncate(oid, op->get_offset(), oncommit);
}
- else if (op->get_op() == OSD_OP_DELETE) {
+ else if ((op->get_op() == OSD_OP_DELETE)||
+ (op->get_op() == OSD_OP_REP_DELETE)) {
// delete
store->collection_remove(pgid, oid); // be careful with locking
- r = store->remove(oid);
- get_repop(repop);
- assert(repop->waitfor_ack.count(0));
- assert(repop->waitfor_commit.count(0));
- repop->waitfor_ack.erase(0);
- repop->waitfor_commit.erase(0);
- put_repop(repop);
+ r = store->remove(oid, oncommit);
}
else assert(0);
-
- //unlock_object(oid);
}
-
-
-