PG *pg = open_pg(it->first);
assert(pg);
- dout(10) << " " << *pg << " remote state " << it->second.state
+ dout(10) << " " << *pg << " osd" << from << " remote state " << it->second.state
<< " w/ " << it->second.objects.size() << " objects, "
<< it->second.deleted.size() << " deleted" << endl;
for (map<int, PGPeer*>::iterator pit = pg->get_peers().begin();
pit != pg->get_peers().end();
pit++) {
+ dout(10) << " " << *pg << " peer osd" << pit->first << " state " << pit->second->get_state() << endl;
if (!pit->second->is_active()) fully = false;
}
logger->inc("op");
- // do the op
- switch (op->get_op()) {
+ // replication ops?
+ if (OSD_OP_IS_REP(op->get_op())) {
+ // replication/recovery
+ switch (op->get_op()) {
+ case OSD_OP_REP_PULL:
+ op_rep_pull(op);
+ break;
+ case OSD_OP_REP_PUSH:
+ op_rep_push(op);
+ break;
+ case OSD_OP_REP_REMOVE:
+ op_rep_remove(op);
+ break;
+ case OSD_OP_REP_WRITE:
+ op_rep_write(op);
+ break;
+ default:
+ assert(0);
+ }
+ } else {
+ // regular op
- // normal
-
- case OSD_OP_READ:
- op_read(op);
- break;
- case OSD_OP_WRITE:
- op_write(op);
- break;
- case OSD_OP_DELETE:
- op_delete(op);
- break;
- case OSD_OP_TRUNCATE:
- op_truncate(op);
- break;
- case OSD_OP_STAT:
- op_stat(op);
- break;
+ pg_t pgid = op->get_pg();
+ PG *pg = open_pg(pgid);
- // replication/recovery
- case OSD_OP_REP_PULL:
- op_rep_pull(op);
- break;
- case OSD_OP_REP_PUSH:
- op_rep_push(op);
- break;
- case OSD_OP_REP_REMOVE:
- op_rep_remove(op);
- break;
- case OSD_OP_REP_WRITE:
- op_rep_write(op);
- break;
-
- default:
- assert(0);
+ // PG must be peered for all client ops.
+ if (!pg) {
+ dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl;
+ waiting_for_pg[pgid].push_back(op);
+ return;
+ }
+ if (!pg->is_peered()) {
+ dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
+ waiting_for_pg_peered[pgid].push_back(op);
+ return;
+ }
+
+ // do op
+ switch (op->get_op()) {
+ case OSD_OP_READ:
+ op_read(op, pg);
+ break;
+ case OSD_OP_WRITE:
+ op_write(op, pg);
+ break;
+ case OSD_OP_DELETE:
+ op_delete(op, pg);
+ break;
+ case OSD_OP_TRUNCATE:
+ op_truncate(op, pg);
+ break;
+ case OSD_OP_STAT:
+ op_stat(op, pg);
+ break;
+ default:
+ assert(0);
+ }
}
// finish
osd_lock.Unlock();
}
-void OSD::op_read(MOSDOp *r)
+
+
+// READ OPS
+
+bool OSD::object_complete(PG *pg, object_t oid, Message *op)
{
+ //v = 0;
+
+ if (pg->is_complete()) {
+ /*
+ if (store->exists(oid)) {
+ store->getattr(oid, "version", &v, sizeof(v));
+ assert(v>0);
+ }
+ */
+ } else {
+ if (pg->objects.count(oid)) {
+ //v = pg->objects[oid];
+
+ if (pg->objects_loc.count(oid)) {
+ // proxying, wait.
+ dout(7) << "object " << hex << oid << dec << /*" v " << v << */" in " << *pg
+ << " exists but not local (yet)" << endl;
+ waiting_for_object[oid].push_back(op);
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+void OSD::op_read(MOSDOp *op, PG *pg)
+{
+ object_t oid = op->get_oid();
+ lock_object(oid);
+
+ // version? clean?
+ if (!object_complete(pg, oid, op)) {
+ unlock_object(oid);
+ return;
+ }
+
// read into a buffer
- bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read
- long got = store->read(r->get_oid(),
- r->get_length(), r->get_offset(),
+ bufferptr bptr = new buffer(op->get_length()); // prealloc space for entire read
+ long got = store->read(oid,
+ op->get_length(), op->get_offset(),
bptr.c_str());
// set up reply
- MOSDOpReply *reply = new MOSDOpReply(r, 0, osdmap);
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap);
if (got >= 0) {
bptr.set_length(got); // properly size the buffer
reply->set_length(0);
}
- dout(12) << "read got " << got << " / " << r->get_length() << " bytes from obj " << hex << r->get_oid() << dec << endl;
+ dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
logger->inc("rd");
if (got >= 0) logger->inc("rdb", got);
// send it
- messenger->send_message(reply, r->get_asker());
+ messenger->send_message(reply, op->get_asker());
+
+ delete op;
+
+ unlock_object(oid);
+}
+
+void OSD::op_stat(MOSDOp *op, PG *pg)
+{
+ object_t oid = op->get_oid();
+ lock_object(oid);
+
+ // version? clean?
+ if (!object_complete(pg, oid, op)) {
+ unlock_object(oid);
+ return;
+ }
+
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ int r = store->stat(oid, &st);
+
+ dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap);
+ reply->set_object_size(st.st_size);
+ messenger->send_message(reply, op->get_asker());
+
+ logger->inc("stat");
+ delete op;
- delete r;
+ unlock_object(oid);
}
-// -- osd_write
+// WRITE OPS
void OSD::apply_write(MOSDOp *op, bool write_sync, version_t v)
{
store->setattr(op->get_oid(), "version", &v, sizeof(v));
}
-
-void OSD::op_write(MOSDOp *op)
+bool OSD::object_clean(PG *pg, object_t oid, version_t& v, Message *op)
{
- // PG
- pg_t pgid = op->get_pg();
- PG *pg = open_pg(pgid);
- if (!pg) {
- dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl;
- waiting_for_pg[pgid].push_back(op);
- return;
- }
- if (!pg->is_peered()) {
- dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
- waiting_for_pg_peered[pgid].push_back(op);
- return;
- }
-
- object_t oid = op->get_oid();
-
- lock_object(oid);
-
- // version
- version_t v = 0; // 0 == dne (yet)
-
+ v = 0;
+
if (pg->is_complete() && pg->is_clean()) {
// PG is complete+clean, easy shmeasy!
if (store->exists(oid)) {
- // inc version
store->getattr(oid, "version", &v, sizeof(v));
+ assert(v>0);
}
} else {
// PG is recovering|replicating, blech.
dout(10) << " pg not clean, checking if " << hex << oid << dec << " v " << v << " is specifically clean yet!" << endl;
// object (logically) exists
if (!pg->existant_object_is_clean(oid, v)) {
- dout(7) << "op_write " << hex << oid << dec << " v " << v << " in " << *pg
+ dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg
<< " exists but is not clean" << endl;
waiting_for_clean_object[oid].push_back(op);
- unlock_object(oid);
- return;
+ return false;
}
} else {
// object (logically) dne
if (store->exists(oid) ||
!pg->nonexistant_object_is_clean(oid)) {
- dout(7) << "op_write " << hex << oid << dec << " v " << v << " in " << *pg
+ dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg
<< " dne but is not clean" << endl;
waiting_for_clean_object[oid].push_back(op);
- unlock_object(oid);
- return;
+ return false;
}
}
}
+ return true;
+}
+
+void OSD::op_write(MOSDOp *op, PG *pg)
+{
+ object_t oid = op->get_oid();
+
+ lock_object(oid);
+
+ // version? clean?
+ version_t v = 0; // 0 == dne (yet)
+ if (!object_clean(pg, oid, v, op)) {
+ unlock_object(oid);
+ return;
+ }
v++; // we're good!
dout(12) << "op_write " << hex << oid << dec << " v " << v << endl;
replica_write_tids[op].insert(tid);
replica_writes[tid] = op;
- replica_pg_osd_tids[pgid][osd].insert(tid);
+ replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
}
replica_write_lock.Unlock();
// write
apply_write(op, true, v);
- PG *r = open_pg(pgid);
if (v == 1) {
// put new object in proper collection
- r->add_object(store, oid);
+ pg->add_object(store, oid);
}
// reply?
unlock_object(oid);
}
-/*
-void OSD::handle_mkfs(MOSDMkfs *op)
-{
- dout(3) << "MKFS" << endl;
-
- // wipe store
- int r = store->mkfs();
-
- // create PGs
- list<pg_t> pg_list;
- for (int nrep = 2; nrep < 4; nrep++) {
- ps_t maxps = 1LL << osdmap->get_pg_bits();
- for (pg_t ps = 0; ps < maxps; ps++) {
- pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
- vector<int> acting;
- osdmap->pg_to_acting_osds(pgid, acting);
-
- if (acting[0] == whoami) {
- PG *pg = create_pg(pgid);
- pg->acting = acting;
- pg->calc_role(whoami);
- pg->state_set(PG_STATE_COMPLETE);
+void OSD::op_delete(MOSDOp *op, PG *pg)
+{
+ object_t oid = op->get_oid();
- dout(7) << "created " << *pg << endl;
+ lock_object(oid);
- pg_list.push_back(pgid);
- }
- }
+ // version? clean?
+ version_t v = 0; // 0 == dne (yet)
+ if (!object_clean(pg, oid, v, op)) {
+ unlock_object(oid);
+ return;
}
+ v++; // we're good!
- // activate!
- if (osdmap)
- activate_map(pg_list);
-
- // reply!
- messenger->send_message(new MOSDMkfsAck(op), op->get_asker());
-
- delete op;
-}
-*/
-
-void OSD::op_delete(MOSDOp *op)
-{
- int r = store->remove(op->get_oid());
+ int r = store->remove(oid);
dout(12) << "delete on " << hex << op->get_oid() << dec << " r = " << r << endl;
// "ack"
logger->inc("rm");
delete op;
+
+ unlock_object(oid);
}
-void OSD::op_truncate(MOSDOp *op)
+void OSD::op_truncate(MOSDOp *op, PG *pg)
{
- int r = store->truncate(op->get_oid(), op->get_offset());
+ object_t oid = op->get_oid();
+
+ lock_object(oid);
+
+ // version? clean?
+ version_t v = 0; // 0 == dne (yet)
+ if (!object_clean(pg, oid, v, op)) {
+ unlock_object(oid);
+ return;
+ }
+ v++; // we're good!
+
+ int r = store->truncate(oid, op->get_offset());
dout(3) << "truncate on " << hex << op->get_oid() << dec << " at " << op->get_offset() << " r = " << r << endl;
// "ack"
logger->inc("trunc");
delete op;
-}
-void OSD::op_stat(MOSDOp *op)
-{
- struct stat st;
- memset(&st, sizeof(st), 0);
- int r = store->stat(op->get_oid(), &st);
-
- dout(3) << "stat on " << hex << op->get_oid() << dec << " r = " << r << " size = " << st.st_size << endl;
-
- MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap);
- reply->set_object_size(st.st_size);
- messenger->send_message(reply, op->get_asker());
-
- logger->inc("stat");
- delete op;
+ unlock_object(oid);
}
void doop(OSD *u, MOSDOp *p) {
bool PG::existant_object_is_clean(object_t o, version_t v)
{
assert(is_peered() && !is_clean());
-
+
return objects_unrep.count(o) ? false:true;
/*
it != peers.end();
it++) {
//if (!it->second->is_active()) continue;
+ if (it->second->is_complete()) continue;
if (it->second->peer_state.objects.count(o)) {
return false;
}
map<object_t, version_t> local_objects;
scan_local_objects(local_objects, store);
+ dout(10) << " " << local_objects.size() << " local objects" << endl;
+
objects = local_objects; // start w/ local object set.
- // newest objects -> objects
- for (map<int, PGPeer*>::iterator pit = peers.begin();
- pit != peers.end();
- pit++) {
- for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
- oit != pit->second->peer_state.objects.end();
- oit++) {
- // know this object?
- if (objects.count(oit->first)) {
- object_t v = objects[oit->first];
- if (oit->second < v) // older?
- continue; // useless
- else if (oit->second == v) // same?
- objects_nrep[oit->first]++; // not quite accurate bc local_objects isn't included in nrep
- else { // newer!
+ if (!is_complete()) {
+ // newest objects -> objects
+ for (map<int, PGPeer*>::iterator pit = peers.begin();
+ pit != peers.end();
+ pit++) {
+ for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
+ oit != pit->second->peer_state.objects.end();
+ oit++) {
+ // know this object?
+ if (objects.count(oit->first)) {
+ object_t v = objects[oit->first];
+ if (oit->second < v) // older?
+ continue; // useless
+ else if (oit->second == v) // same?
+ objects_nrep[oit->first]++; // not quite accurate bc local_objects isn't included in nrep
+ else { // newer!
+ objects[oit->first] = oit->second;
+ objects_nrep[oit->first] = 0;
+ objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
+ }
+ } else {
+ // newly seen object!
objects[oit->first] = oit->second;
objects_nrep[oit->first] = 0;
objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
}
- } else {
- // newly seen object!
- objects[oit->first] = oit->second;
- objects_nrep[oit->first] = 0;
- objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
}
}
- }
- /*
- // remove deleted objects
- assim_deleted_objects(deleted_objects); // locally
+ /*
+ // remove deleted objects
+ assim_deleted_objects(deleted_objects); // locally
for (map<int, PGPeer*>::iterator pit = peers.begin();
- pit != peers.end();
+ pit != peers.end();
pit++)
assim_deleted_objects(pit->second->peer_state.deleted); // on peers
*/
- // just cleanup old local objects
- // FIXME: do this async?
- for (map<object_t, version_t>::iterator it = local_objects.begin();
- it != local_objects.end();
- it++) {
- if (objects.count(it->first) && objects[it->first] == it->second) continue; // same!
-
- dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl;
- store->remove(it->first);
- local_objects.erase(it->first);
- }
+ // just cleanup old local objects
+ // FIXME: do this async?
+ for (map<object_t, version_t>::iterator it = local_objects.begin();
+ it != local_objects.end();
+ it++) {
+ if (objects.count(it->first) && objects[it->first] == it->second) continue; // same!
+
+ dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl;
+ store->remove(it->first);
+ local_objects.erase(it->first);
+ }
+ // get complete PG
+ plan_pull();
+ }
- // make remote action plans!
- plan_pull();
+ // sync up replicas
plan_push_cleanup();
}
}
if (pull_plan.empty()) {
- dout(10) << "nothing to pull, marking complete" << endl;
+ dout(10) << " nothing to pull, marking complete" << endl;
mark_complete();
}
}
PGPeer *pgp = peers[acting[r]];
assert(pgp);
+ if (pgp->is_complete()) continue;
+
if (pgp->peer_state.objects.count(oit->first) == 0 ||
oit->second < pgp->peer_state.objects[oit->first]) {
dout(10) << " o " << hex << oit->first << dec << " v " << oit->second << " old|dne on osd" << pgp->get_peer() << ", pushing" << endl;
if (role == 0) continue; // skip primary
PGPeer *pgp = pit->second;
+ assert(pgp->is_active());
+ if (pgp->is_complete()) {
+ dout(12) << " peer osd" << pit->first << " is complete" << endl;
+ continue;
+ }
+ dout(12) << " peer osd" << pit->first << " is !complete" << endl;
+
for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
oit != pit->second->peer_state.objects.end();
oit++) {
}
if (push_plan.empty() && clean_plan.empty()) {
- dout(10) << "nothing to push|clean, marking clean" << endl;
+ dout(10) << " nothing to push|clean, marking clean" << endl;
mark_clean();
}
}