}
};
+
+void Ebofs::encode_onode(Onode *on, bufferlist& bl, unsigned& off)
+{
+ // onode
+ struct ebofs_onode eo;
+ eo.onode_loc = on->onode_loc;
+ eo.object_id = on->object_id;
+ eo.object_size = on->object_size;
+ eo.object_blocks = on->object_blocks;
+ eo.num_attr = on->attr.size();
+ eo.num_extents = on->extents.size();
+ bl.copy_in(off, sizeof(eo), (char*)&eo);
+ off += sizeof(eo);
+
+ // attr
+ for (map<string, AttrVal>::iterator i = on->attr.begin();
+ i != on->attr.end();
+ i++) {
+ bl.copy_in(off, i->first.length()+1, i->first.c_str());
+ off += i->first.length()+1;
+ bl.copy_in(off, sizeof(int), (char*)&i->second.len);
+ off += sizeof(int);
+ bl.copy_in(off, i->second.len, i->second.data);
+ off += i->second.len;
+ dout(15) << "write_onode " << *on << " attr " << i->first << " len " << i->second.len << endl;
+ }
+
+ // extents
+ for (unsigned i=0; i<on->extents.size(); i++) {
+ bl.copy_in(off, sizeof(Extent), (char*)&on->extents[i]);
+ off += sizeof(Extent);
+ dout(15) << "write_onode " << *on << " ex " << i << ": " << on->extents[i] << endl;
+ }
+}
+
void Ebofs::write_onode(Onode *on)
{
// buffer
dout(10) << "write_onode " << *on << " to " << on->onode_loc << endl;
- struct ebofs_onode *eo = (struct ebofs_onode*)bl.c_str();
- eo->onode_loc = on->onode_loc;
- eo->object_id = on->object_id;
- eo->object_size = on->object_size;
- eo->object_blocks = on->object_blocks;
- eo->num_attr = on->attr.size();
- eo->num_extents = on->extents.size();
-
- // attr
- unsigned off = sizeof(*eo);
- for (map<string, AttrVal>::iterator i = on->attr.begin();
- i != on->attr.end();
- i++) {
- bl.copy_in(off, i->first.length()+1, i->first.c_str());
- off += i->first.length()+1;
- bl.copy_in(off, sizeof(int), (char*)&i->second.len);
- off += sizeof(int);
- bl.copy_in(off, i->second.len, i->second.data);
- off += i->second.len;
- dout(15) << "write_onode " << *on << " attr " << i->first << " len " << i->second.len << endl;
- }
-
- // extents
- for (unsigned i=0; i<on->extents.size(); i++) {
- bl.copy_in(off, sizeof(Extent), (char*)&on->extents[i]);
- off += sizeof(Extent);
- dout(15) << "write_onode " << *on << " ex " << i << ": " << on->extents[i] << endl;
- }
+ unsigned off = 0;
+ encode_onode(on, bl, off);
+ assert(off == bytes);
// write
dev.write( on->onode_loc.start, on->onode_loc.length, bl,
}
}
+void Ebofs::encode_cnode(Cnode *cn, bufferlist& bl, unsigned& off)
+{
+ // cnode
+ struct ebofs_cnode ec;
+ ec.cnode_loc = cn->cnode_loc;
+ ec.coll_id = cn->coll_id;
+ ec.num_attr = cn->attr.size();
+ bl.copy_in(off, sizeof(ec), (char*)&ec);
+ off += sizeof(ec);
+
+ // attr
+ for (map<string, AttrVal >::iterator i = cn->attr.begin();
+ i != cn->attr.end();
+ i++) {
+ bl.copy_in(off, i->first.length()+1, i->first.c_str());
+ off += i->first.length()+1;
+ bl.copy_in(off, sizeof(int), (char*)&i->second.len);
+ off += sizeof(int);
+ bl.copy_in(off, i->second.len, i->second.data);
+ off += i->second.len;
+
+ dout(15) << "write_cnode " << *cn << " attr " << i->first << " len " << i->second.len << endl;
+ }
+}
+
void Ebofs::write_cnode(Cnode *cn)
{
// allocate buffer
dout(10) << "write_cnode " << *cn << " to " << cn->cnode_loc << endl;
- struct ebofs_cnode ec;
- ec.cnode_loc = cn->cnode_loc;
- ec.coll_id = cn->coll_id;
- ec.num_attr = cn->attr.size();
-
- bl.copy_in(0, sizeof(ec), (char*)&ec);
-
- // attr
- unsigned off = sizeof(ec);
- for (map<string, AttrVal >::iterator i = cn->attr.begin();
- i != cn->attr.end();
- i++) {
- bl.copy_in(off, i->first.length()+1, i->first.c_str());
- off += i->first.length()+1;
- bl.copy_in(off, sizeof(int), (char*)&i->second.len);
- off += sizeof(int);
- bl.copy_in(off, i->second.len, i->second.data);
- off += i->second.len;
+ unsigned off = 0;
+ encode_cnode(cn, bl, off);
+ assert(off == bytes);
- dout(15) << "write_cnode " << *cn << " attr " << i->first << " len " << i->second.len << endl;
- }
-
// write
dev.write( cn->cnode_loc.start, cn->cnode_loc.length, bl,
new C_E_InodeFlush(this), "write_cnode" );
// check clock regularly
g_clock.now();
+ osd_lock.Lock();
switch (m->get_type()) {
{
// no map? starting up?
if (!osdmap) {
- osd_lock.Lock();
dout(7) << "no OSDMap, asking MDS" << endl;
if (waiting_for_osdmap.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
waiting_for_osdmap.push_back(m);
- osd_lock.Unlock();
- return;
+ break;
}
// need OSDMap
if (!finished.empty()) {
list<Message*> waiting;
waiting.splice(waiting.begin(), finished);
+
+ osd_lock.Unlock();
+
for (list<Message*>::iterator it = waiting.begin();
it != waiting.end();
it++) {
dispatch(*it);
}
+ return;
}
-
+
+ osd_lock.Unlock();
}
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
- osd_lock.Lock();
waiting_for_osdmap.push_back(m);
- osd_lock.Unlock();
}
// wait for ops to finish
wait_for_no_ops();
- osd_lock.Lock(); // actually, don't need this if we finish all ops?
-
if (m->is_mkfs()) {
dout(1) << "MKFS" << endl;
- /* done on init() now
- if (!g_conf.osd_mkfs)
- store->mkfs();
- */
}
if (!osdmap ||
dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
}
- osd_lock.Unlock();
-
if (m->is_mkfs()) {
// ack
messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK),
void OSD::op_rep_pull(MOSDOp *op)
{
- dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+ long got = 0;
lock_object(op->get_oid());
-
- // get object size
- struct stat st;
- int r = store->stat(op->get_oid(), &st);
- assert(r == 0);
-
- // check version
- version_t v = 0;
- store->getattr(op->get_oid(), "version", &v, sizeof(v));
- assert(v == op->get_version());
-
- // read
- bufferlist bl;
- long got = store->read(op->get_oid(),
- st.st_size, 0,
- bl);
- assert(got == st.st_size);
-
- // reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- reply->set_result(0);
- reply->set_data(bl);
- reply->set_length(got);
- reply->set_offset(0);
-
- messenger->send_message(reply, op->get_asker());
-
+ {
+ dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+
+ // get object size
+ struct stat st;
+ int r = store->stat(op->get_oid(), &st);
+ assert(r == 0);
+
+ // check version
+ version_t v = 0;
+ store->getattr(op->get_oid(), "version", &v, sizeof(v));
+ assert(v == op->get_version());
+
+ // read
+ bufferlist bl;
+ got = store->read(op->get_oid(),
+ st.st_size, 0,
+ bl);
+ assert(got == st.st_size);
+
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ reply->set_result(0);
+ reply->set_data(bl);
+ reply->set_length(got);
+ reply->set_offset(0);
+
+ messenger->send_message(reply, op->get_asker());
+ }
unlock_object(op->get_oid());
delete op;
dout(7) << "rep_pull_reply " << hex << o << dec << " v " << v << " size " << op->get_length() << endl;
- osd_lock.Lock();
PGPeer *p = pull_ops[op->get_tid()];
PG *pg = p->pg;
assert(p); // FIXME: how will this work?
assert(p->is_pulling(o));
assert(p->pulling_version(o) == v);
- osd_lock.Unlock();
// write it and add it to the PG
store->write(o, op->get_length(), 0, op->get_data());
p->pg->add_object(store, o);
-
+
store->setattr(o, "version", &v, sizeof(v));
// close out pull op.
- osd_lock.Lock();
pull_ops.erase(op->get_tid());
pg->pulled(o, v, p);
// more?
do_recovery(pg);
- osd_lock.Unlock();
-
delete op;
}
void OSD::op_rep_push(MOSDOp *op)
{
- dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
lock_object(op->get_oid());
+ {
+ dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+
+ PG *pg = get_pg(op->get_pg());
+ assert(pg);
- PG *pg = get_pg(op->get_pg());
- assert(pg);
-
- // exists?
- if (store->exists(op->get_oid())) {
- store->truncate(op->get_oid(), 0);
-
- version_t ov = 0;
- store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
- assert(ov <= op->get_version());
+ // exists?
+ if (store->exists(op->get_oid())) {
+ store->truncate(op->get_oid(), 0);
+
+ version_t ov = 0;
+ store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
+ assert(ov <= op->get_version());
+ }
+
+ logger->inc("r_push");
+ logger->inc("r_pushb", op->get_length());
+
+ // write out buffers
+ int r = store->write(op->get_oid(),
+ op->get_length(), 0,
+ op->get_data(),
+ false); // FIXME
+ pg->add_object(store, op->get_oid());
+ assert(r >= 0);
+
+ // set version
+ version_t v = op->get_version();
+ store->setattr(op->get_oid(), "version", &v, sizeof(v));
+
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(reply, op->get_asker());
+
}
-
- logger->inc("r_push");
- logger->inc("r_pushb", op->get_length());
-
- // write out buffers
- int r = store->write(op->get_oid(),
- op->get_length(), 0,
- op->get_data(),
- false); // FIXME
- pg->add_object(store, op->get_oid());
- assert(r >= 0);
-
- // set version
- version_t v = op->get_version();
- store->setattr(op->get_oid(), "version", &v, sizeof(v));
-
- // reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(reply, op->get_asker());
-
unlock_object(op->get_oid());
delete op;
}
dout(7) << "rep_push_reply " << hex << oid << dec << endl;
- osd_lock.Lock();
PGPeer *p = push_ops[op->get_tid()];
PG *pg = p->pg;
assert(p); // FIXME: how will this work?
// more?
do_recovery(pg);
- osd_lock.Unlock();
-
delete op;
}
void OSD::op_rep_remove(MOSDOp *op)
{
- dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
lock_object(op->get_oid());
-
- // sanity checks
- assert(store->exists(op->get_oid()));
-
- version_t v = 0;
- store->getattr(op->get_oid(), "version", &v, sizeof(v));
- assert(v == op->get_version());
-
- // remove
- store->collection_remove(op->get_pg(), op->get_oid());
- int r = store->remove(op->get_oid());
- assert(r == 0);
-
- // reply
- messenger->send_message(new MOSDOpReply(op, r, osdmap, true),
- op->get_asker());
-
+ {
+ dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+
+ // sanity checks
+ assert(store->exists(op->get_oid()));
+
+ version_t v = 0;
+ store->getattr(op->get_oid(), "version", &v, sizeof(v));
+ assert(v == op->get_version());
+
+ // remove
+ store->collection_remove(op->get_pg(), op->get_oid());
+ int r = store->remove(op->get_oid());
+ assert(r == 0);
+
+ // reply
+ messenger->send_message(new MOSDOpReply(op, r, osdmap, true),
+ op->get_asker());
+ }
unlock_object(op->get_oid());
delete op;
}
version_t v = op->get_version();
dout(7) << "rep_remove_reply " << hex << oid << dec << endl;
- osd_lock.Lock();
PGPeer *p = remove_ops[op->get_tid()];
PG *pg = p->pg;
assert(p); // FIXME: how will this work?
// more?
do_recovery(pg);
- osd_lock.Unlock();
-
delete op;
}
void OSD::op_rep_modify_sync(MOSDOp *op)
{
- osd_lock.Lock();
+ object_t oid = op->get_oid();
+ lock_object(oid);
{
dout(2) << "rep_modify_sync on op " << op << endl;
MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
messenger->send_message(ack2, op->get_asker());
delete op;
}
- osd_lock.Unlock();
+ unlock_object(oid);
}
void OSD::op_rep_modify(MOSDOp *op)
{
// when we introduce unordered messaging.. FIXME
object_t oid = op->get_oid();
- version_t ov = 0;
- if (store->exists(oid))
- store->getattr(oid, "version", &ov, sizeof(ov));
- //dout(15) << "rep_modify old versoin is " << ov << " msg sez " << op->get_old_version() << endl;
- assert(op->get_old_version() == ov);
- // PG
- PG *pg = get_pg(op->get_pg());
- assert(pg);
-
- dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
-
- int r = 0;
- Context *onsync = 0;
- if (op->get_op() == OSD_OP_REP_WRITE) {
- // write
- assert(op->get_data().length() == op->get_length());
- onsync = new C_OSD_RepModifySync(this, op);
- r = apply_write(op, op->get_version(), onsync);
- if (ov == 0) pg->add_object(store, oid);
-
- logger->inc("r_wr");
- logger->inc("r_wrb", op->get_length());
- } else if (op->get_op() == OSD_OP_REP_DELETE) {
- // delete
- store->collection_remove(pg->get_pgid(), op->get_oid());
- r = store->remove(oid);
- } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
- // truncate
- r = store->truncate(oid, op->get_offset());
- } else assert(0);
-
- if (onsync) {
- // ack
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
- messenger->send_message(ack, op->get_asker());
- } else {
- // sync, safe
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(ack, op->get_asker());
- delete op;
+ lock_object(oid);
+ {
+ version_t ov = 0;
+ if (store->exists(oid))
+ store->getattr(oid, "version", &ov, sizeof(ov));
+ //dout(15) << "rep_modify old versoin is " << ov << " msg sez " << op->get_old_version() << endl;
+ assert(op->get_old_version() == ov);
+
+ // PG
+ PG *pg = get_pg(op->get_pg());
+ assert(pg);
+
+ dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
+
+ int r = 0;
+ Context *onsync = 0;
+ if (op->get_op() == OSD_OP_REP_WRITE) {
+ // write
+ assert(op->get_data().length() == op->get_length());
+ onsync = new C_OSD_RepModifySync(this, op);
+ r = apply_write(op, op->get_version(), onsync);
+ if (ov == 0) pg->add_object(store, oid);
+
+ logger->inc("r_wr");
+ logger->inc("r_wrb", op->get_length());
+ } else if (op->get_op() == OSD_OP_REP_DELETE) {
+ // delete
+ store->collection_remove(pg->get_pgid(), op->get_oid());
+ r = store->remove(oid);
+ } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
+ // truncate
+ r = store->truncate(oid, op->get_offset());
+ } else assert(0);
+
+ if (onsync) {
+ // ack
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(ack, op->get_asker());
+ } else {
+ // sync, safe
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(ack, op->get_asker());
+ delete op;
+ }
}
+ unlock_object(oid);
}
void OSD::handle_op(MOSDOp *op)
{
- osd_lock.Lock();
-
pg_t pgid = op->get_pg();
PG *pg = get_pg(pgid);
// op's is newer
dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
wait_for_new_map(op);
- osd_lock.Unlock();
return;
}
op->get_asker());
delete op;
}
- osd_lock.Unlock();
return;
}
if (!pg) {
dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl;
waiting_for_pg[pgid].push_back(op);
- osd_lock.Unlock();
return;
}
else {
if (!pg->is_peered()) {
dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
pg->waiting_for_peered.push_back(op);
- osd_lock.Unlock();
return;
}
if (!pg->objects_pulling.count(oid))
pull_replica(pg, oid);
pg->waiting_for_missing_object[oid].push_back(op);
- osd_lock.Unlock();
return;
}
}
pg->waiting_for_clean_object[oid].push_back(op);
if (pg->objects_pushing.count(oid) == 0)
push_replica(pg, oid);
- osd_lock.Unlock();
return;
}
pg->waiting_for_clean_object[oid].push_back(op);
if (pg->objects_removing.count(oid) == 0)
remove_replica(pg, oid);
- osd_lock.Unlock();
return;
}
}
dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary changed on pg " << hex << op->get_pg() << dec << endl;
MOSDOpReply *fail = new MOSDOpReply(op, -1, osdmap, false);
messenger->send_message(fail, op->get_asker());
- osd_lock.Unlock();
return;
} else {
dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary same on pg " << hex << op->get_pg() << dec << endl;
}
// queue op
- if (g_conf.osd_maxthreads < 1) {
- pending_ops++;
- do_op(op);
- } else
- queue_op(op);
-
- osd_lock.Unlock();
-}
-
-void OSD::queue_op(MOSDOp *op) {
- // inc pending count
- osd_lock.Lock();
pending_ops++;
- osd_lock.Unlock();
-
- threadpool->put_op(op);
+ if (g_conf.osd_maxthreads < 1) {
+ osd_lock.Unlock();
+ do_op(op); // or, just do it now
+ osd_lock.Lock();
+ } else {
+ threadpool->put_op(op);
+ }
}
-
void doop(OSD *u, MOSDOp *p) {
u->do_op(p);
}
void OSD::wait_for_no_ops()
{
- osd_lock.Lock();
if (pending_ops > 0) {
dout(7) << "wait_for_no_ops - waiting for " << pending_ops << endl;
waiting_for_no_ops = true;
assert(pending_ops == 0);
}
dout(7) << "wait_for_no_ops - none" << endl;
- osd_lock.Unlock();
}
{
object_t oid = op->get_oid();
lock_object(oid);
-
- // read into a buffer
- bufferlist bl;
- long got = store->read(oid,
- op->get_length(), op->get_offset(),
- bl);
- // set up reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- if (got >= 0) {
- reply->set_result(0);
- reply->set_data(bl);
- reply->set_length(got);
-
- logger->inc("c_rd");
- logger->inc("c_rdb", got);
-
- } else {
- reply->set_result(got); // error
- reply->set_length(0);
+ {
+ // read into a buffer
+ bufferlist bl;
+ long got = store->read(oid,
+ op->get_length(), op->get_offset(),
+ bl);
+ // set up reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ if (got >= 0) {
+ reply->set_result(0);
+ reply->set_data(bl);
+ reply->set_length(got);
+
+ logger->inc("c_rd");
+ logger->inc("c_rdb", got);
+
+ } else {
+ reply->set_result(got); // error
+ reply->set_length(0);
+ }
+
+ dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+
+ logger->inc("rd");
+ if (got >= 0) logger->inc("rdb", got);
+
+ // send it
+ messenger->send_message(reply, op->get_asker());
}
-
- dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
-
- logger->inc("rd");
- if (got >= 0) logger->inc("rdb", got);
-
- // send it
- messenger->send_message(reply, op->get_asker());
-
- delete op;
-
unlock_object(oid);
+ delete op;
}
void OSD::op_stat(MOSDOp *op)
{
object_t oid = op->get_oid();
lock_object(oid);
+ {
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ int r = store->stat(oid, &st);
+
+ dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
+ reply->set_object_size(st.st_size);
+ messenger->send_message(reply, op->get_asker());
+
+ logger->inc("stat");
+ }
+ unlock_object(oid);
- struct stat st;
- memset(&st, sizeof(st), 0);
- int r = store->stat(oid, &st);
-
- dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
-
- MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
- reply->set_object_size(st.st_size);
- messenger->send_message(reply, op->get_asker());
-
- logger->inc("stat");
delete op;
-
- unlock_object(oid);
}
void OSD::op_modify_sync(OSDReplicaOp *repop)
{
- osd_lock.Lock();
+ object_t oid = repop->op->get_oid();
+ lock_object(oid);
{
dout(2) << "op_modify_sync on op " << repop->op << endl;
repop->local_sync = true;
if (repop->can_send_sync()) {
- dout(2) << "op_modify_sync on " << hex << repop->op->get_oid() << dec << " op " << repop->op << endl;
+ dout(2) << "op_modify_sync on " << hex << oid << dec << " op " << repop->op << endl;
if (repop->op->wants_safe()) {
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap, true);
messenger->send_message(reply, repop->op->get_asker());
delete repop;
}
}
- osd_lock.Unlock();
+ unlock_object(oid);
}
void OSD::op_modify(MOSDOp *op)
if (op->get_op() == OSD_OP_TRUNCATE) opname = "op_truncate";
lock_object(oid);
-
- // version? clean?
- version_t ov = 0; // 0 == dne (yet)
- store->getattr(oid, "version", &ov, sizeof(ov));
- version_t nv = messenger->peek_lamport();
- assert(nv > ov);
-
- dout(12) << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl;
-
- // issue replica writes
- OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
-
- osd_lock.Lock();
- PG *pg = get_pg(op->get_pg());
- for (unsigned i=1; i<pg->acting.size(); i++) {
- issue_replica_op(pg, repop, pg->acting[i]);
- }
- osd_lock.Unlock();
-
- // pre-ack
- //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
- //messenger->send_message(reply, op->get_asker());
-
- // do it
- int r;
- if (op->get_op() == OSD_OP_WRITE) {
- // write
- assert(op->get_data().length() == op->get_length());
- Context *onsync = new C_OSD_WriteSync(this, repop);
- r = apply_write(op, nv, onsync);
-
- // put new object in proper collection
- if (ov == 0) pg->add_object(store, oid);
-
- repop->local_ack = true;
-
- logger->inc("c_wr");
- logger->inc("c_wrb", op->get_length());
- }
- else if (op->get_op() == OSD_OP_TRUNCATE) {
- // truncate
- r = store->truncate(oid, op->get_offset());
- repop->local_ack = true;
- repop->local_sync = true;
- }
- else if (op->get_op() == OSD_OP_DELETE) {
- // delete
- pg->remove_object(store, op->get_oid());
- r = store->remove(oid);
- repop->local_ack = true;
- repop->local_sync = true;
- }
- else assert(0);
-
- // can we reply yet?
- osd_lock.Lock();
{
+ // version? clean?
+ version_t ov = 0; // 0 == dne (yet)
+ store->getattr(oid, "version", &ov, sizeof(ov));
+ version_t nv = messenger->peek_lamport();
+ assert(nv > ov);
+
+ dout(12) << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl;
+
+ // issue replica writes
+ OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
+
+ PG *pg = get_pg(op->get_pg());
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ issue_replica_op(pg, repop, pg->acting[i]);
+ }
+
+ // pre-ack
+ //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+ //messenger->send_message(reply, op->get_asker());
+
+ // do it
+ int r;
+ if (op->get_op() == OSD_OP_WRITE) {
+ // write
+ assert(op->get_data().length() == op->get_length());
+ Context *onsync = new C_OSD_WriteSync(this, repop);
+ r = apply_write(op, nv, onsync);
+
+ // put new object in proper collection
+ if (ov == 0) pg->add_object(store, oid);
+
+ repop->local_ack = true;
+
+ logger->inc("c_wr");
+ logger->inc("c_wrb", op->get_length());
+ }
+ else if (op->get_op() == OSD_OP_TRUNCATE) {
+ // truncate
+ r = store->truncate(oid, op->get_offset());
+ repop->local_ack = true;
+ repop->local_sync = true;
+ }
+ else if (op->get_op() == OSD_OP_DELETE) {
+ // delete
+ pg->remove_object(store, op->get_oid());
+ r = store->remove(oid);
+ repop->local_ack = true;
+ repop->local_sync = true;
+ }
+ else assert(0);
+
+ // can we reply yet?
if (repop->can_send_sync()) {
dout(10) << opname << " sending sync on " << op << endl;
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
messenger->send_message(reply, op->get_asker());
}
}
- osd_lock.Unlock();
-
unlock_object(oid);
}