put_repop(repop);
} else {
+ assert(0); // for now
+
// failure
get_repop(repop);
void OSD::op_rep_modify_safe(MOSDOp *op)
{
- //object_t oid = op->get_oid();
+ // hack: hack_blah is true until 'ack' has been sent.
if (op->hack_blah) {
dout(0) << "got rep_modify_safe before rep_modify applied, waiting" << endl;
g_timer.add_event_after(1, new C_OSD_RepModifySafe(this, op));
- } else
- //lock_object(oid); // ... just to make sure the original write is finished with *op
- {
+ } else {
dout(10) << "rep_modify_safe on op " << *op << endl;
- MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(ack2, op->get_asker());
+ MOSDOpReply *safe = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(safe, op->get_asker());
delete op;
}
- //unlock_object(oid);
}
void OSD::op_rep_modify(MOSDOp *op)
// when we introduce unordered messaging.. FIXME
object_t oid = op->get_oid();
- //lock_object(oid);
- {
- version_t ov = 0;
- if (store->exists(oid))
- store->getattr(oid, "version", &ov, sizeof(ov));
- if (op->get_old_version() != ov)
- dout(0) << "rep_modify old version is " << ov << " msg sez " << op->get_old_version() << endl;
- assert(op->get_old_version() == ov);
-
- // PG
- PG *pg = get_pg(op->get_pg());
- assert(pg);
-
- dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
-
- int r = 0;
- Context *onsafe = 0;
- if (op->get_op() == OSD_OP_REP_WRITE) {
- // write
- assert(op->get_data().length() == op->get_length());
- onsafe = new C_OSD_RepModifySafe(this, op);
- op->hack_blah = true;
- r = apply_write(op, op->get_version(), onsafe);
- if (ov == 0) pg->add_object(store, oid);
-
- logger->inc("r_wr");
- logger->inc("r_wrb", op->get_length());
- op->hack_blah = false;
- } else if (op->get_op() == OSD_OP_REP_DELETE) {
- // delete
- store->collection_remove(pg->get_pgid(), op->get_oid());
- r = store->remove(oid);
- } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
- // truncate
- r = store->truncate(oid, op->get_offset());
- } else assert(0);
+ version_t ov = 0;
+ if (store->exists(oid))
+ store->getattr(oid, "version", &ov, sizeof(ov));
+ if (op->get_old_version() != ov)
+ dout(0) << "rep_modify old version is " << ov << " msg sez " << op->get_old_version() << endl;
+ assert(op->get_old_version() == ov);
+
+ // PG
+ PG *pg = get_pg(op->get_pg());
+ assert(pg);
+
+ dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
+
+ int r = 0;
+ Context *onsafe = 0;
+
+ op->hack_blah = true; // hack: make sure any 'safe' goes out _after_ our ack
+
+ if (op->get_op() == OSD_OP_REP_WRITE) {
+ // write
+ assert(op->get_data().length() == op->get_length());
+ onsafe = new C_OSD_RepModifySafe(this, op);
+ r = apply_write(op, op->get_version(), onsafe);
+ if (ov == 0) pg->add_object(store, oid);
- if (onsafe) {
- // ack
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
- messenger->send_message(ack, op->get_asker());
- } else {
- // safe, safe
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(ack, op->get_asker());
- delete op;
- }
+ logger->inc("r_wr");
+ logger->inc("r_wrb", op->get_length());
+ } else if (op->get_op() == OSD_OP_REP_DELETE) {
+ // delete
+ store->collection_remove(pg->get_pgid(), op->get_oid());
+ r = store->remove(oid);
+ } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
+ // truncate
+ r = store->truncate(oid, op->get_offset());
+ } else assert(0);
+
+ if (onsafe) {
+ // ack
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(ack, op->get_asker());
+ } else {
+ // safe, safe
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(ack, op->get_asker());
+ delete op;
}
- //unlock_object(oid);
+
+ op->hack_blah = false; // hack: make sure any 'safe' goes out _after_ our ack
}
unlock_object(oid);
// finish
- dout(10) << "finish op " << op << endl;
osd_lock.Lock();
{
+ dout(10) << "dequeue_op finish op " << op << endl;
assert(pending_ops > 0);
pending_ops--;
if (pending_ops == 0 && waiting_for_no_ops)
void OSD::op_read(MOSDOp *op)
{
object_t oid = op->get_oid();
- //lock_object(oid);
- {
- // read into a buffer
- bufferlist bl;
- long got = store->read(oid,
- op->get_length(), op->get_offset(),
- bl);
- // set up reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- if (got >= 0) {
- reply->set_result(0);
- reply->set_data(bl);
- reply->set_length(got);
-
- logger->inc("c_rd");
- logger->inc("c_rdb", got);
+
+ // read into a buffer
+ bufferlist bl;
+ long got = store->read(oid,
+ op->get_length(), op->get_offset(),
+ bl);
+ // set up reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ if (got >= 0) {
+ reply->set_result(0);
+ reply->set_data(bl);
+ reply->set_length(got);
- } else {
- reply->set_result(got); // error
- reply->set_length(0);
- }
-
- dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+ logger->inc("c_rd");
+ logger->inc("c_rdb", got);
- logger->inc("rd");
- if (got >= 0) logger->inc("rdb", got);
-
- // send it
- messenger->send_message(reply, op->get_asker());
+ } else {
+ reply->set_result(got); // error
+ reply->set_length(0);
}
- //unlock_object(oid);
+
+ dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+
+ logger->inc("rd");
+ if (got >= 0) logger->inc("rdb", got);
+
+ // send it
+ messenger->send_message(reply, op->get_asker());
+
delete op;
}
void OSD::op_stat(MOSDOp *op)
{
object_t oid = op->get_oid();
- //lock_object(oid);
- {
- struct stat st;
- memset(&st, sizeof(st), 0);
- int r = store->stat(oid, &st);
-
- dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
-
- MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
- reply->set_object_size(st.st_size);
- messenger->send_message(reply, op->get_asker());
-
- logger->inc("stat");
- }
- //unlock_object(oid);
+
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ int r = store->stat(oid, &st);
+
+ dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
+ reply->set_object_size(st.st_size);
+ messenger->send_message(reply, op->get_asker());
+
+ logger->inc("stat");
delete op;
}
{
dout(10) << "op_modify_safe on op " << *repop->op << endl;
get_repop(repop);
+ assert(!repop->local_safe);
repop->local_safe = true;
put_repop(repop);
}
pg->add_object(store, oid); // FIXME : be careful w/ locking
get_repop(repop);
+ assert(!repop->local_ack);
repop->local_ack = true;
put_repop(repop);
// truncate
r = store->truncate(oid, op->get_offset());
get_repop(repop);
+ assert(!repop->local_ack);
+ assert(!repop->local_safe);
repop->local_ack = true;
repop->local_safe = true;
put_repop(repop);
pg->remove_object(store, op->get_oid()); // be careful with locking
r = store->remove(oid);
get_repop(repop);
+ assert(!repop->local_ack);
+ assert(!repop->local_safe);
repop->local_ack = true;
repop->local_safe = true;
put_repop(repop);